#include <loguru.hpp>
#include <nlohmann/json.hpp>
#include <ftl/streams/feed.hpp>
#include <ftl/streams/renderer.hpp>

#include <ftl/streams/netstream.hpp>
#include <ftl/streams/filestream.hpp>

#include "ftl/operators/colours.hpp"
#include "ftl/operators/segmentation.hpp"
#include "ftl/operators/mask.hpp"
#include "ftl/operators/antialiasing.hpp"
#include <ftl/operators/smoothing.hpp>
#include <ftl/operators/disparity.hpp>
#include <ftl/operators/detectandtrack.hpp>
#include <ftl/operators/weighting.hpp>
#include <ftl/operators/mvmls.hpp>
#include <ftl/operators/clipping.hpp>
#include <ftl/operators/poser.hpp>
#include <ftl/operators/gt_analysis.hpp>

using ftl::stream::Feed;
using ftl::codecs::Channel;

//static nlohmann::json feed_config;

////////////////////////////////////////////////////////////////////////////////

Feed::Filter::Filter(Feed* feed, const std::unordered_set<uint32_t>& sources, const std::unordered_set<Channel>& channels) :
		feed_(feed), channels_(channels), channels_available_(channels), sources_(sources) {

};

Feed::Filter::~Filter() {

}

void Feed::Filter::remove() {
	return feed_->removeFilter(this);
}

void Feed::Filter::on(const ftl::data::FrameSetCallback &cb) {
	UNIQUE_LOCK(feed_->mtx_, lk);

	if (std::find(feed_->filters_.begin(), feed_->filters_.end(),this) == feed_->filters_.end()) {
		throw ftl::exception("Filter does not belong to Feed; This should never happen!");
	}

	handles_.push_back(std::move(handler_.on(cb)));
}

ftl::Handle Feed::Filter::onWithHandle(const ftl::data::FrameSetCallback &cb) {
	UNIQUE_LOCK(feed_->mtx_, lk);

	if (std::find(feed_->filters_.begin(), feed_->filters_.end(),this) == feed_->filters_.end()) {
		throw ftl::exception("Filter does not belong to Feed; This should never happen!");
	}

	return std::move(handler_.on(cb));
}

std::list<ftl::data::FrameSetPtr> Feed::Filter::getLatestFrameSets() {
	std::list<ftl::data::FrameSetPtr> results;

	SHARED_LOCK(feed_->mtx_, lk);
	if (sources_.empty()) {
		for (auto &i : feed_->latest_) {
			if (i.second) results.emplace_back(std::atomic_load(&(i.second)));
		}
	} else {
		for (auto &s : sources_) {
			auto i = feed_->latest_.find(s);
			if (i != feed_->latest_.end()) {
				if (i->second) results.emplace_back(std::atomic_load(&(i->second)));
			}
		}
	}
	return results;
}

Feed::Filter &Feed::Filter::select(const std::unordered_set<ftl::codecs::Channel> &cs) {
	UNIQUE_LOCK(feed_->mtx_, lk);
	channels_ = cs;
	feed_->select();
	return *this;
}

