Newer
Older
#include <ftl/rgbd/group.hpp>
#include <ftl/rgbd/source.hpp>
#include <ftl/timer.hpp>
#include <chrono>
using ftl::rgbd::Group;
using ftl::rgbd::Source;
using ftl::rgbd::kFrameBufferSize;
using std::vector;
using std::chrono::milliseconds;
using std::this_thread::sleep_for;
Group::Group() : framesets_(kFrameBufferSize), head_(0) {
framesets_[0].timestamp = -1;
//setFPS(20);
mspf_ = ftl::timer::getInterval();
for (auto s : sources_) {
s->removeCallback();
}
main_id_.cancel();
swap_id_.cancel();
cap_id_.cancel();
UNIQUE_LOCK(mutex_, lk);
// Make sure all jobs have finished
while (jobs_ > 0) {
sleep_for(milliseconds(10));
}
}
//void Group::setFPS(int fps) {
// mspf_ = 1000 / fps;
// ftl::timer::setInterval(mspf_);
//}
void Group::addSource(ftl::rgbd::Source *src) {
UNIQUE_LOCK(mutex_, lk);
size_t ix = sources_.size();
sources_.push_back(src);
src->setCallback([this,ix,src](int64_t timestamp, cv::Mat &rgb, cv::Mat &depth) {
//LOG(INFO) << "SRC CB: " << timestamp << " (" << framesets_[head_].timestamp << ")";
UNIQUE_LOCK(mutex_, lk);
if (timestamp > framesets_[head_].timestamp) {
// Add new frameset
_addFrameset(timestamp);
} else if (framesets_[(head_+1)%kFrameBufferSize].timestamp > timestamp) {
// Too old, just ditch it
LOG(WARNING) << "Received frame too old for buffer";
return;
}
// Search backwards to find match
for (size_t i=0; i<kFrameBufferSize; ++i) {
FrameSet &fs = framesets_[(head_+kFrameBufferSize-i) % kFrameBufferSize];
if (fs.timestamp == timestamp) {
lk.unlock();
UNIQUE_LOCK(fs.mtx, lk2);
//LOG(INFO) << "Adding frame: " << ix << " for " << timestamp;
// Ensure channels match source mat format
fs.channel1[ix].create(rgb.size(), rgb.type());
fs.channel2[ix].create(depth.size(), depth.type());
cv::swap(rgb, fs.channel1[ix]);
cv::swap(depth, fs.channel2[ix]);
if (fs.count == sources_.size()) {
//LOG(INFO) << "COMPLETE SET: " << fs.timestamp;
} else {
//LOG(INFO) << "INCOMPLETE SET (" << ix << "): " << fs.timestamp;
}
if (callback_ && fs.count == sources_.size()) {
if (callback_(fs)) {
// Reset count to prevent multiple reads of these frames
fs.count = 0;
DLOG(WARNING) << "Frame timestamp not found in buffer";
void Group::addGroup(Group *grp) {
}
void Group::_retrieveJob(ftl::rgbd::Source *src) {
try {
src->retrieve();
} catch (std::exception &ex) {
LOG(ERROR) << "Exception when retrieving frame";
LOG(ERROR) << ex.what();
}
catch (...) {
LOG(ERROR) << "Unknown exception when retrieving frame";
void Group::_computeJob(ftl::rgbd::Source *src) {
try {
src->compute();
} catch (std::exception &ex) {
LOG(ERROR) << "Exception when computing frame";
LOG(ERROR) << ex.what();
}
catch (...) {
LOG(ERROR) << "Unknown exception when computing frame";
}
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) {
if (latency_ == 0) {
callback_ = cb;
}
// 1. Capture camera frames with high precision
cap_id_ = ftl::timer::add(ftl::timer::kTimerHighPrecision, [this](int64_t ts) {
skip_ = jobs_ != 0; // Last frame not finished so skip all steps
if (skip_) return;
last_ts_ = ts;
for (auto s : sources_) {
s->capture(ts);
}
});
// 2. After capture, swap any internal source double buffers
swap_id_ = ftl::timer::add(ftl::timer::kTimerSwap, [this](int64_t ts) {
if (skip_) return;
for (auto s : sources_) {
s->swap();
}
});
// 3. Issue IO retrieve ad compute jobs before finding a valid
// frame at required latency to pass to callback.
main_id_ = ftl::timer::add(ftl::timer::kTimerMain, [this,cb](int64_t ts) {
if (skip_) return;
jobs_++;
for (auto s : sources_) {
jobs_ += 2;
ftl::pool.push([this,s](int id) {
_retrieveJob(s);
--jobs_;
});
ftl::pool.push([this,s](int id) {
_computeJob(s);
--jobs_;
});
}
// Find a previous frameset and specified latency and do the sync
// callback with that frameset.
if (latency_ > 0) {
ftl::rgbd::FrameSet *fs = nullptr;
UNIQUE_LOCK(mutex_, lk);
fs = _getFrameset(latency_);
if (fs) {
UNIQUE_LOCK(fs->mtx, lk2);
lk.unlock();
cb(*fs);
// The buffers are invalid after callback so mark stale
fs->stale = true;
} else {
//LOG(INFO) << "NO FRAME FOUND: " << last_ts_ - latency_*mspf_;
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
jobs_--;
});
ftl::timer::start(true);
}
//ftl::rgbd::FrameSet &Group::_getRelativeFrameset(int rel) {
// int idx = (rel < 0) ? (head_+kFrameBufferSize+rel)%kFrameBufferSize : (head_+rel)%kFrameBufferSize;
// return framesets_[idx];
//}
ftl::rgbd::FrameSet *Group::_getFrameset(int f) {
const int64_t lookfor = last_ts_-f*mspf_;
for (size_t i=1; i<kFrameBufferSize; ++i) {
int idx = (head_+kFrameBufferSize-i)%kFrameBufferSize;
if (framesets_[idx].timestamp == lookfor && framesets_[idx].count != sources_.size()) {
LOG(INFO) << "Required frame not complete (mask " << (framesets_[idx].timestamp) << ")";
continue;
}
if (framesets_[idx].stale) return nullptr;
if (framesets_[idx].timestamp == lookfor && framesets_[idx].count == sources_.size()) {
//framesets_[idx].stale = false;
return &framesets_[idx];
} else if (framesets_[idx].timestamp < lookfor && framesets_[idx].count == sources_.size()) {
//framesets_[idx].stale = true;
return &framesets_[idx];
}
}
return nullptr;
}
void Group::_addFrameset(int64_t timestamp) {
int count = (framesets_[head_].timestamp == -1) ? 200 : (timestamp - framesets_[head_].timestamp) / mspf_;
//LOG(INFO) << "Massive timestamp difference: " << count;
// Allow for massive timestamp changes (Windows clock adjust)
// Only add a single frameset for large changes
if (count < -int(kFrameBufferSize) || count >= kFrameBufferSize-1) {
if (!framesets_[head_].mtx.try_lock()) {
LOG(ERROR) << "Frameset in use!!";
return;
}
framesets_[head_].timestamp = timestamp;
framesets_[head_].count = 0;
framesets_[head_].mask = 0;
framesets_[head_].stale = false;
framesets_[head_].channel1.resize(sources_.size());
framesets_[head_].channel2.resize(sources_.size());
if (framesets_[head_].sources.size() != sources_.size()) {
framesets_[head_].sources.clear();
for (auto s : sources_) framesets_[head_].sources.push_back(s);
}
framesets_[head_].mtx.unlock();
return;
}
if (count < 1) return;
// Must make sure to also insert missing framesets
int64_t lt = (framesets_[head_].timestamp == -1) ? timestamp-mspf_ : framesets_[head_].timestamp;
if (!framesets_[head_].mtx.try_lock()) {
LOG(ERROR) << "Frameset in use!!";
break;
}
framesets_[head_].timestamp = lt+mspf_;
framesets_[head_].count = 0;
framesets_[head_].mask = 0;
framesets_[head_].channel1.resize(sources_.size());
framesets_[head_].channel2.resize(sources_.size());
if (framesets_[head_].sources.size() != sources_.size()) {
framesets_[head_].sources.clear();
for (auto s : sources_) framesets_[head_].sources.push_back(s);
}
framesets_[head_].mtx.unlock();