Skip to content
Snippets Groups Projects
sender.cpp 5.4 KiB
Newer Older
#include <ftl/streams/sender.hpp>

#include "injectors.hpp"

using ftl::stream::Sender;
using ftl::codecs::StreamPacket;
using ftl::codecs::Packet;
using ftl::codecs::Channels;
using ftl::codecs::Channel;
using ftl::codecs::definition_t;
using ftl::codecs::device_t;
using ftl::codecs::codec_t;
using ftl::stream::injectCalibration;
using ftl::stream::injectPose;
using ftl::stream::injectConfig;

Sender::Sender(nlohmann::json &config) : ftl::Configurable(config), stream_(nullptr) {
	//do_inject_ = false;
}

Sender::~Sender() {
    // Delete all encoders
}

/*void Sender::onStateChange(const std::function<void(ftl::codecs::Channel,const ftl::rgbd::FrameState&)> &cb) {
	if (cb && state_cb_) throw ftl::exception("State change callback already set");
	state_cb_ = cb;
}*/

void Sender::setStream(ftl::stream::Stream*s) {
	if (stream_) stream_->onPacket(nullptr);
    stream_ = s;
	stream_->onPacket([this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
		LOG(INFO) << "SENDER REQUEST : " << (int)spkt.channel;

		//if (state_cb_) state_cb_(spkt.channel, spkt.streamID, spkt.frame_number);

		// Inject state packets
		//do_inject_ = true;
		do_inject_.clear();
	});
}

template <typename T>
static void writeValue(std::vector<unsigned char> &data, T value) {
	unsigned char *pvalue_start = (unsigned char*)&value;
	data.insert(data.end(), pvalue_start, pvalue_start+sizeof(T));
}

static void mergeNALUnits(const std::list<ftl::codecs::Packet> &pkts, ftl::codecs::Packet &pkt) {
	size_t size = 0;
	for (auto i=pkts.begin(); i!=pkts.end(); ++i) size += (*i).data.size();

	// TODO: Check Codec, jpg etc can just use last frame.
	// TODO: Also if iframe, can just use that instead

	const auto &first = pkts.front();
	pkt.codec = first.codec;
	pkt.definition = first.definition;
	pkt.frame_count = first.frame_count;
	pkt.bitrate = first.bitrate;
	pkt.flags = first.flags | ftl::codecs::kFlagMultiple;  // means merged packets
	pkt.data.reserve(size+pkts.size()*sizeof(int));

	for (auto i=pkts.begin(); i!=pkts.end(); ++i) {
		writeValue<int>(pkt.data, (*i).data.size());
		//LOG(INFO) << "NAL Count = " << (*i).data.size();
		pkt.data.insert(pkt.data.end(), (*i).data.begin(), (*i).data.end());
	}
}

void Sender::post(const ftl::rgbd::FrameSet &fs) {
    if (!stream_) return;

    Channels selected;
	if (stream_->size() > 0) selected = stream_->selected(0);

	Channels available;  // but not selected and actually sent.

	bool do_inject = !do_inject_.test_and_set();
	//do_inject_ = false;

    for (int i=0; i<fs.frames.size(); ++i) {
        const auto &frame = fs.frames[i];

		if (do_inject) {
			//LOG(INFO) << "Force inject calibration";
			injectCalibration(stream_, fs, i);
			injectCalibration(stream_, fs, i, true);
			injectPose(stream_, fs, i);
			injectConfig(stream_, fs, i);
		} else {
			if (frame.hasChanged(Channel::Pose)) injectPose(stream_, fs, i);
			if (frame.hasChanged(Channel::Calibration)) injectCalibration(stream_, fs, i);
			if (frame.hasChanged(Channel::Calibration2)) injectCalibration(stream_, fs, i, true);
			if (frame.hasChanged(Channel::Configuration)) injectConfig(stream_, fs, i);
		}

        for (auto c : frame.getChannels()) {
			if (selected.has(c)) {
				// FIXME: Sends high res colour, but receive end currently broken
				//auto cc = (c == Channel::Colour && frame.hasChannel(Channel::ColourHighRes)) ? Channel::ColourHighRes : Channel::Colour;
				auto cc = c;

				StreamPacket spkt;
				spkt.version = 4;
				spkt.timestamp = fs.timestamp;
				spkt.streamID = 0; //fs.id;
				spkt.frame_number = i;
				spkt.channel = c;

				// Check if there are existing encoded packets
				const auto &packets = frame.getPackets(cc);
				if (packets.size() > 0) {
					if (packets.size() > 1) {
						LOG(WARNING) << "Multi-packet send";
						ftl::codecs::Packet pkt;
						mergeNALUnits(packets, pkt);
						stream_->post(spkt, pkt);
					} else {
						// Send existing encoding instead of re-encoding
						//for (auto &pkt : packets) {
						stream_->post(spkt, packets.front());
						//}
					}
				} else  {
					auto *enc = _getEncoder(fs.id, i, cc);

					if (enc) {
						// FIXME: Timestamps may not always be aligned to interval.
						//if (do_inject || fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc->reset();
						if (do_inject) enc->reset();
						try {
							enc->encode(frame.get<cv::cuda::GpuMat>(cc), 0, [this,&spkt](const ftl::codecs::Packet &pkt){
								stream_->post(spkt, pkt);
							});
						} catch (std::exception &e) {
							LOG(ERROR) << "Exception in encoder: " << e.what();
						}
					} else {
						LOG(ERROR) << "No encoder";
					}
				}
			} else {
				available += c;
			}
        }
    }

	for (auto c : available) {
		// Not selected so send an empty packet...
		StreamPacket spkt;
		spkt.version = 4;
		spkt.timestamp = fs.timestamp;
		spkt.streamID = 0; // FIXME: fs.id;
		spkt.frame_number = 255;
		spkt.channel = c;

		Packet pkt;
		pkt.codec = codec_t::Any;
		pkt.definition = definition_t::Any;
		pkt.frame_count = 1;
		pkt.bitrate = 0;
		stream_->post(spkt, pkt);
	}

	//do_inject_ = false;
}

ftl::codecs::Encoder *Sender::_getEncoder(int fsid, int fid, Channel c) {
	int id = (fsid << 16) | (fid << 8) | (int)c;

	{
		SHARED_LOCK(mutex_, lk);
		auto i = encoders_.find(id);
		if (i != encoders_.end()) {
			return (*i).second;
		}
	}

	auto *enc = ftl::codecs::allocateEncoder(
					definition_t::HD1080, device_t::Any, codec_t::Any);
	UNIQUE_LOCK(mutex_, lk);
	encoders_[id] = enc;
	return enc;
}