////////////////////////////////////////////////////////////////////////////////

Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) :
		ftl::Configurable(config), net_(net) {

	//feed_config = ftl::loadJSON(FTL_LOCAL_CONFIG_ROOT "/feed.json");
	restore(ftl::Configurable::getID(), {
		"recent_files",
		"recent_sources",
		"known_hosts",
		"known_groups",
		"auto_host_connect",
		"auto_host_sources",
		"uri",
		"recorder"
	});

	pool_ = std::make_unique<ftl::data::Pool>(3,5);

	stream_ = std::unique_ptr<ftl::stream::Muxer>
		(ftl::create<ftl::stream::Muxer>(this, "muxer"));

	interceptor_ = std::unique_ptr<ftl::stream::Intercept>
		(ftl::create<ftl::stream::Intercept>(this, "intercept"));

	receiver_ = std::unique_ptr<ftl::stream::Receiver>
		(ftl::create<ftl::stream::Receiver>(this, "receiver", pool_.get()));

	sender_ = std::unique_ptr<ftl::stream::Sender>
		(ftl::create<ftl::stream::Sender>(this, "sender"));

	recorder_ = std::unique_ptr<ftl::stream::Sender>
		(ftl::create<ftl::stream::Sender>(this, "recorder"));
	record_stream_ = std::unique_ptr<ftl::stream::Broadcast>
		(ftl::create<ftl::stream::Broadcast>(this, "record_stream"));
	recorder_->setStream(record_stream_.get());

	record_recv_handle_ = record_stream_->onPacket([this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
		receiver_->processPackets(spkt, pkt);
		return true;
	});

	record_filter_ = nullptr;

	//interceptor_->setStream(stream_.get());
	receiver_->setStream(stream_.get());
	sender_->setStream(stream_.get());

	handle_sender_ = pool_->onFlush([this]
			(ftl::data::Frame &f, ftl::codecs::Channel c) {

		// Send only reponse channels on a per frame basis
		if (f.mode() == ftl::data::FrameMode::RESPONSE) {
			// Remote sources need to use sender, otherwise loopback to local
			if (streams_.find(f.frameset()) != streams_.end()) {
				sender_->post(f, c);
			} else {
				receiver_->loopback(f, c);
			}
		}
		return true;
	});

	net_->onConnect([this](ftl::net::Peer *p) {
		ftl::pool.push([this,p](int id) {
			_updateNetSources(p);
		});
	});

	net_->bind("add_stream", [this](ftl::net::Peer &p, std::string uri){
		//UNIQUE_LOCK(mtx_, lk);
		_updateNetSources(&p, uri);
	});

	net_->onDisconnect([this](ftl::net::Peer *) {
		// TODO: maintain map between peer and sources, on disconnect remove all
		//       peer's source. Also map between Peers and fsids?
		//std::unique_lock<std::mutex> lk(mtx_);
	});

	handle_receiver_ = receiver_->onFrameSet(
		[this](const ftl::data::FrameSetPtr& fs) {
			if (value("drop_partial_framesets", false)) {
				if (fs->count < fs->frames.size()) {
					LOG(WARNING) << "Dropping partial frameset: " << fs->timestamp();
					return true;
				}
			}

			// FIXME: What happens if pipeline added concurrently?
			if (pre_pipelines_.count(fs->frameset()) == 1) {
				pre_pipelines_[fs->frameset()]->apply(*fs, *fs, 0);
			}

			SHARED_LOCK(mtx_, lk);

			std::atomic_store(&latest_.at(fs->frameset()), fs);

			if (fs->hasAnyChanged(Channel::Thumbnail)) {
				_saveThumbnail(fs);
			}

			for (auto* filter : filters_) {
				// TODO: smarter update (update only when changed) instead of
				// filter->channels_available_ = fs->channels();

				if (filter->sources().empty()) {
					//filter->channels_available_ = fs->channels();
					filter->handler_.trigger(fs);
				}
				else {
					// TODO: process partial/complete sets here (drop), that is
					//       intersection filter->sources() and fs->sources() is
					//       same as filter->sources().

					// TODO: reverse map source ids required here?
					for (const auto& src : filter->sources()) {
						//if (fs->hasFrame(src)) {
						if (fs->frameset() == src) {
							//filter->channels_available_ = fs->channels();
							filter->handler_.trigger(fs);
							break;
						}
					}
				}
			}

			return true;
	});

	stream_->begin();

	//if (value("auto_host_connect", true)) autoConnect();
}

Feed::~Feed() {
	UNIQUE_LOCK(mtx_, lk);
	//ftl::saveJSON(FTL_LOCAL_CONFIG_ROOT "/feed.json", feed_config);

	receiver_.reset();  // Note: Force destruction first to remove filters this way
	sender_.reset();
	recorder_.reset();

	// TODO stop everything and clean up
	// delete

	for (auto &p : pre_pipelines_) {
		delete p.second;
	}
	for (auto &d : devices_) {
		delete d.second;
	}
	for (auto &r : renderers_) {
		lk.unlock();
		delete r.second;
		lk.lock();
	}

	if (filters_.size() > 0) LOG(WARNING) << "Filters remain after feed destruct (" << filters_.size() << ")";
	for (auto* filter : filters_) {
		delete filter;
	}
	filters_.clear();

	interceptor_.reset();
	stream_.reset();
	for (auto &ls : streams_) {
		for (auto *s : ls.second) {
			delete s;
		}
	}
}

void Feed::_saveThumbnail(const ftl::data::FrameSetPtr& fs) {
	// TODO: Put thumb somewhere here...
}

