diff --git a/include/ftl/protocol/broadcaster.hpp b/include/ftl/protocol/broadcaster.hpp index 081709d6ca51f7587d3b76b8751a15329bdbdff8..7a45622f3864ef897ebb234ce760041c9cf95339 100644 --- a/include/ftl/protocol/broadcaster.hpp +++ b/include/ftl/protocol/broadcaster.hpp @@ -28,13 +28,26 @@ class Broadcast : public Stream { void reset() override; - const std::list<std::shared_ptr<Stream>> &streams() const { return streams_; } + std::list<std::shared_ptr<Stream>> streams() const; + + void setProperty(ftl::protocol::StreamProperty opt, int value) override; + + int getProperty(ftl::protocol::StreamProperty opt) override; + + bool supportsProperty(ftl::protocol::StreamProperty opt) override; + + StreamType type() const override; private: - std::list<std::shared_ptr<Stream>> streams_; - std::list<ftl::Handle> handles_; - //StreamCallback cb_; - SHARED_MUTEX mutex_; + + struct StreamEntry { + std::shared_ptr<Stream> stream; + ftl::Handle handle; + ftl::Handle req_handle; + ftl::Handle avail_handle; + }; + + std::list<StreamEntry> streams_; }; } diff --git a/include/ftl/protocol/frameid.hpp b/include/ftl/protocol/frameid.hpp new file mode 100644 index 0000000000000000000000000000000000000000..39610ca3e63f7224e0d0ef27642f8c2c966c2d9e --- /dev/null +++ b/include/ftl/protocol/frameid.hpp @@ -0,0 +1,55 @@ +#pragma once + +#include <cinttypes> + +namespace ftl { +namespace protocol { +/** + * Unique identifier for a single frame. This is stored as two 16bit numbers + * packed into a 32bit int. Every frame has a FrameID, as does every frameset. + * FrameID + Timestamp together will be a unique object within the system since + * frames cannot be duplicated. + */ +struct FrameID { + uint32_t id; + + /** + * Frameset ID for this frame. + */ + inline unsigned int frameset() const { return id >> 8; } + + /** + * Frame index within the frameset. This will correspond to the vector + * index in the frameset object. + */ + inline unsigned int source() const { return id & 0xff; } + + /** + * The packed int with both frameset ID and index. + */ + operator uint32_t() const { return id; } + + inline FrameID &operator=(int v) { id = v; return *this; } + + /** + * Create a frame ID using a frameset id and a source number. + * @param fs Frameset id + * @param s Source number inside frameset + */ + FrameID(unsigned int fs, unsigned int s) : id((fs << 8) + (s & 0xff) ) {} + explicit FrameID(uint32_t x) : id(x) {} + FrameID() : id(0) {} +}; + +} +} + +// custom specialization of std::hash can be injected in namespace std +template<> +struct std::hash<ftl::protocol::FrameID> +{ + std::size_t operator()(ftl::protocol::FrameID const& s) const noexcept + { + return std::hash<unsigned int>{}(s.id); + } +}; diff --git a/include/ftl/protocol/muxer.hpp b/include/ftl/protocol/muxer.hpp index 129cb467433d666562572631d4701e8be202e2f9..44041ad71c0a0b4caae32d7dcd045db7f75213c8 100644 --- a/include/ftl/protocol/muxer.hpp +++ b/include/ftl/protocol/muxer.hpp @@ -21,7 +21,7 @@ class Muxer : public Stream { explicit Muxer(); virtual ~Muxer(); - void add(const std::shared_ptr<Stream> &, size_t fsid=0, const std::function<int()> &cb=nullptr); + void add(const std::shared_ptr<Stream> &, int fsid=-1); void remove(const std::shared_ptr<Stream> &); //bool onPacket(const StreamCallback &) override; @@ -34,27 +34,55 @@ class Muxer : public Stream { void reset() override; - std::shared_ptr<Stream> originStream(size_t fsid, int fid); + bool enable(FrameID id) override; + + bool enable(FrameID id, ftl::protocol::Channel channel) override; + + bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override; + + void setProperty(ftl::protocol::StreamProperty opt, int value) override; + + int getProperty(ftl::protocol::StreamProperty opt) override; + + bool supportsProperty(ftl::protocol::StreamProperty opt) override; + + StreamType type() const override; + + std::shared_ptr<Stream> originStream(FrameID) const; private: struct StreamEntry { std::shared_ptr<Stream> stream; - std::unordered_map<int, std::vector<int>> maps; - uint32_t original_fsid = 0; ftl::Handle handle; - std::vector<int> ids; + ftl::Handle req_handle; + ftl::Handle avail_handle; + int id = 0; + int fixed_fs = -1; }; + std::unordered_map<int, int> fsmap_; + std::unordered_map<int, int> sourcecount_; + std::unordered_map<int64_t, FrameID> imap_; + std::unordered_map<FrameID, std::pair<FrameID, Muxer::StreamEntry*>> omap_; + std::list<StreamEntry> streams_; - std::vector<std::pair<StreamEntry*,int>> revmap_[kMaxStreams]; + //std::vector<std::pair<StreamEntry*,int>> revmap_[kMaxStreams]; //std::list<ftl::Handle> handles_; - int nid_[kMaxStreams]; + //int nid_[kMaxStreams]; //StreamCallback cb_; - SHARED_MUTEX mutex_; + mutable SHARED_MUTEX mutex_; + std::atomic_int stream_ids_ = 0; + std::atomic_int framesets_ = 0; + + /* On packet receive, map to local ID */ + FrameID _mapFromInput(StreamEntry *, FrameID id); + + /* On posting, map to output ID */ + std::pair<FrameID, StreamEntry*> _mapToOutput(FrameID id) const; - void _notify(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt); - int _lookup(size_t fsid, StreamEntry *se, int ssid, int count); - void _forward(const std::string &name); + //void _notify(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt); + //int _lookup(size_t fsid, StreamEntry *se, int ssid, int count); + //void _forward(const std::string &name); }; } diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp index 21b6ef74ad14e8ff14e3169910dbff0259449179..5f24b61504eb46d4d8f96ceb2c154c2f2fe1c758 100644 --- a/include/ftl/protocol/streams.hpp +++ b/include/ftl/protocol/streams.hpp @@ -11,6 +11,7 @@ #include <ftl/protocol/channels.hpp> #include <ftl/protocol/channelSet.hpp> #include <ftl/protocol/packet.hpp> +#include <ftl/protocol/frameid.hpp> #include <string> #include <vector> #include <unordered_set> @@ -20,8 +21,7 @@ namespace protocol { /* Represents a request for data through a stream */ struct Request { - uint32_t frameset; - uint32_t frame; + FrameID id; ftl::protocol::Channel channel; int bitrate; int count; @@ -32,12 +32,15 @@ using RequestCallback = std::function<bool(const ftl::protocol::Request&)>; using StreamCallback = std::function<bool(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &)>; -enum struct StreamOption { +enum struct StreamProperty { kInvalid = 0, kLooping, kSpeed, kBitrate, - kAdaptiveBitrate + kMaxBitrate, + kAdaptiveBitrate, + kObservers, + kURI }; enum struct StreamType { @@ -58,17 +61,21 @@ class Stream { public: virtual ~Stream() {}; + virtual std::string name() const; + /** * Obtain all packets for next frame. The provided callback function is * called once for every packet. This function might continue to call the * callback even after the read function returns, for example with a * NetStream. */ - ftl::Handle onPacket(const StreamCallback &cb) { return cb_.on(cb); }; + ftl::Handle onPacket(const StreamCallback &cb) { return cb_.on(cb); } + + ftl::Handle onRequest(const std::function<bool(const Request &)> &cb) { return request_cb_.on(cb); } virtual bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &)=0; - // TODO: Add methods for: pause, paused, statistics, stream type, options + // TODO: Add methods for: pause, paused, statistics /** * Start the stream. Calls to the onPacket callback will only occur after @@ -86,54 +93,115 @@ class Stream { virtual bool active()=0; /** - * Perform a forced reset of the stream. This means something different - * depending on stream type, for example with a network stream it involves - * resending all stream requests as if a reconnection had occured. + * @brief Clear all state. This will remove all information about available + * and enabled frames or channels. You will then need to enable frames and + * channels again. If active the stream will remain active. + * */ virtual void reset(); /** - * Query available video channels for a frameset. + * @brief Re-request all channels and state. This will also cause video encoding + * to generate new I-frames as if a new connection is made. All persistent data + * channels would also become available. For file streams this would reset the + * stream to the beginning of the file. + * + */ + virtual void refresh(); + + /** + * @brief Check if a frame is available. + * + * @param id Frameset and frame number + * @return true if data is available for the frame + * @return false if no data has been seen */ - const ftl::protocol::ChannelSet &available(int fs) const; + bool available(FrameID id) const; + + bool available(FrameID id, ftl::protocol::Channel channel) const; + + bool available(FrameID id, const ftl::protocol::ChannelSet channels) const; + + ftl::Handle onAvailable(const std::function<bool(FrameID, ftl::protocol::Channel)> &cb) { return avail_cb_.on(cb); } /** - * Query selected channels for a frameset. Channels not selected may not - * be transmitted, received or decoded. + * @brief Get all channels seen for a frame. If the frame does not exist then + * an empty set is returned. + * + * @param id Frameset and frame number + * @return Set of all seen channels, or empty. */ - ftl::protocol::ChannelSet selected(int fs) const; + ftl::protocol::ChannelSet channels(FrameID id) const; - ftl::protocol::ChannelSet selectedNoExcept(int fs) const; + ftl::protocol::ChannelSet enabledChannels(FrameID id) const; /** - * Change the video channel selection for a frameset. + * @brief Get all available frames in the stream. + * + * @return Set of frame IDs */ - void select(int fs, const ftl::protocol::ChannelSet &, bool make=false); + std::unordered_set<FrameID> frames() const; + + /** + * @brief Get all enabled frames in the stream. + * + * @return Set of frame IDs + */ + std::unordered_set<FrameID> enabled() const; + + /** + * @brief Check if a frame is enabled. + * + * @param id Frameset and frame number + * @return true if data for this frame is enabled. + * @return false if data not enabled or frame does not exist. + */ + bool enabled(FrameID id) const; + + bool enabled(FrameID id, ftl::protocol::Channel channel) const; /** * Number of framesets in stream. */ inline size_t size() const { return state_.size(); } - virtual bool enable(uint8_t fs, uint8_t f) { return true; } + virtual bool enable(FrameID id); + + virtual bool enable(FrameID id, ftl::protocol::Channel channel); + + virtual bool enable(FrameID id, const ftl::protocol::ChannelSet &channels); + + // TODO: Disable + + virtual void setProperty(ftl::protocol::StreamProperty opt, int value)=0; + + virtual int getProperty(ftl::protocol::StreamProperty opt)=0; + + virtual bool supportsProperty(ftl::protocol::StreamProperty opt)=0; + + virtual StreamType type() const { return StreamType::kUnknown; } protected: - ftl::Handler<const ftl::protocol::StreamPacket&, const ftl::protocol::Packet&> cb_; + void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt); - /** - * Allow modification of available channels. Calling this with an invalid - * fs number will create that frameset and increase the size. - */ - ftl::protocol::ChannelSet &available(int fs); + void seen(FrameID id, ftl::protocol::Channel channel); + + void request(const Request &req); + + mutable SHARED_MUTEX mtx_; private: struct FSState { + bool enabled = false; ftl::protocol::ChannelSet selected; ftl::protocol::ChannelSet available; + // TODO: Add a name and metadata }; - std::vector<FSState> state_; - mutable SHARED_MUTEX mtx_; + ftl::Handler<const ftl::protocol::StreamPacket&, const ftl::protocol::Packet&> cb_; + ftl::Handler<const Request &> request_cb_; + ftl::Handler<FrameID, ftl::protocol::Channel> avail_cb_; + std::unordered_map<int, FSState> state_; }; } diff --git a/src/streams/broadcaster.cpp b/src/streams/broadcaster.cpp index c2471aa60e5f57b6d04716e02f3ffc4088996575..6315314619eb561ab8f2717c57fbea664b1cd779 100644 --- a/src/streams/broadcaster.cpp +++ b/src/streams/broadcaster.cpp @@ -3,6 +3,8 @@ using ftl::protocol::Broadcast; using ftl::protocol::StreamPacket; using ftl::protocol::Packet; +using ftl::protocol::Channel; +using ftl::protocol::FrameID; Broadcast::Broadcast() { @@ -13,39 +15,51 @@ Broadcast::~Broadcast() { } void Broadcast::add(const std::shared_ptr<Stream> &s) { - UNIQUE_LOCK(mutex_,lk); - - streams_.push_back(s); - handles_.push_back(std::move(s->onPacket([this,s](const StreamPacket &spkt, const Packet &pkt) { - //LOG(INFO) << "BCAST Request: " << (int)spkt.streamID << " " << (int)spkt.channel << " " << spkt.timestamp; - SHARED_LOCK(mutex_, lk); - if (spkt.frameSetID() < 255) available(spkt.frameSetID()) += spkt.channel; - cb_.trigger(spkt, pkt); - if (spkt.streamID < 255) s->select(spkt.streamID, selected(spkt.streamID)); + UNIQUE_LOCK(mtx_,lk); + + auto &entry = streams_.emplace_back(); + entry.stream = s; + + entry.handle = std::move(s->onPacket([this,s](const StreamPacket &spkt, const Packet &pkt) { + trigger(spkt, pkt); return true; - }))); + })); + + entry.avail_handle = std::move(s->onAvailable([this,s](FrameID id, Channel channel) { + seen(id, channel); + return true; + })); + + entry.req_handle = std::move(s->onRequest([this,s](const ftl::protocol::Request &req) { + request(req); + return true; + })); } void Broadcast::remove(const std::shared_ptr<Stream> &s) { - UNIQUE_LOCK(mutex_,lk); - // TODO: Find and remove handle also - streams_.remove(s); + UNIQUE_LOCK(mtx_,lk); + for (auto it = streams_.begin(); it != streams_.end(); ++it) { + if (it->stream == s) { + it->handle.cancel(); + it->req_handle.cancel(); + it->avail_handle.cancel(); + streams_.erase(it); + break; + } + } } void Broadcast::clear() { - UNIQUE_LOCK(mutex_,lk); - handles_.clear(); + UNIQUE_LOCK(mtx_,lk); streams_.clear(); } bool Broadcast::post(const StreamPacket &spkt, const Packet &pkt) { - SHARED_LOCK(mutex_, lk); - if (spkt.frameSetID() < 255) available(spkt.frameSetID()) += spkt.channel; + //SHARED_LOCK(mtx_, lk); bool status = true; - for (auto s : streams_) { - //s->select(spkt.frameSetID(), selected(spkt.frameSetID())); - status = status && s->post(spkt, pkt); + for (auto &s : streams_) { + status = s.stream->post(spkt, pkt) && status; } return status; } @@ -53,7 +67,7 @@ bool Broadcast::post(const StreamPacket &spkt, const Packet &pkt) { bool Broadcast::begin() { bool r = true; for (auto &s : streams_) { - r = r && s->begin(); + r = r && s.stream->begin(); } return r; } @@ -61,7 +75,7 @@ bool Broadcast::begin() { bool Broadcast::end() { bool r = true; for (auto &s : streams_) { - r = r && s->end(); + r = r && s.stream->end(); } return r; } @@ -70,13 +84,30 @@ bool Broadcast::active() { if (streams_.size() == 0) return false; bool r = true; for (auto &s : streams_) { - r = r && s->active(); + r = r && s.stream->active(); } return r; } void Broadcast::reset() { + SHARED_LOCK(mtx_, lk); for (auto &s : streams_) { - s->reset(); + s.stream->reset(); } } + +void Broadcast::setProperty(ftl::protocol::StreamProperty opt, int value) { + +} + +int Broadcast::getProperty(ftl::protocol::StreamProperty opt) { + return 0; +} + +bool Broadcast::supportsProperty(ftl::protocol::StreamProperty opt) { + return false; +} + +ftl::protocol::StreamType Broadcast::type() const { + return ftl::protocol::StreamType::kUnknown; +} diff --git a/src/streams/muxer.cpp b/src/streams/muxer.cpp index b21ea5305495a63e2030d3f65176f40f5fe98ae2..edde7d0fa8527b07ee47367b27c49f7eab6b4be1 100644 --- a/src/streams/muxer.cpp +++ b/src/streams/muxer.cpp @@ -4,80 +4,95 @@ using ftl::protocol::Muxer; using ftl::protocol::Stream; using ftl::protocol::StreamPacket; +using ftl::protocol::FrameID; +using ftl::protocol::StreamType; + +Muxer::Muxer() { -Muxer::Muxer() : nid_{0} { - //value("paused", false); - //_forward("paused"); } Muxer::~Muxer() { UNIQUE_LOCK(mutex_,lk); for (auto &se : streams_) { se.handle.cancel(); + se.req_handle.cancel(); + se.avail_handle.cancel(); } } -void Muxer::_forward(const std::string &name) { - /*on(name, [this,name]() { - auto val = getConfig()[name]; - UNIQUE_LOCK(mutex_,lk); - for (auto &se : streams_) { - se.stream->set(name, val); +FrameID Muxer::_mapFromInput(Muxer::StreamEntry *s, FrameID id) { + SHARED_LOCK(mutex_,lk); + int64_t iid = (int64_t(s->id) << 32) | id.id; + auto it = imap_.find(iid); + if (it != imap_.end()) { + return it->second; + } else { + // Otherwise allocate something. + lk.unlock(); + UNIQUE_LOCK(mutex_,ulk); + + FrameID newID; + if (s->fixed_fs >= 0) { + int source = sourcecount_[s->fixed_fs]++; + newID = FrameID(s->fixed_fs, source); + } else { + int fsiid = (s->id << 16) | id.frameset(); + if (fsmap_.count(fsiid) == 0) fsmap_[fsiid] = framesets_++; + newID = FrameID(fsmap_[fsiid], id.source()); } - });*/ + + imap_[iid] = newID; + auto &op = omap_[newID]; + op.first = id; + op.second = s; + //LOG(INFO) << "ADD MAPPING " << newID.frameset() << " - " << newID.source(); + return newID; + } } +std::pair<FrameID, Muxer::StreamEntry*> Muxer::_mapToOutput(FrameID id) const { + SHARED_LOCK(mutex_,lk); + auto it = omap_.find(id); + if (it != omap_.end()) { + return it->second; + } else { + // Bad + //LOG(ERROR) << "NO OUTPUT MAPPING " << id.frameset() << " - " << id.source(); + return {id, nullptr}; + } +} -void Muxer::add(const std::shared_ptr<Stream> &s, size_t fsid, const std::function<int()> &cb) { +void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) { UNIQUE_LOCK(mutex_,lk); - if (fsid < 0u || fsid >= ftl::protocol::kMaxStreams) return; auto &se = streams_.emplace_back(); - //int i = streams_.size()-1; + se.id = stream_ids_++; se.stream = s; - se.ids.push_back(fsid); + se.fixed_fs = fsid; Muxer::StreamEntry *ptr = &se; - se.handle = std::move(s->onPacket([this,s,ptr,cb](const StreamPacket &spkt, const Packet &pkt) { - //TODO: Allow input streams to have other streamIDs - // Same fsid means same streamIDs map together in the end - - /*ftl::stream::Muxer::StreamEntry *ptr = nullptr; - { - SHARED_LOCK(mutex_,lk); - ptr = &streams_[i]; - }*/ - - if (!cb && spkt.streamID > 0) { - LOG(WARNING) << "Multiple framesets in stream"; - return true; - } - - if (ptr->ids.size() <= spkt.streamID) { - UNIQUE_LOCK(mutex_,lk); - if (ptr->ids.size() <= spkt.streamID) { - ptr->ids.resize(spkt.streamID + 1); - ptr->ids[spkt.streamID] = cb(); - } - } - - int fsid; - { - SHARED_LOCK(mutex_, lk); - fsid = ptr->ids[spkt.streamID]; - } - + se.handle = std::move(s->onPacket([this,ptr](const StreamPacket &spkt, const Packet &pkt) { + FrameID newID = _mapFromInput(ptr, FrameID(spkt.streamID, spkt.frame_number)); + StreamPacket spkt2 = spkt; - ptr->original_fsid = spkt.streamID; // FIXME: Multiple originals needed - spkt2.streamID = fsid; + spkt2.streamID = newID.frameset(); + spkt2.frame_number = newID.source(); - if (spkt2.frame_number < 255) { - int id = _lookup(fsid, ptr, spkt.frame_number, pkt.frame_count); - spkt2.frame_number = id; - } + trigger(spkt2, pkt); + return true; + })); - _notify(spkt2, pkt); - s->select(spkt.streamID, selected(fsid), true); + se.avail_handle = std::move(s->onAvailable([this,ptr](FrameID id, Channel channel) { + FrameID newID = _mapFromInput(ptr, id); + seen(newID, channel); + return true; + })); + + se.req_handle = std::move(s->onRequest([this,ptr](const Request &req) { + FrameID newID = _mapFromInput(ptr, req.id); + Request newRequest = req; + newRequest.id = newID; + request(newRequest); return true; })); } @@ -86,15 +101,22 @@ void Muxer::remove(const std::shared_ptr<Stream> &s) { UNIQUE_LOCK(mutex_,lk); for (auto i = streams_.begin(); i != streams_.end(); ++i) { if (i->stream == s) { - i->handle.cancel(); auto *se = &(*i); - for (size_t j=0; j<kMaxStreams; ++j) { - for (auto &k : revmap_[j]) { - if (k.first == se) { - k.first = nullptr; - } - } + se->handle.cancel(); + se->req_handle.cancel(); + se->avail_handle.cancel(); + + // Cleanup imap and omap + for (auto j = imap_.begin(); j != imap_.end();) { + const auto &e = *j; + if (e.first >> 32 == se->id) j = imap_.erase(j); + else ++j; + } + for (auto j = omap_.begin(); j != omap_.end();) { + const auto &e = *j; + if (e.second.second == se) j = omap_.erase(j); + else ++j; } streams_.erase(i); @@ -103,33 +125,18 @@ void Muxer::remove(const std::shared_ptr<Stream> &s) { } } -std::shared_ptr<Stream> Muxer::originStream(size_t fsid, int fid) { - if (fsid < ftl::protocol::kMaxStreams && static_cast<uint32_t>(fid) < revmap_[fsid].size()) { - return std::get<0>(revmap_[fsid][fid])->stream; - } - return nullptr; +std::shared_ptr<Stream> Muxer::originStream(FrameID id) const { + auto p = _mapToOutput(id); + return (p.second) ? p.second->stream : nullptr; } bool Muxer::post(const StreamPacket &spkt, const Packet &pkt) { - SHARED_LOCK(mutex_, lk); - if (pkt.data.size() > 0 || !(spkt.flags & ftl::protocol::kFlagRequest)) available(spkt.frameSetID()) += spkt.channel; - - if (spkt.streamID < ftl::protocol::kMaxStreams && spkt.frame_number < revmap_[spkt.streamID].size()) { - auto [se, ssid] = revmap_[spkt.streamID][spkt.frame_number]; - //auto &se = streams_[sid]; - - if (!se) return false; - - //LOG(INFO) << "POST " << spkt.frame_number; - - StreamPacket spkt2 = spkt; - spkt2.streamID = se->original_fsid; // FIXME: Multiple possible originals - spkt2.frame_number = ssid; - se->stream->select(spkt2.streamID, selected(spkt.frameSetID())); - return se->stream->post(spkt2, pkt); - } else { - return false; - } + auto p = _mapToOutput(FrameID(spkt.streamID, spkt.frame_number)); + if (!p.second) return false; + StreamPacket spkt2 = spkt; + spkt2.streamID = p.first.frameset(); + spkt2.frame_number = p.first.source(); + return p.second->stream->post(spkt2, pkt); } bool Muxer::begin() { @@ -162,52 +169,50 @@ void Muxer::reset() { } } -int Muxer::_lookup(size_t fsid, Muxer::StreamEntry *se, int ssid, int count) { - SHARED_LOCK(mutex_, lk); - - auto i = se->maps.find(fsid); - if (i == se->maps.end()) { - lk.unlock(); - { - UNIQUE_LOCK(mutex_, lk2); - if (se->maps.count(fsid) == 0) { - se->maps[fsid] = {}; - } - i = se->maps.find(fsid); - } - lk.lock(); - } +bool Muxer::enable(FrameID id) { + auto p = _mapToOutput(id); + if (!p.second) return false; + return p.second->stream->enable(p.first); +} - auto &map = i->second; +bool Muxer::enable(FrameID id, ftl::protocol::Channel channel) { + auto p = _mapToOutput(id); + if (!p.second) return false; + return p.second->stream->enable(p.first, channel); +} - if (static_cast<uint32_t>(ssid) >= map.size()) { - lk.unlock(); - { - UNIQUE_LOCK(mutex_, lk2); - while (static_cast<uint32_t>(ssid) >= map.size()) { - int nid = nid_[fsid]++; - revmap_[fsid].push_back({se, static_cast<uint32_t>(map.size())}); - map.push_back(nid); - for (int j=1; j<count; ++j) { - int nid2 = nid_[fsid]++; - revmap_[fsid].push_back({se, static_cast<uint32_t>(map.size())}); - map.push_back(nid2); - } - } - } - lk.lock(); +bool Muxer::enable(FrameID id, const ftl::protocol::ChannelSet &channels) { + auto p = _mapToOutput(id); + if (!p.second) return false; + return p.second->stream->enable(p.first, channels); +} + +void Muxer::setProperty(ftl::protocol::StreamProperty opt, int value) { + for (auto &s : streams_) { + s.stream->setProperty(opt, value); } - return map[ssid]; } -void Muxer::_notify(const StreamPacket &spkt, const Packet &pkt) { - SHARED_LOCK(mutex_, lk); - available(spkt.frameSetID()) += spkt.channel; +int Muxer::getProperty(ftl::protocol::StreamProperty opt) { + for (auto &s : streams_) { + if (s.stream->supportsProperty(opt)) return s.stream->getProperty(opt); + } + return 0; +} + +bool Muxer::supportsProperty(ftl::protocol::StreamProperty opt) { + for (auto &s : streams_) { + if (s.stream->supportsProperty(opt)) return true; + } + return false; +} - try { - cb_.trigger(spkt, pkt); // spkt.frame_number < 255 && - } catch (std::exception &e) { - LOG(ERROR) << "Exception in packet handler (" << int(spkt.channel) << "): " << e.what(); - //reset(); // Force stream reset here to get new i-frames +StreamType Muxer::type() const { + StreamType t = StreamType::kUnknown; + for (const auto &s : streams_) { + const StreamType tt = s.stream->type(); + if (t == StreamType::kUnknown) t = tt; + else if (t != tt) t = StreamType::kMixed; } + return t; } diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index 3b35584cf8199008c976721a81b4df97600936a0..f90478aa2aef1d7c7f17efbfd0bcdd59272330ff 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -19,6 +19,7 @@ using ftl::protocol::StreamPacketMSGPACK; using ftl::protocol::Packet; using ftl::protocol::Channel; using ftl::protocol::Codec; +using ftl::protocol::FrameID; using ftl::protocol::kAllFrames; using ftl::protocol::kAllFramesets; using std::string; @@ -113,7 +114,7 @@ bool Net::post(const StreamPacket &spkt, const Packet &pkt) { // Lock to prevent clients being added / removed { SHARED_LOCK(mutex_,lk); - available(spkt.frameSetID()) += spkt.channel; + //available(spkt.frameSetID()) += spkt.channel; // Map the frameset ID from a local one to a remote one StreamPacketMSGPACK spkt_net = *((StreamPacketMSGPACK*)&spkt); @@ -225,6 +226,10 @@ bool Net::begin() { // Map remote frameset ID to a local one. spkt.streamID = _remoteToLocalFS(spkt.streamID); + FrameID localFrame(spkt.streamID, spkt.frame_number); + + seen(localFrame, spkt.channel); + // Manage recuring requests if (!host_ && last_frame_ != spkt.timestamp) { UNIQUE_LOCK(mutex_, lk); @@ -246,7 +251,7 @@ bool Net::begin() { } }*/ - if (size() > spkt.frameSetID()) { + /*if (size() > spkt.frameSetID()) { auto sel = selected(spkt.frameSetID()); // A change in channel selections, so send those requests now @@ -258,18 +263,18 @@ bool Net::begin() { _sendRequest(c, spkt.frameSetID(), kAllFrames, kFramesToRequest, 255); } } - } + }*/ // Are we close to reaching the end of our frames request? if (tally_ <= 5) { // Yes, so send new requests - for (size_t i = 0; i < size(); ++i) { - const auto &sel = selected(i); + //for (size_t i = 0; i < size(); ++i) { + const auto &sel = enabledChannels(localFrame); for (auto c : sel) { - _sendRequest(c, i, kAllFrames, kFramesToRequest, 255); + _sendRequest(c, localFrame.frameset(), kAllFrames, kFramesToRequest, 255); } - } + //} tally_ = kFramesToRequest; } else { --tally_; @@ -287,10 +292,10 @@ bool Net::begin() { _processRequest(p, spkt, pkt); } else { // FIXME: Allow availability to change... - available(spkt.frameSetID()) += spkt.channel; + //available(spkt.frameSetID()) += spkt.channel; } - cb_.trigger(spkt, pkt); + trigger(spkt, pkt); if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp); }); @@ -334,21 +339,28 @@ bool Net::begin() { return true; } -void Net::reset() { +void Net::refresh() { + Stream::refresh(); + UNIQUE_LOCK(mutex_, lk); - for (size_t i = 0; i < size(); ++i) { - auto sel = selected(i); + for (const auto &i : enabled()) { + auto sel = enabledChannels(i); for (auto c : sel) { - _sendRequest(c, i, kAllFrames, kFramesToRequest, 255, true); + _sendRequest(c, i.frameset(), kAllFrames, kFramesToRequest, 255, true); } } tally_ = kFramesToRequest; } -bool Net::enable(uint8_t fs, uint8_t f) { - if (host_) { return false; } +void Net::reset() { + Stream::reset(); +} + +bool Net::_enable(FrameID id) { + if (host_) { return false; } + if (enabled(id)) return true; // not hosting, try to find peer now // First find non-proxy version, then check for proxy version if no match @@ -368,12 +380,27 @@ bool Net::enable(uint8_t fs, uint8_t f) { } // TODO: check return value - net_->send(*peer_, "enable_stream", uri_, fs, 0); - _sendRequest(Channel::kColour, fs, kAllFrames, kFramesToRequest, 255, true); + net_->send(*peer_, "enable_stream", uri_, id.frameset(), id.source()); + return true; +} + +bool Net::enable(FrameID id) { + if (host_) { return false; } + if (!_enable(id)) return false; + if (!Stream::enable(id)) return false; + _sendRequest(Channel::kColour, id.frameset(), kAllFrames, kFramesToRequest, 255, true); return true; } +bool Net::enable(FrameID id, Channel c) { + if (host_) { return false; } + if (!_enable(id)) return false; + if (!Stream::enable(id, c)) return false; + _sendRequest(c, id.frameset(), kAllFrames, kFramesToRequest, 255, true); + return true; +} + bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t count, uint8_t bitrate, bool doreset) { if (!active_ || host_) return false; @@ -508,3 +535,19 @@ bool Net::end() { bool Net::active() { return active_; } + +void Net::setProperty(ftl::protocol::StreamProperty opt, int value) { + +} + +int Net::getProperty(ftl::protocol::StreamProperty opt) { + return 0; +} + +bool Net::supportsProperty(ftl::protocol::StreamProperty opt) { + return false; +} + +ftl::protocol::StreamType Net::type() const { + return ftl::protocol::StreamType::kLive; +} diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index 04f97f716d309f06ac23421a80c03d472cb09f95..b41bf3f252a881402b9c070dfbaf7b142d765bb8 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -48,9 +48,16 @@ class Net : public Stream { bool end() override; bool active() override; - bool enable(uint8_t fs, uint8_t f) override; + bool enable(FrameID id) override; + bool enable(FrameID id, ftl::protocol::Channel c) override; void reset() override; + void refresh() override; + + void setProperty(ftl::protocol::StreamProperty opt, int value) override; + int getProperty(ftl::protocol::StreamProperty opt) override; + bool supportsProperty(ftl::protocol::StreamProperty opt) override; + StreamType type() const override; inline const ftl::UUID &getPeer() const { if (host_) { throw FTL_Error("Net::getPeer() not possible, hosting stream"); } @@ -105,6 +112,7 @@ private: std::list<detail::StreamClient> clients_; + bool _enable(FrameID id); bool _processRequest(ftl::net::Peer &p, ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt); void _checkRXRate(size_t rx_size, int64_t rx_latency, int64_t ts); void _checkTXRate(size_t tx_size, int64_t tx_latency, int64_t ts); diff --git a/src/streams/streams.cpp b/src/streams/streams.cpp index bdf14b5d5c3a9f9dc8dd09da1e71a01e610330b1..52967498822b371470889427c9e5ab05286b4e1a 100644 --- a/src/streams/streams.cpp +++ b/src/streams/streams.cpp @@ -3,43 +3,142 @@ using ftl::protocol::Stream; using ftl::protocol::Channel; using ftl::protocol::ChannelSet; +using ftl::protocol::FrameID; -const ChannelSet &Stream::available(int fs) const { +std::string Stream::name() const { + return "Unknown"; +} + +bool Stream::available(FrameID id) const { + SHARED_LOCK(mtx_, lk); + return state_.count(id) > 0; +} + +bool Stream::available(FrameID id, Channel channel) const { + SHARED_LOCK(mtx_, lk); + auto it = state_.find(id); + if (it != state_.end()) { + return it->second.available.count(channel) > 0; + } + return false; +} + +bool Stream::available(FrameID id, ChannelSet channels) const { SHARED_LOCK(mtx_, lk); - if (fs < 0 || static_cast<uint32_t>(fs) >= state_.size()) throw FTL_Error("Frameset index out-of-bounds: " << fs); - return state_[fs].available; + auto it = state_.find(id); + if (it != state_.end()) { + const auto &set = it->second.available; + for (auto channel : channels) { + if (set.count(channel) == 0) return false; + } + return true; + } + return false; } -ChannelSet Stream::selected(int fs) const { +ftl::protocol::ChannelSet Stream::channels(FrameID id) const { SHARED_LOCK(mtx_, lk); - if (fs < 0 || static_cast<uint32_t>(fs) >= state_.size()) throw FTL_Error("Frameset index out-of-bounds: " << fs); - return state_[fs].selected; + auto it = state_.find(id); + if (it != state_.end()) { + return it->second.available; + } + return {}; } -ChannelSet Stream::selectedNoExcept(int fs) const { - if (fs == 255) return {}; +std::unordered_set<FrameID> Stream::frames() const { + SHARED_LOCK(mtx_, lk); + std::unordered_set<FrameID> result; + for (const auto &s : state_) { + result.insert(FrameID(s.first)); + } + return result; +} + +std::unordered_set<FrameID> Stream::enabled() const { + SHARED_LOCK(mtx_, lk); + std::unordered_set<FrameID> result; + for (const auto &s : state_) { + if (s.second.enabled) { + result.emplace(s.first); + } + } + return result; +} +bool Stream::enabled(FrameID id) const { SHARED_LOCK(mtx_, lk); - if (fs < 0 || static_cast<uint32_t>(fs) >= state_.size()) return {}; - return state_[fs].selected; + auto it = state_.find(id); + if (it != state_.end()) { + return it->second.enabled; + } + return false; } -void Stream::select(int fs, const ChannelSet &s, bool make) { - if (fs == 255) return; +bool Stream::enabled(FrameID id, ftl::protocol::Channel channel) const { + SHARED_LOCK(mtx_, lk); + auto it = state_.find(id); + if (it != state_.end()) { + return it->second.selected.count(channel) > 0; + } + return false; +} +ftl::protocol::ChannelSet Stream::enabledChannels(FrameID id) const { + SHARED_LOCK(mtx_, lk); + auto it = state_.find(id); + if (it != state_.end()) { + return it->second.selected; + } + return {}; +} + +bool Stream::enable(FrameID id) { + UNIQUE_LOCK(mtx_, lk); + auto &p = state_[id]; + p.enabled = true; + return true; +} + +bool Stream::enable(FrameID id, ftl::protocol::Channel channel) { UNIQUE_LOCK(mtx_, lk); - if (fs < 0 || (!make && static_cast<uint32_t>(fs) >= state_.size())) throw FTL_Error("Frameset index out-of-bounds: " << fs); - if (static_cast<uint32_t>(fs) >= state_.size()) state_.resize(fs+1); - state_[fs].selected = s; + auto &p = state_[id]; + p.enabled = true; + p.selected.insert(channel); + return true; } -ChannelSet &Stream::available(int fs) { +bool Stream::enable(FrameID id, const ftl::protocol::ChannelSet &channels) { UNIQUE_LOCK(mtx_, lk); - if (fs < 0) throw FTL_Error("Frameset index out-of-bounds: " << fs); - if (static_cast<uint32_t>(fs) >= state_.size()) state_.resize(fs+1); - return state_[fs].available; + auto &p = state_[id]; + p.enabled = true; + p.selected.insert(channels.begin(), channels.end()); + return true; } void Stream::reset() { - // Clear available and selected? + UNIQUE_LOCK(mtx_, lk); + state_.clear(); +} + +void Stream::refresh() { + +} + +void Stream::trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { + cb_.trigger(spkt, pkt); +} + +void Stream::seen(FrameID id, ftl::protocol::Channel channel) { + if (!available(id, channel)) { + { + UNIQUE_LOCK(mtx_, lk); + auto &p = state_[id]; + p.available.insert(channel); + } + avail_cb_.trigger(id, channel); + } +} + +void Stream::request(const ftl::protocol::Request &req) { + request_cb_.trigger(req); } diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp index 7fd303cf500556b96c535a4bf2a53eac80f3d54e..3fe765116ad80180e8597534ff2e367bb3b41e90 100644 --- a/test/stream_integration.cpp +++ b/test/stream_integration.cpp @@ -6,6 +6,8 @@ #include <ftl/exception.hpp> #include <ftl/protocol/node.hpp> +using ftl::protocol::FrameID; + // --- Tests ------------------------------------------------------------------- TEST_CASE("TCP Stream", "[net]") { @@ -40,7 +42,7 @@ TEST_CASE("TCP Stream", "[net]") { s1->begin(); s2->begin(); - s2->enable(0, 0); + s2->enable(FrameID(0, 0)); // TODO: Find better option std::this_thread::sleep_for(std::chrono::milliseconds(10)); diff --git a/test/stream_unit.cpp b/test/stream_unit.cpp index fc920b6c0698289b871d92c7e8e7f9d00fd59ad2..8c39beb4b50d84e039ddc0a5744f8fb9429f345e 100644 --- a/test/stream_unit.cpp +++ b/test/stream_unit.cpp @@ -11,6 +11,7 @@ using ftl::protocol::Stream; using ftl::protocol::StreamPacket; using ftl::protocol::Packet; using ftl::protocol::Channel; +using ftl::protocol::FrameID; class TestStream : public ftl::protocol::Stream { public: @@ -18,8 +19,8 @@ class TestStream : public ftl::protocol::Stream { ~TestStream() {}; bool post(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { - available(spkt.streamID) += spkt.channel; - cb_.trigger(spkt, pkt); + seen(FrameID(spkt.streamID, spkt.frame_number), spkt.channel); + trigger(spkt, pkt); return true; } @@ -27,6 +28,12 @@ class TestStream : public ftl::protocol::Stream { bool end() override { return true; } bool active() override { return true; } + void setProperty(ftl::protocol::StreamProperty opt, int value) override {} + + int getProperty(ftl::protocol::StreamProperty opt) override { return 0; } + + bool supportsProperty(ftl::protocol::StreamProperty opt) override { return true; } + private: //std::function<void(const StreamPacket &, const Packet &)> cb_; }; @@ -70,12 +77,14 @@ TEST_CASE("ftl::stream::Muxer()::write", "[stream]") { }); REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 0 ); REQUIRE( tspkt.timestamp == 100 ); REQUIRE( tspkt.frame_number == 0 ); REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 1 ); REQUIRE( tspkt.timestamp == 101 ); - REQUIRE( tspkt.frame_number == 1 ); + REQUIRE( tspkt.frame_number == 0 ); StreamPacket tspkt2 = {4,0,0,1,Channel::kColour}; StreamPacket tspkt3 = {4,0,0,1,Channel::kColour}; @@ -88,8 +97,9 @@ TEST_CASE("ftl::stream::Muxer()::write", "[stream]") { return true; }); - REQUIRE( mux->post({4,200,0,1,Channel::kColour},{}) ); + REQUIRE( mux->post({4,200,1,0,Channel::kColour},{}) ); REQUIRE( tspkt3.timestamp == 200 ); + REQUIRE( tspkt3.streamID == 0 ); REQUIRE( tspkt3.frame_number == 0 ); } } @@ -149,8 +159,8 @@ TEST_CASE("ftl::stream::Muxer()::read", "[stream]") { std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); REQUIRE(s2); - mux->add(s1); - mux->add(s2); + mux->add(s1, 0); + mux->add(s2, 0); StreamPacket tspkt = {4,0,0,1,Channel::kColour}; auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { @@ -181,8 +191,8 @@ TEST_CASE("ftl::stream::Muxer()::read", "[stream]") { std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); REQUIRE(s2); - mux->add(s1); - mux->add(s2); + mux->add(s1, 0); + mux->add(s2, 0); StreamPacket tspkt = {4,0,0,1,Channel::kColour}; auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) {