Skip to content
Snippets Groups Projects
receiver.cpp 5.98 KiB
Newer Older
#include <ftl/streams/receiver.hpp>

#include "parsers.hpp"
#include "injectors.hpp"

using ftl::stream::Receiver;
using ftl::stream::Stream;
using ftl::codecs::StreamPacket;
using ftl::codecs::Packet;
using ftl::codecs::Channel;
using std::string;
using ftl::stream::parseCalibration;
using ftl::stream::parsePose;
using ftl::stream::parseConfig;
using ftl::stream::injectCalibration;
using ftl::stream::injectPose;

Receiver::Receiver(nlohmann::json &config) : ftl::Configurable(config), stream_(nullptr) {
	timestamp_ = 0;
	second_channel_ = Channel::Depth;
}

Receiver::~Receiver() {

}

/*void Receiver::_processConfig(InternalStates &frame, const ftl::codecs::Packet &pkt) {
	std::string cfg;
	auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size());
	unpacked.get().convert(cfg);

	LOG(INFO) << "Config Received: " << cfg;
	// TODO: This needs to be put in safer / better location
	//host_->set(std::get<0>(cfg), nlohmann::json::parse(std::get<1>(cfg)));
}*/

void Receiver::_createDecoder(InternalStates &frame, int chan, const ftl::codecs::Packet &pkt) {
	//UNIQUE_LOCK(mutex_,lk);
	auto *decoder = frame.decoders[chan];
	if (decoder) {
		if (!decoder->accepts(pkt)) {
			//UNIQUE_LOCK(mutex_,lk);
			ftl::codecs::free(frame.decoders[chan]);
		} else {
			return;
		}
	}

	//UNIQUE_LOCK(mutex_,lk);
	frame.decoders[chan] = ftl::codecs::allocateDecoder(pkt);
}

Receiver::InternalStates::InternalStates() {
	for (int i=0; i<32; ++i) decoders[i] = nullptr;
}

Receiver::InternalStates &Receiver::_getFrame(const StreamPacket &spkt) {
	UNIQUE_LOCK(mutex_, lk);
	while (frames_.size() <= spkt.frameNumber()) {
		//frames_.resize(spkt.frameNumber()+1);
		frames_.push_back(new InternalStates);
		frames_[frames_.size()-1]->state.set("name",std::string("Source ")+std::to_string(spkt.frameNumber()+1));
	}
	auto &f = *frames_[spkt.frameNumber()];
	if (!f.frame.origin()) f.frame.setOrigin(&f.state);
	return f;
}

void Receiver::setStream(ftl::stream::Stream *s) {
	if (stream_) {
		stream_->onPacket(nullptr);
	}

	stream_ = s;

	s->onPacket([this](const StreamPacket &spkt, const Packet &pkt) {	
		//const ftl::codecs::Channel chan = second_channel_;
		const ftl::codecs::Channel rchan = spkt.channel;
		const unsigned int channum = (unsigned int)spkt.channel;

		//LOG(INFO) << "PACKET: " << spkt.timestamp << ", " << (int)spkt.channel << ", " << (int)pkt.codec;

		// TODO: Allow for multiple framesets
		if (spkt.frameSetID() > 0) return;

		// Too many frames, so ignore.
		if (spkt.frameNumber() >= value("max_frames",32)) return;

		// Dummy no data packet.
		if (pkt.data.size() == 0) return;

		InternalStates &frame = _getFrame(spkt);

		//if (spkt.timestamp > frame.timestamp && !frame.completed.empty()) {
		//	LOG(WARNING) << "Next frame received";
			//return;
		//}

		// Deal with the special channels...
		switch (rchan) {
		case Channel::Configuration		: frame.state.getConfig() = nlohmann::json::parse(parseConfig(pkt)); return;
		case Channel::Calibration		: frame.state.getLeft() = parseCalibration(pkt); return;
		case Channel::Calibration2		: frame.state.getRight() = parseCalibration(pkt); return;
		case Channel::Pose				: frame.state.getPose() = parsePose(pkt); return;
		default: break;
		}

		// Packets are for unwanted channel.
		//if (rchan != Channel::Colour && rchan != chan) return;

		if (frame.frame.hasChannel(spkt.channel)) {
			// FIXME: Is this a corruption in recording or in playback?
			// Seems to occur in same place in ftl file, one channel is missing
			LOG(ERROR) << "Previous frame not complete: " << frame.timestamp;
			//LOG(ERROR) << " --- " << (string)spkt;
			UNIQUE_LOCK(frame.mutex, lk);
			frame.frame.reset();
			frame.completed.clear();
		}
		frame.timestamp = spkt.timestamp;

		// Add channel to frame and allocate memory if required
		const cv::Size size = cv::Size(ftl::codecs::getWidth(pkt.definition), ftl::codecs::getHeight(pkt.definition));
		frame.frame.create<cv::cuda::GpuMat>(spkt.channel).create(size, (isFloatChannel(rchan) ? CV_32FC1 : CV_8UC4));

		Packet tmppkt = pkt;
		frame.frame.pushPacket(spkt.channel, tmppkt);

		//LOG(INFO) << " CODEC = " << (int)pkt.codec << " " << (int)pkt.flags << " " << (int)spkt.channel;

		// Do the actual decode
		_createDecoder(frame, channum, pkt);
		auto *decoder = frame.decoders[channum];
		if (!decoder) {
			LOG(ERROR) << "No frame decoder available";
			return;
		}

		try {
			//LOG(INFO) << "TYPE = " << frame.frame.get<cv::cuda::GpuMat>(spkt.channel).type();
			decoder->decode(pkt, frame.frame.get<cv::cuda::GpuMat>(spkt.channel));
		} catch (std::exception &e) {
			LOG(ERROR) << "Decode failed for " << spkt.timestamp << ": " << e.what();
		}

		frame.completed += spkt.channel;
		
		// Complete if all requested frames are found
		auto sel = stream_->selected(spkt.frameSetID());

		if ((frame.completed & sel) == sel) {
			UNIQUE_LOCK(frame.mutex, lk);
			if ((frame.completed & sel) == sel) {
				timestamp_ = frame.timestamp;

				//LOG(INFO) << "BUILDER PUSH: " << timestamp_ << ", " << spkt.frameNumber();

				if (frame.state.getLeft().width == 0) {
					LOG(WARNING) << "Missing calibration, skipping frame";
					//frame.frame.reset();
					//frame.completed.clear();
					//return;
				}

				// TODO: Have multiple builders for different framesets.
				builder_.push(frame.timestamp, spkt.frameNumber(), frame.frame);

				// Check for any state changes and send them back
				if (frame.state.hasChanged(Channel::Pose)) injectPose(stream_, frame.frame, frame.timestamp, spkt.frameNumber());
				if (frame.state.hasChanged(Channel::Calibration)) injectCalibration(stream_, frame.frame, frame.timestamp, spkt.frameNumber());
				if (frame.state.hasChanged(Channel::Calibration2)) injectCalibration(stream_, frame.frame, frame.timestamp, spkt.frameNumber(), true);

				frame.frame.reset();
				frame.completed.clear();
			}
		}
	});
}

size_t Receiver::size() {
	return builder_.size();
}

ftl::rgbd::FrameState &Receiver::state(int ix) {
	return builder_.state(ix);
}

void Receiver::onFrameSet(const ftl::rgbd::VideoCallback &cb) {
	builder_.onFrameSet(cb);
}