uint32_t Feed::allocateFrameSetId(const std::string &group) {
	if (group.size() == 0) {
		return fs_counter_++;
	} else {
		auto i = groups_.find(group);
		if (i == groups_.end()) {
			uint32_t id = fs_counter_++;
			groups_[group] = id;
			return id;
		} else {
			return i->second;
		}
	}
}

void Feed::select() {
	std::map<uint32_t, std::unordered_set<Channel>> selected_channels;
	for (auto &filter : filters_) {
		const auto& selected = filter->channels();

		if (filter->sources().empty()) {
			// no sources: select all sources with selected channels
			for (const auto& [uri, fsid] : fsid_lookup_) {
				std::ignore = uri;
				selected_channels[fsid].insert(selected.begin(), selected.end());
			}
		}
		else {
			// sources given
			for (const auto& fsid : filter->sources()) {
				if (selected_channels.count(fsid) == 0) {
					selected_channels.try_emplace(fsid);
				}
				selected_channels[fsid].insert(selected.begin(), selected.end());
			}
		}
	}
	for (auto& [fsid, channels] : selected_channels) {
		stream_->select(fsid, channels, true);
		LOG(INFO) << "Update selections";
		for (auto c : channels) {
			LOG(INFO) << "  -- select " << (int)c;
		}
	}
}

std::vector<std::string> Feed::listSources() {
	std::vector<std::string> sources;
	SHARED_LOCK(mtx_, lk);
	sources.reserve(fsid_lookup_.size());
	for (auto& [uri, fsid] : fsid_lookup_) {
		std::ignore = fsid;
		sources.push_back(uri);
	}
	return sources;
}

Feed::Filter* Feed::filter(const std::unordered_set<uint32_t> &framesets,
		const std::unordered_set<Channel> &channels) {

	auto* filter = new Filter(this, framesets, channels);
	UNIQUE_LOCK(mtx_, lk);
	filters_.push_back(filter);
	select();
	return filter;
}

Feed::Filter* Feed::filter(const std::unordered_set<Channel> &channels) {
	return filter(std::unordered_set<uint32_t>{}, channels);
}

Feed::Filter* Feed::filter(const std::unordered_set<std::string> &sources, const std::unordered_set<Channel> &channels) {
	std::unordered_set<uint32_t> fsids;

	SHARED_LOCK(mtx_, lk);
	for (const auto &src : sources) {
		ftl::URI uri(src);

		auto i = fsid_lookup_.find(uri.getBaseURI());
		if (i != fsid_lookup_.end()) {
			fsids.emplace(i->second);
		}
	}
	return filter(fsids, channels);
}

void Feed::remove(const std::string &str) {
	uint32_t fsid;

	{
		UNIQUE_LOCK(mtx_, lk);
		auto i = fsid_lookup_.find(str);
		if (i != fsid_lookup_.end()) {
			fsid = i->second;
		} else {
			return;
		}
	}

	remove(fsid);
}

void Feed::remove(uint32_t id) {
	UNIQUE_LOCK(mtx_, lk);

	// First tell all filters
	for (auto *f : filters_) {
		if (f->sources_.empty() || f->sources_.count(id)) {
			f->remove_handler_.trigger(id);
		}
	}

	remove_sources_cb_.trigger(id);

	// TODO: Actual delete of source
	// If stream source, remove from muxer
	if (streams_.count(id)) {
		auto &streams = streams_[id];
		for (auto *s : streams) {
			stream_->remove(s);
			delete s;
		}

		streams_.erase(id);
	} else if (devices_.count(id)) {
		receiver_->removeBuilder(id);
		delete devices_[id];
		devices_.erase(id);
	} else if (renderers_.count(id)) {

	}

	if (latest_.count(id)) latest_.erase(id);

	for (auto i = fsid_lookup_.begin(); i != fsid_lookup_.end();) {
		if (i->second == id) {
			i = fsid_lookup_.erase(i);
		} else {
			++i;
		}
	}
}

ftl::operators::Graph* Feed::addPipeline(uint32_t fsid) {
	UNIQUE_LOCK(mtx_, lk);
	return _addPipeline(fsid);
}

