Select Git revision
-
Nicolas Pope authoredNicolas Pope authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
feed.cpp 11.82 KiB
#include <loguru.hpp>
#include <nlohmann/json.hpp>
#include <ftl/streams/feed.hpp>
#include <ftl/streams/renderer.hpp>
#include <ftl/streams/netstream.hpp>
#include <ftl/streams/filestream.hpp>
using ftl::stream::Feed;
using ftl::codecs::Channel;
////////////////////////////////////////////////////////////////////////////////
Feed::Filter::Filter(Feed* feed, const std::unordered_set<uint32_t>& sources, const std::unordered_set<Channel>& channels) :
feed_(feed), channels_(channels), channels_available_(channels), sources_(sources) {
};
Feed::Filter::~Filter() {
}
void Feed::Filter::remove() {
return feed_->removeFilter(this);
}
void Feed::Filter::on(const ftl::data::FrameSetCallback &cb) {
std::unique_lock<std::mutex> lk(feed_->mtx_);
if (std::find(feed_->filters_.begin(), feed_->filters_.end(),this) == feed_->filters_.end()) {
throw ftl::exception("Filter does not belong to Feed; This should never happen!");
}
handles_.push_back(std::move(handler_.on(cb)));
}
Feed::Filter &Feed::Filter::select(const std::unordered_set<ftl::codecs::Channel> &cs) {
std::unique_lock<std::mutex> lk(feed_->mtx_);
channels_ = cs;
feed_->select();
return *this;
}
////////////////////////////////////////////////////////////////////////////////
Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) :
ftl::Configurable(config), net_(net) {
pool_ = std::make_unique<ftl::data::Pool>(3,5);
stream_ = std::unique_ptr<ftl::stream::Muxer>
(ftl::create<ftl::stream::Muxer>(this, "muxer"));
interceptor_ = std::unique_ptr<ftl::stream::Intercept>
(ftl::create<ftl::stream::Intercept>(this, "intercept"));
receiver_ = std::unique_ptr<ftl::stream::Receiver>
(ftl::create<ftl::stream::Receiver>(this, "receiver", pool_.get()));
sender_ = std::unique_ptr<ftl::stream::Sender>
(ftl::create<ftl::stream::Sender>(this, "sender"));
//interceptor_->setStream(stream_.get());
receiver_->setStream(stream_.get());
sender_->setStream(stream_.get());
handle_sender_ = pool_->onFlush([this]
(ftl::data::Frame &f, ftl::codecs::Channel c) {
// Send only reponse channels on a per frame basis
if (f.mode() == ftl::data::FrameMode::RESPONSE) {
// Remote sources need to use sender, otherwise loopback to local
if (streams_.find(f.frameset()) != streams_.end()) {
sender_->post(f, c);
} else {
receiver_->loopback(f, c);
}
}
return true;
});
net_->onConnect([this](ftl::net::Peer *p) {
ftl::pool.push([this](int id) {
// FIXME: Find better option that waiting here.
// Wait to make sure streams have started properly.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lk(mtx_);
updateNetSources();
});
});
net_->bind("add_stream", [this](std::string uri){
std::unique_lock<std::mutex> lk(mtx_);
updateNetSources();
});
net_->onDisconnect([this](ftl::net::Peer *) {
// TODO: maintain map between peer and sources, on disconnect remove all
// peer's source. Also map between Peers and fsids?
std::unique_lock<std::mutex> lk(mtx_);
});
handle_receiver_ = receiver_->onFrameSet(
[this](const ftl::data::FrameSetPtr& fs) {
if (pre_pipelines_.count(fs->frameset()) == 1) {
pre_pipelines_[fs->frameset()]->apply(*fs, *fs, 0);
}
std::atomic_store(&latest_[fs->frameset()], fs);
std::unique_lock<std::mutex> lk(mtx_);
for (auto* filter : filters_) {
// TODO: smarter update (update only when changed) instead of
// filter->channels_available_ = fs->channels();
if (filter->sources().empty()) {
//filter->channels_available_ = fs->channels();
filter->handler_.trigger(fs);
}
else {
// TODO: process partial/complete sets here (drop), that is
// intersection filter->sources() and fs->sources() is
// same as filter->sources().
// TODO: reverse map source ids required here?
for (const auto& src : filter->sources()) {
//if (fs->hasFrame(src)) {
if (fs->frameset() == src) {
//filter->channels_available_ = fs->channels();
filter->handler_.trigger(fs);
break;
}
}
}
}
return true;
});
stream_->begin();
}
Feed::~Feed() {
std::unique_lock<std::mutex> lk(mtx_);
receiver_.reset(); // Note: Force destruction first to remove filters this way
for (auto* filter : filters_) {
delete filter;
}
filters_.clear();
// TODO stop everything and clean up
// delete pre_pipelines_
// delete
}
uint32_t Feed::allocateFrameSetId() {
return fs_counter_++;
}
void Feed::select() {
std::map<uint32_t, std::unordered_set<Channel>> selected_channels;
for (auto &filter : filters_) {
const auto& selected = filter->channels();
if (filter->sources().empty()) {
// no sources: select all sources with selected channels
for (const auto& [uri, fsid] : fsid_lookup_) {
std::ignore = uri;
selected_channels[fsid].insert(selected.begin(), selected.end());
}
}
else {
// sources given
for (const auto& fsid : filter->sources()) {
if (selected_channels.count(fsid) == 0) {
selected_channels.try_emplace(fsid);
}
selected_channels[fsid].insert(selected.begin(), selected.end());
}
}
}
for (auto& [fsid, channels] : selected_channels) {
stream_->select(fsid, channels, true);
LOG(INFO) << "Update selections";
for (auto c : channels) {
LOG(INFO) << " -- select " << (int)c;
}
}
}
std::vector<std::string> Feed::listSources() {
std::vector<std::string> sources;
sources.reserve(fsid_lookup_.size());
for (auto& [uri, fsid] : fsid_lookup_) {
std::ignore = fsid;
sources.push_back(uri);
}
return sources;
}
Feed::Filter* Feed::filter(const std::unordered_set<uint32_t> &framesets,
const std::unordered_set<Channel> &channels) {
auto* filter = new Filter(this, framesets, channels);
std::unique_lock<std::mutex> lk(mtx_);
filters_.push_back(filter);
select();
return filter;
}
Feed::Filter* Feed::filter(const std::unordered_set<Channel> &channels) {
return filter(std::unordered_set<uint32_t>{}, channels);
}
Feed::Filter* Feed::filter(const std::unordered_set<std::string> &sources, const std::unordered_set<Channel> &channels) {
std::unordered_set<uint32_t> fsids;
for (const auto &src : sources) {
fsids.emplace(fsid_lookup_.at(src));
}
return filter(fsids, channels);
}
ftl::operators::Graph* Feed::addPipeline(uint32_t fsid) {
std::unique_lock<std::mutex> lk(mtx_);
if (pre_pipelines_.count(fsid) != 0) {
delete pre_pipelines_[fsid];
}
pre_pipelines_[fsid] = ftl::config::create<ftl::operators::Graph>
(this, std::string("pre_filters") + std::to_string(fsid));
return pre_pipelines_[fsid];
}
void Feed::removeFilter(Feed::Filter* filter) {
std::unique_lock<std::mutex> lk(mtx_);
auto iter = std::find(filters_.begin(), filters_.end(), filter);
if (iter != filters_.end()) {
filters_.erase(iter);
delete filter;
}
}
void Feed::updateNetSources() {
netcams_ =
net_->findAll<std::string>("list_streams");
if (false) { // TODO: Allow auto net add
for (auto s : netcams_) {
if (fsid_lookup_.count(s) == 0) {
auto *stream = ftl::create<ftl::stream::Net>
(this, std::string("netstream")
+std::to_string(fsid_lookup_.size()), net_);
int fsid = allocateFrameSetId();
stream->set("uri", s);
add(fsid, s, stream);
LOG(INFO) << "Add Stream: "
<< stream->value("uri", std::string("NONE"))
<< " (" << fsid << ")";
cv_net_connect_.notify_one();
}
else {
LOG(INFO) << "Stream exists: " << s;
}
}
}
ftl::pool.push([this](int id) {
new_sources_cb_.trigger(0);
});
/* done by add()
if (n > 0) {
stream_->begin();
}*/
}
std::vector<std::string> Feed::availableNetworkSources() {
return netcams_;
}
std::vector<std::string> Feed::availableFileSources() {
return {};
}
std::vector<std::string> Feed::availableDeviceSources() {
return {};
}
bool Feed::sourceAvailable(const std::string &uri) {
return false;
}
bool Feed::sourceActive(const std::string &uri) {
return fsid_lookup_.count(uri) > 0;
}
std::string Feed::getName(const std::string &uri) {
return "No Name";
}
void Feed::add(uint32_t fsid, const std::string &uri, ftl::stream::Stream* stream) {
fsid_lookup_[uri] = fsid;
streams_[fsid] = stream;
stream_->add(stream, fsid);
stream_->begin();
stream_->select(fsid, {Channel::Colour}, true);
}
uint32_t Feed::add(const std::string &path) {
std::unique_lock<std::mutex> lk(mtx_);
if (fsid_lookup_.count(path) > 0) return fsid_lookup_[path];
ftl::URI uri(path);
const auto scheme = uri.getScheme();
if ((scheme == ftl::URI::SCHEME_OTHER) || // assumes relative path
(scheme == ftl::URI::SCHEME_FILE)) {
auto eix = path.find_last_of('.');
auto ext = path.substr(eix+1);
if (ext != "ftl") {
throw ftl::exception("bad filename (expects .ftl)");
}
const int fsid = allocateFrameSetId();
auto* fstream = ftl::create<ftl::stream::File>
(this, std::string("ftlfile-") + std::to_string(fsid));
if (scheme == ftl::URI::SCHEME_OTHER) {
fstream->set("filename", path);
}
else {
// possible BUG: uri.getPath() might return (wrong) absolute paths
// for relative paths (extra / at beginning)
fstream->set("filename", uri.getPath());
}
// TODO: URI normalization; should happen in add(,,) or add(,,,) take
// ftl::URI instead of std::string as argument. Note the bug above.
// TODO: write unit test for uri parsing
add(fsid, path, fstream);
return fsid;
}
else if (scheme == ftl::URI::SCHEME_DEVICE) {
int fsid = allocateFrameSetId();
fsid_lookup_[path] = fsid; // Manually add mapping
std::string srcname = std::string("source") + std::to_string(fsid);
uri.to_json(getConfig()[srcname]);
// Make the source object
ftl::data::DiscreteSource *source;
lk.unlock();
if (uri.getBaseURI() == "device:render") {
// TODO: Use a ManualSourceBuilder and tick in draw thread. Also
// need to keep all such pointers to render sources to gain access
// to the texture objects for use by Camera.
source = ftl::create<ftl::render::Source>(this, srcname, this);
} else {
source = ftl::create<ftl::rgbd::Source>(this, srcname, net_);
}
// Create local builder instance
auto *creator = new ftl::streams::IntervalSourceBuilder(pool_.get(), fsid, {source});
std::shared_ptr<ftl::streams::BaseBuilder> creatorptr(creator);
lk.lock();
receiver_->registerBuilder(creatorptr);
creator->start();
// TODO: Maintain a record or list of sources for control / removal purposes
// Perhaps just maintain a list of creators
return fsid;
}
else if ((scheme == ftl::URI::SCHEME_TCP) ||
(scheme == ftl::URI::SCHEME_WS)) {
// just connect, onConnect callback will add the stream
// TODO: do not connect same uri twice
// TODO: write unit test
net_->connect(path)->waitConnection();
}
else if (scheme == ftl::URI::SCHEME_FTL) {
auto *stream = ftl::create<ftl::stream::Net>
(this, std::string("netstream")
+std::to_string(fsid_lookup_.size()), net_);
int fsid = allocateFrameSetId();
stream->set("uri", path);
add(fsid, path, stream);
LOG(INFO) << "Add Stream: "
<< stream->value("uri", std::string("NONE"))
<< " (" << fsid << ")";
}
else{
throw ftl::exception("bad uri");
}
return -1;
}
uint32_t Feed::getID(const std::string &source) {
return fsid_lookup_.at(source);
}
const std::unordered_set<Channel> Feed::availableChannels(ftl::data::FrameID id) {
ftl::data::FrameSetPtr fs;
std::atomic_store(&fs, latest_[id.frameset()]);
if (fs && fs->hasFrame(id.source())) {
return (*fs.get())[id.source()].available();
}
return {};
}
std::vector<ftl::data::FrameID> Feed::listFrames() {
std::vector<ftl::data::FrameID> result;
result.reserve(fsid_lookup_.size());
for (const auto [k, fs] : latest_) {
for (unsigned i = 0; i < fs->frames.size(); i++) {
result.push_back(ftl::data::FrameID(fs->frameset(), i));
}
}
return result;
}
std::string Feed::getURI(uint32_t fsid) {
for (const auto& [k, v] : fsid_lookup_) {
if (v == fsid) {
return k;
}
}
return "";
}