diff --git a/components/streams/include/ftl/streams/stream.hpp b/components/streams/include/ftl/streams/stream.hpp index 69c5b628f3e42c35a6a9f0e8734a09214207625c..f67b6fd84d0f856fe80238cf5f6ca79070736fa2 100644 --- a/components/streams/include/ftl/streams/stream.hpp +++ b/components/streams/include/ftl/streams/stream.hpp @@ -138,11 +138,12 @@ class Muxer : public Stream { Stream *stream; std::vector<int> maps; uint32_t original_fsid = 0; + ftl::Handle handle; }; std::list<StreamEntry> streams_; std::vector<std::pair<StreamEntry*,int>> revmap_[kMaxStreams]; - std::list<ftl::Handle> handles_; + //std::list<ftl::Handle> handles_; int nid_[kMaxStreams]; //StreamCallback cb_; SHARED_MUTEX mutex_; diff --git a/components/streams/src/stream.cpp b/components/streams/src/stream.cpp index f57a4fe1147a460572c5b4651e50e37f14565697..57c9c5cd91b248ebec52cd36bc00db22ad61bd89 100644 --- a/components/streams/src/stream.cpp +++ b/components/streams/src/stream.cpp @@ -75,7 +75,10 @@ Muxer::Muxer(nlohmann::json &config) : Stream(config), nid_{0} { } Muxer::~Muxer() { - handles_.clear(); + UNIQUE_LOCK(mutex_,lk); + for (auto &se : streams_) { + se.handle.cancel(); + } } @@ -88,7 +91,7 @@ void Muxer::add(Stream *s, size_t fsid) { se.stream = s; ftl::stream::Muxer::StreamEntry *ptr = &se; - handles_.push_back(std::move(s->onPacket([this,s,fsid,ptr](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + se.handle = std::move(s->onPacket([this,s,fsid,ptr](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { //TODO: Allow input streams to have other streamIDs // Same fsid means same streamIDs map together in the end @@ -110,13 +113,28 @@ void Muxer::add(Stream *s, size_t fsid) { _notify(spkt2, pkt); s->select(spkt.streamID, selected(fsid)); return true; - }))); + })); } void Muxer::remove(Stream *s) { UNIQUE_LOCK(mutex_,lk); + for (auto i = streams_.begin(); i != streams_.end(); ++i) { + if (i->stream == s) { + i->handle.cancel(); + auto *se = &(*i); + + for (size_t j=0; j<kMaxStreams; ++j) { + for (auto &k : revmap_[j]) { + if (k.first == se) { + k.first = nullptr; + } + } + } - LOG(ERROR) << "NOT IMPLEMENTED"; + streams_.erase(i); + return; + } + } } ftl::stream::Stream *Muxer::originStream(size_t fsid, int fid) { @@ -134,6 +152,8 @@ bool Muxer::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packe auto [se, ssid] = revmap_[spkt.streamID][spkt.frame_number]; //auto &se = streams_[sid]; + if (!se) return false; + //LOG(INFO) << "POST " << spkt.frame_number; ftl::codecs::StreamPacket spkt2 = spkt;