Skip to content
Snippets Groups Projects
net.cpp 13.9 KiB
Newer Older
#include "net.hpp"
Nicolas Pope's avatar
Nicolas Pope committed
#include <vector>
#include <thread>
#include <chrono>
#include <tuple>
#include <bitset>
Nicolas Pope's avatar
Nicolas Pope committed

#include "colour.hpp"

#include <ftl/rgbd/streamer.hpp>

using ftl::rgbd::detail::NetFrame;
using ftl::rgbd::detail::NetFrameQueue;
using ftl::rgbd::detail::NetSource;
Nicolas Pope's avatar
Nicolas Pope committed
using ftl::net::Universe;
using ftl::UUID;
Nicolas Pope's avatar
Nicolas Pope committed
using std::string;
using ftl::rgbd::Camera;
Nicolas Pope's avatar
Nicolas Pope committed
using std::vector;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
using std::tuple;
using ftl::codecs::Channel;
Nicolas Pope's avatar
Nicolas Pope committed

// ===== NetFrameQueue =========================================================

NetFrameQueue::NetFrameQueue(int size) : frames_(size) {
	for (auto &f : frames_) f.timestamp = -1;
}

NetFrameQueue::~NetFrameQueue() {

}

NetFrame &NetFrameQueue::getFrame(int64_t ts, const cv::Size &s, int c1type, int c2type) {
	UNIQUE_LOCK(mtx_, lk);

	// Find matching timestamp
	for (auto &f : frames_) {
		if (f.timestamp == ts) return f;
	}

	int64_t oldest = ts;

	// No match so find an empty slot
	for (auto &f : frames_) {
		if (f.timestamp == -1) {
			f.timestamp = ts;
Nicolas Pope's avatar
Nicolas Pope committed
			f.chunk_count[0] = 0;
			f.chunk_count[1] = 0;
			f.chunk_total[0] = 0;
			f.chunk_total[1] = 0;
			f.channel_count = 0;
			f.tx_size = 0;
Nicolas Pope's avatar
Nicolas Pope committed
			//f.channel[0].create(s, c1type);
			//f.channel[1].create(s, c2type);
			return f;
		}
		oldest = (f.timestamp < oldest) ? f.timestamp : oldest;
	}

	// No empty slot, so give a fatal error
	for (auto &f : frames_) {
		LOG(ERROR) << "Stale frame: " << f.timestamp << " - " << f.chunk_count;

		// Force release of frame!
		if (f.timestamp == oldest) {
			f.timestamp = ts;
Nicolas Pope's avatar
Nicolas Pope committed
			f.chunk_count[0] = 0;
			f.chunk_count[1] = 0;
			f.chunk_total[0] = 0;
			f.chunk_total[1] = 0;
			f.channel_count = 0;
			f.tx_size = 0;
Nicolas Pope's avatar
Nicolas Pope committed
			//f.channel[0].create(s, c1type);
			//f.channel[1].create(s, c2type);
	}
	LOG(FATAL) << "Net Frame Queue not large enough: " << ts;
	// FIXME: (Nick) Could auto resize the queue.
	return frames_[0];  // To avoid missing return error...
}

void NetFrameQueue::freeFrame(NetFrame &f) {
	UNIQUE_LOCK(mtx_, lk);
	f.timestamp = -1;
}


// ===== NetSource =============================================================

NetSource::NetSource(ftl::rgbd::Source *host)
		: ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1), adaptive_(0), queue_(3) {
	gamma_ = host->value("gamma", 1.0f);
	temperature_ = host->value("temperature", 6500);
	default_quality_ = host->value("quality", 0);
	last_bitrate_ = 0;
Sebastian Hahta's avatar
Sebastian Hahta committed
	params_right_.width = 0;
	decoder_[0] = nullptr;
	decoder_[1] = nullptr;
Nicolas Pope's avatar
Nicolas Pope committed
	host->on("gamma", [this,host](const ftl::config::Event&) {
		gamma_ = host->value("gamma", 1.0f);
	});

	host->on("temperature", [this,host](const ftl::config::Event&) {
		temperature_ = host->value("temperature", 6500);
	});

	host->on("focal", [this,host](const ftl::config::Event&) {
		params_.fx = host_->value("focal", 400.0);
		params_.fy = params_.fx;
		host_->getNet()->send(peer_, "update_cfg", host_->getURI() + "/focal", host_->getConfig()["focal"].dump());
	});

Sebastian Hahta's avatar
Sebastian Hahta committed
	host->on("centre_x", [this,host](const ftl::config::Event&) {
		params_.cx = host_->value("centre_x", 0.0);
		host_->getNet()->send(peer_, "update_cfg", host_->getURI() + "/centre_x", host_->getConfig()["centre_x"].dump());
	});

	host->on("centre_y", [this,host](const ftl::config::Event&) {
		params_.cy = host_->value("centre_y", 0.0);
		host_->getNet()->send(peer_, "update_cfg", host_->getURI() + "/centre_y", host_->getConfig()["centre_y"].dump());
	});

	host->on("doffs", [this,host](const ftl::config::Event&) {
		params_.doffs = host_->value("doffs", params_.doffs);
		host_->getNet()->send(peer_, "update_cfg", host_->getURI() + "/doffs", host_->getConfig()["doffs"].dump());
	});

Nicolas Pope's avatar
Nicolas Pope committed
	host->on("baseline", [this,host](const ftl::config::Event&) {
		params_.baseline = host_->value("baseline", 400.0);
		host_->getNet()->send(peer_, "update_cfg", host_->getURI() + "/baseline", host_->getConfig()["baseline"].dump());
	});

Sebastian Hahta's avatar
Sebastian Hahta committed
	// Right parameters

	host->on("focal_right", [this,host](const ftl::config::Event&) {
		params_right_.fx = host_->value("focal_right", 0.0);
		params_right_.fy = params_right_.fx;
		host_->getNet()->send(peer_, "update_cfg", host_->getURI() + "/focal_right", host_->getConfig()["focal_right"].dump());
	});

	host->on("centre_x_right", [this,host](const ftl::config::Event&) {
		params_right_.cx = host_->value("centre_x_right", 0.0);
		host_->getNet()->send(peer_, "update_cfg", host_->getURI() + "/centre_x_right", host_->getConfig()["centre_x_right"].dump());
Sebastian Hahta's avatar
Sebastian Hahta committed
	host->on("centre_y_right", [this,host](const ftl::config::Event&) {
		params_right_.cy = host_->value("centre_y_right", 0.0);
		host_->getNet()->send(peer_, "update_cfg", host_->getURI() + "/centre_y_right", host_->getConfig()["centre_y_right"].dump());
	});


	host->on("quality", [this,host](const ftl::config::Event&) {
		default_quality_ = host->value("quality", 0);
	});

	abr_.setMaximumBitrate(host->value("max_bitrate", -1));
	abr_.setMinimumBitrate(host->value("min_bitrate", -1));

	_updateURI();
	h_ = host_->getNet()->onConnect([this](ftl::net::Peer *p) {
		if (active_) return;
		LOG(INFO) << "NetSource restart...";
		_updateURI();
	});
Nicolas Pope's avatar
Nicolas Pope committed
}

NetSource::~NetSource() {
	if (decoder_[0]) ftl::codecs::free(decoder_[0]);
	if (decoder_[1]) ftl::codecs::free(decoder_[1]);
	if (uri_.size() > 0) {
		host_->getNet()->unbind(uri_);
	host_->getNet()->removeCallback(h_);
/*void NetSource::_checkAdaptive(int64_t ts) {
	const int64_t current = ftl::timer::get_time();
	int64_t net_latency = current - ts;

	// Only change bit rate gradually
	if (current - last_br_change_ > ftl::rgbd::detail::kAdaptationRate) {
		// Was this frame late?
		if (adaptive_ < ftl::rgbd::detail::kMaxBitrateLevels && net_latency > ftl::rgbd::detail::kLatencyThreshold) {
			slow_log_ = (slow_log_ << 1) + 1;
			std::bitset<32> bs(slow_log_);

			// Enough late frames to reduce bit rate
			if (bs.count() > ftl::rgbd::detail::kSlowFramesThreshold) {
				adaptive_++;
				slow_log_ = 0;
				last_br_change_ = current;
				LOG(WARNING) << "Adjust bitrate to " << adaptive_;
			}
		// No late frames in recent history...
		} else if (adaptive_ > 0 && slow_log_ == 0) {
			// TODO: (Nick) Don't change bitrate up so quickly as down?
			// Try a higher bitrate again?
			adaptive_--;
		}
	}
}*/

void NetSource::_createDecoder(int chan, const ftl::codecs::Packet &pkt) {
	UNIQUE_LOCK(mutex_,lk);
	auto *decoder = decoder_[chan];
	if (decoder) {
		if (!decoder->accepts(pkt)) {
			ftl::codecs::free(decoder_[chan]);
	decoder_[chan] = ftl::codecs::allocateDecoder(pkt);
}

void NetSource::_processConfig(const ftl::codecs::Packet &pkt) {
	std::tuple<std::string, std::string> cfg;
	auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size());
	unpacked.get().convert(cfg);

	//LOG(INFO) << "Config Received: " << std::get<1>(cfg);
	// TODO: This needs to be put in safer / better location
	host_->set(std::get<0>(cfg), nlohmann::json::parse(std::get<1>(cfg)));
}

void NetSource::_processCalibration(const ftl::codecs::Packet &pkt) {
	std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, ftl::rgbd::capability_t> params;
	auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size());
	unpacked.get().convert(params);

	if (std::get<1>(params) == Channel::Left) {
		params_ = std::get<0>(params);
		capabilities_ = std::get<2>(params);
		has_calibration_ = true;
		LOG(INFO) << "Got Calibration channel: " << params_.width << "x" << params_.height;
		params_right_ = std::get<0>(params);
void NetSource::_processPose(const ftl::codecs::Packet &pkt) {
	LOG(INFO) << "Got POSE channel";
}

void NetSource::_checkDataRate(size_t tx_size, int64_t tx_latency) {
	float actual_mbps = (float(tx_size) * 8.0f * (1000.0f / float(tx_latency))) / 1048576.0f;
    float min_mbps = (float(tx_size) * 8.0f * (1000.0f / float(ftl::timer::getInterval()))) / 1048576.0f;
    if (actual_mbps < min_mbps) LOG(WARNING) << "Bitrate = " << actual_mbps << "Mbps, min required = " << min_mbps << "Mbps";
}

void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
	// Capture time here for better net latency estimate
	int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count();
	if (!active_) return;
	// Allow acccess to the raw data elsewhere...
	host_->notifyRaw(spkt, pkt);

	const ftl::codecs::Channel chan = host_->getChannel();
Nicolas Pope's avatar
Nicolas Pope committed
	const ftl::codecs::Channel rchan = spkt.channel;
	const int channum = (rchan == Channel::Colour) ? 0 : 1;
	// Deal with the special channels...
	switch (rchan) {
	case Channel::Configuration		: _processConfig(pkt); return;
	case Channel::Calibration		: _processCalibration(pkt); return;
	case Channel::Pose				: _processPose(pkt); return;
	}

	if (!has_calibration_) {
		LOG(WARNING) << "Missing calibration, skipping frame";
		return;
	}

	//LOG(INFO) << "PACKET: " << spkt.timestamp << ", " << (int)spkt.channel << ", " << (int)pkt.codec;
Nicolas Pope's avatar
Nicolas Pope committed
	
	const cv::Size size = cv::Size(ftl::codecs::getWidth(pkt.definition), ftl::codecs::getHeight(pkt.definition));
	NetFrame &frame = queue_.getFrame(spkt.timestamp, size, CV_8UC3, (isFloatChannel(chan) ? CV_32FC1 : CV_8UC3));
	if (timestamp_ > 0 && frame.timestamp <= timestamp_) {
		LOG(ERROR) << "Duplicate frame - " << frame.timestamp << " received=" << int(rchan) << " uri=" << uri_;
		return;
	}

	// Calculate how many packets to expect for this channel
	if (frame.chunk_total[channum] == 0) {
		frame.chunk_total[channum] = pkt.block_total;
	}

	// Capture tx time of first received chunk
	if (frame.chunk_count[0] == 0 && frame.chunk_count[1] == 0) {
		UNIQUE_LOCK(frame.mtx, flk);
		if (frame.chunk_count[0] == 0 && frame.chunk_count[1] == 0) {
			frame.tx_latency = int64_t(ttimeoff);
		}
	}	

	++frame.chunk_count[channum];
	if (frame.chunk_count[channum] > frame.chunk_total[channum]) {
		LOG(WARNING) << "Too many channel packets received, discarding";
		return;
	}

	// Update frame statistics
	frame.tx_size += pkt.data.size();
Nicolas Pope's avatar
Nicolas Pope committed
	frame.channel[channum].create(size, (isFloatChannel(rchan) ? CV_32FC1 : CV_8UC3));

	// Only decode if this channel is wanted.
	if (rchan == Channel::Colour || rchan == chan) {
Nicolas Pope's avatar
Nicolas Pope committed
		_createDecoder(channum, pkt);
		auto *decoder = decoder_[channum];
		if (!decoder) {
			LOG(ERROR) << "No frame decoder available";
			return;
		}
		decoder->decode(pkt, frame.channel[channum]);
	} else if (chan != Channel::None && rchan != Channel::Colour) {
		// Didn't receive correct second channel so just clear the images
		if (isFloatChannel(chan)) {
			frame.channel[1].setTo(cv::Scalar(0.0f));
			frame.channel[1].setTo(cv::Scalar(0,0,0));
Nicolas Pope's avatar
Nicolas Pope committed

	// Apply colour correction to chunk
	//ftl::rgbd::colourCorrection(tmp_rgb, gamma_, temperature_);
	// TODO:(Nick) Decode directly into double buffer if no scaling
	
	_checkDataRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff));
	if (frame.chunk_count[channum] == frame.chunk_total[channum]) ++frame.channel_count;

	// Last chunk of both channels now received, so we are done.
	if (frame.channel_count == spkt.channel_count) {
		_completeFrame(frame, now-(spkt.timestamp+frame.tx_latency));
	}
}
void NetSource::_completeFrame(NetFrame &frame, int64_t latency) {
	UNIQUE_LOCK(frame.mtx, flk);

	// Frame must not have already been freed.
	if (frame.timestamp >= 0) {
		timestamp_ = frame.timestamp;
		frame.tx_latency = latency;

		// Note: Not used currently
		adaptive_ = abr_.selectBitrate(frame);

		host_->notify(frame.timestamp, frame.channel[0], frame.channel[1]);

		queue_.freeFrame(frame);
		N_--;
Nicolas Pope's avatar
Nicolas Pope committed
void NetSource::setPose(const Eigen::Matrix4d &pose) {
	if (!active_) return;

	vector<unsigned char> vec((unsigned char*)pose.data(), (unsigned char*)(pose.data()+(pose.size())));
	try {
		if (!host_->getNet()->send(peer_, "set_pose", *host_->get<string>("uri"), vec)) {
			active_ = false;
		}
	} catch (...) {

	}
	//Source::setPose(pose);
ftl::rgbd::Camera NetSource::parameters(ftl::codecs::Channel chan) {
	if (chan == ftl::codecs::Channel::Right) {
Sebastian Hahta's avatar
Sebastian Hahta committed
		return params_right_;
	} else {
		return params_;
	}
}

void NetSource::_updateURI() {
	UNIQUE_LOCK(mutex_,lk);
	active_ = false;
	prev_chan_ = ftl::codecs::Channel::None;
	auto uri = host_->get<string>("uri");

	if (uri_.size() > 0) {
		host_->getNet()->unbind(uri_);
		auto p = host_->getNet()->findOne<ftl::UUID>("find_stream", *uri);
		if (!p) {
			LOG(ERROR) << "Could not find stream: " << *uri;
			return;
		}
		peer_ = *p;

		host_->getNet()->bind(*uri, [this](short ttimeoff, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
			_recvPacket(ttimeoff, spkt, pkt);
		N_ = 0;
		uri_ = *uri;
		active_ = true;
	} else {
		uri_ = "";
		LOG(WARNING) << "NetSource URI is missing";
	}
}

bool NetSource::compute(int n, int b) {
	// Choose highest requested number of frames
	maxN_ = std::max(maxN_,(n == -1) ? ftl::rgbd::detail::kDefaultFrameCount : n);

	// Choose best requested quality
	minB_ = std::min(minB_,(b == -1) ? int(adaptive_) : b);

	// Send k frames before end to prevent unwanted pause
	// Unless only a single frame is requested
	if ((N_ <= maxN_/2 && maxN_ > 1) || N_ == 0) {
		const ftl::codecs::Channel chan = host_->getChannel();
		N_ = maxN_;

		// Verify depth destination is of required type
		if (isFloatChannel(chan) && depth_.type() != CV_32F) {
			depth_.create(cv::Size(params_.width, params_.height), CV_32FC1);  // 0.0f
		} else if (!isFloatChannel(chan) && depth_.type() != CV_8UC3) {
			depth_.create(cv::Size(params_.width, params_.height), CV_8UC3);  // cv::Scalar(0,0,0)
		}

		if (prev_chan_ != chan) {
			host_->getNet()->send(peer_, "set_channel", *host_->get<string>("uri"), chan);
			prev_chan_ = chan;
		}

		if (!host_->getNet()->send(peer_, "get_stream",
				*host_->get<string>("uri"), maxN_, minB_,
				host_->getNet()->id(), *host_->get<string>("uri"))) {
			active_ = false;
		}
		abr_.notifyChanged();

		maxN_ = 1;  // Reset to single frame
		minB_ = 9;  // Reset to worst quality
	return true;
Nicolas Pope's avatar
Nicolas Pope committed
}

bool NetSource::isReady() {
Nicolas Pope's avatar
Nicolas Pope committed
}