13template <
typename Result,
typename Input,
typename Model>
class Instance {
17 std::shared_ptr<std::promise<Result>>
pro;
24 volatile bool run_ =
false;
39 item.pro->set_value(Result());
50 virtual std::shared_future<Result>
commit(
const Input &input) {
53 item.
pro.reset(
new std::promise<Result>());
59 return item.
pro->get_future();
62 virtual std::vector<std::shared_future<Result>>
63 commits(
const std::vector<Input> &inputs) {
64 std::vector<std::shared_future<Result>> output;
67 for (
int i = 0; i < (int)inputs.size(); ++i) {
69 item.
input = inputs[i];
70 item.
pro.reset(
new std::promise<Result>());
71 output.emplace_back(item.
pro->get_future());
79 template <
typename LoadMethod>
80 bool start(
const LoadMethod &loadmethod,
int max_items_processed = 1,
81 void *stream =
nullptr) {
84 this->stream_ = stream;
85 this->max_items_processed_ = max_items_processed;
86 std::promise<bool> status;
88 std::make_shared<std::thread>(&Instance::worker<LoadMethod>,
this,
89 std::ref(loadmethod), std::ref(status));
90 return status.get_future().get();
94 template <
typename LoadMethod>
95 void worker(
const LoadMethod &loadmethod, std::promise<bool> &status) {
96 std::shared_ptr<Model> model = loadmethod();
97 if (model ==
nullptr) {
98 status.set_value(
false);
103 status.set_value(
true);
105 std::vector<Item> fetch_items;
106 std::vector<Input> inputs;
108 inputs.resize(fetch_items.size());
109 std::transform(fetch_items.begin(), fetch_items.end(), inputs.begin(),
110 [](Item &item) { return item.input; });
112 auto ret = model->forwards(inputs,
stream_);
113 for (
int i = 0; i < (int)fetch_items.size(); ++i) {
114 if (i < (
int)ret.size()) {
115 fetch_items[i].pro->set_value(ret[i]);
117 fetch_items[i].pro->set_value(Result());
127 virtual bool get_items_and_wait(std::vector<Item> &fetch_items,
136 for (
int i = 0; i < max_size && !
input_queue_.empty(); ++i) {
137 fetch_items.emplace_back(std::move(
input_queue_.front()));
143 virtual bool get_item_and_wait(
Item &fetch_item) {