diff --git a/components/codecs/src/nvpipe_decoder.cpp b/components/codecs/src/nvpipe_decoder.cpp index 77a3105f88b84f2b9c00f5dba152bbc9814c70db..93261854d88e75e79878a41dd5f6f0b71bcf6e9f 100644 --- a/components/codecs/src/nvpipe_decoder.cpp +++ b/components/codecs/src/nvpipe_decoder.cpp @@ -69,6 +69,12 @@ bool NvPipeDecoder::decode(const ftl::codecs::Packet &pkt, cv::cuda::GpuMat &out // No I-Frame yet so don't attempt to decode P-Frames. if (!seen_iframe_) return false; + // Final checks for validity + if (pkt.data.size() == 0 || tmp_.empty()) { + LOG(ERROR) << "Failed to decode packet"; + return false; + } + int rc = NvPipe_Decode(nv_decoder_, pkt.data.data(), pkt.data.size(), tmp_.data, tmp_.cols, tmp_.rows, tmp_.step); if (rc == 0) LOG(ERROR) << "NvPipe decode error: " << NvPipe_GetError(nv_decoder_); diff --git a/components/rgbd-sources/src/group.cpp b/components/rgbd-sources/src/group.cpp index 625d62e2c9767ee6164d2835e832de20994ec983..aad850d2cf9d501d6d655b1f77978de6f1bab39e 100644 --- a/components/rgbd-sources/src/group.cpp +++ b/components/rgbd-sources/src/group.cpp @@ -214,8 +214,8 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) { try { cb(*fs); //LOG(INFO) << "Frameset processed (" << name_ << "): " << fs->timestamp; - } catch(...) { - LOG(ERROR) << "Exception in group sync callback"; + } catch(std::exception &e) { + LOG(ERROR) << "Exception in group sync callback: " << e.what(); } // The buffers are invalid after callback so mark stale diff --git a/components/rgbd-sources/src/sources/net/net.cpp b/components/rgbd-sources/src/sources/net/net.cpp index 694aa50f884210664aa33bc3a4cb39dbb6f9d3b2..51ecbc01e592e217f229512e624a079e68bcff8a 100644 --- a/components/rgbd-sources/src/sources/net/net.cpp +++ b/components/rgbd-sources/src/sources/net/net.cpp @@ -317,23 +317,23 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk // Calculate how many packets to expect for this channel if (frame.chunk_total[channum] == 0) { frame.chunk_total[channum] = pkt.block_total; - } - - ++frame.chunk_count[channum]; - if (frame.chunk_count[channum] == frame.chunk_total[channum]) ++frame.channel_count; - if (frame.chunk_count[channum] > frame.chunk_total[channum]) LOG(FATAL) << "TOO MANY CHUNKS"; + } // Capture tx time of first received chunk - // FIXME: This seems broken - if (channum == 1 && frame.chunk_count[channum] == 1) { + if (frame.chunk_count[0] == 0 && frame.chunk_count[1] == 0) { UNIQUE_LOCK(frame.mtx, flk); - if (frame.chunk_count[channum] == 1) { + if (frame.chunk_count[0] == 0 && frame.chunk_count[1] == 0) { frame.tx_latency = int64_t(ttimeoff); } - } + } + + ++frame.chunk_count[channum]; + if (frame.chunk_count[channum] == frame.chunk_total[channum]) ++frame.channel_count; + if (frame.chunk_count[channum] > frame.chunk_total[channum]) LOG(ERROR) << "TOO MANY CHUNKS"; // Last chunk of both channels now received, so we are done. if (frame.channel_count == spkt.channel_count) { + LOG(INFO) << "COMPLETED FRAME " << frame.channel_count << ", " << (int)spkt.channel_count; _completeFrame(frame, now-(spkt.timestamp+frame.tx_latency)); } } diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index a4b02f34554898ced490c22f5f6eb5f365080c41..4c70127d352847d91136a59e2a491e2fd0eebbce 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -464,27 +464,43 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { auto *enc1 = src->hq_encoder_c1; auto *enc2 = src->hq_encoder_c2; - // Important to send channel 2 first if needed... - // Receiver only waits for channel 1 by default - // TODO: Each encode could be done in own thread - if (hasChan2) { - // TODO: Stagger the reset between nodes... random phasing - if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc2->reset(); + MUTEX mtx; + std::condition_variable cv; + bool chan2done = false; - auto chan = fs.sources[j]->getChannel(); - - enc2->encode(fs.frames[j].get<cv::cuda::GpuMat>(chan), src->hq_bitrate, [this,src,hasChan2,chan](const ftl::codecs::Packet &blk){ - _transmitPacket(src, blk, chan, hasChan2, Quality::High); + if (hasChan2) { + ftl::pool.push([this,&fs,enc2,src,hasChan2,&cv,j,&chan2done](int id) { + // TODO: Stagger the reset between nodes... random phasing + if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc2->reset(); + + auto chan = fs.sources[j]->getChannel(); + + try { + enc2->encode(fs.frames[j].get<cv::cuda::GpuMat>(chan), src->hq_bitrate, [this,src,hasChan2,chan,&cv,&chan2done](const ftl::codecs::Packet &blk){ + _transmitPacket(src, blk, chan, hasChan2, Quality::High); + chan2done = true; + cv.notify_one(); + }); + } catch (std::exception &e) { + LOG(ERROR) << "Exception in encoder: " << e.what(); + chan2done = true; + cv.notify_one(); + } }); } else { if (enc2) enc2->reset(); + chan2done = true; } // TODO: Stagger the reset between nodes... random phasing if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc1->reset(); - enc1->encode(fs.frames[j].get<cv::cuda::GpuMat>(Channel::Colour), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ + enc1->encode(fs.frames[j].get<cv::cuda::GpuMat>(Channel::Colour), src->hq_bitrate, [this,src,hasChan2,&mtx](const ftl::codecs::Packet &blk){ _transmitPacket(src, blk, Channel::Colour, hasChan2, Quality::High); }); + + // Ensure both channels have been completed. + std::unique_lock<std::mutex> lk(mtx); + cv.wait(lk, [&chan2done]{ return chan2done; }); } }