ReUseX  0.0.1
3D Point Cloud Processing for Building Reuse
Loading...
Searching...
No Matches
cpm.hpp
Go to the documentation of this file.
1#pragma once
2// Comsumer Producer Model
3
4#include <algorithm>
5#include <condition_variable>
6#include <future>
7#include <memory>
8#include <queue>
9#include <thread>
10
12
13template <typename Result, typename Input, typename Model> class Instance {
14 protected:
15 struct Item {
16 Input input;
17 std::shared_ptr<std::promise<Result>> pro;
18 };
19
20 std::condition_variable cond_;
21 std::queue<Item> input_queue_;
22 std::mutex queue_lock_;
23 std::shared_ptr<std::thread> worker_;
24 volatile bool run_ = false;
25 volatile int max_items_processed_ = 0;
26 void *stream_ = nullptr;
27
28 public:
29 virtual ~Instance() { stop(); }
30
31 void stop() {
32 run_ = false;
33 cond_.notify_one();
34 {
35 std::unique_lock<std::mutex> l(queue_lock_);
36 while (!input_queue_.empty()) {
37 auto &item = input_queue_.front();
38 if (item.pro)
39 item.pro->set_value(Result());
40 input_queue_.pop();
41 }
42 };
43
44 if (worker_) {
45 worker_->join();
46 worker_.reset();
47 }
48 }
49
50 virtual std::shared_future<Result> commit(const Input &input) {
51 Item item;
52 item.input = input;
53 item.pro.reset(new std::promise<Result>());
54 {
55 std::unique_lock<std::mutex> __lock_(queue_lock_);
56 input_queue_.push(item);
57 }
58 cond_.notify_one();
59 return item.pro->get_future();
60 }
61
62 virtual std::vector<std::shared_future<Result>>
63 commits(const std::vector<Input> &inputs) {
64 std::vector<std::shared_future<Result>> output;
65 {
66 std::unique_lock<std::mutex> __lock_(queue_lock_);
67 for (int i = 0; i < (int)inputs.size(); ++i) {
68 Item item;
69 item.input = inputs[i];
70 item.pro.reset(new std::promise<Result>());
71 output.emplace_back(item.pro->get_future());
72 input_queue_.push(item);
73 }
74 }
75 cond_.notify_one();
76 return output;
77 }
78
79 template <typename LoadMethod>
80 bool start(const LoadMethod &loadmethod, int max_items_processed = 1,
81 void *stream = nullptr) {
82 stop();
83
84 this->stream_ = stream;
85 this->max_items_processed_ = max_items_processed;
86 std::promise<bool> status;
87 worker_ =
88 std::make_shared<std::thread>(&Instance::worker<LoadMethod>, this,
89 std::ref(loadmethod), std::ref(status));
90 return status.get_future().get();
91 }
92
93 private:
94 template <typename LoadMethod>
95 void worker(const LoadMethod &loadmethod, std::promise<bool> &status) {
96 std::shared_ptr<Model> model = loadmethod();
97 if (model == nullptr) {
98 status.set_value(false);
99 return;
100 }
101
102 run_ = true;
103 status.set_value(true);
104
105 std::vector<Item> fetch_items;
106 std::vector<Input> inputs;
107 while (get_items_and_wait(fetch_items, max_items_processed_)) {
108 inputs.resize(fetch_items.size());
109 std::transform(fetch_items.begin(), fetch_items.end(), inputs.begin(),
110 [](Item &item) { return item.input; });
111
112 auto ret = model->forwards(inputs, stream_);
113 for (int i = 0; i < (int)fetch_items.size(); ++i) {
114 if (i < (int)ret.size()) {
115 fetch_items[i].pro->set_value(ret[i]);
116 } else {
117 fetch_items[i].pro->set_value(Result());
118 }
119 }
120 inputs.clear();
121 fetch_items.clear();
122 }
123 model.reset();
124 run_ = false;
125 }
126
127 virtual bool get_items_and_wait(std::vector<Item> &fetch_items,
128 int max_size) {
129 std::unique_lock<std::mutex> l(queue_lock_);
130 cond_.wait(l, [&]() { return !run_ || !input_queue_.empty(); });
131
132 if (!run_)
133 return false;
134
135 fetch_items.clear();
136 for (int i = 0; i < max_size && !input_queue_.empty(); ++i) {
137 fetch_items.emplace_back(std::move(input_queue_.front()));
138 input_queue_.pop();
139 }
140 return true;
141 }
142
143 virtual bool get_item_and_wait(Item &fetch_item) {
144 std::unique_lock<std::mutex> l(queue_lock_);
145 cond_.wait(l, [&]() { return !run_ || !input_queue_.empty(); });
146
147 if (!run_)
148 return false;
149
150 fetch_item = std::move(input_queue_.front());
151 input_queue_.pop();
152 return true;
153 }
154};
155}; // namespace ReUseX::vision::tensor_rt::cpm
std::shared_ptr< std::thread > worker_
Definition cpm.hpp:23
bool start(const LoadMethod &loadmethod, int max_items_processed=1, void *stream=nullptr)
Definition cpm.hpp:80
virtual std::shared_future< Result > commit(const Input &input)
Definition cpm.hpp:50
std::condition_variable cond_
Definition cpm.hpp:20
virtual std::vector< std::shared_future< Result > > commits(const std::vector< Input > &inputs)
Definition cpm.hpp:63
std::shared_ptr< std::promise< Result > > pro
Definition cpm.hpp:17