diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index a75f9cd64f168c00c701d6532f6f434ea2e7d8ae..4c3553a4c56bc1c4d97e2d9a9c6f89740bd51d77 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -28,6 +28,7 @@ #include <ftl/streams/netstream.hpp> #include <ftl/streams/sender.hpp> +#include <ftl/streams/receiver.hpp> #include <ftl/audio/source.hpp> @@ -155,7 +156,12 @@ static void run(ftl::Configurable *root) { ftl::data::Pool pool(2,5); //auto creator = pool.creator<ftl::data::IntervalFrameCreator>(ftl::data::FrameID(0,0), source); - ftl::streams::IntervalSourceBuilder creator(&pool, 0, source); + auto *creator = new ftl::streams::IntervalSourceBuilder(&pool, 0, source); + std::shared_ptr<ftl::streams::BaseBuilder> creatorptr(creator); + + ftl::stream::Receiver *receiver = ftl::create<ftl::stream::Receiver>(root, "receiver", &pool); + //receiver->setStream(outstream); + receiver->registerBuilder(creatorptr); // Listen for any flush events for frameset 0 /*auto flushh = pool.group(0).onFlush([](ftl::data::Frame &f, ftl::codecs::Channel c) { @@ -181,7 +187,7 @@ static void run(ftl::Configurable *root) { bool busy = false; - auto h = creator.onFrameSet([sender,&stats_count,&latency,&frames,pipeline,&busy](const ftl::data::FrameSetPtr &fs) { + auto h = creator->onFrameSet([sender,&stats_count,&latency,&frames,pipeline,&busy](const ftl::data::FrameSetPtr &fs) { if (busy) return true; busy = true; @@ -215,7 +221,7 @@ static void run(ftl::Configurable *root) { }); // Start the timed generation of frames - creator.start(); + creator->start(); /*grp->onFrameSet([sender,&stats_count](ftl::rgbd::FrameSet &fs) { fs.id = 0; diff --git a/components/streams/include/ftl/streams/builder.hpp b/components/streams/include/ftl/streams/builder.hpp index b49be8b03925ba264684805775a111059725d56d..e3755e857512f4cdd22a55b4ed99eb6b6a5041b9 100644 --- a/components/streams/include/ftl/streams/builder.hpp +++ b/components/streams/include/ftl/streams/builder.hpp @@ -41,6 +41,8 @@ class BaseBuilder { inline size_t size() const { return size_; } + inline const int id() const { return id_; } + protected: ftl::data::Pool *pool_; int id_; diff --git a/components/streams/include/ftl/streams/filestream.hpp b/components/streams/include/ftl/streams/filestream.hpp index 0a93fc8795f947b0a71f4843f8287330f85be0fe..6805554830aabf86b3ab8de96ee167c01825381a 100644 --- a/components/streams/include/ftl/streams/filestream.hpp +++ b/components/streams/include/ftl/streams/filestream.hpp @@ -21,7 +21,7 @@ class File : public Stream { File(nlohmann::json &config, std::ofstream *); ~File(); - bool onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &) override; + //bool onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &) override; bool post(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &) override; @@ -76,7 +76,7 @@ class File : public Stream { bool is_video_; bool save_data_; - StreamCallback cb_; + //StreamCallback cb_; MUTEX mutex_; MUTEX data_mutex_; std::atomic<int> jobs_; diff --git a/components/streams/include/ftl/streams/netstream.hpp b/components/streams/include/ftl/streams/netstream.hpp index 0ad54061b41ff62a4eb2c12db18d20e415afb365..1341efb711af2ec0c19e69e65f72ffb345b02709 100644 --- a/components/streams/include/ftl/streams/netstream.hpp +++ b/components/streams/include/ftl/streams/netstream.hpp @@ -48,7 +48,7 @@ class Net : public Stream { Net(nlohmann::json &config, ftl::net::Universe *net); ~Net(); - bool onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &) override; + //bool onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &) override; bool post(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &) override; @@ -90,7 +90,7 @@ class Net : public Stream { std::list<detail::StreamClient> clients_; - StreamCallback cb_; + //StreamCallback cb_; bool _processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt); void _checkDataRate(size_t tx_size, int64_t tx_latency, int64_t ts); diff --git a/components/streams/include/ftl/streams/receiver.hpp b/components/streams/include/ftl/streams/receiver.hpp index 75bb7e1bb98e18890a90012091dcfe307feead56..af7cab87e7ee7ca09821057ceb37f3dc524249c5 100644 --- a/components/streams/include/ftl/streams/receiver.hpp +++ b/components/streams/include/ftl/streams/receiver.hpp @@ -45,6 +45,8 @@ class Receiver : public ftl::Configurable, public ftl::data::Generator { ftl::streams::BaseBuilder &builder(uint32_t id); + void registerBuilder(const std::shared_ptr<ftl::streams::BaseBuilder> &b); + private: ftl::stream::Stream *stream_; ftl::data::Pool *pool_; @@ -55,6 +57,7 @@ class Receiver : public ftl::Configurable, public ftl::data::Generator { int64_t timestamp_; SHARED_MUTEX mutex_; unsigned int frame_mask_; + ftl::Handle handle_; struct InternalVideoStates { InternalVideoStates(); diff --git a/components/streams/include/ftl/streams/sender.hpp b/components/streams/include/ftl/streams/sender.hpp index d542fc748d3f2a924fc71dfa184a669fd3285b9c..5e1045d07e5aa7a5060c32e2c720a10313caad25 100644 --- a/components/streams/include/ftl/streams/sender.hpp +++ b/components/streams/include/ftl/streams/sender.hpp @@ -48,6 +48,7 @@ class Sender : public ftl::Configurable { ftl::stream::StreamCallback reqcb_; int add_iframes_; int iframe_; + ftl::Handle handle_; struct EncodingState { uint8_t bitrate; diff --git a/components/streams/include/ftl/streams/stream.hpp b/components/streams/include/ftl/streams/stream.hpp index cdc0f47fe7daa5ad7b44c37a20b4260b6a45c442..d49aab60a2657875537e032851dc302b16cf34de 100644 --- a/components/streams/include/ftl/streams/stream.hpp +++ b/components/streams/include/ftl/streams/stream.hpp @@ -6,6 +6,7 @@ //#include <ftl/rgbd/source.hpp> //#include <ftl/rgbd/group.hpp> #include <ftl/codecs/encoder.hpp> +#include <ftl/handle.hpp> #include <ftl/threads.hpp> #include <string> #include <vector> @@ -16,7 +17,7 @@ namespace ftl { namespace stream { -typedef std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> StreamCallback; +typedef std::function<bool(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> StreamCallback; /** * Base stream class to be implemented. Provides encode and decode functionality @@ -36,7 +37,7 @@ class Stream : public ftl::Configurable { * callback even after the read function returns, for example with a * NetStream. */ - virtual bool onPacket(const StreamCallback &)=0; + ftl::Handle onPacket(const StreamCallback &cb) { return cb_.on(cb); }; virtual bool post(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)=0; @@ -84,6 +85,7 @@ class Stream : public ftl::Configurable { inline size_t size() const { return state_.size(); } protected: + ftl::Handler<const ftl::codecs::StreamPacket&, const ftl::codecs::Packet&> cb_; /** * Allow modification of available channels. Calling this with an invalid @@ -117,7 +119,7 @@ class Muxer : public Stream { void add(Stream *, size_t fsid=0); void remove(Stream *); - bool onPacket(const StreamCallback &) override; + //bool onPacket(const StreamCallback &) override; bool post(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &) override; @@ -137,8 +139,9 @@ class Muxer : public Stream { std::vector<StreamEntry> streams_; std::vector<std::pair<size_t,int>> revmap_[kMaxStreams]; + std::list<ftl::Handle> handles_; int nid_[kMaxStreams]; - StreamCallback cb_; + //StreamCallback cb_; SHARED_MUTEX mutex_; void _notify(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt); @@ -159,7 +162,7 @@ class Broadcast : public Stream { void remove(Stream *); void clear(); - bool onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &) override; + //bool onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &) override; bool post(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &) override; @@ -171,7 +174,8 @@ class Broadcast : public Stream { private: std::list<Stream*> streams_; - StreamCallback cb_; + std::list<ftl::Handle> handles_; + //StreamCallback cb_; SHARED_MUTEX mutex_; }; @@ -185,7 +189,7 @@ class Intercept : public Stream { void setStream(Stream *); - bool onPacket(const StreamCallback &) override; + //bool onPacket(const StreamCallback &) override; bool onIntercept(const StreamCallback &); bool post(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &) override; @@ -198,7 +202,8 @@ class Intercept : public Stream { private: Stream *stream_; - StreamCallback cb_; + std::list<ftl::Handle> handles_; + //StreamCallback cb_; StreamCallback intercept_; SHARED_MUTEX mutex_; }; diff --git a/components/streams/src/filestream.cpp b/components/streams/src/filestream.cpp index 7ddbfdb6a0f83cb9db3c53eca92df8ae8afc6e17..6e13884f41d5e0c3eb2d248d20ea85c9376d2524 100644 --- a/components/streams/src/filestream.cpp +++ b/components/streams/src/filestream.cpp @@ -93,11 +93,6 @@ bool File::_checkFile() { return true; } -bool File::onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { - cb_ = f; - return true; -} - bool File::post(const ftl::codecs::StreamPacket &s, const ftl::codecs::Packet &p) { if (!active_) return false; if (mode_ != Mode::Write) { @@ -211,7 +206,7 @@ bool File::tick(int64_t ts) { auto &pkt = std::get<1>(*i); try { - if (cb_) cb_(spkt, pkt); + cb_.trigger(spkt, pkt); } catch (const ftl::exception &e) { LOG(ERROR) << "Exception in packet callback: " << e.what() << e.trace(); } catch (std::exception &e) { @@ -253,15 +248,15 @@ bool File::tick(int64_t ts) { // above. Hence, no need to bother parallelising this bit. if (std::get<0>(data).timestamp <= timestamp_) { std::get<0>(data).timestamp = ts; - if (cb_) { + //if (cb_) { dlk.lock(); try { - cb_(std::get<0>(data),std::get<1>(data)); + cb_.trigger(std::get<0>(data),std::get<1>(data)); } catch (std::exception &e) { LOG(ERROR) << "Exception in packet callback: " << e.what(); } data_.pop_back(); - } + //} } else if (std::get<0>(data).timestamp > extended_ts) { break; } diff --git a/components/streams/src/netstream.cpp b/components/streams/src/netstream.cpp index 31cc9ddc050e5054431fccbc16688514a560c2fa..361df1b92cd2421a87b07fca1010620c1c562847 100644 --- a/components/streams/src/netstream.cpp +++ b/components/streams/src/netstream.cpp @@ -52,11 +52,6 @@ Net::~Net() { end(); } -bool Net::onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { - cb_ = f; - return true; -} - bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { if (!active_) return false; @@ -223,10 +218,10 @@ bool Net::begin() { //LOG(INFO) << "AVAILABLE: " << (int)spkt.channel; } - if (cb_) { - cb_(spkt, pkt); + //if (cb_) { + cb_.trigger(spkt, pkt); if (pkt.data.size() > 0) _checkDataRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp); - } + //} }); auto p = net_->findOne<ftl::UUID>("find_stream", uri_); diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index 782c42e82034455ee400ad26d748d63103ed47d9..68b32c8cf0fab992c49e1fa1881f5e5b3b9fdd8a 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -75,6 +75,12 @@ ftl::streams::BaseBuilder &Receiver::builder(uint32_t id) { } } +void Receiver::registerBuilder(const std::shared_ptr<ftl::streams::BaseBuilder> &b) { + auto i = builders_.find(b->id()); + if (i != builders_.end()) throw FTL_Error("Builder already exists"); + builders_[b->id()] = b; +} + //void Receiver::onAudio(const ftl::audio::FrameSet::Callback &cb) { // audio_cb_ = cb; //} @@ -395,33 +401,30 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { } void Receiver::setStream(ftl::stream::Stream *s) { - if (stream_) { - stream_->onPacket(nullptr); - } - + handle_.cancel(); stream_ = s; - s->onPacket([this](const StreamPacket &spkt, const Packet &pkt) { + handle_ = s->onPacket([this](const StreamPacket &spkt, const Packet &pkt) { const unsigned int channum = (unsigned int)spkt.channel; //LOG(INFO) << "PACKET: " << spkt.timestamp << ", " << (int)spkt.channel << ", " << (int)pkt.codec << ", " << (int)pkt.definition; // TODO: Allow for multiple framesets //if (spkt.frameSetID() > 0) LOG(INFO) << "Frameset " << spkt.frameSetID() << " received: " << (int)spkt.channel; - if (spkt.frameSetID() >= ftl::stream::kMaxStreams) return; + if (spkt.frameSetID() >= ftl::stream::kMaxStreams) return true; // Frameset level data channels if (spkt.frameNumber() == 255 && pkt.data.size() > 0) { _processData(spkt,pkt); - return; + return true; } // Too many frames, so ignore. //if (spkt.frameNumber() >= value("max_frames",32)) return; - if (spkt.frameNumber() >= 32 || ((1 << spkt.frameNumber()) & frame_mask_) == 0) return; + if (spkt.frameNumber() >= 32 || ((1 << spkt.frameNumber()) & frame_mask_) == 0) return true; // Dummy no data packet. - if (pkt.data.size() == 0) return; + if (pkt.data.size() == 0) return true; if (channum >= 64) { @@ -431,6 +434,7 @@ void Receiver::setStream(ftl::stream::Stream *s) { } else { _processVideo(spkt,pkt); } + return true; }); } diff --git a/components/streams/src/sender.cpp b/components/streams/src/sender.cpp index b3b0c2b31730ba763931a4b76f7c64d5230b1be4..5d9671bc1ba8a326be4d3d05ac2cc37016dbcde7 100644 --- a/components/streams/src/sender.cpp +++ b/components/streams/src/sender.cpp @@ -42,9 +42,9 @@ Sender::~Sender() { } void Sender::setStream(ftl::stream::Stream*s) { - if (stream_) stream_->onPacket(nullptr); + //if (stream_) stream_->onPacket(nullptr); stream_ = s; - stream_->onPacket([this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + handle_ = stream_->onPacket([this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { LOG(INFO) << "SENDER REQUEST : " << (int)spkt.channel; //if (state_cb_) state_cb_(spkt.channel, spkt.streamID, spkt.frame_number); @@ -53,6 +53,7 @@ void Sender::setStream(ftl::stream::Stream*s) { // Inject state packets //do_inject_ = true; do_inject_.clear(); + return true; }); } diff --git a/components/streams/src/stream.cpp b/components/streams/src/stream.cpp index e92a30183f7860df2b177c990d11c8a669933749..70e17a3d09688ad81bb686539dde1f524c81d55c 100644 --- a/components/streams/src/stream.cpp +++ b/components/streams/src/stream.cpp @@ -81,7 +81,7 @@ void Muxer::add(Stream *s, size_t fsid) { int i = streams_.size()-1; se.stream = s; - s->onPacket([this,s,i,fsid](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + handles_.push_back(std::move(s->onPacket([this,s,i,fsid](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { //TODO: Allow input streams to have other streamIDs // Same fsid means same streamIDs map together in the end @@ -95,7 +95,8 @@ void Muxer::add(Stream *s, size_t fsid) { _notify(spkt2, pkt); s->select(spkt.streamID, selected(fsid)); - }); + return true; + }))); } void Muxer::remove(Stream *s) { @@ -104,13 +105,6 @@ void Muxer::remove(Stream *s) { LOG(ERROR) << "NOT IMPLEMENTED"; } - -bool Muxer::onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &cb) { - UNIQUE_LOCK(mutex_,lk); - cb_ = cb; - return true; -} - int Muxer::originStream(size_t fsid, int fid) { if (fsid < ftl::stream::kMaxStreams && static_cast<uint32_t>(fid) < revmap_[fsid].size()) { return std::get<0>(revmap_[fsid][fid]); @@ -191,7 +185,7 @@ void Muxer::_notify(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Pa available(spkt.frameSetID()) += spkt.channel; try { - if (cb_) cb_(spkt, pkt); // spkt.frame_number < 255 && + cb_.trigger(spkt, pkt); // spkt.frame_number < 255 && } catch (std::exception &e) { LOG(ERROR) << "Exception in packet handler: " << e.what(); } @@ -211,35 +205,28 @@ void Broadcast::add(Stream *s) { UNIQUE_LOCK(mutex_,lk); streams_.push_back(s); - s->onPacket([this,s](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + handles_.push_back(std::move(s->onPacket([this,s](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { LOG(INFO) << "BCAST Request: " << (int)spkt.streamID << " " << (int)spkt.channel << " " << spkt.timestamp; SHARED_LOCK(mutex_, lk); if (spkt.frameSetID() < 255) available(spkt.frameSetID()) += spkt.channel; - if (cb_) cb_(spkt, pkt); + cb_.trigger(spkt, pkt); if (spkt.streamID < 255) s->select(spkt.streamID, selected(spkt.streamID)); - }); + return true; + }))); } void Broadcast::remove(Stream *s) { UNIQUE_LOCK(mutex_,lk); - s->onPacket(nullptr); + // TODO: Find and remove handle also streams_.remove(s); } void Broadcast::clear() { UNIQUE_LOCK(mutex_,lk); - for (auto s : streams_) { - s->onPacket(nullptr); - } + handles_.clear(); streams_.clear(); } -bool Broadcast::onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &cb) { - UNIQUE_LOCK(mutex_,lk); - cb_ = cb; - return true; -} - bool Broadcast::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { SHARED_LOCK(mutex_, lk); if (spkt.frameSetID() < 255) available(spkt.frameSetID()) += spkt.channel; @@ -297,22 +284,17 @@ void Intercept::setStream(Stream *s) { UNIQUE_LOCK(mutex_,lk); stream_ = s; - s->onPacket([this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + handles_.push_back(std::move(s->onPacket([this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { SHARED_LOCK(mutex_, lk); available(spkt.frameSetID()) += spkt.channel; - if (cb_) cb_(spkt, pkt); + cb_.trigger(spkt, pkt); if (intercept_) intercept_(spkt, pkt); stream_->select(spkt.streamID, selected(spkt.streamID)); - }); -} - -bool Intercept::onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &cb) { - UNIQUE_LOCK(mutex_,lk); - cb_ = cb; - return true; + return true; + }))); } -bool Intercept::onIntercept(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &cb) { +bool Intercept::onIntercept(const std::function<bool(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &cb) { UNIQUE_LOCK(mutex_,lk); intercept_ = cb; return true; diff --git a/components/streams/test/filestream_unit.cpp b/components/streams/test/filestream_unit.cpp index 46b3975e055e9fbbdc590f21e7c59a3d72481a19..35cc4a09df0d2c2f577ef648958735f86b021783 100644 --- a/components/streams/test/filestream_unit.cpp +++ b/components/streams/test/filestream_unit.cpp @@ -37,9 +37,10 @@ TEST_CASE("ftl::stream::File write and read", "[stream]") { reader->set("filename", "/tmp/ftl_file_stream_test.ftl"); ftl::codecs::StreamPacket tspkt = {4,0,0,1,ftl::codecs::Channel::Colour}; - REQUIRE( reader->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h = reader->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt = spkt; - }) ); + return true; + }); REQUIRE( reader->begin(false) ); //reader->tick(); @@ -65,10 +66,11 @@ TEST_CASE("ftl::stream::File write and read", "[stream]") { ftl::codecs::StreamPacket tspkt = {4,0,0,1,ftl::codecs::Channel::Colour}; int count = 0; - REQUIRE( reader->onPacket([&tspkt,&count](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h = reader->onPacket([&tspkt,&count](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt = spkt; ++count; - }) ); + return true; + }); REQUIRE( reader->begin(false) ); //reader->tick(); @@ -96,10 +98,11 @@ TEST_CASE("ftl::stream::File write and read", "[stream]") { ftl::codecs::StreamPacket tspkt = {4,0,0,1,ftl::codecs::Channel::Colour}; int count = 0; - REQUIRE( reader->onPacket([&tspkt,&count](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h = reader->onPacket([&tspkt,&count](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt = spkt; ++count; - }) ); + return true; + }); REQUIRE( reader->begin(false) ); //reader->tick(); diff --git a/components/streams/test/sender_unit.cpp b/components/streams/test/sender_unit.cpp index 3a33202dfda4351f456c4b9df97fafbc3c969ee5..5241e5c15042817854707546d803c8b5aaf15221 100644 --- a/components/streams/test/sender_unit.cpp +++ b/components/streams/test/sender_unit.cpp @@ -22,11 +22,6 @@ class TestStream : public ftl::stream::Stream { explicit TestStream(nlohmann::json &config) : ftl::stream::Stream(config) {}; ~TestStream() {}; - bool onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &cb) { - cb_ = cb; - return true; - } - bool onIntercept(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &cb) { icb_ = cb; return true; @@ -42,7 +37,7 @@ class TestStream : public ftl::stream::Stream { } else { select(spkt.frameSetID(), selected(spkt.frameSetID()) + spkt.channel); } - if (cb_) cb_(spkt, pkt); + cb_.trigger(spkt, pkt); } if (icb_) icb_(spkt, pkt); return true; @@ -53,7 +48,7 @@ class TestStream : public ftl::stream::Stream { bool active() override { return true; } private: - std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> cb_; + //std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> cb_; std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> icb_; }; diff --git a/components/streams/test/stream_unit.cpp b/components/streams/test/stream_unit.cpp index aa2093a4045e0b70fc6615480047b9e975180d1e..a9d67652aa230a56a1fdda7a275972f903d9c9ca 100644 --- a/components/streams/test/stream_unit.cpp +++ b/components/streams/test/stream_unit.cpp @@ -13,14 +13,9 @@ class TestStream : public ftl::stream::Stream { TestStream(nlohmann::json &config) : ftl::stream::Stream(config) {}; ~TestStream() {}; - bool onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &cb) { - cb_ = cb; - return true; - } - bool post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { available(spkt.streamID) += spkt.channel; - if (cb_) cb_(spkt, pkt); + cb_.trigger(spkt, pkt); return true; } @@ -29,7 +24,7 @@ class TestStream : public ftl::stream::Stream { bool active() override { return true; } private: - std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> cb_; + //std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> cb_; }; TEST_CASE("ftl::stream::Muxer()::write", "[stream]") { @@ -55,8 +50,9 @@ TEST_CASE("ftl::stream::Muxer()::write", "[stream]") { ftl::codecs::StreamPacket tspkt = {4,0,0,1,ftl::codecs::Channel::Colour};; - s->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h = s->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt = spkt; + return true; }); REQUIRE( !mux->post({4,100,0,1,ftl::codecs::Channel::Colour},{}) ); @@ -80,8 +76,9 @@ TEST_CASE("ftl::stream::Muxer()::write", "[stream]") { mux->add(s2); ftl::codecs::StreamPacket tspkt = {4,0,0,1,ftl::codecs::Channel::Colour}; - mux->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h = mux->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt = spkt; + return true; }); REQUIRE( s1->post({4,100,0,0,ftl::codecs::Channel::Colour},{}) ); @@ -94,11 +91,13 @@ TEST_CASE("ftl::stream::Muxer()::write", "[stream]") { ftl::codecs::StreamPacket tspkt2 = {4,0,0,1,ftl::codecs::Channel::Colour}; ftl::codecs::StreamPacket tspkt3 = {4,0,0,1,ftl::codecs::Channel::Colour}; - s1->onPacket([&tspkt2](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h2 = s1->onPacket([&tspkt2](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt2 = spkt; + return true; }); - s2->onPacket([&tspkt3](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h3 = s2->onPacket([&tspkt3](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt3 = spkt; + return true; }); REQUIRE( mux->post({4,200,0,1,ftl::codecs::Channel::Colour},{}) ); @@ -135,8 +134,9 @@ TEST_CASE("ftl::stream::Muxer()::post multi-frameset", "[stream]") { mux->add(s2,1); ftl::codecs::StreamPacket tspkt = {4,0,0,1,ftl::codecs::Channel::Colour}; - mux->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h = mux->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt = spkt; + return true; }); REQUIRE( s1->post({4,100,0,0,ftl::codecs::Channel::Colour},{}) ); @@ -149,11 +149,13 @@ TEST_CASE("ftl::stream::Muxer()::post multi-frameset", "[stream]") { ftl::codecs::StreamPacket tspkt2 = {4,0,0,1,ftl::codecs::Channel::Colour}; ftl::codecs::StreamPacket tspkt3 = {4,0,0,1,ftl::codecs::Channel::Colour}; - s1->onPacket([&tspkt2](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h2 = s1->onPacket([&tspkt2](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt2 = spkt; + return true; }); - s2->onPacket([&tspkt3](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h3 = s2->onPacket([&tspkt3](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt3 = spkt; + return true; }); REQUIRE( mux->post({4,200,1,0,ftl::codecs::Channel::Colour},{}) ); @@ -190,8 +192,9 @@ TEST_CASE("ftl::stream::Muxer()::read", "[stream]") { mux->add(s2); ftl::codecs::StreamPacket tspkt = {4,0,0,1,ftl::codecs::Channel::Colour}; - mux->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h = mux->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt = spkt; + return true; }); REQUIRE( s1->post({4,100,0,0,ftl::codecs::Channel::Colour},{}) ); @@ -228,8 +231,9 @@ TEST_CASE("ftl::stream::Muxer()::read", "[stream]") { mux->add(s2); ftl::codecs::StreamPacket tspkt = {4,0,0,1,ftl::codecs::Channel::Colour}; - mux->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h = mux->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt = spkt; + return true; }); REQUIRE( s1->post({4,100,0,0,ftl::codecs::Channel::Colour},{}) ); @@ -290,8 +294,9 @@ TEST_CASE("ftl::stream::Muxer()::read multi-frameset", "[stream]") { mux->add(s4,1); ftl::codecs::StreamPacket tspkt = {4,0,0,1,ftl::codecs::Channel::Colour}; - mux->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h = mux->onPacket([&tspkt](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt = spkt; + return true; }); REQUIRE( s1->post({4,100,0,0,ftl::codecs::Channel::Colour},{}) ); @@ -342,11 +347,13 @@ TEST_CASE("ftl::stream::Broadcast()::write", "[stream]") { ftl::codecs::StreamPacket tspkt1 = {4,0,0,1,ftl::codecs::Channel::Colour}; ftl::codecs::StreamPacket tspkt2 = {4,0,0,1,ftl::codecs::Channel::Colour}; - s1->onPacket([&tspkt1](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h1 = s1->onPacket([&tspkt1](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt1 = spkt; + return true; }); - s2->onPacket([&tspkt2](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + auto h2 = s2->onPacket([&tspkt2](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { tspkt2 = spkt; + return true; }); REQUIRE( mux->post({4,100,0,1,ftl::codecs::Channel::Colour},{}) );