diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp index 0eeadf3d309dc3ee01cc47c1ea28704046a89074..8a2eb4a2840a95f726b0b6db77d2534da943cb90 100644 --- a/components/streams/src/feed.cpp +++ b/components/streams/src/feed.cpp @@ -65,13 +65,13 @@ std::list<ftl::data::FrameSetPtr> Feed::Filter::getLatestFrameSets() { SHARED_LOCK(feed_->mtx_, lk); if (sources_.empty()) { for (auto &i : feed_->latest_) { - results.emplace_back(std::atomic_load(&(i.second))); + if (i.second) results.emplace_back(std::atomic_load(&(i.second))); } } else { for (auto &s : sources_) { auto i = feed_->latest_.find(s); if (i != feed_->latest_.end()) { - results.emplace_back(std::atomic_load(&(i->second))); + if (i->second) results.emplace_back(std::atomic_load(&(i->second))); } } } @@ -181,10 +181,10 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) : pre_pipelines_[fs->frameset()]->apply(*fs, *fs, 0); } - // FIXME: Adding new to latest_ will modify data structure in non-threadsafe way! - std::atomic_store(&latest_[fs->frameset()], fs); - SHARED_LOCK(mtx_, lk); + + std::atomic_store(&latest_.at(fs->frameset()), fs); + for (auto* filter : filters_) { // TODO: smarter update (update only when changed) instead of // filter->channels_available_ = fs->channels(); @@ -603,6 +603,7 @@ std::string Feed::getName(const std::string &puri) { void Feed::add(uint32_t fsid, const std::string &uri, ftl::stream::Stream* stream) { fsid_lookup_[uri] = fsid; + latest_[fsid] = nullptr; streams_[fsid].push_back(stream); _createPipeline(fsid); @@ -677,6 +678,7 @@ uint32_t Feed::add(const std::string &path) { // Make the source object ftl::data::DiscreteSource *source; + latest_[fsid] = nullptr; lk.unlock(); if (uri.getBaseURI() == "device:render" || uri.getBaseURI() == "device:openvr") { @@ -768,7 +770,8 @@ uint32_t Feed::getID(const std::string &source) { const std::unordered_set<Channel> Feed::availableChannels(ftl::data::FrameID id) { ftl::data::FrameSetPtr fs; - std::atomic_store(&fs, latest_[id.frameset()]); + // FIXME: Should this be locked? + std::atomic_store(&fs, latest_.at(id.frameset())); if (fs && fs->hasFrame(id.source())) { return (*fs.get())[id.source()].allChannels(); } @@ -780,8 +783,10 @@ std::vector<ftl::data::FrameID> Feed::listFrames() { SHARED_LOCK(mtx_, lk); result.reserve(fsid_lookup_.size()); for (const auto [k, fs] : latest_) { - for (unsigned i = 0; i < fs->frames.size(); i++) { - result.push_back(ftl::data::FrameID(k, i)); + if (fs) { + for (unsigned i = 0; i < fs->frames.size(); i++) { + result.push_back(ftl::data::FrameID(k, i)); + } } } return result; @@ -817,8 +822,9 @@ std::vector<unsigned int> Feed::listFrameSets() { std::vector<unsigned int> result; result.reserve(fsid_lookup_.size()); for (const auto [k, fs] : latest_) { - std::ignore = fs; - result.push_back(k); + if (fs) { + result.push_back(k); + } } return result; }