diff --git a/components/rgbd-sources/include/ftl/rgbd/detail/netframe.hpp b/components/rgbd-sources/include/ftl/rgbd/detail/netframe.hpp index 1abb624c96e6a6871c2058b2e69574331183e7b4..eb9c64b99a03c66f331381a595d308f1345fa100 100644 --- a/components/rgbd-sources/include/ftl/rgbd/detail/netframe.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/detail/netframe.hpp @@ -14,8 +14,7 @@ namespace detail { * Also maintains statistics about the frame transmission for later analysis. */ struct NetFrame { - cv::Mat channel1; - cv::Mat channel2; + cv::Mat channel[2]; volatile int64_t timestamp; std::atomic<int> chunk_count[2]; std::atomic<int> channel_count; diff --git a/components/rgbd-sources/src/sources/net/net.cpp b/components/rgbd-sources/src/sources/net/net.cpp index d6d3c78042a99b72adbabe96152c016a5941cb88..16c5994d491bd1d7794166a26a6174511307cc9b 100644 --- a/components/rgbd-sources/src/sources/net/net.cpp +++ b/components/rgbd-sources/src/sources/net/net.cpp @@ -52,8 +52,8 @@ NetFrame &NetFrameQueue::getFrame(int64_t ts, const cv::Size &s, int c1type, int f.chunk_total[1] = 0; f.channel_count = 0; f.tx_size = 0; - f.channel1.create(s, c1type); - f.channel2.create(s, c2type); + f.channel[0].create(s, c1type); + f.channel[1].create(s, c2type); return f; } oldest = (f.timestamp < oldest) ? f.timestamp : oldest; @@ -72,8 +72,8 @@ NetFrame &NetFrameQueue::getFrame(int64_t ts, const cv::Size &s, int c1type, int f.chunk_total[1] = 0; f.channel_count = 0; f.tx_size = 0; - f.channel1.create(s, c1type); - f.channel2.create(s, c2type); + f.channel[0].create(s, c1type); + f.channel[1].create(s, c2type); return f; } } @@ -100,8 +100,8 @@ NetSource::NetSource(ftl::rgbd::Source *host) params_right_.width = 0; has_calibration_ = false; - decoder_c1_ = nullptr; - decoder_c2_ = nullptr; + decoder_[0] = nullptr; + decoder_[1] = nullptr; host->on("gamma", [this,host](const ftl::config::Event&) { gamma_ = host->value("gamma", 1.0f); @@ -173,8 +173,8 @@ NetSource::NetSource(ftl::rgbd::Source *host) } NetSource::~NetSource() { - if (decoder_c1_) ftl::codecs::free(decoder_c1_); - if (decoder_c2_) ftl::codecs::free(decoder_c2_); + if (decoder_[0]) ftl::codecs::free(decoder_[0]); + if (decoder_[1]) ftl::codecs::free(decoder_[1]); if (uri_.size() > 0) { host_->getNet()->unbind(uri_); @@ -212,22 +212,47 @@ NetSource::~NetSource() { void NetSource::_createDecoder(int chan, const ftl::codecs::Packet &pkt) { UNIQUE_LOCK(mutex_,lk); - auto *decoder = (chan == 0) ? decoder_c1_ : decoder_c2_; + auto *decoder = decoder_[chan]; if (decoder) { if (!decoder->accepts(pkt)) { - ftl::codecs::free((chan == 0) ? decoder_c1_ : decoder_c2_); + ftl::codecs::free(decoder_[chan]); } else { return; } } - if (chan == 0) { - decoder_c1_ = ftl::codecs::allocateDecoder(pkt); + decoder_[chan] = ftl::codecs::allocateDecoder(pkt); +} + +void NetSource::_processConfig(const ftl::codecs::Packet &pkt) { + std::tuple<std::string, std::string> cfg; + auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size()); + unpacked.get().convert(cfg); + + LOG(INFO) << "Config Received: " << std::get<1>(cfg); + // TODO: This needs to be put in safer / better location + host_->set(std::get<0>(cfg), nlohmann::json::parse(std::get<1>(cfg))); +} + +void NetSource::_processCalibration(const ftl::codecs::Packet &pkt) { + std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, ftl::rgbd::capability_t> params; + auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size()); + unpacked.get().convert(params); + + if (std::get<1>(params) == Channel::Left) { + params_ = std::get<0>(params); + capabilities_ = std::get<2>(params); + has_calibration_ = true; + LOG(INFO) << "Got Calibration channel: " << params_.width << "x" << params_.height; } else { - decoder_c2_ = ftl::codecs::allocateDecoder(pkt); + params_right_ = std::get<0>(params); } } +void NetSource::_processPose(const ftl::codecs::Packet &pkt) { + LOG(INFO) << "Got POSE channel"; +} + void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { // Capture time here for better net latency estimate int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count(); @@ -240,32 +265,11 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk const ftl::codecs::Channel rchan = spkt.channel; const int channum = (rchan == Channel::Colour) ? 0 : 1; - // Should config items be parsed here? - if (rchan == Channel::Configuration) { - std::tuple<std::string, std::string> cfg; - auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size()); - unpacked.get().convert(cfg); - - LOG(INFO) << "Config Received: " << std::get<1>(cfg); - } - else if (rchan == Channel::Calibration) { - std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, ftl::rgbd::capability_t> params; - auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size()); - unpacked.get().convert(params); - - if (std::get<1>(params) == Channel::Left) { - params_ = std::get<0>(params); - capabilities_ = std::get<2>(params); - has_calibration_ = true; - LOG(INFO) << "Got Calibration channel: " << params_.width << "x" << params_.height; - } else { - params_right_ = std::get<0>(params); - } - - return; - } else if (rchan == Channel::Pose) { - LOG(INFO) << "Got POSE channel"; - return; + // Deal with the special channels... + switch (rchan) { + case Channel::Configuration : _processConfig(pkt); return; + case Channel::Calibration : _processCalibration(pkt); return; + case Channel::Pose : _processPose(pkt); return; } if (!has_calibration_) { @@ -281,19 +285,19 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk // Only decode if this channel is wanted. if (rchan == Channel::Colour || rchan == chan) { _createDecoder(channum, pkt); - auto *decoder = (rchan == Channel::Colour) ? decoder_c1_ : decoder_c2_; + auto *decoder = decoder_[channum]; if (!decoder) { LOG(ERROR) << "No frame decoder available"; return; } - decoder->decode(pkt, (rchan == Channel::Colour) ? frame.channel1 : frame.channel2); + decoder->decode(pkt, frame.channel[channum]); } else if (chan != Channel::None && rchan != Channel::Colour) { // Didn't receive correct second channel so just clear the images if (isFloatChannel(chan)) { - frame.channel2.setTo(cv::Scalar(0.0f)); + frame.channel[1].setTo(cv::Scalar(0.0f)); } else { - frame.channel2.setTo(cv::Scalar(0,0,0)); + frame.channel[1].setTo(cv::Scalar(0,0,0)); } } @@ -314,7 +318,6 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk ++frame.chunk_count[channum]; if (frame.chunk_count[channum] == frame.chunk_total[channum]) ++frame.channel_count; - if (frame.chunk_count[channum] > frame.chunk_total[channum]) LOG(FATAL) << "TOO MANY CHUNKS"; // Capture tx time of first received chunk @@ -326,7 +329,7 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk } } - // Last chunk of both channels now received + // Last chunk of both channels now received, so we are done. if (frame.channel_count == spkt.channel_count) { _completeFrame(frame, now-(spkt.timestamp+frame.tx_latency)); } @@ -343,7 +346,7 @@ void NetSource::_completeFrame(NetFrame &frame, int64_t latency) { // Note: Not used currently adaptive_ = abr_.selectBitrate(frame); - host_->notify(frame.timestamp, frame.channel1, frame.channel2); + host_->notify(frame.timestamp, frame.channel[0], frame.channel[1]); queue_.freeFrame(frame); N_--; diff --git a/components/rgbd-sources/src/sources/net/net.hpp b/components/rgbd-sources/src/sources/net/net.hpp index 5d0b4e9afcc9a514433164526a9b43209dec11c3..5cef2726d2cdc5c34c161a74b25d45234f55ce48 100644 --- a/components/rgbd-sources/src/sources/net/net.hpp +++ b/components/rgbd-sources/src/sources/net/net.hpp @@ -68,8 +68,7 @@ class NetSource : public detail::Source { //NvPipe *nv_channel2_decoder_; //#endif - ftl::codecs::Decoder *decoder_c1_; - ftl::codecs::Decoder *decoder_c2_; + ftl::codecs::Decoder *decoder_[2]; // Adaptive bitrate functionality ftl::rgbd::detail::bitrate_t adaptive_; // Current adapted bitrate @@ -87,6 +86,9 @@ class NetSource : public detail::Source { //void _checkAdaptive(int64_t); void _createDecoder(int chan, const ftl::codecs::Packet &); void _completeFrame(NetFrame &frame, int64_t now); + void _processCalibration(const ftl::codecs::Packet &pkt); + void _processConfig(const ftl::codecs::Packet &pkt); + void _processPose(const ftl::codecs::Packet &pkt); }; }