diff --git a/components/streams/include/ftl/streams/netstream.hpp b/components/streams/include/ftl/streams/netstream.hpp index 1341efb711af2ec0c19e69e65f72ffb345b02709..341796038c1e240797d4085ebb3c8aa4ab634fae 100644 --- a/components/streams/include/ftl/streams/netstream.hpp +++ b/components/streams/include/ftl/streams/netstream.hpp @@ -78,6 +78,7 @@ class Net : public Stream { int64_t frame_no_; int64_t last_ping_; std::string uri_; + std::string base_uri_; bool host_; int tally_; std::array<std::atomic<int>,32> reqtally_; diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp index b74385fe99624eda92e776ae8af124cecbc9bd54..b085072f7a93bd6938c57b60f259558ddfd3daf8 100644 --- a/components/streams/src/feed.cpp +++ b/components/streams/src/feed.cpp @@ -644,9 +644,6 @@ uint32_t Feed::add(const std::string &path) { lk.unlock(); if (uri.getBaseURI() == "device:render" || uri.getBaseURI() == "device:openvr") { - // TODO: Use a ManualSourceBuilder and tick in draw thread. Also - // need to keep all such pointers to render sources to gain access - // to the texture objects for use by Camera. auto *rsource = ftl::create<ftl::render::Source>(this, srcname, this); renderers_[fsid] = rsource; source = rsource; diff --git a/components/streams/src/netstream.cpp b/components/streams/src/netstream.cpp index fe9b7f99f6112e3f0ef392620e9a0c92999295d9..d183d43d35d359f8ecef7b0e34b0a33d82246942 100644 --- a/components/streams/src/netstream.cpp +++ b/components/streams/src/netstream.cpp @@ -27,25 +27,36 @@ float Net::sample_count__ = 0.0f; int64_t Net::last_msg__ = 0; MUTEX Net::msg_mtx__; +static std::list<std::string> net_streams; +static SHARED_MUTEX stream_mutex; + Net::Net(nlohmann::json &config, ftl::net::Universe *net) : Stream(config), active_(false), net_(net), clock_adjust_(0), last_ping_(0) { - // TODO: Install "find_stream" binding if not installed... if (!net_->isBound("find_stream")) { - net_->bind("find_stream", [this](const std::string &uri) -> optional<ftl::UUID> { + // FIXME: This only allows for a single net stream!!! + net_->bind("find_stream", [net = net_](const std::string &uri) -> optional<ftl::UUID> { LOG(INFO) << "REQUEST FIND STREAM: " << uri; - if (uri_ == uri) { - return net_->id(); - } else { - return {}; + + ftl::URI u(uri); + std::string base = u.getBaseURI(); + + SHARED_LOCK(stream_mutex, lk); + for (const auto &s : net_streams) { + // Don't compare query string components. + if (base == s) { + return net->id(); + } } + return {}; }); } if (!net_->isBound("list_streams")) { net_->bind("list_streams", [this]() { LOG(INFO) << "REQUEST LIST STREAMS"; - vector<string> streams; - streams.push_back(uri_); - return streams; + SHARED_LOCK(stream_mutex, lk); + //vector<string> streams; + //streams.push_back(uri_); // Send full original URI + return net_streams; }); } @@ -95,7 +106,7 @@ bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet short pre_transmit_latency = short(ftl::timer::get_time() - spkt.timestamp); if (!net_->send(client.peerid, - uri_, + base_uri_, pre_transmit_latency, // Time since timestamp for tx spkt, pkt)) { @@ -115,7 +126,7 @@ bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet try { short pre_transmit_latency = short(ftl::timer::get_time() - spkt.timestamp); if (!net_->send(peer_, - uri_, + base_uri_, pre_transmit_latency, // Time since timestamp for tx spkt, pkt)) { @@ -140,13 +151,17 @@ bool Net::begin() { uri_ = *get<string>("uri"); - if (net_->isBound(uri_)) { + ftl::URI u(uri_); + if (!u.isValid() || !(u.getScheme() == ftl::URI::SCHEME_FTL)) return false; + base_uri_ = u.getBaseURI(); + + if (net_->isBound(base_uri_)) { LOG(ERROR) << "Stream already exists! - " << uri_; active_ = false; return false; } - net_->bind(uri_, [this](ftl::net::Peer &p, short ttimeoff, const ftl::codecs::StreamPacket &spkt_raw, const ftl::codecs::Packet &pkt) { + net_->bind(base_uri_, [this](ftl::net::Peer &p, short ttimeoff, const ftl::codecs::StreamPacket &spkt_raw, const ftl::codecs::Packet &pkt) { int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count(); if (!active_) return; @@ -213,7 +228,7 @@ bool Net::begin() { } else { select(spkt.frameSetID(), selected(spkt.frameSetID()) + spkt.channel); } - LOG(INFO) << "GOT REQUEST"; + _processRequest(p, pkt); } else { // FIXME: Allow availability to change... @@ -239,6 +254,11 @@ bool Net::begin() { ftl::config::alias(uri_, this); } + { + UNIQUE_LOCK(stream_mutex, lk); + net_streams.push_back(base_uri_); + } + // Automatically set name if missing if (!get<std::string>("name")) { char hostname[1024] = {0}; @@ -289,7 +309,7 @@ void Net::reset() { bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t count, uint8_t bitrate) { if (!active_ || host_) return false; - LOG(INFO) << "SENDING REQUEST FOR " << (int)c; + //LOG(INFO) << "SENDING REQUEST FOR " << (int)c; Packet pkt = { codec_t::Any, // TODO: Allow specific codec requests @@ -311,7 +331,7 @@ bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t coun 0 }; - net_->send(peer_, uri_, (short)0, spkt, pkt); + net_->send(peer_, base_uri_, (short)0, spkt, pkt); // FIXME: Find a way to use this for correct stream latency info if (false) { //if (c == Channel::Colour) { // TODO: Not every time @@ -419,6 +439,13 @@ float Net::getRequiredBitrate() { bool Net::end() { if (!active_) return false; + + { + UNIQUE_LOCK(stream_mutex, lk); + auto i = std::find(net_streams.begin(), net_streams.end(), base_uri_); + if (i != net_streams.end()) net_streams.erase(i); + } + active_ = false; net_->unbind(uri_); return true;