diff --git a/components/streams/include/ftl/streams/stream.hpp b/components/streams/include/ftl/streams/stream.hpp index b100e83caf6235c3bc9e3b2c367d64cbe6a1b8ca..69c5b628f3e42c35a6a9f0e8734a09214207625c 100644 --- a/components/streams/include/ftl/streams/stream.hpp +++ b/components/streams/include/ftl/streams/stream.hpp @@ -131,7 +131,7 @@ class Muxer : public Stream { void reset() override; - int originStream(size_t fsid, int fid); + ftl::stream::Stream *originStream(size_t fsid, int fid); private: struct StreamEntry { @@ -140,15 +140,15 @@ class Muxer : public Stream { uint32_t original_fsid = 0; }; - std::vector<StreamEntry> streams_; - std::vector<std::pair<size_t,int>> revmap_[kMaxStreams]; + std::list<StreamEntry> streams_; + std::vector<std::pair<StreamEntry*,int>> revmap_[kMaxStreams]; std::list<ftl::Handle> handles_; int nid_[kMaxStreams]; //StreamCallback cb_; SHARED_MUTEX mutex_; void _notify(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt); - int _lookup(size_t fsid, int sid, int ssid, int count); + int _lookup(size_t fsid, StreamEntry *se, int ssid, int count); }; /** diff --git a/components/streams/src/stream.cpp b/components/streams/src/stream.cpp index b97667d2b5e77e264d84a4e17e9f1414d52dd466..f57a4fe1147a460572c5b4651e50e37f14565697 100644 --- a/components/streams/src/stream.cpp +++ b/components/streams/src/stream.cpp @@ -84,25 +84,26 @@ void Muxer::add(Stream *s, size_t fsid) { if (fsid < 0u || fsid >= ftl::stream::kMaxStreams) return; auto &se = streams_.emplace_back(); - int i = streams_.size()-1; + //int i = streams_.size()-1; se.stream = s; + ftl::stream::Muxer::StreamEntry *ptr = &se; - handles_.push_back(std::move(s->onPacket([this,s,i,fsid](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + handles_.push_back(std::move(s->onPacket([this,s,fsid,ptr](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { //TODO: Allow input streams to have other streamIDs // Same fsid means same streamIDs map together in the end - ftl::stream::Muxer::StreamEntry *ptr = nullptr; + /*ftl::stream::Muxer::StreamEntry *ptr = nullptr; { SHARED_LOCK(mutex_,lk); ptr = &streams_[i]; - } + }*/ ftl::codecs::StreamPacket spkt2 = spkt; ptr->original_fsid = spkt.streamID; spkt2.streamID = fsid; if (spkt2.frame_number < 255) { - int id = _lookup(fsid, i, spkt.frame_number, pkt.frame_count); + int id = _lookup(fsid, ptr, spkt.frame_number, pkt.frame_count); spkt2.frame_number = id; } @@ -118,11 +119,11 @@ void Muxer::remove(Stream *s) { LOG(ERROR) << "NOT IMPLEMENTED"; } -int Muxer::originStream(size_t fsid, int fid) { +ftl::stream::Stream *Muxer::originStream(size_t fsid, int fid) { if (fsid < ftl::stream::kMaxStreams && static_cast<uint32_t>(fid) < revmap_[fsid].size()) { - return std::get<0>(revmap_[fsid][fid]); + return std::get<0>(revmap_[fsid][fid])->stream; } - return -1; + return nullptr; } bool Muxer::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { @@ -130,16 +131,16 @@ bool Muxer::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packe if (pkt.data.size() > 0 || !(spkt.flags & ftl::codecs::kFlagRequest)) available(spkt.frameSetID()) += spkt.channel; if (spkt.streamID < ftl::stream::kMaxStreams && spkt.frame_number < revmap_[spkt.streamID].size()) { - auto [sid, ssid] = revmap_[spkt.streamID][spkt.frame_number]; - auto &se = streams_[sid]; + auto [se, ssid] = revmap_[spkt.streamID][spkt.frame_number]; + //auto &se = streams_[sid]; //LOG(INFO) << "POST " << spkt.frame_number; ftl::codecs::StreamPacket spkt2 = spkt; - spkt2.streamID = se.original_fsid; + spkt2.streamID = se->original_fsid; spkt2.frame_number = ssid; - se.stream->select(spkt2.streamID, selected(spkt.frameSetID())); - return se.stream->post(spkt2, pkt); + se->stream->select(spkt2.streamID, selected(spkt.frameSetID())); + return se->stream->post(spkt2, pkt); } else { return false; } @@ -175,27 +176,27 @@ void Muxer::reset() { } } -int Muxer::_lookup(size_t fsid, int sid, int ssid, int count) { +int Muxer::_lookup(size_t fsid, ftl::stream::Muxer::StreamEntry *se, int ssid, int count) { SHARED_LOCK(mutex_, lk); - auto &se = streams_[sid]; - if (static_cast<uint32_t>(ssid) >= se.maps.size()) { + //auto &se = streams_[sid]; + if (static_cast<uint32_t>(ssid) >= se->maps.size()) { lk.unlock(); { UNIQUE_LOCK(mutex_, lk2); - while (static_cast<uint32_t>(ssid) >= se.maps.size()) { + while (static_cast<uint32_t>(ssid) >= se->maps.size()) { int nid = nid_[fsid]++; - revmap_[fsid].push_back({sid, static_cast<uint32_t>(se.maps.size())}); - se.maps.push_back(nid); + revmap_[fsid].push_back({se, static_cast<uint32_t>(se->maps.size())}); + se->maps.push_back(nid); for (int i=1; i<count; ++i) { int nid = nid_[fsid]++; - revmap_[fsid].push_back({sid, static_cast<uint32_t>(se.maps.size())}); - se.maps.push_back(nid); + revmap_[fsid].push_back({se, static_cast<uint32_t>(se->maps.size())}); + se->maps.push_back(nid); } } } lk.lock(); } - return se.maps[ssid]; + return se->maps[ssid]; } void Muxer::_notify(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {