diff --git a/applications/gui2/src/modules/camera.cpp b/applications/gui2/src/modules/camera.cpp index aec6695fdaa9aaf429681437e165a7f191fa8193..a47d35f1a8fbfb28410add25b5b5140ecd8d28e9 100644 --- a/applications/gui2/src/modules/camera.cpp +++ b/applications/gui2/src/modules/camera.cpp @@ -39,7 +39,7 @@ void Camera::update(double delta) { auto *mod = screen->getModule<ftl::gui2::Statistics>(); mod->getJSON(StatisticsPanel::PERFORMANCE_INFO)["FPS"] = n/diff; - mod->getJSON(StatisticsPanel::PERFORMANCE_INFO)["Latency"] = l; + mod->getJSON(StatisticsPanel::PERFORMANCE_INFO)["Latency"] = std::to_string(int(l))+std::string("ms"); if (live_) mod->getJSON(StatisticsPanel::MEDIA_STATUS)["LIVE"] = nlohmann::json{{"icon", ENTYPO_ICON_VIDEO_CAMERA},{"value", true},{"colour","#0000FF"},{"size",28}}; auto ptr = std::atomic_load(&latest_); diff --git a/applications/reconstruct2/src/main.cpp b/applications/reconstruct2/src/main.cpp index a7e94f5012f4abced96a65933ee868bbecbd11e4..1a0bb7c59ccb5af1c94657262bc5dc8b5f1848fc 100644 --- a/applications/reconstruct2/src/main.cpp +++ b/applications/reconstruct2/src/main.cpp @@ -128,6 +128,7 @@ static void run(ftl::Configurable *root) { net->shutdown(); LOG(INFO) << "Stopping..."; ftl::timer::stop(true); + LOG(INFO) << "Timer stopped..."; ftl::pool.stop(true); LOG(INFO) << "All threads stopped."; diff --git a/components/common/cpp/src/configuration.cpp b/components/common/cpp/src/configuration.cpp index f6300e67c93f5505e6f1d91a1a11d0f92dfacf48..f756d1d571927d9c8eb9e070e8b39761b007b962 100644 --- a/components/common/cpp/src/configuration.cpp +++ b/components/common/cpp/src/configuration.cpp @@ -341,7 +341,7 @@ void ftl::config::registerConfigurable(ftl::Configurable *cfg) { LOG(ERROR) << "Attempting to create a duplicate object: " << *uri; } else { config_instance[*uri] = cfg; - LOG(INFO) << "Registering instance: " << *uri; + //LOG(INFO) << "Registering instance: " << *uri; lk.unlock(); auto tags = cfg->get<vector<string>>("tags"); diff --git a/components/control/cpp/src/master.cpp b/components/control/cpp/src/master.cpp index 159fc403bca1d26f76a9a94afb77d8daa11d5184..d6a2f1cac2b0a0508436a6f69a5bcbc76b23f5e0 100644 --- a/components/control/cpp/src/master.cpp +++ b/components/control/cpp/src/master.cpp @@ -36,6 +36,18 @@ Master::Master(Configurable *root, Universe *net) state_.paused = !state_.paused; }); + net->bind("list_streams", []() { + return std::list<std::string>(); + }); + + net->bind("find_stream", [](const std::string &uri, bool proxy) { + return std::optional<ftl::UUID>{}; + }); + + net->bind("add_stream", [](const std::string &uri) { + + }); + net->bind("update_cfg", [](const std::string &uri, const std::string &value) { ftl::config::update(uri, nlohmann::json::parse(value)); }); diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index c7bef74e572837da789a6e37b95cd5252a53f7b6..3d3a4b3ffe175c048958219e1fa1477e0043e73b 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -311,102 +311,74 @@ void Universe::broadcast(const std::string &name, ARGS... args) { template <typename R, typename... ARGS> std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { - std::atomic_flag hasreturned; - std::mutex m; - std::condition_variable cv; - std::atomic<int> count = 0; - std::optional<R> result; - - hasreturned.clear(); - - auto handler = [&](const std::optional<R> &r) { - std::unique_lock<std::mutex> lk(m); - count--; - //if (hasreturned || !r) return; - if (r && !hasreturned.test_and_set()) { - result = r; - } - lk.unlock(); - cv.notify_one(); + struct SharedData { + std::atomic_bool hasreturned = false; + std::mutex m; + std::condition_variable cv; + std::optional<R> result; }; - std::map<Peer*, int> record; - SHARED_LOCK(net_mutex_,lk); + auto sdata = std::make_shared<SharedData>(); - for (auto p : peers_) { - if (!p->waitConnection()) continue; - count++; - record[p] = p->asyncCall<std::optional<R>>(name, handler, args...); - } - lk.unlock(); - - { // Block thread until async callback notifies us - std::unique_lock<std::mutex> llk(m); - // FIXME: what happens if one clients does not return (count != 0)? - cv.wait_for(llk, std::chrono::seconds(1), [&count] { - return count == 0; //hasreturned || count == 0; - }); - - // Cancel any further results - lk.lock(); - if (count > 0) { - throw FTL_Error("Find one failed with timeout"); + auto handler = [sdata](const std::optional<R> &r) { + std::unique_lock<std::mutex> lk(sdata->m); + if (r && !sdata->hasreturned) { + sdata->hasreturned = true; + sdata->result = r; } + lk.unlock(); + sdata->cv.notify_one(); + }; + { + SHARED_LOCK(net_mutex_,lk); for (auto p : peers_) { - auto mm = record.find(p); - if (mm != record.end()) { - p->cancelCall(mm->second); - } + if (!p->waitConnection()) continue; + p->asyncCall<std::optional<R>>(name, handler, args...); } } + + // Block thread until async callback notifies us + std::unique_lock<std::mutex> llk(sdata->m); + sdata->cv.wait_for(llk, std::chrono::seconds(1), [sdata] { + return (bool)sdata->hasreturned; + }); - return result; + return sdata->result; } template <typename R, typename... ARGS> std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { - int returncount = 0; - int sentcount = 0; - std::mutex m; - std::condition_variable cv; - - std::vector<R> results; + struct SharedData { + std::atomic_int returncount = 0; + std::atomic_int sentcount = 0; + std::mutex m; + std::condition_variable cv; + std::vector<R> results; + }; + + auto sdata = std::make_shared<SharedData>(); - auto handler = [&](const std::vector<R> &r) { - //UNIQUE_LOCK(m,lk); - std::unique_lock<std::mutex> lk(m); - returncount++; - results.insert(results.end(), r.begin(), r.end()); + auto handler = [sdata](const std::vector<R> &r) { + std::unique_lock<std::mutex> lk(sdata->m); + ++sdata->returncount; + sdata->results.insert(sdata->results.end(), r.begin(), r.end()); lk.unlock(); - cv.notify_one(); + sdata->cv.notify_one(); }; - std::map<Peer*, int> record; - SHARED_LOCK(net_mutex_,lk); - for (auto p : peers_) { - if (!p->waitConnection()) continue; - sentcount++; - record[p] = p->asyncCall<std::vector<R>>(name, handler, args...); - } - lk.unlock(); - - { // Block thread until async callback notifies us - //UNIQUE_LOCK(m,llk); - std::unique_lock<std::mutex> llk(m); - cv.wait_for(llk, std::chrono::seconds(1), [&returncount,sentcount]{return returncount == sentcount;}); - - // Cancel any further results - lk.lock(); + { + SHARED_LOCK(net_mutex_,lk); for (auto p : peers_) { - auto mm = record.find(p); - if (mm != record.end()) { - p->cancelCall(mm->second); - } + if (!p->waitConnection()) continue; + ++sdata->sentcount; + p->asyncCall<std::vector<R>>(name, handler, args...); } } - - return results; + + std::unique_lock<std::mutex> llk(sdata->m); + sdata->cv.wait_for(llk, std::chrono::seconds(1), [sdata]{return sdata->returncount == sdata->sentcount; }); + return sdata->results; } template <typename R, typename... ARGS> diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 4e99f67b5b1123bf1a3c0c9eb5d5da71d7ec325c..368a5bbb2364f57c008a1f1071dff8ef603b7794 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -420,7 +420,7 @@ void Peer::socketError() { // more socket errors... _badClose(); - LOG(ERROR) << "Socket: " << uri_ << " - error " << err; + if (err != 0) LOG(ERROR) << "Socket: " << uri_ << " - error " << err; } void Peer::error(int e) { diff --git a/components/streams/include/ftl/streams/feed.hpp b/components/streams/include/ftl/streams/feed.hpp index 2b2d22ed0e2e65ddeafbe8147eec6ace3f9e6ad6..72075b74fe786a3803ce6dc2d9616cba3ed10013 100644 --- a/components/streams/include/ftl/streams/feed.hpp +++ b/components/streams/include/ftl/streams/feed.hpp @@ -193,7 +193,7 @@ private: std::list<ftl::streams::ManualSourceBuilder*> render_builders_; std::function<void(ftl::operators::Graph*)> pipe_creator_; - std::vector<std::string> netcams_; + std::unordered_set<std::string> netcams_; ftl::Handler<const std::vector<std::string> &> new_sources_cb_; ftl::Handler<uint32_t> add_src_cb_; ftl::Handler<uint32_t> remove_sources_cb_; diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp index 873f94f3840efab03c9f2d54ac63c2114aea2f1b..defe19720ee5cfb0204477123b7bb0c8c911f733 100644 --- a/components/streams/src/feed.cpp +++ b/components/streams/src/feed.cpp @@ -154,6 +154,7 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) : }); }); + if (net_->isBound("add_stream")) net_->unbind("add_stream"); net_->bind("add_stream", [this](ftl::net::Peer &p, std::string uri){ //UNIQUE_LOCK(mtx_, lk); _updateNetSources(&p, uri); @@ -487,7 +488,7 @@ void Feed::removeFilter(Feed::Filter* filter) { void Feed::_updateNetSources(ftl::net::Peer *p, const std::string &s, bool autoadd) { UNIQUE_LOCK(mtx_, lk); - netcams_.push_back(s); + netcams_.insert(s); // TODO: Auto add source @@ -508,50 +509,31 @@ void Feed::_updateNetSources(ftl::net::Peer *p, bool autoadd) { //auto netcams = // net_->findAll<std::string>("list_streams"); - auto peerstreams = p->call<std::vector<std::string>>("list_streams"); - - UNIQUE_LOCK(mtx_, lk); - //netcams_ = std::move(netcams); - netcams_.insert(netcams_.end(), peerstreams.begin(), peerstreams.end()); - - for (const auto &s : peerstreams) { - ftl::URI uri(s); - _add_recent_source(uri)["host"] = p->getURI(); - - if (autoadd || value("auto_host_sources", false)) { - ftl::pool.push([this, uri](int id) { add(uri); }); - } - } - - /*if (value("auto_host_sources", false)) { - for (auto s : netcams_) { - const std::string group = uri.getAttribute<std::string>("group"); - - if (fsid_lookup_.count(uri.getBaseURI()) == 0) { - auto *stream = ftl::create<ftl::stream::Net> - (this, std::string("netstream") - +std::to_string(fsid_lookup_.size()), net_); - - int fsid = allocateFrameSetId(group); + // Peer may not have a list_streams binding yet + try { + auto peerstreams = p->call<std::vector<std::string>>("list_streams"); + - stream->set("uri", s); - add(fsid, uri.getBaseURI(), stream); + UNIQUE_LOCK(mtx_, lk); + //netcams_ = std::move(netcams); + netcams_.insert(peerstreams.begin(), peerstreams.end()); - LOG(INFO) << "Add Stream: " - << stream->value("uri", std::string("NONE")) - << " (" << fsid << ")"; + for (const auto &s : peerstreams) { + ftl::URI uri(s); + _add_recent_source(uri)["host"] = p->getURI(); - //cv_net_connect_.notify_one(); - } - else { - LOG(INFO) << "Stream exists: " << s; + if (autoadd || value("auto_host_sources", false)) { + ftl::pool.push([this, uri](int id) { add(uri); }); } } - }*/ - ftl::pool.push([this, peerstreams](int id) { - new_sources_cb_.trigger(peerstreams); - }); + ftl::pool.push([this, peerstreams](int id) { + new_sources_cb_.trigger(peerstreams); + }); + + } catch (const ftl::exception &e) { + + } /* done by add() if (n > 0) { @@ -561,7 +543,8 @@ void Feed::_updateNetSources(ftl::net::Peer *p, bool autoadd) { std::vector<std::string> Feed::availableNetworkSources() { SHARED_LOCK(mtx_, lk); - return netcams_; + std::vector<std::string> result(netcams_.begin(), netcams_.end()); + return result;; } std::vector<std::string> Feed::availableGroups() { diff --git a/components/streams/src/netstream.cpp b/components/streams/src/netstream.cpp index b497ac3dfa67c949ccba37aa37eddc271d727692..0d6fba6570be8bec847ac9d49994ce3079e115fb 100644 --- a/components/streams/src/netstream.cpp +++ b/components/streams/src/netstream.cpp @@ -28,11 +28,13 @@ int64_t Net::last_msg__ = 0; MUTEX Net::msg_mtx__; static std::list<std::string> net_streams; +static std::atomic_flag has_bindings = ATOMIC_FLAG_INIT; static SHARED_MUTEX stream_mutex; Net::Net(nlohmann::json &config, ftl::net::Universe *net) : Stream(config), active_(false), net_(net), clock_adjust_(0), last_ping_(0) { - if (!net_->isBound("find_stream")) { - net_->bind("find_stream", [net = net_](const std::string &uri) -> optional<ftl::UUID> { + if (!has_bindings.test_and_set()) { + if (net_->isBound("find_stream")) net_->unbind("find_stream"); + net_->bind("find_stream", [net = net_](const std::string &uri, bool proxy) -> optional<ftl::UUID> { LOG(INFO) << "REQUEST FIND STREAM: " << uri; ftl::URI u1(uri); @@ -48,9 +50,8 @@ Net::Net(nlohmann::json &config, ftl::net::Universe *net) : Stream(config), acti } return {}; }); - } - if (!net_->isBound("list_streams")) { + if (net_->isBound("list_streams")) net_->unbind("list_streams"); net_->bind("list_streams", [this]() { LOG(INFO) << "REQUEST LIST STREAMS"; SHARED_LOCK(stream_mutex, lk); @@ -103,7 +104,7 @@ bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet try { // FIXME: This doesn't work for file sources with file relative timestamps... - short pre_transmit_latency = short(ftl::timer::get_time() - spkt.timestamp); + short pre_transmit_latency = short(ftl::timer::get_time() - spkt.localTimestamp); if (!net_->send(client.peerid, base_uri_, @@ -124,7 +125,7 @@ bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet } } else { try { - short pre_transmit_latency = short(ftl::timer::get_time() - spkt.timestamp); + short pre_transmit_latency = short(ftl::timer::get_time() - spkt.localTimestamp); if (!net_->send(peer_, base_uri_, pre_transmit_latency, // Time since timestamp for tx @@ -169,7 +170,7 @@ bool Net::begin() { StreamPacket spkt = spkt_raw; // FIXME: see #335 //spkt.timestamp -= clock_adjust_; - spkt.localTimestamp = now - ttimeoff; + spkt.localTimestamp = now - int64_t(ttimeoff); spkt.hint_capability = 0; spkt.hint_source_total = 0; //LOG(INFO) << "LATENCY: " << ftl::timer::get_time() - spkt.localTimestamp() << " : " << spkt.timestamp << " - " << clock_adjust_; @@ -242,7 +243,10 @@ bool Net::begin() { //} }); - auto p = net_->findOne<ftl::UUID>("find_stream", uri_); + // First find non-proxy version, then check for proxy version if no match + auto p = net_->findOne<ftl::UUID>("find_stream", uri_, false); + if (!p) p = net_->findOne<ftl::UUID>("find_stream", uri_, true); + if (!p) { LOG(INFO) << "Hosting stream: " << uri_; // TODO: Register URI as available. diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index 8b59bf6ed1676486d4aa0435e1e3b4fdcd4e7f37..35b6675f7d2aa1959ca91c350c7cd496c02e7208 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -209,9 +209,10 @@ void Receiver::_processData(const StreamPacket &spkt, const Packet &pkt) { //UNIQUE_LOCK(vidstate.mutex, lk); timestamp_ = spkt.timestamp; fs->completed(spkt.frame_number); - fs->localTimestamp = spkt.localTimestamp; } + fs->localTimestamp = spkt.localTimestamp; + /*const auto *cs = stream_; const auto sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID()); @@ -259,9 +260,10 @@ void Receiver::_processAudio(const StreamPacket &spkt, const Packet &pkt) { //UNIQUE_LOCK(vidstate.mutex, lk); timestamp_ = spkt.timestamp; fs->completed(spkt.frame_number); - fs->localTimestamp = spkt.localTimestamp; } + fs->localTimestamp = spkt.localTimestamp; + // Generate settings from packet data /*ftl::audio::AudioSettings settings; settings.channels = (spkt.channel == Channel::AudioStereo) ? 2 : 1; @@ -437,9 +439,10 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { UNIQUE_LOCK(vidstate.mutex, lk); timestamp_ = spkt.timestamp; fs->completed(spkt.frame_number+i); - fs->localTimestamp = spkt.localTimestamp; } } + + fs->localTimestamp = spkt.localTimestamp; } void Receiver::processPackets(const StreamPacket &spkt, const Packet &pkt) { @@ -461,7 +464,6 @@ void Receiver::processPackets(const StreamPacket &spkt, const Packet &pkt) { //UNIQUE_LOCK(vidstate.mutex, lk); // FIXME: Should have a lock here... timestamp_ = spkt.timestamp; fs->completed(frame.source()); - fs->localTimestamp = spkt.localTimestamp; } //if (frame.availableAll(sel)) { @@ -469,6 +471,8 @@ void Receiver::processPackets(const StreamPacket &spkt, const Packet &pkt) { // fs->completed(frame.source()); //} } + + fs->localTimestamp = spkt.localTimestamp; } return; } diff --git a/web-service/server/src/index.js b/web-service/server/src/index.js index 2510db14a53f4d09a0104b3b5afcd94f2b647583..22eb16270b00bc9a57ed09e986267adbbc0b31e7 100644 --- a/web-service/server/src/index.js +++ b/web-service/server/src/index.js @@ -380,7 +380,9 @@ app.ws('/', (ws, req) => { } }); - p.bind("find_stream", (uri) => { + p.bind("find_stream", (uri, proxy) => { + if (!proxy) return null; + const parsedURI = stringSplitter(uri) if (uri_to_peer.hasOwnProperty(parsedURI)) { console.log("Stream found: ", uri, parsedURI);