diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index d077c5313744e5f2f42c3d33784b5f0a2094555b..2e838f140835858bb0b68b4fcb0b886f7a8c3c91 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -158,17 +158,22 @@ static void run(ftl::Configurable *root) { receiver->setStream(outstream); receiver->registerBuilder(creatorptr); + // Which channels should be encoded + std::unordered_set<Channel> encodable; + // Send channels on flush - auto flushhandle = pool.onFlushSet([sender](ftl::data::FrameSet &fs, ftl::codecs::Channel c) { - if ((int)c >= 64) sender->post(fs, c); + auto flushhandle = pool.onFlushSet([sender,&encodable](ftl::data::FrameSet &fs, ftl::codecs::Channel c) { + if ((int)c >= 32) sender->post(fs, c); else { - switch (c) { - case Channel::AudioStereo : - case Channel::AudioMono : - case Channel::Colour : - case Channel::Colour2 : - case Channel::Depth : sender->post(fs, c); break; - default: break; + if (encodable.count(c)) { + sender->post(fs, c); + } else { + switch (c) { + case Channel::Colour : + case Channel::Colour2 : + case Channel::Depth : sender->post(fs, c, true); break; + default: break; + } } } return true; @@ -186,7 +191,7 @@ static void run(ftl::Configurable *root) { bool busy = false; - auto h = creator->onFrameSet([sender,&stats_count,&latency,&frames,&stats_time,pipeline,&busy](const ftl::data::FrameSetPtr &fs) { + auto h = creator->onFrameSet([sender,outstream,&stats_count,&latency,&frames,&stats_time,pipeline,&busy,&encodable](const ftl::data::FrameSetPtr &fs) { if (busy) { LOG(INFO) << "Frame drop due to pipeline: " << fs->timestamp(); return true; @@ -198,32 +203,43 @@ static void run(ftl::Configurable *root) { LOG(INFO) << "Got control: " << fs->firstFrame().get<int>(ftl::codecs::Channel::Control); } - // Do all processing in another thread... - ftl::pool.push([sender,&stats_count,&latency,&frames,&stats_time,pipeline,&busy,fs](int id) { - // Do pipeline here... - pipeline->apply(*fs, *fs); - - // Send any remaining channels... - // Note: Ensures these send now, otherwise waits until destructor - fs->flush(Channel::Depth); - - ++frames; - latency += float(ftl::timer::get_time() - fs->timestamp()); - - if (--stats_count <= 0) { - latency /= float(frames); - int64_t nowtime = ftl::timer::get_time(); - stats_time = nowtime - stats_time; - float fps = float(frames) / (float(stats_time) / 1000.0f); - LOG(INFO) << "Frame rate: " << fps << ", Latency: " << latency; - stats_count = 20; - frames = 0; - latency = 0.0f; - stats_time = nowtime; - } + encodable.clear(); + // Decide what to encode here. + const auto sel = outstream->selectedNoExcept(fs->frameset()); + std::vector<Channel> sortedsel(sel.begin(), sel.end()); + std::sort(sortedsel.begin(),sortedsel.end()); + + if (sortedsel.size() > 0) encodable.emplace(sortedsel[0]); + if (sortedsel.size() > 1) encodable.emplace(sortedsel[1]); + + // Do all processing in another thread... only if encoding of depth + //if (encodable.find(Channel::Depth) != encodable.end()) { + ftl::pool.push([sender,&stats_count,&latency,&frames,&stats_time,pipeline,&busy,fs](int id) { + // Do pipeline here... + pipeline->apply(*fs, *fs); + + // Send any remaining channels... + // Note: Ensures these send now, otherwise waits until destructor + fs->flush(Channel::Depth); + + ++frames; + latency += float(ftl::timer::get_time() - fs->timestamp()); + + if (--stats_count <= 0) { + latency /= float(frames); + int64_t nowtime = ftl::timer::get_time(); + stats_time = nowtime - stats_time; + float fps = float(frames) / (float(stats_time) / 1000.0f); + LOG(INFO) << "Frame rate: " << fps << ", Latency: " << latency; + stats_count = 20; + frames = 0; + latency = 0.0f; + stats_time = nowtime; + } - busy = false; - }); + busy = false; + }); + //} // Lock colour right now to encode in parallel fs->flush(ftl::codecs::Channel::Colour); diff --git a/components/codecs/include/ftl/codecs/codecs.hpp b/components/codecs/include/ftl/codecs/codecs.hpp index 8dd2b18a075e5e1ffa76ac78ff0063bb0cb8f4d7..53ae2456870016312cde9e30b14863c87dedd647 100644 --- a/components/codecs/include/ftl/codecs/codecs.hpp +++ b/components/codecs/include/ftl/codecs/codecs.hpp @@ -22,6 +22,8 @@ static constexpr uint8_t kFlagPartial = 0x10; // This frameset is not complete static constexpr uint8_t kFlagStereo = 0x20; // Left-Right stereo in single channel static constexpr uint8_t kFlagMultiple = 0x80; // Multiple video frames in single packet +static constexpr uint8_t kFlagRequest = 0x01; // Used for empty data packets to mark a request for data + /** * Compression format used. */ diff --git a/components/streams/include/ftl/streams/sender.hpp b/components/streams/include/ftl/streams/sender.hpp index 45574e731424e8cfb3f5fa50ceb2b322e6345342..e9f52fb8fe059874409c0a8475fa832bc2d58920 100644 --- a/components/streams/include/ftl/streams/sender.hpp +++ b/components/streams/include/ftl/streams/sender.hpp @@ -25,9 +25,16 @@ class Sender : public ftl::Configurable { /** * Encode and transmit an entire frame set. Frames may already contain - * an encoded form, in which case that is used instead. + * an encoded form, in which case that is used instead. If `noencode` is + * set to true then encoding is not performed if required and instead the + * channel is sent with empty data to mark availability. */ - void post(ftl::data::FrameSet &fs, ftl::codecs::Channel c); + void post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode=false); + + /** + * Make the channel available in the stream even if not available locally. + */ + void forceAvailable(ftl::data::FrameSet &fs, ftl::codecs::Channel c); void post(ftl::data::Frame &f, ftl::codecs::Channel c); diff --git a/components/streams/include/ftl/streams/stream.hpp b/components/streams/include/ftl/streams/stream.hpp index d49aab60a2657875537e032851dc302b16cf34de..5624c7123c38888be2ea9e31f511e2b882ca1044 100644 --- a/components/streams/include/ftl/streams/stream.hpp +++ b/components/streams/include/ftl/streams/stream.hpp @@ -74,6 +74,8 @@ class Stream : public ftl::Configurable { */ const std::unordered_set<ftl::codecs::Channel> &selected(int fs) const; + std::unordered_set<ftl::codecs::Channel> selectedNoExcept(int fs) const; + /** * Change the video channel selection for a frameset. */ diff --git a/components/streams/src/netstream.cpp b/components/streams/src/netstream.cpp index 2f2046a18be77dfe7943d59beabc21d365b7a758..aa0dc042b61b0460f5e1f1a3c2ab0ff6c61c8044 100644 --- a/components/streams/src/netstream.cpp +++ b/components/streams/src/netstream.cpp @@ -199,7 +199,7 @@ bool Net::begin() { // If hosting and no data then it is a request for data // Note: a non host can receive empty data, meaning data is available // but that you did not request it - if (host_ && pkt.data.size() == 0) { + if (host_ && pkt.data.size() == 0 && pkt.flags == ftl::codecs::kFlagRequest) { // FIXME: Allow unselecting ...? if (spkt.frameSetID() == 255) { for (size_t i=0; i<size(); ++i) { @@ -271,7 +271,8 @@ bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t coun codec_t::Any, // TODO: Allow specific codec requests definition_t::Any, // TODO: Allow specific definition requests count, - bitrate + bitrate, + ftl::codecs::kFlagRequest }; StreamPacket spkt = { diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index 9287dad97f4604e2242a8d1c639d67a835ee990a..3f3e6c73b3bbd901da9d88ab59f4ed594bd50fdd 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -336,8 +336,8 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { //auto &frame = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+i); auto &frame = fs->frames[spkt.frame_number+i]; - const auto *cs = stream_; - const auto &sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID()); + //const auto *cs = stream_; + const auto &sel = stream_->selected(spkt.frameSetID()); // & cs->available(spkt.frameSetID()); //frame.create<cv::cuda::GpuMat>(spkt.channel); @@ -390,10 +390,20 @@ void Receiver::setStream(ftl::stream::Stream *s) { // No data packet means channel is only available. if (pkt.data.size() == 0) { - if (spkt.streamID < 255 && spkt.frame_number < 255) { + if (spkt.streamID < 255 && pkt.flags == 0) { // Get the frameset auto fs = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1); - fs->frames[spkt.frame_number].markAvailable(spkt.channel); + const auto &sel = stream_->selected(spkt.frameSetID()); + + for (auto &frame : fs->frames) { + //LOG(INFO) << "MARK " << frame.source() << " " << (int)spkt.channel; + frame.markAvailable(spkt.channel); + + if (frame.availableAll(sel)) { + //LOG(INFO) << "FRAME COMPLETED " << frame.source(); + fs->completed(frame.source()); + } + } } return true; } diff --git a/components/streams/src/sender.cpp b/components/streams/src/sender.cpp index 26e1b1d6ab5e854f21cb0a17ae1e9203839c8109..505d0dd1de0a2d5052e8b6473edd111ac955da8f 100644 --- a/components/streams/src/sender.cpp +++ b/components/streams/src/sender.cpp @@ -106,7 +106,7 @@ static void mergeNALUnits(const std::list<ftl::codecs::Packet> &pkts, ftl::codec } } -void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { +void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode) { if (!stream_) return; std::unordered_set<ftl::codecs::Channel> selected; @@ -122,6 +122,9 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { if (fs.timestamp() > timestamp_) timestamp_ = fs.timestamp(); bool do_iframe = injection_timestamp_ == fs.timestamp(); + // FIXME: Not ideal + if (do_iframe) do_inject = true; + FTL_Profile("SenderPost", 0.02); // Send any frameset data channels @@ -149,6 +152,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { const auto &frame = fs.frames[i]; if (!frame.hasOwn(c)) continue; + // TODO: Send entire persistent session on inject if (do_inject) { //LOG(INFO) << "Force inject calibration"; if (frame.has(Channel::Calibration)) injectCalibration(stream_, fs, i); @@ -156,9 +160,9 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { if (frame.has(Channel::Pose)) injectPose(stream_, fs, i); //injectConfig(stream_, fs, i); } else { - if (frame.changed(Channel::Pose)) injectPose(stream_, fs, i); - if (frame.changed(Channel::Calibration)) injectCalibration(stream_, fs, i); - if (frame.changed(Channel::Calibration2)) injectCalibration(stream_, fs, i, true); + //if (frame.changed(Channel::Pose)) injectPose(stream_, fs, i); + //if (frame.changed(Channel::Calibration)) injectCalibration(stream_, fs, i); + //if (frame.changed(Channel::Calibration2)) injectCalibration(stream_, fs, i, true); //if (frame.changed(Channel::Configuration)) injectConfig(stream_, fs, i); } @@ -182,7 +186,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { //for (auto ic : frame.changed()) { //auto c = ic.first; - if (selected.find(c) != selected.end() || (int)c >= 64) { + if (selected.find(c) != selected.end() || (int)c >= 32) { // FIXME: Sends high res colour, but receive end currently broken //auto cc = (c == Channel::Colour && frame.hasChannel(Channel::ColourHighRes)) ? Channel::ColourHighRes : c; auto cc = c; @@ -218,6 +222,12 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { } + // Don't transmit if noencode and needs encoding + if (needs_encoding && noencode) { + needs_encoding = false; + available = true; + } + if (available) { // Not selected so send an empty packet... StreamPacket spkt; @@ -231,22 +241,31 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { pkt.codec = codec_t::Any; pkt.frame_count = 1; pkt.bitrate = 0; + pkt.flags = 0; stream_->post(spkt, pkt); } Channel sec_chan = Channel::Right; if (needs_encoding) { - // TODO: One thread per channel. - if ((int)c >= 32 || c == Channel::Colour || c == sec_chan) { - LOG(INFO) << "ENCODING " << (int)c; - _encodeChannel(fs, c, do_iframe); - } else { - LOG(WARNING) << "Not encoding channel: " << (int)c; - } + _encodeChannel(fs, c, do_iframe); } +} - //do_inject_ = false; +void Sender::forceAvailable(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { + StreamPacket spkt; + spkt.version = 4; + spkt.timestamp = fs.timestamp(); + spkt.streamID = fs.frameset(); + spkt.frame_number = 255; + spkt.channel = c; + + Packet pkt; + pkt.codec = codec_t::Any; + pkt.frame_count = 1; + pkt.bitrate = 0; + pkt.flags = 0; + stream_->post(spkt, pkt); } void Sender::post(ftl::data::Frame &frame, ftl::codecs::Channel c) { diff --git a/components/streams/src/stream.cpp b/components/streams/src/stream.cpp index 70e17a3d09688ad81bb686539dde1f524c81d55c..a4a273ce4e73b9a66c63794eeb1ce77144f36447 100644 --- a/components/streams/src/stream.cpp +++ b/components/streams/src/stream.cpp @@ -44,6 +44,12 @@ const std::unordered_set<ftl::codecs::Channel> &Stream::selected(int fs) const { return state_[fs].selected; } +std::unordered_set<ftl::codecs::Channel> Stream::selectedNoExcept(int fs) const { + SHARED_LOCK(mtx_, lk); + if (fs < 0 || static_cast<uint32_t>(fs) >= state_.size()) return {}; + return state_[fs].selected; +} + void Stream::select(int fs, const std::unordered_set<ftl::codecs::Channel> &s, bool make) { UNIQUE_LOCK(mtx_, lk); if (fs < 0 || (!make && static_cast<uint32_t>(fs) >= state_.size())) throw FTL_Error("Frameset index out-of-bounds: " << fs);