diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index 0ae963ae1a24686eee7bb88aa18c6a0653d806f4..49717205fbd7b7c306217dd4878c9f23569d2ea5 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -165,18 +165,14 @@ static void run(ftl::Configurable *root) { // Send channels on flush auto flushhandle = pool.onFlushSet([sender,&encodable](ftl::data::FrameSet &fs, ftl::codecs::Channel c) { + // Always send data channels if ((int)c >= 32) sender->post(fs, c); else { + // Only encode some of the video channels if (encodable.count(c)) { sender->post(fs, c); } else { - //switch (c) { - //case Channel::Colour : - //case Channel::Colour2 : - //case Channel::Depth : - sender->post(fs, c, true); //break; - //default : sender->fakePost(fs, c); - //} + sender->post(fs, c, true); } } return true; @@ -198,15 +194,18 @@ static void run(ftl::Configurable *root) { busy.clear(); auto h = creator->onFrameSet([sender,outstream,&stats_count,&latency,&frames,&stats_time,pipeline,&busy,&encodable,&previous_encodable](const ftl::data::FrameSetPtr &fs) { - encodable.clear(); - // Decide what to encode here. + + // Decide what to encode here, based upon what remote users select const auto sel = outstream->selectedNoExcept(fs->frameset()); + encodable.clear(); encodable.insert(sel.begin(), sel.end()); - // Only allow the two encoders to exist - if (encodable.size() > 2) { + // Only allow the two encoders to exist, remove the rest + int max_encodeable = sender->value("max_encodeable", 2); + + if (encodable.size() > max_encodeable) { auto enciter = encodable.begin(); - std::advance(enciter, 2); + std::advance(enciter, max_encodeable); encodable.erase(enciter, encodable.end()); } @@ -214,45 +213,40 @@ static void run(ftl::Configurable *root) { if (encodable != previous_encodable) sender->resetEncoders(fs->frameset()); previous_encodable = encodable; - // Do all processing in another thread... only if encoding of depth - //if (encodable.find(Channel::Depth) != encodable.end()) { - ftl::pool.push([sender,&stats_count,&latency,&frames,&stats_time,pipeline,&busy,fs](int id) mutable { - // Do pipeline here... if not still busy doing it - if (busy.test_and_set()) { - LOG(WARNING) << "Depth pipeline drop: " << fs->timestamp(); - fs->firstFrame().message(ftl::data::Message::Warning_PIPELINE_DROP, "Depth pipeline drop"); - return; - } - pipeline->apply(*fs, *fs); - busy.clear(); - - ++frames; - latency += float(ftl::timer::get_time() - fs->timestamp()); - - // Destruct frameset as soon as possible to send the data... - if (fs->hasAnyChanged(Channel::Depth)) fs->flush(Channel::Depth); - const_cast<ftl::data::FrameSetPtr&>(fs).reset(); - - if (!quiet && --stats_count <= 0) { - latency /= float(frames); - int64_t nowtime = ftl::timer::get_time(); - stats_time = nowtime - stats_time; - float fps = float(frames) / (float(stats_time) / 1000.0f); - LOG(INFO) << "Frame rate: " << fps << ", Latency: " << latency; - stats_count = 20; - frames = 0; - latency = 0.0f; - stats_time = nowtime; - } - }); - //} else { - //LOG(INFO) << "NOT DOING DEPTH"; - // sender->forceAvailable(*fs, Channel::Depth); - // busy = false; - //} - - // Lock colour right now to encode in parallel + // Do all processing in another thread... + ftl::pool.push([sender,&stats_count,&latency,&frames,&stats_time,pipeline,&busy,fs](int id) mutable { + // Do pipeline here... if not still busy doing it + if (busy.test_and_set()) { + LOG(WARNING) << "Depth pipeline drop: " << fs->timestamp(); + fs->firstFrame().message(ftl::data::Message::Warning_PIPELINE_DROP, "Depth pipeline drop"); + return; + } + pipeline->apply(*fs, *fs); + busy.clear(); + + ++frames; + latency += float(ftl::timer::get_time() - fs->timestamp()); + + // Destruct frameset as soon as possible to send the data... + if (fs->hasAnyChanged(Channel::Depth)) fs->flush(Channel::Depth); + const_cast<ftl::data::FrameSetPtr&>(fs).reset(); + + if (!quiet && --stats_count <= 0) { + latency /= float(frames); + int64_t nowtime = ftl::timer::get_time(); + stats_time = nowtime - stats_time; + float fps = float(frames) / (float(stats_time) / 1000.0f); + LOG(INFO) << "Frame rate: " << fps << ", Latency: " << latency; + stats_count = 20; + frames = 0; + latency = 0.0f; + stats_time = nowtime; + } + }); + + // Lock colour right now to encode in parallel, same for audio ftl::pool.push([fs](int id){ fs->flush(ftl::codecs::Channel::Colour); }); + if (fs->hasAnyChanged(Channel::Audio)) { ftl::pool.push([fs](int id){ fs->flush(ftl::codecs::Channel::Audio); }); } diff --git a/components/streams/src/netstream.cpp b/components/streams/src/netstream.cpp index 0d6fba6570be8bec847ac9d49994ce3079e115fb..6974066b7ed68c12f7bb4779e2256a71b455c2f5 100644 --- a/components/streams/src/netstream.cpp +++ b/components/streams/src/netstream.cpp @@ -67,6 +67,10 @@ Net::Net(nlohmann::json &config, ftl::net::Universe *net) : Stream(config), acti Net::~Net() { end(); + + // FIXME: Wait to ensure no net callbacks are active. + // Do something better than this + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { @@ -173,7 +177,6 @@ bool Net::begin() { spkt.localTimestamp = now - int64_t(ttimeoff); spkt.hint_capability = 0; spkt.hint_source_total = 0; - //LOG(INFO) << "LATENCY: " << ftl::timer::get_time() - spkt.localTimestamp() << " : " << spkt.timestamp << " - " << clock_adjust_; spkt.version = 4; // Manage recuring requests @@ -457,7 +460,7 @@ bool Net::end() { } active_ = false; - net_->unbind(uri_); + net_->unbind(base_uri_); return true; } diff --git a/components/streams/src/sender.cpp b/components/streams/src/sender.cpp index 9b7a60816ef0fb670275ca118e20fcd586e2764d..ae56595b7c30275fdb3d6766b455cc5dccf6eb17 100644 --- a/components/streams/src/sender.cpp +++ b/components/streams/src/sender.cpp @@ -212,6 +212,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode StreamPacket spkt; spkt.version = 5; spkt.timestamp = fs.timestamp(); + spkt.localTimestamp = fs.localTimestamp; spkt.streamID = fs.frameset(); //fs.id; spkt.frame_number = i; spkt.channel = c; @@ -235,6 +236,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode StreamPacket spkt; spkt.version = 5; spkt.timestamp = fs.timestamp(); + spkt.localTimestamp = fs.localTimestamp; spkt.streamID = fs.frameset(); spkt.frame_number = 255; spkt.channel = c; @@ -257,6 +259,7 @@ void Sender::forceAvailable(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { StreamPacket spkt; spkt.version = 5; spkt.timestamp = fs.timestamp(); + spkt.localTimestamp = fs.localTimestamp; spkt.streamID = fs.frameset(); spkt.frame_number = 255; spkt.channel = c; @@ -305,6 +308,7 @@ void Sender::post(ftl::data::Frame &frame, ftl::codecs::Channel c) { StreamPacket spkt; spkt.version = 5; spkt.timestamp = frame.timestamp(); + spkt.localTimestamp = spkt.timestamp; spkt.streamID = frame.frameset(); //fs.id; spkt.frame_number = frame.source(); spkt.channel = c; @@ -335,6 +339,7 @@ void Sender::post(ftl::data::Frame &frame, ftl::codecs::Channel c) { StreamPacket spkt; spkt.version = 5; spkt.timestamp = frame.timestamp(); + spkt.localTimestamp = spkt.timestamp; spkt.streamID = frame.frameset(); spkt.frame_number = 255; spkt.channel = c; @@ -419,6 +424,7 @@ void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset, StreamPacket spkt; spkt.version = 5; spkt.timestamp = fs.timestamp(); + spkt.localTimestamp = fs.localTimestamp; spkt.streamID = fs.frameset(); spkt.frame_number = offset; spkt.channel = c; @@ -513,6 +519,7 @@ void Sender::_encodeAudioChannel(ftl::data::FrameSet &fs, Channel c, bool reset, StreamPacket spkt; spkt.version = 5; spkt.timestamp = fs.timestamp(); + spkt.localTimestamp = fs.localTimestamp; spkt.streamID = fs.frameset(); spkt.frame_number = i; spkt.channel = c; @@ -552,6 +559,7 @@ void Sender::_encodeDataChannel(ftl::data::FrameSet &fs, Channel c, bool reset, StreamPacket spkt; spkt.version = 5; spkt.timestamp = fs.timestamp(); + spkt.localTimestamp = fs.localTimestamp; spkt.streamID = fs.frameset(); spkt.frame_number = i++; spkt.channel = c; @@ -578,6 +586,7 @@ void Sender::_encodeDataChannel(ftl::data::Frame &f, Channel c, bool reset) { StreamPacket spkt; spkt.version = 5; spkt.timestamp = f.timestamp(); + spkt.localTimestamp = spkt.timestamp; spkt.streamID = f.frameset(); spkt.frame_number = f.source(); spkt.channel = c;