From 746f49f069a116b25dfacf72500e9e3a2d57ebec Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nwpope@utu.fi> Date: Mon, 27 Jul 2020 10:21:39 +0300 Subject: [PATCH] Reconstruct but with wrong pipeline --- applications/gui2/src/views/addsource.cpp | 3 +-- applications/reconstruct2/src/main.cpp | 10 ++++++++++ .../streams/include/ftl/streams/feed.hpp | 5 +++-- .../streams/include/ftl/streams/netstream.hpp | 5 +++++ components/streams/src/feed.cpp | 19 ++++++++++++------- components/streams/src/netstream.cpp | 6 ++++++ 6 files changed, 37 insertions(+), 11 deletions(-) diff --git a/applications/gui2/src/views/addsource.cpp b/applications/gui2/src/views/addsource.cpp index 13b77f005..c8d62aac5 100644 --- a/applications/gui2/src/views/addsource.cpp +++ b/applications/gui2/src/views/addsource.cpp @@ -100,8 +100,7 @@ AddSourceWindow::AddSourceWindow(nanogui::Widget* parent, AddCtrl *ctrl) : rebuild(); tabs_->setActiveTab(0); - new_source_handle_ = ctrl_->feed()->onNewSources([this](int a) { - LOG(INFO) << "NEW SOURCES"; + new_source_handle_ = ctrl_->feed()->onNewSources([this](const std::vector<std::string> &srcs) { UNIQUE_LOCK(mutex_, lk); uptodate_.clear(); return true; diff --git a/applications/reconstruct2/src/main.cpp b/applications/reconstruct2/src/main.cpp index 8efa92d17..813b71727 100644 --- a/applications/reconstruct2/src/main.cpp +++ b/applications/reconstruct2/src/main.cpp @@ -61,6 +61,16 @@ static void run(ftl::Configurable *root) { } } + // Automatically add any new sources + auto nsrc_handle = feed->onNewSources([feed,group_name](const vector<string> &srcs) { + for (const auto &s : srcs) { + ftl::URI uri(s); + uri.setAttribute("group", group_name); + feed->add(uri); + } + return true; + }); + auto *filter = feed->filter({Channel::Colour, Channel::Depth}); feed->startStreaming(filter); diff --git a/components/streams/include/ftl/streams/feed.hpp b/components/streams/include/ftl/streams/feed.hpp index 70e5f8fb7..0aa28254c 100644 --- a/components/streams/include/ftl/streams/feed.hpp +++ b/components/streams/include/ftl/streams/feed.hpp @@ -126,7 +126,7 @@ public: void stopRecording(); bool isRecording(); - inline ftl::Handle onNewSources(const std::function<bool(uint32_t)> &cb) { return new_sources_cb_.on(cb); } + inline ftl::Handle onNewSources(const std::function<bool(const std::vector<std::string> &)> &cb) { return new_sources_cb_.on(cb); } inline ftl::Handle onAdd(const std::function<bool(uint32_t)> &cb) { return add_src_cb_.on(cb); } @@ -173,6 +173,7 @@ private: std::unique_ptr<ftl::stream::Broadcast> record_stream_; ftl::Handle handle_record_; ftl::Handle record_recv_handle_; + ftl::Handle record_new_client_; Filter *record_filter_; //ftl::Handler<const ftl::data::FrameSetPtr&> frameset_cb_; @@ -187,7 +188,7 @@ private: std::list<ftl::streams::ManualSourceBuilder*> render_builders_; std::vector<std::string> netcams_; - ftl::Handler<uint32_t> new_sources_cb_; + ftl::Handler<const std::vector<std::string> &> new_sources_cb_; ftl::Handler<uint32_t> add_src_cb_; ftl::Handler<uint32_t> remove_sources_cb_; diff --git a/components/streams/include/ftl/streams/netstream.hpp b/components/streams/include/ftl/streams/netstream.hpp index 341796038..36ad0d382 100644 --- a/components/streams/include/ftl/streams/netstream.hpp +++ b/components/streams/include/ftl/streams/netstream.hpp @@ -6,6 +6,7 @@ #include <ftl/threads.hpp> #include <ftl/codecs/packet.hpp> #include <ftl/streams/stream.hpp> +#include <ftl/handle.hpp> #include <string> namespace ftl { @@ -60,6 +61,8 @@ class Net : public Stream { inline const ftl::UUID &getPeer() const { return peer_; } + inline ftl::Handle onClientConnect(const std::function<bool(ftl::net::Peer*)> &cb) { return connect_cb_.on(cb); } + /** * Return the average bitrate of all streams since the last call to this * function. Units are Mbps. @@ -84,6 +87,8 @@ class Net : public Stream { std::array<std::atomic<int>,32> reqtally_; std::unordered_set<ftl::codecs::Channel> last_selected_; + ftl::Handler<ftl::net::Peer*> connect_cb_; + static float req_bitrate__; static float sample_count__; static int64_t last_msg__; diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp index af7d513d1..52ca7d87b 100644 --- a/components/streams/src/feed.cpp +++ b/components/streams/src/feed.cpp @@ -489,8 +489,9 @@ void Feed::_updateNetSources(ftl::net::Peer *p, const std::string &s, bool autoa add(uri); } - ftl::pool.push([this](int id) { - new_sources_cb_.trigger(0); + ftl::pool.push([this, s](int id) { + std::vector<std::string> srcs{s}; + new_sources_cb_.trigger(srcs); }); } @@ -509,7 +510,6 @@ void Feed::_updateNetSources(ftl::net::Peer *p, bool autoadd) { _add_recent_source(uri)["host"] = p->getURI(); if (autoadd || value("auto_host_sources", false)) { - LOG(INFO) << "AUTO ADD: " << s; ftl::pool.push([this, uri](int id) { add(uri); }); } } @@ -540,8 +540,8 @@ void Feed::_updateNetSources(ftl::net::Peer *p, bool autoadd) { } }*/ - ftl::pool.push([this](int id) { - new_sources_cb_.trigger(0); + ftl::pool.push([this, peerstreams](int id) { + new_sources_cb_.trigger(peerstreams); }); /* done by add() @@ -1070,6 +1070,12 @@ void Feed::startStreaming(Filter *f) { auto *nstream = ftl::create<ftl::stream::Net>(this, "live_stream", net_); nstream->set("uri", value("uri", std::string("ftl://vision.utu.fi/live"))); + + record_new_client_ = nstream->onClientConnect([this](ftl::net::Peer *p) { + stream_->reset(); + return true; + }); + record_stream_->add(nstream); record_stream_->begin(); recorder_->resetSender(); @@ -1081,8 +1087,6 @@ void Feed::_beginRecord(Filter *f) { handle_record_ = f->onWithHandle([this, f](const ftl::data::FrameSetPtr &fs) { record_stream_->select(fs->frameset(), f->channels(), true); - LOG(INFO) << "GOT FRAMESET TO STREAM: " << fs->timestamp(); - for (auto c : f->channels()) { if (fs->hasAnyChanged(c)) recorder_->post(*fs.get(), c); } @@ -1097,6 +1101,7 @@ void Feed::stopRecording() { void Feed::_stopRecording() { handle_record_.cancel(); + record_new_client_.cancel(); record_stream_->end(); auto garbage = record_stream_->streams(); diff --git a/components/streams/src/netstream.cpp b/components/streams/src/netstream.cpp index f904d6638..b497ac3df 100644 --- a/components/streams/src/netstream.cpp +++ b/components/streams/src/netstream.cpp @@ -398,6 +398,12 @@ bool Net::_processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt) { client.quality = 255; // TODO: Use quality given in packet client.txcount = 0; client.txmax = static_cast<int>(pkt.frame_count)*kTallyScale; + + try { + connect_cb_.trigger(&p); + } catch (const ftl::exception &e) { + LOG(ERROR) << "Exception in stream connect callback: " << e.what(); + } } // First connected peer (or reconnecting peer) becomes a time server -- GitLab