ReUseX
0.0.5
3D Point Cloud Processing for Building Reuse
Toggle main menu visibility
Loading...
Searching...
No Matches
cpm.hpp
Go to the documentation of this file.
1
#pragma once
2
#include <algorithm>
3
#include <atomic>
4
#include <condition_variable>
5
#include <future>
6
#include <memory>
7
#include <queue>
8
#include <thread>
9
10
// Comsumer Producer Model
11
12
namespace
reusex::vision::tensor_rt::cpm
{
13
14
template
<
typename
Result,
typename
Input,
typename
Model>
class
Instance
{
15
protected
:
16
struct
Item
{
17
Input
input
;
18
std::shared_ptr<std::promise<Result>>
pro
;
19
};
20
21
std::condition_variable
cond_
;
22
std::queue<Item>
input_queue_
;
23
std::mutex
queue_lock_
;
24
std::shared_ptr<std::thread>
worker_
;
25
std::atomic_bool
run_
{
false
};
26
std::atomic_int
max_items_processed_
{0};
27
void
*
stream_
=
nullptr
;
28
29
public
:
30
virtual
~Instance
() {
stop
(); }
31
32
void
stop
() {
33
run_
=
false
;
34
cond_
.notify_one();
35
{
36
std::unique_lock<std::mutex> l(
queue_lock_
);
37
while
(!
input_queue_
.empty()) {
38
auto
&item =
input_queue_
.front();
39
if
(item.pro)
40
item.pro->set_value(Result());
41
input_queue_
.pop();
42
}
43
};
44
45
if
(
worker_
) {
46
worker_
->join();
47
worker_
.reset();
48
}
49
}
50
51
virtual
std::shared_future<Result>
commit
(
const
Input &input) {
52
Item
item;
53
item.
input
= input;
54
item.
pro
.reset(
new
std::promise<Result>());
55
{
56
std::unique_lock<std::mutex> __lock_(
queue_lock_
);
57
input_queue_
.push(item);
58
}
59
cond_
.notify_one();
60
return
item.
pro
->get_future();
61
}
62
63
virtual
std::vector<std::shared_future<Result>>
64
commits
(
const
std::vector<Input> &inputs) {
65
std::vector<std::shared_future<Result>> output;
66
{
67
std::unique_lock<std::mutex> __lock_(
queue_lock_
);
68
for
(
int
i = 0; i < (int)inputs.size(); ++i) {
69
Item
item;
70
item.
input
= inputs[i];
71
item.
pro
.reset(
new
std::promise<Result>());
72
output.emplace_back(item.
pro
->get_future());
73
input_queue_
.push(item);
74
}
75
}
76
cond_
.notify_one();
77
return
output;
78
}
79
80
template
<
typename
LoadMethod>
81
bool
start
(
const
LoadMethod &loadmethod,
int
max_items_processed = 1,
82
void
*stream =
nullptr
) {
83
stop
();
84
85
this->stream_ = stream;
86
this->max_items_processed_ = max_items_processed;
87
std::promise<bool> status;
88
worker_
=
89
std::make_shared<std::thread>(&Instance::worker<LoadMethod>,
this
,
90
std::ref(loadmethod), std::ref(status));
91
return
status.get_future().get();
92
}
93
94
private
:
95
template
<
typename
LoadMethod>
96
void
worker(
const
LoadMethod &loadmethod, std::promise<bool> &status) {
97
std::shared_ptr<Model> model = loadmethod();
98
if
(model ==
nullptr
) {
99
status.set_value(
false
);
100
return
;
101
}
102
103
run_
=
true
;
104
status.set_value(
true
);
105
106
std::vector<Item> fetch_items;
107
std::vector<Input> inputs;
108
while
(get_items_and_wait(fetch_items,
max_items_processed_
)) {
109
inputs.resize(fetch_items.size());
110
std::transform(fetch_items.begin(), fetch_items.end(), inputs.begin(),
111
[](Item &item) { return item.input; });
112
113
auto
ret = model->forwards(inputs,
stream_
);
114
for
(
int
i = 0; i < (int)fetch_items.size(); ++i) {
115
if
(i < (
int
)ret.size()) {
116
fetch_items[i].pro->set_value(ret[i]);
117
}
else
{
118
fetch_items[i].pro->set_value(Result());
119
}
120
}
121
inputs.clear();
122
fetch_items.clear();
123
}
124
model.reset();
125
run_
=
false
;
126
}
127
128
virtual
bool
get_items_and_wait(std::vector<Item> &fetch_items,
129
int
max_size) {
130
std::unique_lock<std::mutex> l(
queue_lock_
);
131
cond_
.wait(l, [&]() {
return
!
run_
|| !
input_queue_
.empty(); });
132
133
if
(!
run_
)
134
return
false
;
135
136
fetch_items.clear();
137
for
(
int
i = 0; i < max_size && !
input_queue_
.empty(); ++i) {
138
fetch_items.emplace_back(std::move(
input_queue_
.front()));
139
input_queue_
.pop();
140
}
141
return
true
;
142
}
143
144
virtual
bool
get_item_and_wait(
Item
&fetch_item) {
145
std::unique_lock<std::mutex> l(
queue_lock_
);
146
cond_
.wait(l, [&]() {
return
!
run_
|| !
input_queue_
.empty(); });
147
148
if
(!
run_
)
149
return
false
;
150
151
fetch_item = std::move(
input_queue_
.front());
152
input_queue_
.pop();
153
return
true
;
154
}
155
};
156
};
// namespace reusex::vision::tensor_rt::cpm
reusex::vision::tensor_rt::cpm::Instance
Definition
cpm.hpp:14
reusex::vision::tensor_rt::cpm::Instance::cond_
std::condition_variable cond_
Definition
cpm.hpp:21
reusex::vision::tensor_rt::cpm::Instance::start
bool start(const LoadMethod &loadmethod, int max_items_processed=1, void *stream=nullptr)
Definition
cpm.hpp:81
reusex::vision::tensor_rt::cpm::Instance::commits
virtual std::vector< std::shared_future< Result > > commits(const std::vector< Input > &inputs)
Definition
cpm.hpp:64
reusex::vision::tensor_rt::cpm::Instance::stop
void stop()
Definition
cpm.hpp:32
reusex::vision::tensor_rt::cpm::Instance::max_items_processed_
std::atomic_int max_items_processed_
Definition
cpm.hpp:26
reusex::vision::tensor_rt::cpm::Instance::~Instance
virtual ~Instance()
Definition
cpm.hpp:30
reusex::vision::tensor_rt::cpm::Instance::commit
virtual std::shared_future< Result > commit(const Input &input)
Definition
cpm.hpp:51
reusex::vision::tensor_rt::cpm::Instance::input_queue_
std::queue< Item > input_queue_
Definition
cpm.hpp:22
reusex::vision::tensor_rt::cpm::Instance::run_
std::atomic_bool run_
Definition
cpm.hpp:25
reusex::vision::tensor_rt::cpm::Instance::worker_
std::shared_ptr< std::thread > worker_
Definition
cpm.hpp:24
reusex::vision::tensor_rt::cpm::Instance::stream_
void * stream_
Definition
cpm.hpp:27
reusex::vision::tensor_rt::cpm::Instance::queue_lock_
std::mutex queue_lock_
Definition
cpm.hpp:23
reusex::vision::tensor_rt::cpm
Definition
cpm.hpp:12
reusex::vision::tensor_rt::cpm::Instance::Item
Definition
cpm.hpp:16
reusex::vision::tensor_rt::cpm::Instance::Item::input
Input input
Definition
cpm.hpp:17
reusex::vision::tensor_rt::cpm::Instance::Item::pro
std::shared_ptr< std::promise< Result > > pro
Definition
cpm.hpp:18
libs
reusex
include
vision
tensor_rt
common
cpm.hpp
Generated by
1.17.0