diff --git a/components/streams/include/ftl/streams/builder.hpp b/components/streams/include/ftl/streams/builder.hpp index 466399b4afb261562c4b6274eee1acf642b5b94f..34572770485e9b0cafe3d79609bc234078469d1a 100644 --- a/components/streams/include/ftl/streams/builder.hpp +++ b/components/streams/include/ftl/streams/builder.hpp @@ -33,6 +33,8 @@ class BaseBuilder : public ftl::data::Generator { void setID(uint32_t id) { id_ = id; } void setPool(ftl::data::Pool *p) { pool_ = p; } void setBufferSize(size_t s) { bufferSize_ = s; } + void setMaxBufferSize(size_t s) { max_buffer_size_ = s; } + void setCompletionSize(size_t s) { completion_size_ = s; } inline ftl::Handle onFrameSet(const ftl::data::FrameSetCallback &cb) override { return cb_.on(cb); } @@ -53,6 +55,8 @@ class BaseBuilder : public ftl::data::Generator { int id_; size_t size_; size_t bufferSize_ = 1; + size_t max_buffer_size_ = 16; + size_t completion_size_ = 8; ftl::Handler<const ftl::data::FrameSetPtr&> cb_; ftl::data::ChangeType ctype_ = ftl::data::ChangeType::COMPLETED; }; @@ -153,8 +157,6 @@ class ForeignBuilder : public BaseBuilder { std::atomic<int> jobs_; volatile bool skip_; ftl::Handle main_id_; - size_t max_buffer_size_ = 16; - size_t completion_size_ = 8; std::string name_; diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index 67945bbf463720c6b7dd53617f454fa2a49d3b07..58171e8bef3cff4618f9b1fc252dfc224191e54f 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -36,6 +36,20 @@ Receiver::Receiver(nlohmann::json &config, ftl::data::Pool *p) : ftl::Configurab i.second->setBufferSize(bsize); } }); + + on("max_buffer_size", [this]() { + size_t bsize = value("max_buffer_size", 16); + for (auto &i : builders_) { + i.second->setMaxBufferSize(bsize); + } + }); + + on("completion_size", [this]() { + size_t bsize = value("completion_size", 8); + for (auto &i : builders_) { + i.second->setCompletionSize(bsize); + } + }); } Receiver::~Receiver() { @@ -56,6 +70,8 @@ ftl::streams::BaseBuilder &Receiver::builder(uint32_t id) { b->setID(id); b->setPool(pool_); fb->setBufferSize(value("frameset_buffer_size", 0)); + fb->setBufferSize(value("max_buffer_size", 16)); + fb->setBufferSize(value("completion_size", 8)); handles_[id] = std::move(fb->onFrameSet([this](const ftl::data::FrameSetPtr& fs) { callback_.trigger(fs); return true;