ftl::operators::Graph* Feed::_addPipeline(uint32_t fsid) {
	if (pre_pipelines_.count(fsid) != 0) {
		delete pre_pipelines_[fsid];
	}

	if (devices_.count(fsid)) {
		pre_pipelines_[fsid] = ftl::config::create<ftl::operators::Graph>
			(devices_[fsid], std::string("pipeline"));
	} else if (renderers_.count(fsid)) {
		pre_pipelines_[fsid] = ftl::config::create<ftl::operators::Graph>
			(renderers_[fsid], std::string("pipeline"));
	} else if (streams_.count(fsid)) {
		pre_pipelines_[fsid] = ftl::config::create<ftl::operators::Graph>
			(streams_[fsid].front(), std::string("pipeline"));
	}

	//pre_pipelines_[fsid] = ftl::config::create<ftl::operators::Graph>
	//	(this, std::string("pre_filters") + std::to_string(fsid));

	return pre_pipelines_[fsid];
}

void Feed::_createPipeline(uint32_t fsid) {
	// Don't recreate if already exists
	if (pre_pipelines_.count(fsid)) return;

	LOG(INFO) << "Creating pipeline";
	auto *p = _addPipeline(fsid);

	p->append<ftl::operators::DepthChannel>("depth")->value("enabled", false);
	p->append<ftl::operators::ClipScene>("clipping")->value("enabled", false);
	p->append<ftl::operators::ColourChannels>("colour");  // Convert BGR to BGRA
	p->append<ftl::operators::DetectAndTrack>("facedetection")->value("enabled", false);
	p->append<ftl::operators::ArUco>("aruco")->value("enabled", false);
	//p->append<ftl::operators::HFSmoother>("hfnoise");
	p->append<ftl::operators::CrossSupport>("cross");
	p->append<ftl::operators::PixelWeights>("weights");
	p->append<ftl::operators::CullWeight>("remove_weights")->value("enabled", false);
	p->append<ftl::operators::DegradeWeight>("degrade");
	p->append<ftl::operators::VisCrossSupport>("viscross")->set("enabled", false);
	p->append<ftl::operators::BorderMask>("border_mask");
	p->append<ftl::operators::CullDiscontinuity>("remove_discontinuity");
	p->append<ftl::operators::MultiViewMLS>("mvmls")->value("enabled", false);
	p->append<ftl::operators::Poser>("poser")->value("enabled", true);
	p->append<ftl::operators::GTAnalysis>("gtanalyse");
}

void Feed::removeFilter(Feed::Filter* filter) {
	UNIQUE_LOCK(mtx_, lk);

	if (record_filter_ == filter) {
		_stopRecording();
	}

	auto iter = std::find(filters_.begin(), filters_.end(), filter);
	if (iter != filters_.end()) {
		filters_.erase(iter);
		delete filter;
	}
}

void Feed::_updateNetSources(ftl::net::Peer *p, const std::string &s) {
	UNIQUE_LOCK(mtx_, lk);
	netcams_.push_back(s);

	// TODO: Auto add source

	ftl::URI uri(s);
	_add_recent_source(uri)["host"] = p->getURI();

	ftl::pool.push([this](int id) {
		new_sources_cb_.trigger(0);
	});
}

void Feed::_updateNetSources(ftl::net::Peer *p) {
	//auto netcams =
	//	net_->findAll<std::string>("list_streams");

	auto peerstreams = p->call<std::vector<std::string>>("list_streams");

	UNIQUE_LOCK(mtx_, lk);
	//netcams_ = std::move(netcams);
	netcams_.insert(netcams_.end(), peerstreams.begin(), peerstreams.end());

	for (const auto &s : peerstreams) {
		ftl::URI uri(s);
		_add_recent_source(uri)["host"] = p->getURI();
	}

	/*if (value("auto_host_sources", false)) {
		for (auto s : netcams_) {
			const std::string group = uri.getAttribute<std::string>("group");

			if (fsid_lookup_.count(uri.getBaseURI()) == 0) {
				auto *stream = ftl::create<ftl::stream::Net>
					(this, std::string("netstream")
					+std::to_string(fsid_lookup_.size()), net_);

				int fsid = allocateFrameSetId(group);

				stream->set("uri", s);
				add(fsid, uri.getBaseURI(), stream);

				LOG(INFO)	<< "Add Stream: "
							<< stream->value("uri", std::string("NONE"))
							<< " (" << fsid << ")";

				//cv_net_connect_.notify_one();
			}
			else {
				LOG(INFO) << "Stream exists: " << s;
			}
		}
	}*/

	ftl::pool.push([this](int id) {
		new_sources_cb_.trigger(0);
	});

	/* done by add()
	if (n > 0) {
		stream_->begin();
	}*/
}

