diff --git a/applications/gui/src/camera.cpp b/applications/gui/src/camera.cpp index acebcdc9f4ba43ea0b485ebe163fe6c106efbd0a..a4880a550661c8225bb431c34acb06f11dfb60d4 100644 --- a/applications/gui/src/camera.cpp +++ b/applications/gui/src/camera.cpp @@ -226,10 +226,14 @@ ftl::gui::Camera::~Camera() { //delete fileout_; } -void ftl::gui::Camera::draw(ftl::rgbd::FrameSet &fs) { +void ftl::gui::Camera::draw(std::vector<ftl::rgbd::FrameSet*> &fss) { if (fid_ != 255) return; + if (fsid_ >= fss.size()) return; + + auto &fs = *fss[fsid_]; - UNIQUE_LOCK(mutex_, lk); + UNIQUE_LOCK(fs.mtx,lk); + UNIQUE_LOCK(mutex_, lk2); _draw(fs); } @@ -307,9 +311,12 @@ void ftl::gui::Camera::_downloadFrames(ftl::rgbd::Frame *frame) { } } -void ftl::gui::Camera::update(ftl::rgbd::FrameSet &fs) { +void ftl::gui::Camera::update(std::vector<ftl::rgbd::FrameSet*> &fss) { UNIQUE_LOCK(mutex_, lk); + if (fss.size() <= fsid_) return; + auto &fs = *fss[fsid_]; + ftl::rgbd::Frame *frame = nullptr; if (fid_ == 255) { diff --git a/applications/gui/src/camera.hpp b/applications/gui/src/camera.hpp index 1c98790e26274452dcf221912f3a57de97ea6d2b..e1b16583bc4b5fe33b791ea02addb8e5322bdcbe 100644 --- a/applications/gui/src/camera.hpp +++ b/applications/gui/src/camera.hpp @@ -52,14 +52,14 @@ class Camera { /** * Main function to obtain latest frames. */ - void update(ftl::rgbd::FrameSet &fs); + void update(std::vector<ftl::rgbd::FrameSet *> &fss); /** * Update the available channels. */ void update(const ftl::codecs::Channels<0> &c) { channels_ = (isVirtual()) ? c + ftl::codecs::Channel::Right : c; } - void draw(ftl::rgbd::FrameSet &fs); + void draw(std::vector<ftl::rgbd::FrameSet*> &fss); const GLTexture &captureFrame(); const GLTexture &getLeft() const { return texture1_; } diff --git a/applications/gui/src/src_window.cpp b/applications/gui/src/src_window.cpp index 95f9240a1dd8f3e008f1e1d70caf33e14081b96d..68369dfee50cb55a8a73403b996b727d731e3268 100644 --- a/applications/gui/src/src_window.cpp +++ b/applications/gui/src/src_window.cpp @@ -77,31 +77,31 @@ SourceWindow::SourceWindow(ftl::gui::Screen *screen) if (recorder_->active() && pkt.data.size() > 0) recorder_->post(spkt, pkt); }); - pre_pipeline_ = ftl::config::create<ftl::operators::Graph>(screen->root(), "pre_filters"); - //pre_pipeline_->append<ftl::operators::HFSmoother>("hfnoise"); - //pre_pipeline_->append<ftl::operators::ColourChannels>("colour"); // Convert BGR to BGRA - pre_pipeline_->append<ftl::operators::CrossSupport>("cross"); - pre_pipeline_->append<ftl::operators::DiscontinuityMask>("discontinuity"); - pre_pipeline_->append<ftl::operators::CullDiscontinuity>("remove_discontinuity"); - pre_pipeline_->append<ftl::operators::VisCrossSupport>("viscross")->set("enabled", false); - paused_ = false; cycle_ = 0; receiver_->onFrameSet([this](ftl::rgbd::FrameSet &fs) { + // Request the channels required by current camera configuration + interceptor_->select(fs.id, _aggregateChannels()); + + /*if (fs.id > 0) { + LOG(INFO) << "Got frameset: " << fs.id; + return true; + }*/ + + // Make sure there are enough framesets allocated + _checkFrameSets(fs.id); + if (!paused_) { // Enforce interpolated colour for (int i=0; i<fs.frames.size(); ++i) { fs.frames[i].createTexture<uchar4>(Channel::Colour, true); } - pre_pipeline_->apply(fs, fs, 0); + pre_pipelines_[fs.id]->apply(fs, fs, 0); - fs.swapTo(frameset_); + fs.swapTo(*framesets_[fs.id]); } - // Request the channels required by current camera configuration - interceptor_->select(frameset_.id, _aggregateChannels()); - /*if (fs.frames[0].hasChannel(Channel::Data)) { int data = 0; fs.frames[0].get(Channel::Data, data); @@ -109,7 +109,7 @@ SourceWindow::SourceWindow(ftl::gui::Screen *screen) }*/ const auto *cstream = interceptor_; - _createDefaultCameras(frameset_, cstream->available(fs.id).has(Channel::Depth)); + _createDefaultCameras(*framesets_[fs.id], cstream->available(fs.id).has(Channel::Depth)); //LOG(INFO) << "Channels = " << (unsigned int)cstream->available(fs.id); @@ -117,9 +117,9 @@ SourceWindow::SourceWindow(ftl::gui::Screen *screen) for (auto cam : cameras_) { // Only update the camera periodically unless the active camera if (screen_->activeCamera() == cam.second.camera || - (screen_->activeCamera() == nullptr && cycle_ % cameras_.size() == i++)) cam.second.camera->update(frameset_); + (screen_->activeCamera() == nullptr && cycle_ % cameras_.size() == i++)) cam.second.camera->update(framesets_); - cam.second.camera->update(cstream->available(frameset_.id)); + cam.second.camera->update(cstream->available(fs.id)); } ++cycle_; @@ -129,8 +129,9 @@ SourceWindow::SourceWindow(ftl::gui::Screen *screen) speaker_ = ftl::create<ftl::audio::Speaker>(screen_->root(), "speaker_test"); receiver_->onAudio([this](ftl::audio::FrameSet &fs) { + if (framesets_.size() == 0) return true; //LOG(INFO) << "Audio delay required = " << (ts - frameset_.timestamp) << "ms"; - speaker_->setDelay(fs.timestamp - frameset_.timestamp + ftl::timer::getInterval()); // Add Xms for local render time + speaker_->setDelay(fs.timestamp - framesets_[0]->timestamp + ftl::timer::getInterval()); // Add Xms for local render time speaker_->queue(fs.timestamp, fs.frames[0]); return true; }); @@ -139,8 +140,7 @@ SourceWindow::SourceWindow(ftl::gui::Screen *screen) auto *c = screen_->activeCamera(); // Only offer full framerate render on active camera. if (c) { - UNIQUE_LOCK(frameset_.mtx,lk); - c->draw(frameset_); + c->draw(framesets_); } return true; }); @@ -151,6 +151,7 @@ SourceWindow::SourceWindow(ftl::gui::Screen *screen) // Check paths for FTL files to load. auto paths = (*screen->root()->get<nlohmann::json>("paths")); + int ftl_count = 0; for (auto &x : paths.items()) { std::string path = x.value().get<std::string>(); auto eix = path.find_last_of('.'); @@ -159,15 +160,30 @@ SourceWindow::SourceWindow(ftl::gui::Screen *screen) // Command line path is ftl file if (ext == "ftl") { LOG(INFO) << "Found FTL file: " << path; - auto *fstream = ftl::create<ftl::stream::File>(screen->root(), "ftlfile"); + auto *fstream = ftl::create<ftl::stream::File>(screen->root(), std::string("ftlfile-")+std::to_string(ftl_count+1)); fstream->set("filename", path); - stream_->add(fstream); + stream_->add(fstream, ftl_count++); } } stream_->begin(); } +void SourceWindow::_checkFrameSets(int id) { + while (framesets_.size() <= id) { + auto *p = ftl::config::create<ftl::operators::Graph>(screen_->root(), "pre_filters"); + //pre_pipeline_->append<ftl::operators::HFSmoother>("hfnoise"); + //pre_pipeline_->append<ftl::operators::ColourChannels>("colour"); // Convert BGR to BGRA + p->append<ftl::operators::CrossSupport>("cross"); + p->append<ftl::operators::DiscontinuityMask>("discontinuity"); + p->append<ftl::operators::CullDiscontinuity>("remove_discontinuity"); + p->append<ftl::operators::VisCrossSupport>("viscross")->set("enabled", false); + + pre_pipelines_.push_back(p); + framesets_.push_back(new ftl::rgbd::FrameSet); + } +} + void SourceWindow::recordVideo(const std::string &filename) { if (!recorder_->active()) { recorder_->set("filename", filename); @@ -201,9 +217,9 @@ ftl::codecs::Channels<0> SourceWindow::_aggregateChannels() { void SourceWindow::_createDefaultCameras(ftl::rgbd::FrameSet &fs, bool makevirtual) { for (int i=0; i<fs.frames.size(); ++i) { - int id = i; // TODO: Include frameset id + int id = (fs.id << 8) + i; if (cameras_.find(id) == cameras_.end()) { - auto *cam = new ftl::gui::Camera(screen_, 0, i); + auto *cam = new ftl::gui::Camera(screen_, fs.id, i); cameras_[id] = { cam, nullptr @@ -211,9 +227,9 @@ void SourceWindow::_createDefaultCameras(ftl::rgbd::FrameSet &fs, bool makevirtu } } - if (makevirtual && cameras_.find(-1) == cameras_.end()) { - auto *cam = new ftl::gui::Camera(screen_, 0, 255); - cameras_[-1] = { + if (makevirtual && cameras_.find((fs.id << 8) + 255) == cameras_.end()) { + auto *cam = new ftl::gui::Camera(screen_, fs.id, 255); + cameras_[(fs.id << 8) + 255] = { cam, nullptr }; diff --git a/applications/gui/src/src_window.hpp b/applications/gui/src/src_window.hpp index 6cbcb7c316fc1db0e8a1e3272f151f4afc3260bf..fccab65c35b00f0b6f99f9bb24bb15f5f04d57f1 100644 --- a/applications/gui/src/src_window.hpp +++ b/applications/gui/src/src_window.hpp @@ -69,17 +69,18 @@ class SourceWindow : public nanogui::Window { bool refresh_thumbs_; nanogui::Widget *ipanel_; int cycle_; - ftl::operators::Graph *pre_pipeline_; + std::vector<ftl::operators::Graph*> pre_pipelines_; MUTEX mutex_; ftl::audio::Speaker *speaker_; - ftl::rgbd::FrameSet frameset_; + std::vector<ftl::rgbd::FrameSet*> framesets_; bool paused_; void _updateCameras(const std::vector<std::string> &netcams); void _createDefaultCameras(ftl::rgbd::FrameSet &fs, bool makevirtual); ftl::codecs::Channels<0> _aggregateChannels(); + void _checkFrameSets(int id); }; diff --git a/components/codecs/src/nvpipe_decoder.cpp b/components/codecs/src/nvpipe_decoder.cpp index 5a61969a748547b9a1a666227433b9eec0f90d5c..3f41ce7a6f359f6fbd99de64a8cab8b9bd7a6136 100644 --- a/components/codecs/src/nvpipe_decoder.cpp +++ b/components/codecs/src/nvpipe_decoder.cpp @@ -133,7 +133,7 @@ bool NvPipeDecoder::decode(const ftl::codecs::Packet &pkt, cv::cuda::GpuMat &out //LOG(WARNING) << "Decode of multiple frames: " << count; } else { if (!_checkIFrame(pkt.codec, pkt.data.data(), pkt.data.size())) { - LOG(WARNING) << "P-Frame without I-Frame in decoder"; + LOG(WARNING) << "P-Frame without I-Frame in decoder: " << pkt.data.size(); return false; } rc = NvPipe_Decode(nv_decoder_, pkt.data.data(), pkt.data.size(), out.data, out.cols, out.rows, out.step); diff --git a/components/rgbd-sources/include/ftl/rgbd/frameset.hpp b/components/rgbd-sources/include/ftl/rgbd/frameset.hpp index a1e5396622017622a5de9dceed80059012b7106a..1fcbf4f3f9a4322c27bc065886e142fef678adfa 100644 --- a/components/rgbd-sources/include/ftl/rgbd/frameset.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/frameset.hpp @@ -105,6 +105,8 @@ class Builder : public Generator { ftl::rgbd::FrameState &state(size_t ix) override; + inline void setID(int id) { id_ = id; } + void onFrameSet(const ftl::rgbd::VideoCallback &) override; /** @@ -137,6 +139,7 @@ class Builder : public Generator { int stats_count_; int64_t last_ts_; int64_t last_frame_; + int id_; std::atomic<int> jobs_; volatile bool skip_; ftl::timer::TimerHandle main_id_; diff --git a/components/rgbd-sources/src/frameset.cpp b/components/rgbd-sources/src/frameset.cpp index 740af42ff10cb35a6af7aab29ecf83be2eca5979..acf9e1c2f91dad2fab01b4f29281cb1f42058898 100644 --- a/components/rgbd-sources/src/frameset.cpp +++ b/components/rgbd-sources/src/frameset.cpp @@ -59,7 +59,7 @@ void FrameSet::resetFull() { // ============================================================================= -Builder::Builder() : head_(0) { +Builder::Builder() : head_(0), id_(0) { jobs_ = 0; skip_ = false; //setFPS(20); @@ -102,7 +102,7 @@ ftl::rgbd::Frame &Builder::get(int64_t timestamp, size_t ix) { //states_[ix] = frame.origin(); if (timestamp <= last_frame_) { - throw FTL_Error("Frameset already completed"); + throw FTL_Error("Frameset already completed: " << timestamp); } auto *fs = _findFrameset(timestamp); @@ -374,7 +374,7 @@ ftl::rgbd::FrameSet *Builder::_addFrameset(int64_t timestamp) { allocated_.pop_front(); newf->timestamp = timestamp; - newf->id = 0; + newf->id = id_; newf->count = 0; newf->mask = 0; newf->stale = false; diff --git a/components/streams/include/ftl/streams/receiver.hpp b/components/streams/include/ftl/streams/receiver.hpp index a4bbfd917608a5c66fdf512ef6488fffc15f6319..f3396311887669ddf60cfe4ee82e09a8ff6cf73c 100644 --- a/components/streams/include/ftl/streams/receiver.hpp +++ b/components/streams/include/ftl/streams/receiver.hpp @@ -38,13 +38,18 @@ class Receiver : public ftl::Configurable, public ftl::rgbd::Generator { */ void onFrameSet(const ftl::rgbd::VideoCallback &cb) override; + /** + * Add a frameset handler to a specific stream ID. + */ + void onFrameSet(int s, const ftl::rgbd::VideoCallback &cb); + void onAudio(const ftl::audio::FrameSet::Callback &cb); private: ftl::stream::Stream *stream_; ftl::rgbd::VideoCallback fs_callback_; ftl::audio::FrameSet::Callback audio_cb_; - ftl::rgbd::Builder builder_; + ftl::rgbd::Builder builder_[ftl::stream::kMaxStreams]; ftl::codecs::Channel second_channel_; int64_t timestamp_; SHARED_MUTEX mutex_; @@ -71,8 +76,8 @@ class Receiver : public ftl::Configurable, public ftl::rgbd::Generator { ftl::codecs::Channels<0> completed; }; - std::vector<InternalVideoStates*> video_frames_; - std::vector<InternalAudioStates*> audio_frames_; + std::vector<InternalVideoStates*> video_frames_[ftl::stream::kMaxStreams]; + std::vector<InternalAudioStates*> audio_frames_[ftl::stream::kMaxStreams]; void _processConfig(InternalVideoStates &frame, const ftl::codecs::Packet &pkt); void _processState(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt); diff --git a/components/streams/include/ftl/streams/stream.hpp b/components/streams/include/ftl/streams/stream.hpp index c90b5ce4f68faad388bf3bc7f37e3b9a4e45f4b6..7929f1ee2acf5f6621ae542f09c8d176239facfe 100644 --- a/components/streams/include/ftl/streams/stream.hpp +++ b/components/streams/include/ftl/streams/stream.hpp @@ -101,6 +101,8 @@ class Stream : public ftl::Configurable { mutable SHARED_MUTEX mtx_; }; +static constexpr size_t kMaxStreams = 5; + /** * Combine multiple streams into a single stream. StreamPackets are modified * by mapping the stream identifiers consistently to new values. Both reading @@ -133,13 +135,13 @@ class Muxer : public Stream { }; std::vector<StreamEntry> streams_; - std::vector<std::pair<int,int>> revmap_; - int nid_; + std::vector<std::pair<int,int>> revmap_[kMaxStreams]; + int nid_[kMaxStreams]; StreamCallback cb_; SHARED_MUTEX mutex_; void _notify(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt); - int _lookup(int sid, int ssid); + int _lookup(int fsid, int sid, int ssid); }; /** diff --git a/components/streams/src/injectors.cpp b/components/streams/src/injectors.cpp index f75945576e349ff86f010b10aeddf800fb6119c2..c9ccce40121ca3789bbf030ccc6a037ac760f4d7 100644 --- a/components/streams/src/injectors.cpp +++ b/components/streams/src/injectors.cpp @@ -5,7 +5,7 @@ using ftl::codecs::Channel; using ftl::util::FTLVectorBuffer; void ftl::stream::injectCalibration(ftl::stream::Stream *stream, const ftl::rgbd::FrameSet &fs, int ix, bool right) { - ftl::stream::injectCalibration(stream, fs.frames[ix], fs.timestamp, ix, right); + ftl::stream::injectCalibration(stream, fs.frames[ix], fs.timestamp, fs.id, ix, right); } void ftl::stream::injectPose(ftl::stream::Stream *stream, const ftl::rgbd::FrameSet &fs, int ix) { @@ -58,11 +58,11 @@ void ftl::stream::injectPose(ftl::stream::Stream *stream, const ftl::rgbd::Frame stream->post(spkt, pkt); } -void ftl::stream::injectCalibration(ftl::stream::Stream *stream, const ftl::rgbd::Frame &f, int64_t ts, int ix, bool right) { +void ftl::stream::injectCalibration(ftl::stream::Stream *stream, const ftl::rgbd::Frame &f, int64_t ts, int fsid, int ix, bool right) { ftl::codecs::StreamPacket spkt = { 4, ts, - 0, + fsid, static_cast<uint8_t>(ix), (right) ? Channel::Calibration2 : Channel::Calibration }; diff --git a/components/streams/src/injectors.hpp b/components/streams/src/injectors.hpp index 6fbb76031c7213545fb1cb9e9708764b936a00ac..0d2a35aeee928d89362c133f641b7b8f118061a8 100644 --- a/components/streams/src/injectors.hpp +++ b/components/streams/src/injectors.hpp @@ -11,7 +11,7 @@ void injectCalibration(ftl::stream::Stream *stream, const ftl::rgbd::FrameSet &f void injectConfig(ftl::stream::Stream *stream, const ftl::rgbd::FrameSet &fs, int ix); void injectPose(ftl::stream::Stream *stream, const ftl::rgbd::Frame &fs, int64_t ts, int ix); -void injectCalibration(ftl::stream::Stream *stream, const ftl::rgbd::Frame &fs, int64_t ts, int ix, bool right=false); +void injectCalibration(ftl::stream::Stream *stream, const ftl::rgbd::Frame &fs, int64_t ts, int fsid, int ix, bool right=false); } } diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index 62c96c78d920683c6a22d3125430174d61e11e58..cb1d62844c8b3798e1149efe38e644bedcba5d85 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -24,6 +24,10 @@ using ftl::stream::injectPose; Receiver::Receiver(nlohmann::json &config) : ftl::Configurable(config), stream_(nullptr) { timestamp_ = 0; second_channel_ = Channel::Depth; + + for (int i=0; i<ftl::stream::kMaxStreams; ++i) { + builder_[i].setID(i); + } } Receiver::~Receiver() { @@ -31,7 +35,7 @@ Receiver::~Receiver() { // stream_->onPacket(nullptr); //} - builder_.onFrameSet(nullptr); + builder_[0].onFrameSet(nullptr); } void Receiver::onAudio(const ftl::audio::FrameSet::Callback &cb) { @@ -49,7 +53,7 @@ void Receiver::onAudio(const ftl::audio::FrameSet::Callback &cb) { }*/ void Receiver::_createDecoder(InternalVideoStates &frame, int chan, const ftl::codecs::Packet &pkt) { - //UNIQUE_LOCK(mutex_,lk); + UNIQUE_LOCK(frame.mutex,lk); auto *decoder = frame.decoders[chan]; if (decoder) { if (!decoder->accepts(pkt)) { @@ -73,12 +77,12 @@ Receiver::InternalVideoStates &Receiver::_getVideoFrame(const StreamPacket &spkt uint32_t fn = spkt.frameNumber()+ix; UNIQUE_LOCK(mutex_, lk); - while (video_frames_.size() <= fn) { + while (video_frames_[spkt.streamID].size() <= fn) { //frames_.resize(spkt.frameNumber()+1); - video_frames_.push_back(new InternalVideoStates); - video_frames_[video_frames_.size()-1]->state.set("name",std::string("Source ")+std::to_string(fn+1)); + video_frames_[spkt.streamID].push_back(new InternalVideoStates); + video_frames_[spkt.streamID][video_frames_[spkt.streamID].size()-1]->state.set("name",std::string("Source ")+std::to_string(fn+1)); } - auto &f = *video_frames_[fn]; + auto &f = *video_frames_[spkt.streamID][fn]; if (!f.frame.origin()) f.frame.setOrigin(&f.state); return f; } @@ -91,12 +95,12 @@ Receiver::InternalAudioStates &Receiver::_getAudioFrame(const StreamPacket &spkt uint32_t fn = spkt.frameNumber()+ix; UNIQUE_LOCK(mutex_, lk); - while (audio_frames_.size() <= fn) { + while (audio_frames_[spkt.streamID].size() <= fn) { //frames_.resize(spkt.frameNumber()+1); - audio_frames_.push_back(new InternalAudioStates); - audio_frames_[audio_frames_.size()-1]->state.set("name",std::string("Source ")+std::to_string(fn+1)); + audio_frames_[spkt.streamID].push_back(new InternalAudioStates); + audio_frames_[spkt.streamID][audio_frames_[spkt.streamID].size()-1]->state.set("name",std::string("Source ")+std::to_string(fn+1)); } - auto &f = *audio_frames_[fn]; + auto &f = *audio_frames_[spkt.streamID][fn]; if (!f.frame.origin()) f.frame.setOrigin(&f.state); return f; } @@ -173,7 +177,7 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { // Do the actual decode into the surface buffer try { if (!decoder->decode(pkt, surface)) { - LOG(ERROR) << "Decode failed"; + LOG(ERROR) << "Decode failed on channel " << (int)spkt.channel; return; } } catch (std::exception &e) { @@ -197,7 +201,7 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { // Note: Done in reverse to allocate correct number of frames first time round for (int i=pkt.frame_count-1; i>=0; --i) { InternalVideoStates &vidstate = _getVideoFrame(spkt,i); - auto &frame = builder_.get(spkt.timestamp, spkt.frame_number+i); + auto &frame = builder_[spkt.streamID].get(spkt.timestamp, spkt.frame_number+i); if (!frame.origin()) frame.setOrigin(&vidstate.state); @@ -252,7 +256,7 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { for (int i=0; i<pkt.frame_count; ++i) { InternalVideoStates &vidstate = _getVideoFrame(spkt,i); - auto &frame = builder_.get(spkt.timestamp, spkt.frame_number+i); + auto &frame = builder_[spkt.streamID].get(spkt.timestamp, spkt.frame_number+i); auto sel = stream_->selected(spkt.frameSetID()); @@ -280,12 +284,12 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { // TODO: Have multiple builders for different framesets. //builder_.push(frame.timestamp, spkt.frameNumber()+i, frame.frame); - builder_.completed(spkt.timestamp, spkt.frame_number+i); + builder_[spkt.streamID].completed(spkt.timestamp, spkt.frame_number+i); // Check for any state changes and send them back if (vidstate.state.hasChanged(Channel::Pose)) injectPose(stream_, frame, spkt.timestamp, spkt.frameNumber()+i); - if (vidstate.state.hasChanged(Channel::Calibration)) injectCalibration(stream_, frame, spkt.timestamp, spkt.frameNumber()+i); - if (vidstate.state.hasChanged(Channel::Calibration2)) injectCalibration(stream_, frame, spkt.timestamp, spkt.frameNumber()+i, true); + if (vidstate.state.hasChanged(Channel::Calibration)) injectCalibration(stream_, frame, spkt.timestamp, spkt.streamID, spkt.frameNumber()+i); + if (vidstate.state.hasChanged(Channel::Calibration2)) injectCalibration(stream_, frame, spkt.timestamp, spkt.streamID, spkt.frameNumber()+i, true); //frame.reset(); //frame.completed.clear(); @@ -310,8 +314,8 @@ void Receiver::setStream(ftl::stream::Stream *s) { //LOG(INFO) << "PACKET: " << spkt.timestamp << ", " << (int)spkt.channel << ", " << (int)pkt.codec << ", " << (int)pkt.definition; // TODO: Allow for multiple framesets - if (spkt.frameSetID() > 0) LOG(INFO) << "Frameset " << spkt.frameSetID() << " received: " << (int)spkt.channel; - if (spkt.frameSetID() > 0) return; + //if (spkt.frameSetID() > 0) LOG(INFO) << "Frameset " << spkt.frameSetID() << " received: " << (int)spkt.channel; + if (spkt.frameSetID() >= ftl::stream::kMaxStreams) return; // Too many frames, so ignore. if (spkt.frameNumber() >= value("max_frames",32)) return; @@ -333,13 +337,19 @@ void Receiver::setStream(ftl::stream::Stream *s) { } size_t Receiver::size() { - return builder_.size(); + return builder_[0].size(); } ftl::rgbd::FrameState &Receiver::state(size_t ix) { - return builder_.state(ix); + return builder_[0].state(ix); } void Receiver::onFrameSet(const ftl::rgbd::VideoCallback &cb) { - builder_.onFrameSet(cb); + for (int i=0; i<ftl::stream::kMaxStreams; ++i) + builder_[i].onFrameSet(cb); +} + +void Receiver::onFrameSet(int s, const ftl::rgbd::VideoCallback &cb) { + if (s >= 0 && s < ftl::stream::kMaxStreams) + builder_[s].onFrameSet(cb); } diff --git a/components/streams/src/stream.cpp b/components/streams/src/stream.cpp index 8175a9a30791fb1ddd99af1b5ca77dfc967c0ad6..445300cdb083aa9f35b20d5f50f1068a12bb88af 100644 --- a/components/streams/src/stream.cpp +++ b/components/streams/src/stream.cpp @@ -40,7 +40,7 @@ void Stream::reset() { // ==== Muxer ================================================================== -Muxer::Muxer(nlohmann::json &config) : Stream(config), nid_(0) { +Muxer::Muxer(nlohmann::json &config) : Stream(config), nid_{0} { } @@ -51,24 +51,26 @@ Muxer::~Muxer() { void Muxer::add(Stream *s, int fsid) { UNIQUE_LOCK(mutex_,lk); + if (fsid < 0 || fsid >= ftl::stream::kMaxStreams) return; auto &se = streams_.emplace_back(); int i = streams_.size()-1; se.stream = s; s->onPacket([this,s,i,fsid](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { - //SHARED_LOCK(mutex_, lk); + //TODO: Allow input streams to have other streamIDs + // Same fsid means same streamIDs map together in the end ftl::codecs::StreamPacket spkt2 = spkt; spkt2.streamID = fsid; if (spkt2.frame_number < 255) { - int id = _lookup(i, spkt.frame_number); + int id = _lookup(fsid, i, spkt.frame_number); spkt2.frame_number = id; } _notify(spkt2, pkt); - s->select(spkt.streamID, selected(0)); + s->select(spkt.streamID, selected(fsid)); }); } @@ -79,8 +81,8 @@ bool Muxer::onPacket(const std::function<void(const ftl::codecs::StreamPacket &, } int Muxer::originStream(int fsid, int fid) { - if (static_cast<uint32_t>(fid) < revmap_.size()) { - return std::get<0>(revmap_[fid]); + if (fsid < ftl::stream::kMaxStreams && static_cast<uint32_t>(fid) < revmap_[fsid].size()) { + return std::get<0>(revmap_[fsid][fid]); } return -1; } @@ -89,8 +91,8 @@ bool Muxer::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packe SHARED_LOCK(mutex_, lk); available(spkt.frameSetID()) += spkt.channel; - if (spkt.frame_number < revmap_.size()) { - auto [sid, ssid] = revmap_[spkt.frame_number]; + if (spkt.streamID < ftl::stream::kMaxStreams && spkt.frame_number < revmap_[spkt.streamID].size()) { + auto [sid, ssid] = revmap_[spkt.streamID][spkt.frame_number]; auto &se = streams_[sid]; //LOG(INFO) << "POST " << spkt.frame_number; @@ -135,7 +137,7 @@ void Muxer::reset() { } } -int Muxer::_lookup(int sid, int ssid) { +int Muxer::_lookup(int fsid, int sid, int ssid) { SHARED_LOCK(mutex_, lk); auto &se = streams_[sid]; if (static_cast<uint32_t>(ssid) >= se.maps.size()) { @@ -143,9 +145,9 @@ int Muxer::_lookup(int sid, int ssid) { { UNIQUE_LOCK(mutex_, lk2); if (static_cast<uint32_t>(ssid) >= se.maps.size()) { - int nid = nid_++; + int nid = nid_[fsid]++; se.maps.push_back(nid); - revmap_.push_back({sid,ssid}); + revmap_[fsid].push_back({sid,ssid}); } } lk.lock(); diff --git a/components/streams/test/receiver_unit.cpp b/components/streams/test/receiver_unit.cpp index e6dd0c6b789fffc0f488e8463facd0bb4c85cf17..d683d4abddfc5f3e764150aa50710fa242718829 100644 --- a/components/streams/test/receiver_unit.cpp +++ b/components/streams/test/receiver_unit.cpp @@ -85,7 +85,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { state.getLeft().width = 1280; state.getLeft().height = 720; dummy.setOrigin(&state); - ftl::stream::injectCalibration(&stream, dummy, 0, 0); + ftl::stream::injectCalibration(&stream, dummy, 0, 0, 0); ftl::timer::start(false); @@ -116,9 +116,33 @@ TEST_CASE( "Receiver generating onFrameSet" ) { REQUIRE( count == 1 ); } + SECTION("multi-frameset") { + cv::cuda::GpuMat m(cv::Size(1280,720), CV_8UC4, cv::Scalar(0)); + ftl::stream::injectCalibration(&stream, dummy, 1, 0, 0); + + bool r = encoder.encode(m, pkt); + REQUIRE( r ); + + stream.post(spkt, pkt); + + std::atomic<int> mask = 0; + receiver->onFrameSet([&mask](ftl::rgbd::FrameSet &fs) { + mask |= 1 << fs.id; + return true; + }); + + spkt.streamID = 1; + stream.post(spkt, pkt); + + int i=10; + while (i-- > 0 && mask != 3) std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + REQUIRE( mask == 3 ); + } + SECTION("a tiled colour frame") { cv::cuda::GpuMat m(cv::Size(2560,720), CV_8UC4, cv::Scalar(0)); - ftl::stream::injectCalibration(&stream, dummy, 0, 1); + ftl::stream::injectCalibration(&stream, dummy, 0, 0, 1); pkt.frame_count = 2; bool r = encoder.encode(m, pkt); @@ -150,7 +174,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { SECTION("a tiled lossy depth frame") { cv::cuda::GpuMat m(cv::Size(2560,720), CV_8UC4, cv::Scalar(0)); - ftl::stream::injectCalibration(&stream, dummy, 0, 1); + ftl::stream::injectCalibration(&stream, dummy, 0, 0, 1); spkt.channel = Channel::Depth; pkt.frame_count = 2; @@ -184,7 +208,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { SECTION("a tiled lossless depth frame") { cv::cuda::GpuMat m(cv::Size(2560,720), CV_16U, cv::Scalar(0)); - ftl::stream::injectCalibration(&stream, dummy, 0, 1); + ftl::stream::injectCalibration(&stream, dummy, 0, 0, 1); spkt.channel = Channel::Depth; pkt.frame_count = 2; @@ -258,7 +282,7 @@ TEST_CASE( "Receiver sync bugs" ) { state.getLeft().width = 1280; state.getLeft().height = 720; dummy.setOrigin(&state); - ftl::stream::injectCalibration(&stream, dummy, 0, 0); + ftl::stream::injectCalibration(&stream, dummy, 0, 0, 0); ftl::timer::start(false); diff --git a/components/streams/test/stream_unit.cpp b/components/streams/test/stream_unit.cpp index 0513c555082ae4acf3e44bb8e19128be1533c573..aa2093a4045e0b70fc6615480047b9e975180d1e 100644 --- a/components/streams/test/stream_unit.cpp +++ b/components/streams/test/stream_unit.cpp @@ -101,12 +101,67 @@ TEST_CASE("ftl::stream::Muxer()::write", "[stream]") { tspkt3 = spkt; }); - REQUIRE( mux->post({4,200,1,1,ftl::codecs::Channel::Colour},{}) ); + REQUIRE( mux->post({4,200,0,1,ftl::codecs::Channel::Colour},{}) ); REQUIRE( tspkt3.timestamp == 200 ); REQUIRE( tspkt3.frame_number == 0 ); } } +TEST_CASE("ftl::stream::Muxer()::post multi-frameset", "[stream]") { + json_t global = json_t{{"$id","ftl://test"}}; + ftl::config::configure(global); + + json_t cfg = json_t{ + {"$id","ftl://test/1"} + }; + + Muxer *mux = ftl::create<Muxer>(cfg); + REQUIRE(mux); + + SECTION("write to previously read") { + json_t cfg1 = json_t{ + {"$id","ftl://test/2"} + }; + json_t cfg2 = json_t{ + {"$id","ftl://test/3"} + }; + + Stream *s1 = ftl::create<TestStream>(cfg1); + REQUIRE(s1); + Stream *s2 = ftl::create<TestStream>(cfg2); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2,1); + + ftl::codecs::StreamPacket tspkt = {4,0,0,1,ftl::codecs::Channel::Colour}; + mux->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + tspkt = spkt; + }); + + REQUIRE( s1->post({4,100,0,0,ftl::codecs::Channel::Colour},{}) ); + REQUIRE( tspkt.streamID == 0 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,101,0,0,ftl::codecs::Channel::Colour},{}) ); + REQUIRE( tspkt.streamID == 1 ); + REQUIRE( tspkt.frame_number == 0 ); + + ftl::codecs::StreamPacket tspkt2 = {4,0,0,1,ftl::codecs::Channel::Colour}; + ftl::codecs::StreamPacket tspkt3 = {4,0,0,1,ftl::codecs::Channel::Colour}; + s1->onPacket([&tspkt2](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + tspkt2 = spkt; + }); + s2->onPacket([&tspkt3](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + tspkt3 = spkt; + }); + + REQUIRE( mux->post({4,200,1,0,ftl::codecs::Channel::Colour},{}) ); + REQUIRE( tspkt3.streamID == 0 ); + REQUIRE( tspkt3.frame_number == 0 ); + } +} + TEST_CASE("ftl::stream::Muxer()::read", "[stream]") { json_t global = json_t{{"$id","ftl://test"}}; ftl::config::configure(global); @@ -195,6 +250,68 @@ TEST_CASE("ftl::stream::Muxer()::read", "[stream]") { } } +TEST_CASE("ftl::stream::Muxer()::read multi-frameset", "[stream]") { + json_t global = json_t{{"$id","ftl://test"}}; + ftl::config::configure(global); + + json_t cfg = json_t{ + {"$id","ftl://test/1"} + }; + + Muxer *mux = ftl::create<Muxer>(cfg); + REQUIRE(mux); + + //SECTION("read with two writing streams") { + json_t cfg1 = json_t{ + {"$id","ftl://test/2"} + }; + json_t cfg2 = json_t{ + {"$id","ftl://test/3"} + }; + json_t cfg3 = json_t{ + {"$id","ftl://test/4"} + }; + json_t cfg4 = json_t{ + {"$id","ftl://test/5"} + }; + + Stream *s1 = ftl::create<TestStream>(cfg1); + REQUIRE(s1); + Stream *s2 = ftl::create<TestStream>(cfg2); + REQUIRE(s2); + Stream *s3 = ftl::create<TestStream>(cfg3); + REQUIRE(s3); + Stream *s4 = ftl::create<TestStream>(cfg4); + REQUIRE(s4); + + mux->add(s1,0); + mux->add(s2,1); + mux->add(s3,0); + mux->add(s4,1); + + ftl::codecs::StreamPacket tspkt = {4,0,0,1,ftl::codecs::Channel::Colour}; + mux->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + tspkt = spkt; + }); + + REQUIRE( s1->post({4,100,0,0,ftl::codecs::Channel::Colour},{}) ); + REQUIRE( tspkt.streamID == 0 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,101,0,0,ftl::codecs::Channel::Colour},{}) ); + REQUIRE( tspkt.streamID == 1 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s3->post({4,102,0,0,ftl::codecs::Channel::Colour},{}) ); + REQUIRE( tspkt.streamID == 0 ); + REQUIRE( tspkt.frame_number == 1 ); + + REQUIRE( s4->post({4,103,0,0,ftl::codecs::Channel::Colour},{}) ); + REQUIRE( tspkt.streamID == 1 ); + REQUIRE( tspkt.frame_number == 1 ); + //} +} + TEST_CASE("ftl::stream::Broadcast()::write", "[stream]") { json_t global = json_t{{"$id","ftl://test"}}; ftl::config::configure(global);