Skip to content
Snippets Groups Projects

Reduce latency in device capture and parallel encoding

Merged Nicolas Pope requested to merge feature/send-parallel into master
15 files
+ 157
115
Compare changes
  • Side-by-side
  • Inline
Files
15
@@ -14,6 +14,7 @@
#include <vector>
#include <fstream>
#include <thread>
#include <set>
#include <opencv2/opencv.hpp>
#include <ftl/rgbd.hpp>
@@ -148,7 +149,7 @@ static void run(ftl::Configurable *root) {
outstream->begin();
sender->setStream(outstream);
ftl::audio::Source *audioSrc = ftl::create<ftl::audio::Source>(root, "audio_test");
ftl::audio::Source *audioSrc = ftl::create<ftl::audio::Source>(root, "audio");
ftl::data::Pool pool(root->value("mempool_min", 2),root->value("mempool_max", 5));
auto *creator = new ftl::streams::IntervalSourceBuilder(&pool, 0, {source, audioSrc});
@@ -159,7 +160,8 @@ static void run(ftl::Configurable *root) {
receiver->registerBuilder(creatorptr);
// Which channels should be encoded
std::unordered_set<Channel> encodable;
std::set<Channel> encodable;
std::set<Channel> previous_encodable;
// Send channels on flush
auto flushhandle = pool.onFlushSet([sender,&encodable](ftl::data::FrameSet &fs, ftl::codecs::Channel c) {
@@ -192,39 +194,43 @@ static void run(ftl::Configurable *root) {
pipeline->append<ftl::operators::ArUco>("aruco")->value("enabled", false);
pipeline->append<ftl::operators::DepthChannel>("depth"); // Ensure there is a depth channel
bool busy = false;
auto h = creator->onFrameSet([sender,outstream,&stats_count,&latency,&frames,&stats_time,pipeline,&busy,&encodable](const ftl::data::FrameSetPtr &fs) {
if (busy) {
LOG(WARNING) << "Depth pipeline drop: " << fs->timestamp();
fs->firstFrame().message(ftl::data::Message::Warning_PIPELINE_DROP, "Depth pipeline drop");
return true;
}
busy = true;
std::atomic_flag busy;
busy.clear();
auto h = creator->onFrameSet([sender,outstream,&stats_count,&latency,&frames,&stats_time,pipeline,&busy,&encodable,&previous_encodable](const ftl::data::FrameSetPtr &fs) {
encodable.clear();
// Decide what to encode here.
const auto sel = outstream->selectedNoExcept(fs->frameset());
std::vector<Channel> sortedsel(sel.begin(), sel.end());
std::sort(sortedsel.begin(),sortedsel.end());
if (sortedsel.size() > 0) encodable.emplace(sortedsel[0]);
if (sortedsel.size() > 1) encodable.emplace(sortedsel[1]);
encodable.insert(sel.begin(), sel.end());
// Only allow the two encoders to exist
if (encodable.size() > 2) {
auto enciter = encodable.begin();
std::advance(enciter, 2);
encodable.erase(enciter, encodable.end());
}
// This ensures we cleanup other encoders
sender->setActiveEncoders(fs->frameset(), encodable);
if (encodable != previous_encodable) sender->resetEncoders(fs->frameset());
previous_encodable = encodable;
// Do all processing in another thread... only if encoding of depth
//if (encodable.find(Channel::Depth) != encodable.end()) {
ftl::pool.push([sender,&stats_count,&latency,&frames,&stats_time,pipeline,&busy,fs](int id) mutable {
// Do pipeline here...
// Do pipeline here... if not still busy doing it
if (busy.test_and_set()) {
LOG(WARNING) << "Depth pipeline drop: " << fs->timestamp();
fs->firstFrame().message(ftl::data::Message::Warning_PIPELINE_DROP, "Depth pipeline drop");
return;
}
pipeline->apply(*fs, *fs);
busy.clear();
++frames;
latency += float(ftl::timer::get_time() - fs->timestamp());
// Destruct frameset as soon as possible to send the data...
if (fs->hasAnyChanged(Channel::Depth)) fs->flush(Channel::Depth);
const_cast<ftl::data::FrameSetPtr&>(fs).reset();
if (!quiet && --stats_count <= 0) {
@@ -238,8 +244,6 @@ static void run(ftl::Configurable *root) {
latency = 0.0f;
stats_time = nowtime;
}
busy = false;
});
//} else {
//LOG(INFO) << "NOT DOING DEPTH";
@@ -248,8 +252,10 @@ static void run(ftl::Configurable *root) {
//}
// Lock colour right now to encode in parallel
fs->flush(ftl::codecs::Channel::Colour);
fs->flush(ftl::codecs::Channel::AudioStereo);
ftl::pool.push([fs](int id){ fs->flush(ftl::codecs::Channel::Colour); });
if (fs->hasAnyChanged(Channel::Audio)) {
ftl::pool.push([fs](int id){ fs->flush(ftl::codecs::Channel::Audio); });
}
return true;
});
Loading