Skip to content
Snippets Groups Projects
Select Git revision
  • feature/SKR
  • master default protected
  • censuseval
  • exp/labcolours
  • feature/disconflow
  • exp/multires-sgm
  • chromatest
  • bug/feedrecord
  • feature/375/fullres-fstream
  • feature/poses
  • feature/stereocalib
  • feature/gui2-pch
  • feature/gui2-nanogui-mitsuba
  • feature/use-new-frame
  • feature/depth-touch
  • feature/use10bit
  • bug/optflow-disparity
  • feature/websocket-pose
  • exp/candidatemask
  • feature/sgm-experimental
  • v0.0.6
  • v0.0.5
  • v0.0.4
  • v0.0.3
  • v0.0.2
  • v0.0.1
26 results

feed.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    feed.cpp 11.82 KiB
    #include <loguru.hpp>
    #include <nlohmann/json.hpp>
    #include <ftl/streams/feed.hpp>
    #include <ftl/streams/renderer.hpp>
    
    #include <ftl/streams/netstream.hpp>
    #include <ftl/streams/filestream.hpp>
    
    using ftl::stream::Feed;
    using ftl::codecs::Channel;
    
    ////////////////////////////////////////////////////////////////////////////////
    
    Feed::Filter::Filter(Feed* feed, const std::unordered_set<uint32_t>& sources, const std::unordered_set<Channel>& channels) :
    		feed_(feed), channels_(channels), channels_available_(channels), sources_(sources) {
    
    };
    
    Feed::Filter::~Filter() {
    
    }
    
    void Feed::Filter::remove() {
    	return feed_->removeFilter(this);
    }
    
    void Feed::Filter::on(const ftl::data::FrameSetCallback &cb) {
    	std::unique_lock<std::mutex> lk(feed_->mtx_);
    
    	if (std::find(feed_->filters_.begin(), feed_->filters_.end(),this) == feed_->filters_.end()) {
    		throw ftl::exception("Filter does not belong to Feed; This should never happen!");
    	}
    
    	handles_.push_back(std::move(handler_.on(cb)));
    }
    
    Feed::Filter &Feed::Filter::select(const std::unordered_set<ftl::codecs::Channel> &cs) {
    	std::unique_lock<std::mutex> lk(feed_->mtx_);
    	channels_ = cs;
    	feed_->select();
    	return *this;
    }
    
    ////////////////////////////////////////////////////////////////////////////////
    
    Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) :
    		ftl::Configurable(config), net_(net) {
    
    	pool_ = std::make_unique<ftl::data::Pool>(3,5);
    
    	stream_ = std::unique_ptr<ftl::stream::Muxer>
    		(ftl::create<ftl::stream::Muxer>(this, "muxer"));
    
    	interceptor_ = std::unique_ptr<ftl::stream::Intercept>
    		(ftl::create<ftl::stream::Intercept>(this, "intercept"));
    
    	receiver_ = std::unique_ptr<ftl::stream::Receiver>
    		(ftl::create<ftl::stream::Receiver>(this, "receiver", pool_.get()));
    
    	sender_ = std::unique_ptr<ftl::stream::Sender>
    		(ftl::create<ftl::stream::Sender>(this, "sender"));
    
    	//interceptor_->setStream(stream_.get());
    	receiver_->setStream(stream_.get());
    	sender_->setStream(stream_.get());
    
    	handle_sender_ = pool_->onFlush([this]
    			(ftl::data::Frame &f, ftl::codecs::Channel c) {
    
    		// Send only reponse channels on a per frame basis
    		if (f.mode() == ftl::data::FrameMode::RESPONSE) {
    			// Remote sources need to use sender, otherwise loopback to local
    			if (streams_.find(f.frameset()) != streams_.end()) {
    				sender_->post(f, c);
    			} else {
    				receiver_->loopback(f, c);
    			}
    		}
    		return true;
    	});
    
    	net_->onConnect([this](ftl::net::Peer *p) {
    		ftl::pool.push([this](int id) {
    			// FIXME: Find better option that waiting here.
    			// Wait to make sure streams have started properly.
    			std::this_thread::sleep_for(std::chrono::milliseconds(100));
    			std::unique_lock<std::mutex> lk(mtx_);
    			updateNetSources();
    		});
    	});
    
    	net_->bind("add_stream", [this](std::string uri){
    		std::unique_lock<std::mutex> lk(mtx_);
    		updateNetSources();
    	});
    
    	net_->onDisconnect([this](ftl::net::Peer *) {
    		// TODO: maintain map between peer and sources, on disconnect remove all
    		//       peer's source. Also map between Peers and fsids?
    		std::unique_lock<std::mutex> lk(mtx_);
    	});
    
    	handle_receiver_ = receiver_->onFrameSet(
    		[this](const ftl::data::FrameSetPtr& fs) {
    			if (pre_pipelines_.count(fs->frameset()) == 1) {
    				pre_pipelines_[fs->frameset()]->apply(*fs, *fs, 0);
    			}
    
    			std::atomic_store(&latest_[fs->frameset()], fs);
    
    			std::unique_lock<std::mutex> lk(mtx_);
    			for (auto* filter : filters_) {
    				// TODO: smarter update (update only when changed) instead of
    				// filter->channels_available_ = fs->channels();
    
    				if (filter->sources().empty()) {
    					//filter->channels_available_ = fs->channels();
    					filter->handler_.trigger(fs);
    				}
    				else {
    					// TODO: process partial/complete sets here (drop), that is
    					//       intersection filter->sources() and fs->sources() is
    					//       same as filter->sources().
    
    					// TODO: reverse map source ids required here?
    					for (const auto& src : filter->sources()) {
    						//if (fs->hasFrame(src)) {
    						if (fs->frameset() == src) {
    							//filter->channels_available_ = fs->channels();
    							filter->handler_.trigger(fs);
    							break;
    						}
    					}
    				}
    			}
    
    			return true;
    	});
    
    	stream_->begin();
    }
    
    Feed::~Feed() {
    	std::unique_lock<std::mutex> lk(mtx_);
    	receiver_.reset();  // Note: Force destruction first to remove filters this way
    
    	for (auto* filter : filters_) {
    		delete filter;
    	}
    	filters_.clear();
    	// TODO stop everything and clean up
    	// delete pre_pipelines_
    	// delete
    }
    
    
    uint32_t Feed::allocateFrameSetId() {
    	return fs_counter_++;
    }
    
    void Feed::select() {
    	std::map<uint32_t, std::unordered_set<Channel>> selected_channels;
    	for (auto &filter : filters_) {
    		const auto& selected = filter->channels();
    
    		if (filter->sources().empty()) {
    			// no sources: select all sources with selected channels
    			for (const auto& [uri, fsid] : fsid_lookup_) {
    				std::ignore = uri;
    				selected_channels[fsid].insert(selected.begin(), selected.end());
    			}
    		}
    		else {
    			// sources given
    			for (const auto& fsid : filter->sources()) {
    				if (selected_channels.count(fsid) == 0) {
    					selected_channels.try_emplace(fsid);
    				}
    				selected_channels[fsid].insert(selected.begin(), selected.end());
    			}
    		}
    	}
    	for (auto& [fsid, channels] : selected_channels) {
    		stream_->select(fsid, channels, true);
    		LOG(INFO) << "Update selections";
    		for (auto c : channels) {
    			LOG(INFO) << "  -- select " << (int)c;
    		}
    	}
    }
    
    std::vector<std::string> Feed::listSources() {
    	std::vector<std::string> sources;
    	sources.reserve(fsid_lookup_.size());
    	for (auto& [uri, fsid] : fsid_lookup_) {
    		std::ignore = fsid;
    		sources.push_back(uri);
    	}
    	return sources;
    }
    
    Feed::Filter* Feed::filter(const std::unordered_set<uint32_t> &framesets,
    		const std::unordered_set<Channel> &channels) {
    
    	auto* filter = new Filter(this, framesets, channels);
    	std::unique_lock<std::mutex> lk(mtx_);
    	filters_.push_back(filter);
    	select();
    	return filter;
    }
    
    Feed::Filter* Feed::filter(const std::unordered_set<Channel> &channels) {
    	return filter(std::unordered_set<uint32_t>{}, channels);
    }
    
    Feed::Filter* Feed::filter(const std::unordered_set<std::string> &sources, const std::unordered_set<Channel> &channels) {
    	std::unordered_set<uint32_t> fsids;
    	for (const auto &src : sources) {
    		fsids.emplace(fsid_lookup_.at(src));
    	}
    	return filter(fsids, channels);
    }
    
    ftl::operators::Graph* Feed::addPipeline(uint32_t fsid) {
    	std::unique_lock<std::mutex> lk(mtx_);
    	if (pre_pipelines_.count(fsid) != 0) {
    		delete pre_pipelines_[fsid];
    	}
    
    	pre_pipelines_[fsid] = ftl::config::create<ftl::operators::Graph>
    		(this, std::string("pre_filters") + std::to_string(fsid));
    
    	return pre_pipelines_[fsid];
    }
    
    void Feed::removeFilter(Feed::Filter* filter) {
    	std::unique_lock<std::mutex> lk(mtx_);
    	auto iter = std::find(filters_.begin(), filters_.end(), filter);
    	if (iter != filters_.end()) {
    		filters_.erase(iter);
    		delete filter;
    	}
    }
    
    void Feed::updateNetSources() {
    	netcams_ =
    		net_->findAll<std::string>("list_streams");
    
    	if (false) {  // TODO: Allow auto net add
    		for (auto s : netcams_) {
    			if (fsid_lookup_.count(s) == 0) {
    				auto *stream = ftl::create<ftl::stream::Net>
    					(this, std::string("netstream")
    					+std::to_string(fsid_lookup_.size()), net_);
    
    				int fsid = allocateFrameSetId();
    
    				stream->set("uri", s);
    				add(fsid, s, stream);
    
    				LOG(INFO)	<< "Add Stream: "
    							<< stream->value("uri", std::string("NONE"))
    							<< " (" << fsid << ")";
    
    				cv_net_connect_.notify_one();
    			}
    			else {
    				LOG(INFO) << "Stream exists: " << s;
    			}
    		}
    	}
    
    	ftl::pool.push([this](int id) {
    		new_sources_cb_.trigger(0);
    	});
    
    	/* done by add()
    	if (n > 0) {
    		stream_->begin();
    	}*/
    }
    
    std::vector<std::string> Feed::availableNetworkSources() {
    	return netcams_;
    }
    
    std::vector<std::string> Feed::availableFileSources() {
    	return {};
    }
    
    std::vector<std::string> Feed::availableDeviceSources() {
    	return {};
    }
    
    bool Feed::sourceAvailable(const std::string &uri) {
    	return false;
    }
    
    bool Feed::sourceActive(const std::string &uri) {
    	return fsid_lookup_.count(uri) > 0;
    }
    
    std::string Feed::getName(const std::string &uri) {
    	return "No Name";
    }
    
    void Feed::add(uint32_t fsid, const std::string &uri, ftl::stream::Stream* stream) {
    	fsid_lookup_[uri] = fsid;
    	streams_[fsid] = stream;
    
    	stream_->add(stream, fsid);
    	stream_->begin();
    	stream_->select(fsid, {Channel::Colour}, true);
    }
    
    uint32_t Feed::add(const std::string &path) {
    	std::unique_lock<std::mutex> lk(mtx_);
    
    	if (fsid_lookup_.count(path) > 0) return fsid_lookup_[path];
    
    	ftl::URI uri(path);
    	const auto scheme = uri.getScheme();
    
    	if ((scheme == ftl::URI::SCHEME_OTHER) || // assumes relative path
    		(scheme == ftl::URI::SCHEME_FILE)) {
    
    		auto eix = path.find_last_of('.');
    		auto ext = path.substr(eix+1);
    
    		if (ext != "ftl") {
    			throw ftl::exception("bad filename (expects .ftl)");
    		}
    
    		const int fsid = allocateFrameSetId();
    		auto* fstream = ftl::create<ftl::stream::File>
    			(this, std::string("ftlfile-") + std::to_string(fsid));
    
    		if (scheme == ftl::URI::SCHEME_OTHER) {
    			fstream->set("filename", path);
    		}
    		else {
    			// possible BUG: uri.getPath() might return (wrong) absolute paths
    			// for relative paths (extra / at beginning)
    			fstream->set("filename", uri.getPath());
    		}
    
    		// TODO: URI normalization; should happen in add(,,) or add(,,,) take
    		// ftl::URI instead of std::string as argument. Note the bug above.
    		// TODO: write unit test for uri parsing
    		add(fsid, path, fstream);
    		return fsid;
    	}
    	else if (scheme == ftl::URI::SCHEME_DEVICE) {
    		int fsid = allocateFrameSetId();
    		fsid_lookup_[path] = fsid; // Manually add mapping
    
    		std::string srcname = std::string("source") + std::to_string(fsid);
    		uri.to_json(getConfig()[srcname]);
    
    		// Make the source object
    		ftl::data::DiscreteSource *source;
    
    		lk.unlock();
    
    		if (uri.getBaseURI() == "device:render") {
    			// TODO: Use a ManualSourceBuilder and tick in draw thread. Also
    			// need to keep all such pointers to render sources to gain access
    			// to the texture objects for use by Camera.
    			source = ftl::create<ftl::render::Source>(this, srcname, this);
    		} else {
    			source = ftl::create<ftl::rgbd::Source>(this, srcname, net_);
    		}
    
    		// Create local builder instance
    		auto *creator = new ftl::streams::IntervalSourceBuilder(pool_.get(), fsid, {source});
    		std::shared_ptr<ftl::streams::BaseBuilder> creatorptr(creator);
    
    		lk.lock();
    		receiver_->registerBuilder(creatorptr);
    
    		creator->start();
    
    		// TODO: Maintain a record or list of sources for control / removal purposes
    		// Perhaps just maintain a list of creators
    		return fsid;
    	}
    
    	else if ((scheme == ftl::URI::SCHEME_TCP) ||
    			 (scheme == ftl::URI::SCHEME_WS)) {
    
    		// just connect, onConnect callback will add the stream
    		// TODO: do not connect same uri twice
    		// TODO: write unit test
    
    		net_->connect(path)->waitConnection();
    
    	}
    	else if (scheme == ftl::URI::SCHEME_FTL) {
    		auto *stream = ftl::create<ftl::stream::Net>
    			(this, std::string("netstream")
    			+std::to_string(fsid_lookup_.size()), net_);
    
    		int fsid = allocateFrameSetId();
    
    		stream->set("uri", path);
    		add(fsid, path, stream);
    
    		LOG(INFO)	<< "Add Stream: "
    					<< stream->value("uri", std::string("NONE"))
    					<< " (" << fsid << ")";
    	}
    	else{
    		throw ftl::exception("bad uri");
    	}
    	return -1;
    }
    
    uint32_t Feed::getID(const std::string &source) {
    	return fsid_lookup_.at(source);
    }
    
    const std::unordered_set<Channel> Feed::availableChannels(ftl::data::FrameID id) {
    	ftl::data::FrameSetPtr fs;
    	std::atomic_store(&fs, latest_[id.frameset()]);
    	if (fs && fs->hasFrame(id.source())) {
    		return (*fs.get())[id.source()].available();
    	}
    	return {};
    }
    
    std::vector<ftl::data::FrameID> Feed::listFrames() {
    	std::vector<ftl::data::FrameID> result;
    	result.reserve(fsid_lookup_.size());
    	for (const auto [k, fs] : latest_) {
    		for (unsigned i = 0; i < fs->frames.size(); i++) {
    			result.push_back(ftl::data::FrameID(fs->frameset(), i));
    		}
    	}
    	return result;
    }
    
    std::string Feed::getURI(uint32_t fsid) {
    	for (const auto& [k, v] : fsid_lookup_) {
    		if (v == fsid) {
    			return k;
    		}
    	}
    	return "";
    }