std::vector<std::string> Feed::availableNetworkSources() {
	SHARED_LOCK(mtx_, lk);
	return netcams_;
}

std::vector<std::string> Feed::availableGroups() {
	std::vector<std::string> groups;
	auto &known = getConfig()["known_groups"];

	for (auto &f : known.items()) {
		groups.push_back(f.key());
	}

	return groups;
}

std::vector<std::string> Feed::availableFileSources() {
	std::vector<std::string> files;
	auto &recent_files = getConfig()["recent_files"];

	for (auto &f : recent_files.items()) {
		files.push_back(f.key());
	}

	return files;
}

std::vector<std::string> Feed::knownHosts() {
	std::vector<std::string> hosts;
	auto &known = getConfig()["known_hosts"];

	for (auto &f : known.items()) {
		hosts.push_back(f.key());
	}

	return hosts;
}

std::set<ftl::stream::SourceInfo> Feed::recentSources() {
	std::set<ftl::stream::SourceInfo> result;

	auto &recent = getConfig()["recent_sources"];

	for (auto &f : recent.items()) {
		ftl::stream::SourceInfo info;
		info.uri = f.key();
		if (f.value().contains("uri")) info.uri = f.value()["uri"].get<std::string>();
		info.last_used = f.value()["last_open"].get<int64_t>();
		result.insert(info);
	}

	return result;
}

std::vector<std::string> Feed::availableDeviceSources() {
	std::vector<std::string> results;

	if (ftl::rgbd::Source::supports("device:pylon")) results.emplace_back("device:pylon");
	if (ftl::rgbd::Source::supports("device:camera")) results.emplace_back("device:camera");
	if (ftl::rgbd::Source::supports("device:stereo")) results.emplace_back("device:stereo");
	if (ftl::rgbd::Source::supports("device:screen")) results.emplace_back("device:screen");
	if (ftl::rgbd::Source::supports("device:realsense")) results.emplace_back("device:realsense");
	if (ftl::render::Source::supports("device:render")) results.emplace_back("device:render");
	if (ftl::render::Source::supports("device:openvr")) results.emplace_back("device:openvr");

	return results;
}

void Feed::autoConnect() {
	ftl::pool.push([this](int id) {
		auto &known_hosts = getConfig()["known_hosts"];

		for (auto &h : known_hosts.items()) {
			net_->connect(h.key())->noReconnect();
		}
	});
}

bool Feed::sourceAvailable(const std::string &uri) {
	return false;
}

bool Feed::sourceActive(const std::string &suri) {
	ftl::URI uri(suri);

	if (uri.getScheme() == ftl::URI::SCHEME_TCP || uri.getScheme() == ftl::URI::SCHEME_WS) {
		return net_->isConnected(uri);
	} else if (uri.getScheme() == ftl::URI::SCHEME_GROUP) {
		// Check that every constituent source is active
		auto &known = getConfig()["known_groups"];
		if (known.contains(uri.getBaseURI())) {
			auto &sources = known[uri.getBaseURI()]["sources"];

			for (auto i=sources.begin(); i!=sources.end(); ++i) {
				if (!sourceActive(i.key())) return false;
			}
		}
		return true;
	} else {
		SHARED_LOCK(mtx_, lk);
		return fsid_lookup_.count(uri.getBaseURI()) > 0;
	}
}

