Skip to content
Snippets Groups Projects
Commit 4b1b154b authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Use thread for second encoder

parent c6bb2f10
No related branches found
No related tags found
1 merge request!201Implements #170 concurrent encoding
Pipeline #17198 passed
...@@ -69,6 +69,12 @@ bool NvPipeDecoder::decode(const ftl::codecs::Packet &pkt, cv::cuda::GpuMat &out ...@@ -69,6 +69,12 @@ bool NvPipeDecoder::decode(const ftl::codecs::Packet &pkt, cv::cuda::GpuMat &out
// No I-Frame yet so don't attempt to decode P-Frames. // No I-Frame yet so don't attempt to decode P-Frames.
if (!seen_iframe_) return false; if (!seen_iframe_) return false;
// Final checks for validity
if (pkt.data.size() == 0 || tmp_.empty()) {
LOG(ERROR) << "Failed to decode packet";
return false;
}
int rc = NvPipe_Decode(nv_decoder_, pkt.data.data(), pkt.data.size(), tmp_.data, tmp_.cols, tmp_.rows, tmp_.step); int rc = NvPipe_Decode(nv_decoder_, pkt.data.data(), pkt.data.size(), tmp_.data, tmp_.cols, tmp_.rows, tmp_.step);
if (rc == 0) LOG(ERROR) << "NvPipe decode error: " << NvPipe_GetError(nv_decoder_); if (rc == 0) LOG(ERROR) << "NvPipe decode error: " << NvPipe_GetError(nv_decoder_);
......
...@@ -214,8 +214,8 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) { ...@@ -214,8 +214,8 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) {
try { try {
cb(*fs); cb(*fs);
//LOG(INFO) << "Frameset processed (" << name_ << "): " << fs->timestamp; //LOG(INFO) << "Frameset processed (" << name_ << "): " << fs->timestamp;
} catch(...) { } catch(std::exception &e) {
LOG(ERROR) << "Exception in group sync callback"; LOG(ERROR) << "Exception in group sync callback: " << e.what();
} }
// The buffers are invalid after callback so mark stale // The buffers are invalid after callback so mark stale
......
...@@ -319,21 +319,21 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk ...@@ -319,21 +319,21 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk
frame.chunk_total[channum] = pkt.block_total; frame.chunk_total[channum] = pkt.block_total;
} }
++frame.chunk_count[channum];
if (frame.chunk_count[channum] == frame.chunk_total[channum]) ++frame.channel_count;
if (frame.chunk_count[channum] > frame.chunk_total[channum]) LOG(FATAL) << "TOO MANY CHUNKS";
// Capture tx time of first received chunk // Capture tx time of first received chunk
// FIXME: This seems broken if (frame.chunk_count[0] == 0 && frame.chunk_count[1] == 0) {
if (channum == 1 && frame.chunk_count[channum] == 1) {
UNIQUE_LOCK(frame.mtx, flk); UNIQUE_LOCK(frame.mtx, flk);
if (frame.chunk_count[channum] == 1) { if (frame.chunk_count[0] == 0 && frame.chunk_count[1] == 0) {
frame.tx_latency = int64_t(ttimeoff); frame.tx_latency = int64_t(ttimeoff);
} }
} }
++frame.chunk_count[channum];
if (frame.chunk_count[channum] == frame.chunk_total[channum]) ++frame.channel_count;
if (frame.chunk_count[channum] > frame.chunk_total[channum]) LOG(ERROR) << "TOO MANY CHUNKS";
// Last chunk of both channels now received, so we are done. // Last chunk of both channels now received, so we are done.
if (frame.channel_count == spkt.channel_count) { if (frame.channel_count == spkt.channel_count) {
LOG(INFO) << "COMPLETED FRAME " << frame.channel_count << ", " << (int)spkt.channel_count;
_completeFrame(frame, now-(spkt.timestamp+frame.tx_latency)); _completeFrame(frame, now-(spkt.timestamp+frame.tx_latency));
} }
} }
......
...@@ -464,27 +464,43 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { ...@@ -464,27 +464,43 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
auto *enc1 = src->hq_encoder_c1; auto *enc1 = src->hq_encoder_c1;
auto *enc2 = src->hq_encoder_c2; auto *enc2 = src->hq_encoder_c2;
// Important to send channel 2 first if needed... MUTEX mtx;
// Receiver only waits for channel 1 by default std::condition_variable cv;
// TODO: Each encode could be done in own thread bool chan2done = false;
if (hasChan2) { if (hasChan2) {
ftl::pool.push([this,&fs,enc2,src,hasChan2,&cv,j,&chan2done](int id) {
// TODO: Stagger the reset between nodes... random phasing // TODO: Stagger the reset between nodes... random phasing
if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc2->reset(); if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc2->reset();
auto chan = fs.sources[j]->getChannel(); auto chan = fs.sources[j]->getChannel();
enc2->encode(fs.frames[j].get<cv::cuda::GpuMat>(chan), src->hq_bitrate, [this,src,hasChan2,chan](const ftl::codecs::Packet &blk){ try {
enc2->encode(fs.frames[j].get<cv::cuda::GpuMat>(chan), src->hq_bitrate, [this,src,hasChan2,chan,&cv,&chan2done](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, chan, hasChan2, Quality::High); _transmitPacket(src, blk, chan, hasChan2, Quality::High);
chan2done = true;
cv.notify_one();
});
} catch (std::exception &e) {
LOG(ERROR) << "Exception in encoder: " << e.what();
chan2done = true;
cv.notify_one();
}
}); });
} else { } else {
if (enc2) enc2->reset(); if (enc2) enc2->reset();
chan2done = true;
} }
// TODO: Stagger the reset between nodes... random phasing // TODO: Stagger the reset between nodes... random phasing
if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc1->reset(); if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc1->reset();
enc1->encode(fs.frames[j].get<cv::cuda::GpuMat>(Channel::Colour), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ enc1->encode(fs.frames[j].get<cv::cuda::GpuMat>(Channel::Colour), src->hq_bitrate, [this,src,hasChan2,&mtx](const ftl::codecs::Packet &blk){
_transmitPacket(src, blk, Channel::Colour, hasChan2, Quality::High); _transmitPacket(src, blk, Channel::Colour, hasChan2, Quality::High);
}); });
// Ensure both channels have been completed.
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [&chan2done]{ return chan2done; });
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment