diff --git a/include/ftl/protocol/muxer.hpp b/include/ftl/protocol/muxer.hpp index 4cb37d36a122e4fdbcee9198670a7802af707a0f..235d10e19abc1df18030f071de379126a717c98b 100644 --- a/include/ftl/protocol/muxer.hpp +++ b/include/ftl/protocol/muxer.hpp @@ -65,16 +65,16 @@ class Muxer : public Stream { void reset() override; + /** enable(): frameset id 255 to apply for all available framesets, source id 255 for all available frames. + * If both 255: enable all frames on all framesets. + */ bool enable(FrameID id) override; - bool enable(FrameID id, ftl::protocol::Channel channel) override; - bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override; + /** fsid 255/sid 255 trick does not apply for disable() (TODO?) */ void disable(FrameID id) override; - void disable(FrameID id, ftl::protocol::Channel channel) override; - void disable(FrameID id, const ftl::protocol::ChannelSet &channels) override; void setProperty(ftl::protocol::StreamProperty opt, std::any value) override; @@ -133,6 +133,17 @@ class Muxer : public Stream { */ FrameID findLocal(const std::shared_ptr<Stream> &stream, FrameID remote) const; + /** + * @brief Find the system local ID for a stream specific ID, pre-allocate one if doesn't exist. + * + * Remote framesets for given remote FrameID will have the returned local FrameID. + * + * @param stream + * @param remote + * @return FrameID + */ + FrameID findOrCreateLocal(const std::shared_ptr<Stream>& stream, FrameID remote); + /** * @brief Given a local frame ID, get the stream specific ID. * @@ -161,10 +172,17 @@ class Muxer : public Stream { int fixed_fs = -1; }; + /** map between local and remote framsets, first 16 bits for stream id, second 16 bits for frameset */ std::unordered_map<int, int> fsmap_; std::unordered_map<int, int> sourcecount_; + + /* map between stream specific ids to local ids + * packs stream id and frame id in uint64_t (first 32 bits for stream id, last for frameid) */ std::unordered_map<int64_t, FrameID> imap_; + + /** map between local FrameID and remote (FrameID, StreamEntry) pair */ std::unordered_map<FrameID, std::pair<FrameID, Muxer::StreamEntry*>> omap_; + std::list<StreamEntry> streams_; mutable SHARED_MUTEX mutex_; std::atomic_int stream_ids_ = 0; diff --git a/src/streams/muxer.cpp b/src/streams/muxer.cpp index 9c926b702062b9e59d8299b3be03d74728be6549..c4048f289174a1d4b5c94ea8eb9968be05d74107 100644 --- a/src/streams/muxer.cpp +++ b/src/streams/muxer.cpp @@ -132,7 +132,7 @@ FrameID Muxer::findLocal(const std::string &uri, FrameID remote) const { break; } } - } + } if (entry) { return _mapFromInput(entry, remote); @@ -161,6 +161,28 @@ FrameID Muxer::findLocal(const std::shared_ptr<Stream> &stream, FrameID remote) } } + +FrameID Muxer::findOrCreateLocal(const std::shared_ptr<Stream> &stream, FrameID remote) { + StreamEntry *entry = nullptr; + + { + SHARED_LOCK(mutex_, lk); + for (auto &e : streams_) { + if (e.stream == stream) { + entry = &e; + break; + } + } + } + + if (entry) { + return _mapFromInput(entry, remote); + } else { + throw FTL_Error("No stream"); + } +} + + FrameID Muxer::findRemote(FrameID local) const { auto m = _mapToOutput(local); if (m.second == nullptr) { @@ -181,10 +203,13 @@ std::list<std::shared_ptr<Stream>> Muxer::streams() const { void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) { UNIQUE_LOCK(mutex_, lk); + // TODO: should this check if stream already exists? + auto &se = streams_.emplace_back(); se.id = stream_ids_++; se.stream = s; se.fixed_fs = fsid; + Muxer::StreamEntry *ptr = &se; se.handle = std::move(s->onPacket([this, ptr](const StreamPacket &spkt, const DataPacket &pkt) { @@ -228,6 +253,7 @@ void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) { error(err, str); return true; })); + } void Muxer::remove(const std::shared_ptr<Stream> &s) { diff --git a/src/universe.cpp b/src/universe.cpp index 148f63f37b80ea8e89240b27fe06bce168fe8be3..57af06a1e0721d24abe2c55ac86a73addc0e3c7f 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -75,10 +75,11 @@ struct NetImplDetail { // TODO(Seb): move to ServerSocket and ClientSocket // Defaults, should be changed in config -#define TCP_SEND_BUFFER_SIZE (1024*1024*64) -#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*64) // Perhaps try 24K? -#define WS_SEND_BUFFER_SIZE (1024*1024) -#define WS_RECEIVE_BUFFER_SIZE (62*1024) + +#define TCP_SEND_BUFFER_SIZE (32*1024*1024) +#define TCP_RECEIVE_BUFFER_SIZE (32*1024*1024) // Perhaps try 24K? +#define WS_SEND_BUFFER_SIZE (32*1024*1024) +#define WS_RECEIVE_BUFFER_SIZE (32*1024*1024) std::shared_ptr<Universe> Universe::instance_ = nullptr;