std::string Feed::getName(const std::string &puri) {
	ftl::URI uri(puri);

	if (uri.isValid() == false) return "Invalid";

	if (uri.getScheme() == ftl::URI::SCHEME_FTL) {
		try {
			auto *cfgble = ftl::config::find(puri);
			if (cfgble) {
				auto &j = cfgble->getConfig();
				std::string name = (j.is_structured()) ? j.value("name", j.value("uri", uri.getPathSegment(-1))) : uri.getPathSegment(-1);
				return (name.size() == 0) ? uri.getHost() : name;
			} else {
				std::string name = uri.getPathSegment(-1);
				return (name.size() == 0) ? uri.getHost() : name;
			}
			/*auto n = net_->findOne<std::string>("get_cfg", puri);
			if (n) {
				auto j = nlohmann::json::parse(*n);
				return (j.is_structured()) ? j.value("name", j.value("uri", uri.getPathSegment(-1))) : uri.getPathSegment(-1);
			}*/
		} catch (const ftl::exception &e) {
			e.ignore();
		}
		return puri;
	} else if (uri.getScheme() == ftl::URI::SCHEME_DEVICE) {
		if (uri.getPathSegment(0) == "pylon") return "Pylon";
		if (uri.getPathSegment(0) == "camera") return "Web Cam";
		if (uri.getPathSegment(0) == "stereo") return "Stereo";
		if (uri.getPathSegment(0) == "realsense") return "Realsense";
		if (uri.getPathSegment(0) == "screen") return "Screen Capture";
		if (uri.getPathSegment(0) == "render") return "3D Virtual";
		if (uri.getPathSegment(0) == "openvr") return "OpenVR";
		return "Unknown Device";
	} else if (uri.getScheme() == ftl::URI::SCHEME_FILE) {
		return getConfig()["recent_files"][uri.getBaseURI()].value("name", "FTLFile");
	} else if (uri.getScheme() == ftl::URI::SCHEME_TCP || uri.getScheme() == ftl::URI::SCHEME_WS) {
		return uri.getBaseURI();
	} else if (uri.getScheme() == ftl::URI::SCHEME_GROUP) {
		auto &groups = getConfig()["known_groups"];
		if (groups.contains(uri.getBaseURI())) {
			return uri.getPathSegment(0) + std::string(" (") + std::to_string(groups[uri.getBaseURI()]["sources"].size()) + std::string(")");
		} else {
			return uri.getPathSegment(0);
		}
	}

	return uri.getPathSegment(-1);
}

nlohmann::json &Feed::_add_recent_source(const ftl::URI &uri) {
	auto &known = getConfig()["recent_sources"];
	auto &details = known[uri.getBaseURI()];
	std::string name = uri.getPathSegment(-1);

	if (uri.hasAttribute("name")) {
		name = uri.getAttribute<std::string>("name");
	} else if (uri.getScheme() == ftl::URI::SCHEME_FILE) {
		name = name.substr(0, name.find_last_of('.'));
	}

	details["uri"] = uri.to_string();
	details["name"] = name;
	details["last_open"] = ftl::timer::get_time();

	if (uri.hasAttribute("group")) {
		std::string grpname = uri.getAttribute<std::string>("group");
		auto &groups = getConfig()["known_groups"];
		auto &grpdetail = groups[std::string("group:")+grpname];
		grpdetail["sources"][uri.getBaseURI()] = true;
	}

	return details;
}

void Feed::add(uint32_t fsid, const std::string &uri, ftl::stream::Stream* stream) {
	fsid_lookup_[uri] = fsid;
	latest_[fsid] = nullptr;
	streams_[fsid].push_back(stream);

	_createPipeline(fsid);

	stream_->add(stream, fsid);
	stream_->begin();
	stream_->select(fsid, {Channel::Colour}, true);
}


uint32_t Feed::add(const std::string &path) {
	UNIQUE_LOCK(mtx_, lk);
	ftl::URI uri(path);

	//if (!uri.isValid()) throw FTL_Error("Invalid URI: " << path);

	if (fsid_lookup_.count(uri.getBaseURI()) > 0) return fsid_lookup_[uri.getBaseURI()];

	const auto scheme = uri.getScheme();
	const std::string group = uri.getAttribute<std::string>("group");

	if ((scheme == ftl::URI::SCHEME_OTHER) || // assumes relative path
		(scheme == ftl::URI::SCHEME_FILE)) {

		auto eix = ((scheme == ftl::URI::SCHEME_OTHER) ? path : uri.getPath()).find_last_of('.');
		auto ext = ((scheme == ftl::URI::SCHEME_OTHER) ? path : uri.getPath()).substr(eix+1);

		if (ext != "ftl") {
			throw FTL_Error("Bad filename (expects .ftl) : " << path);
		}

		const int fsid = allocateFrameSetId(group);
		auto* fstream = ftl::create<ftl::stream::File>
			(this, std::string("ftlfile-") + std::to_string(file_counter_++));

		if (scheme == ftl::URI::SCHEME_OTHER) {
			fstream->set("filename", path);
		}
		else {
			// possible BUG: uri.getPath() might return (wrong) absolute paths
			// for relative paths (extra / at beginning)
#ifdef WIN32
			fstream->set("filename", uri.getPath().substr(1));
#else
			fstream->set("filename", uri.getPath());
#endif
		}

		fstream->set("uri", path);

		auto &recent_files = getConfig()["recent_files"];
		auto &file_details = recent_files[uri.getBaseURI()];
		std::string fname = uri.getPathSegment(-1);
		file_details["name"] = fname.substr(0, fname.find_last_of('.'));
		file_details["last_open"] = ftl::timer::get_time();

		_add_recent_source(uri);

		// TODO: URI normalization; should happen in add(,,) or add(,,,) take
		// ftl::URI instead of std::string as argument. Note the bug above.
		// TODO: write unit test for uri parsing
		add(fsid, uri.getBaseURI(), fstream);

		add_src_cb_.trigger(fsid);
		return fsid;
	}
	else if (scheme == ftl::URI::SCHEME_DEVICE) {
		int fsid = allocateFrameSetId("");  // TODO: Support groups with devices?
		fsid_lookup_[uri.getBaseURI()] = fsid; // Manually add mapping

		std::string srcname = std::string("source") + std::to_string(fsid);
		uri.to_json(getConfig()[srcname]);

		// Make the source object
		ftl::data::DiscreteSource *source;

		latest_[fsid] = nullptr;
		lk.unlock();

		if (uri.getBaseURI() == "device:render" || uri.getBaseURI() == "device:openvr") {
			auto *rsource = ftl::create<ftl::render::Source>(this, srcname, this);
			renderers_[fsid] = rsource;
			source = rsource;

			// Create local builder instance
			auto *creator = new ftl::streams::ManualSourceBuilder(pool_.get(), fsid, source);
			if (uri.getBaseURI() == "device:openvr") creator->setFrameRate(10000);
			else creator->setFrameRate(30);

			std::shared_ptr<ftl::streams::BaseBuilder> creatorptr(creator);
			lk.lock();
			receiver_->registerBuilder(creatorptr);

			// FIXME: pointer is deleted when removed from receiver
			render_builders_.push_back(creator);
		} else {
			auto *dsource = ftl::create<ftl::rgbd::Source>(this, srcname);
			devices_[fsid] = dsource;
			source = dsource;
			_createPipeline(fsid);

			// Create local builder instance
			auto *creator = new ftl::streams::IntervalSourceBuilder(pool_.get(), fsid, {source});
			std::shared_ptr<ftl::streams::BaseBuilder> creatorptr(creator);

			lk.lock();
			receiver_->registerBuilder(creatorptr);

			creator->start();
		}

		_add_recent_source(uri);

		add_src_cb_.trigger(fsid);
		return fsid;
	}

	else if ((scheme == ftl::URI::SCHEME_TCP) ||
			 (scheme == ftl::URI::SCHEME_WS)) {

		// just connect, onConnect callback will add the stream
		// TODO: do not connect same uri twice
		// TODO: write unit test

		auto &known_hosts = getConfig()["known_hosts"];
		auto &host_details = known_hosts[uri.getBaseURIWithUser()];
		host_details["last_open"] = ftl::timer::get_time();

		ftl::pool.push([this,path](int id) { net_->connect(path); });

	}
	else if (scheme == ftl::URI::SCHEME_FTL) {
		// Attempt to ensure connection first
		auto &known = getConfig()["recent_sources"];
		auto &details = known[uri.getBaseURI()];
		if (details.contains("host")) {
			auto *p = net_->connect(details["host"].get<std::string>());
			p->noReconnect();
			if (!p->waitConnection()) {
				throw FTL_Error("Could not connect to host " << details["host"].get<std::string>() << " for stream " << path);
			}
		} else {
			// See if it can otherwise be found?
		}

		auto *stream = ftl::create<ftl::stream::Net>
			(this, std::string("netstream")
			+std::to_string(fsid_lookup_.size()), net_);

		int fsid = allocateFrameSetId(group);

		stream->set("uri", path);
		add(fsid, uri.getBaseURI(), stream);

		LOG(INFO)	<< "Add Stream: "
					<< stream->value("uri", std::string("NONE"))
					<< " (" << fsid << ")";

		add_src_cb_.trigger(fsid);
		return fsid;
	} else if (scheme == ftl::URI::SCHEME_GROUP) {
		auto &known = getConfig()["known_groups"];
		if (known.contains(uri.getBaseURI())) {
			auto &sources = known[uri.getBaseURI()]["sources"];

			lk.unlock();
			for (auto i=sources.begin(); i!=sources.end(); ++i) {
				add(i.key());
			}

			lk.lock();
			_add_recent_source(uri);
		}
	}
	else{
		throw ftl::exception("bad uri");
	}
	return -1;
}

void Feed::render() {
	SHARED_LOCK(mtx_, lk);
	auto builders = render_builders_;
	lk.unlock();

	for (auto *r : builders) {
		r->tick();
	}
}

uint32_t Feed::getID(const std::string &source) {
	return fsid_lookup_.at(source);
}

const std::unordered_set<Channel> Feed::availableChannels(ftl::data::FrameID id) {
	ftl::data::FrameSetPtr fs;
	// FIXME: Should this be locked?
	std::atomic_store(&fs, latest_.at(id.frameset()));
	if (fs && fs->hasFrame(id.source())) {
		return (*fs.get())[id.source()].allChannels();
	}
	return {};
}

std::vector<ftl::data::FrameID> Feed::listFrames() {
	std::vector<ftl::data::FrameID> result;
	SHARED_LOCK(mtx_, lk);
	result.reserve(fsid_lookup_.size());
	for (const auto [k, fs] : latest_) {
		if (fs) {
			for (unsigned i = 0; i < fs->frames.size(); i++) {
				result.push_back(ftl::data::FrameID(k, i));
			}
		}
	}
	return result;
}

std::string Feed::getURI(uint32_t fsid) {
	SHARED_LOCK(mtx_, lk);
	for (const auto& [k, v] : fsid_lookup_) {
		if (v == fsid) {
			return k;
		}
	}
	return "";
}

std::string Feed::getSourceURI(ftl::data::FrameID id) {
	/*if (streams_.count(id.frameset())) {
		auto i = streams_.find(id.frameset());
		return i->second->getID();
	} else if (devices_.count(id.frameset())) {
		auto i = devices_.find(id.frameset());
		return i->second->getID();
	} else if (renderers_.count(id.frameset())) {
		auto i = renderers_.find(id.frameset());
		return i->second->getID();
	}*/

	return "";
}

std::vector<unsigned int> Feed::listFrameSets() {
	SHARED_LOCK(mtx_, lk);
	std::vector<unsigned int> result;
	result.reserve(fsid_lookup_.size());
	for (const auto [k, fs] : latest_) {
		if (fs) {
			result.push_back(k);
		}
	}
	return result;
}

// ==== Record =================================================================

void Feed::startRecording(Filter *f, const std::string &filename) {
	{
		UNIQUE_LOCK(mtx_, lk);
		if (_isRecording()) throw FTL_Error("Already recording, cannot record " << filename);

		record_filter_ = f;

		auto *fstream = ftl::create<ftl::stream::File>(this, "record_file");
		fstream->setMode(ftl::stream::File::Mode::Write);
		fstream->set("filename", filename);
		record_stream_->add(fstream);
		record_stream_->begin();
		recorder_->resetSender();
	}
	_beginRecord(f);
}

void Feed::startStreaming(Filter *f, const std::string &filename) {
	if (_isRecording()) throw FTL_Error("Already recording, cannot live stream: " << filename);

	// TODO: Allow net streaming
}

void Feed::startStreaming(Filter *f) {
	{
		UNIQUE_LOCK(mtx_, lk);
		if (_isRecording()) throw FTL_Error("Already recording, cannot live stream");

		record_filter_ = f;

		auto *nstream = ftl::create<ftl::stream::Net>(this, "live_stream", net_);
		nstream->set("uri", value("uri", std::string("ftl://vision.utu.fi/live")));
		record_stream_->add(nstream);
		record_stream_->begin();
		recorder_->resetSender();
	}
	_beginRecord(f);
}

void Feed::_beginRecord(Filter *f) {
	handle_record_ = f->onWithHandle([this, f](const ftl::data::FrameSetPtr &fs) {
		record_stream_->select(fs->frameset(), f->channels(), true);

		for (auto c : f->channels()) {
			if (fs->hasAnyChanged(c)) recorder_->post(*fs.get(), c);
		}
		return true;
	});
}

void Feed::stopRecording() {
	UNIQUE_LOCK(mtx_, lk);
	_stopRecording();
}

void Feed::_stopRecording() {
	handle_record_.cancel();
	record_stream_->end();

	auto garbage = record_stream_->streams();

	record_stream_->clear();

	for (auto *s : garbage) {
		delete s;
	}

	record_filter_ = nullptr;
}

bool Feed::isRecording() {
	SHARED_LOCK(mtx_, lk);
	return _isRecording();
}

bool Feed::_isRecording() {
	return record_stream_->streams().size() != 0;
}