Skip to content
Snippets Groups Projects
Commit b4255363 authored by Sebastian Hahta's avatar Sebastian Hahta
Browse files

pre-allocate frame id on request; increase default buffer size

parent 9c82bd22
Branches
Tags
No related merge requests found
......@@ -65,16 +65,16 @@ class Muxer : public Stream {
void reset() override;
/** enable(): frameset id 255 to apply for all available framesets, source id 255 for all available frames.
* If both 255: enable all frames on all framesets.
*/
bool enable(FrameID id) override;
bool enable(FrameID id, ftl::protocol::Channel channel) override;
bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override;
/** fsid 255/sid 255 trick does not apply for disable() (TODO?) */
void disable(FrameID id) override;
void disable(FrameID id, ftl::protocol::Channel channel) override;
void disable(FrameID id, const ftl::protocol::ChannelSet &channels) override;
void setProperty(ftl::protocol::StreamProperty opt, std::any value) override;
......@@ -133,6 +133,17 @@ class Muxer : public Stream {
*/
FrameID findLocal(const std::shared_ptr<Stream> &stream, FrameID remote) const;
/**
* @brief Find the system local ID for a stream specific ID, pre-allocate one if doesn't exist.
*
* Remote framesets for given remote FrameID will have the returned local FrameID.
*
* @param stream
* @param remote
* @return FrameID
*/
FrameID findOrCreateLocal(const std::shared_ptr<Stream>& stream, FrameID remote);
/**
* @brief Given a local frame ID, get the stream specific ID.
*
......@@ -161,10 +172,17 @@ class Muxer : public Stream {
int fixed_fs = -1;
};
/** map between local and remote framsets, first 16 bits for stream id, second 16 bits for frameset */
std::unordered_map<int, int> fsmap_;
std::unordered_map<int, int> sourcecount_;
/* map between stream specific ids to local ids
* packs stream id and frame id in uint64_t (first 32 bits for stream id, last for frameid) */
std::unordered_map<int64_t, FrameID> imap_;
/** map between local FrameID and remote (FrameID, StreamEntry) pair */
std::unordered_map<FrameID, std::pair<FrameID, Muxer::StreamEntry*>> omap_;
std::list<StreamEntry> streams_;
mutable SHARED_MUTEX mutex_;
std::atomic_int stream_ids_ = 0;
......
......@@ -161,6 +161,28 @@ FrameID Muxer::findLocal(const std::shared_ptr<Stream> &stream, FrameID remote)
}
}
FrameID Muxer::findOrCreateLocal(const std::shared_ptr<Stream> &stream, FrameID remote) {
StreamEntry *entry = nullptr;
{
SHARED_LOCK(mutex_, lk);
for (auto &e : streams_) {
if (e.stream == stream) {
entry = &e;
break;
}
}
}
if (entry) {
return _mapFromInput(entry, remote);
} else {
throw FTL_Error("No stream");
}
}
FrameID Muxer::findRemote(FrameID local) const {
auto m = _mapToOutput(local);
if (m.second == nullptr) {
......@@ -181,10 +203,13 @@ std::list<std::shared_ptr<Stream>> Muxer::streams() const {
void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) {
UNIQUE_LOCK(mutex_, lk);
// TODO: should this check if stream already exists?
auto &se = streams_.emplace_back();
se.id = stream_ids_++;
se.stream = s;
se.fixed_fs = fsid;
Muxer::StreamEntry *ptr = &se;
se.handle = std::move(s->onPacket([this, ptr](const StreamPacket &spkt, const DataPacket &pkt) {
......@@ -228,6 +253,7 @@ void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) {
error(err, str);
return true;
}));
}
void Muxer::remove(const std::shared_ptr<Stream> &s) {
......
......@@ -75,10 +75,10 @@ struct NetImplDetail {
// TODO(Seb): move to ServerSocket and ClientSocket
// Defaults, should be changed in config
#define TCP_SEND_BUFFER_SIZE (1024*1024)
#define TCP_RECEIVE_BUFFER_SIZE (1024*1024) // Perhaps try 24K?
#define WS_SEND_BUFFER_SIZE (1024*1024)
#define WS_RECEIVE_BUFFER_SIZE (62*1024)
#define TCP_SEND_BUFFER_SIZE (32*1024*1024)
#define TCP_RECEIVE_BUFFER_SIZE (32*1024*1024) // Perhaps try 24K?
#define WS_SEND_BUFFER_SIZE (32*1024*1024)
#define WS_RECEIVE_BUFFER_SIZE (32*1024*1024)
std::shared_ptr<Universe> Universe::instance_ = nullptr;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment