From 26a45584f57e1a138dd3ebb16bf61a36fb404e08 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Mon, 13 Jul 2020 11:22:37 +0300
Subject: [PATCH] WIP Change FTL format to include complete flag

---
 applications/vision/src/main.cpp              |  2 +-
 .../codecs/include/ftl/codecs/codecs.hpp      |  1 +
 .../codecs/include/ftl/codecs/packet.hpp      | 27 +++++-
 .../streams/include/ftl/streams/sender.hpp    | 13 ++-
 components/streams/src/filestream.cpp         | 29 ++++++-
 components/streams/src/receiver.cpp           | 70 ++++++---------
 components/streams/src/sender.cpp             | 85 +++++++++----------
 components/streams/test/receiver_unit.cpp     | 13 +++
 components/streams/test/sender_unit.cpp       | 28 +++---
 .../include/ftl/data/new_frameset.hpp         |  2 +-
 components/structures/src/frameset.cpp        |  4 +-
 11 files changed, 160 insertions(+), 114 deletions(-)

diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp
index 7a8d79b1d..320d3781b 100644
--- a/applications/vision/src/main.cpp
+++ b/applications/vision/src/main.cpp
@@ -172,7 +172,7 @@ static void run(ftl::Configurable *root) {
 				case Channel::Colour		:
 				case Channel::Colour2		:
 				case Channel::Depth			: sender->post(fs, c, true); break;
-				default: break;
+				default						: sender->fakePost(fs, c);
 				}
 			}
 		}
diff --git a/components/codecs/include/ftl/codecs/codecs.hpp b/components/codecs/include/ftl/codecs/codecs.hpp
index 53ae24568..ad49ef6e0 100644
--- a/components/codecs/include/ftl/codecs/codecs.hpp
+++ b/components/codecs/include/ftl/codecs/codecs.hpp
@@ -23,6 +23,7 @@ static constexpr uint8_t kFlagStereo = 0x20;		// Left-Right stereo in single cha
 static constexpr uint8_t kFlagMultiple = 0x80;		// Multiple video frames in single packet
 
 static constexpr uint8_t kFlagRequest = 0x01;		// Used for empty data packets to mark a request for data
+static constexpr uint8_t kFlagCompleted = 0x02;		// Last packet for timestamp
 
 /**
  * Compression format used.
diff --git a/components/codecs/include/ftl/codecs/packet.hpp b/components/codecs/include/ftl/codecs/packet.hpp
index f9aea4ca6..8685ac511 100644
--- a/components/codecs/include/ftl/codecs/packet.hpp
+++ b/components/codecs/include/ftl/codecs/packet.hpp
@@ -18,7 +18,7 @@ static constexpr uint8_t kAllFramesets = 255;
  */
 struct Header {
 	const char magic[4] = {'F','T','L','F'};
-	uint8_t version = 4;
+	uint8_t version = 5;
 };
 
 /**
@@ -47,6 +47,28 @@ struct Packet {
 
 static constexpr unsigned int kStreamCap_Static = 0x01;
 
+/** V4 packets have no stream flags field */
+struct StreamPacketV4 {
+	int version;			// FTL version, Not encoded into stream
+
+	int64_t timestamp;
+	uint8_t streamID;  		// Source number [or v4 frameset id]
+	uint8_t frame_number;	// v4+ First frame number (packet may include multiple frames)
+	ftl::codecs::Channel channel;		// Actual channel of this current set of packets
+
+	inline int frameNumber() const { return (version >= 4) ? frame_number : streamID; }
+	inline size_t frameSetID() const { return (version >= 4) ? streamID : 0; }
+	inline int64_t localTimestamp() const { return timestamp + originClockDelta; }
+
+	int64_t originClockDelta;  		// Not message packet / saved
+	unsigned int hint_capability;	// Is this a video stream, for example
+	size_t hint_source_total;		// Number of tracks per frame to expect
+
+	MSGPACK_DEFINE(timestamp, streamID, frame_number, channel);
+
+	operator std::string() const;
+};
+
 /**
  * Add timestamp and channel information to a raw encoded frame packet. This
  * allows the packet to be located within a larger stream and should be sent
@@ -59,6 +81,7 @@ struct StreamPacket {
 	uint8_t streamID;  		// Source number [or v4 frameset id]
 	uint8_t frame_number;	// v4+ First frame number (packet may include multiple frames)
 	ftl::codecs::Channel channel;		// Actual channel of this current set of packets
+	uint8_t flags=0;
 
 	inline int frameNumber() const { return (version >= 4) ? frame_number : streamID; }
 	inline size_t frameSetID() const { return (version >= 4) ? streamID : 0; }
@@ -68,7 +91,7 @@ struct StreamPacket {
 	unsigned int hint_capability;	// Is this a video stream, for example
 	size_t hint_source_total;		// Number of tracks per frame to expect
 
-	MSGPACK_DEFINE(timestamp, streamID, frame_number, channel);
+	MSGPACK_DEFINE(timestamp, streamID, frame_number, channel, flags);
 
 	operator std::string() const;
 };
diff --git a/components/streams/include/ftl/streams/sender.hpp b/components/streams/include/ftl/streams/sender.hpp
index 06ecda858..76a75a2bc 100644
--- a/components/streams/include/ftl/streams/sender.hpp
+++ b/components/streams/include/ftl/streams/sender.hpp
@@ -31,6 +31,11 @@ class Sender : public ftl::Configurable {
 	 */
 	void post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode=false);
 
+	/**
+	 * Mark channel as posted without sending anything.
+	 */
+	void fakePost(ftl::data::FrameSet &fs, ftl::codecs::Channel c);
+
 	/**
 	 * Make the channel available in the stream even if not available locally.
 	 */
@@ -74,11 +79,11 @@ class Sender : public ftl::Configurable {
 	std::unordered_map<int, AudioState> audio_state_;
 
 	//ftl::codecs::Encoder *_getEncoder(int fsid, int fid, ftl::codecs::Channel c);
-	void _encodeChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset);
+	void _encodeChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset, bool last_flush);
 	void _encodeChannel(ftl::data::Frame &f, ftl::codecs::Channel c, bool reset);
-	void _encodeVideoChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset);
-	void _encodeAudioChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset);
-	void _encodeDataChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset);
+	void _encodeVideoChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset, bool last_flush);
+	void _encodeAudioChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset, bool last_flush);
+	void _encodeDataChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset, bool last_flush);
 	void _encodeDataChannel(ftl::data::Frame &fs, ftl::codecs::Channel c, bool reset);
 
 	int _generateTiles(const ftl::rgbd::FrameSet &fs, int offset, ftl::codecs::Channel c, cv::cuda::Stream &stream, bool, bool);
diff --git a/components/streams/src/filestream.cpp b/components/streams/src/filestream.cpp
index acadd13ef..95a15f668 100644
--- a/components/streams/src/filestream.cpp
+++ b/components/streams/src/filestream.cpp
@@ -148,7 +148,23 @@ bool File::readPacket(std::tuple<ftl::codecs::StreamPacket,ftl::codecs::Packet>
 		msgpack::object obj = msg.get();
 
 		try {
-			obj.convert(data);
+			// Older versions have a different SPKT structure.
+			if (version_ < 5) {
+				std::tuple<ftl::codecs::StreamPacketV4, ftl::codecs::Packet> datav4;
+				obj.convert(datav4);
+
+				auto &spkt = std::get<0>(data);
+				auto &spktv4 = std::get<0>(datav4);
+				spkt.streamID = spktv4.streamID;
+				spkt.channel = spktv4.channel;
+				spkt.frame_number = spktv4.frame_number;
+				spkt.timestamp = spktv4.timestamp;
+				spkt.flags = 0;
+
+				std::get<1>(data) = std::move(std::get<1>(datav4));
+			} else {
+				obj.convert(data);
+			}
 		} catch (std::exception &e) {
 			LOG(INFO) << "Corrupt message: " << buffer_in_.nonparsed_size() << " - " << e.what();
 			//active_ = false;
@@ -177,7 +193,8 @@ void File::_patchPackets(ftl::codecs::StreamPacket &spkt, ftl::codecs::Packet &p
 		auto codec = pkt.codec;
 		if (codec == ftl::codecs::codec_t::HEVC) pkt.codec = ftl::codecs::codec_t::HEVC_LOSSLESS;
 	}
-	spkt.version = 4;
+
+	spkt.version = 5;
 
 	// Fix for flags corruption
 	if (pkt.data.size() == 0) {
@@ -235,6 +252,7 @@ bool File::tick(int64_t ts) {
 
 	while ((active_ && istream_->good()) || buffer_in_.nonparsed_size() > 0u) {
 		UNIQUE_LOCK(data_mutex_, dlk);
+		auto *lastData = (data_.size() > 0) ? &data_.back() : nullptr;
 		auto &data = data_.emplace_back();
 		dlk.unlock();
 
@@ -267,7 +285,12 @@ bool File::tick(int64_t ts) {
 				}
 				data_.pop_back();
 			//}
-		} else if (std::get<0>(data).timestamp > extended_ts) {
+		} else if (version_ < 5 && lastData) {
+			// For versions < 5, add completed flag to previous data
+			std::get<0>(*lastData).flags |= ftl::codecs::kFlagCompleted;
+		}
+
+		if (std::get<0>(data).timestamp > extended_ts) {
 			break;
 		}
 	}
diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp
index 9ed4242d0..b29654052 100644
--- a/components/streams/src/receiver.cpp
+++ b/components/streams/src/receiver.cpp
@@ -165,13 +165,19 @@ void Receiver::_processData(const StreamPacket &spkt, const Packet &pkt) {
 	auto &f = (spkt.frame_number == 255) ? **fs : fs->frames[spkt.frame_number];
 	f.informChange(spkt.channel, build.changeType(), pkt);
 
-	const auto *cs = stream_;
+	if (spkt.flags & ftl::codecs::kFlagCompleted) {
+		//UNIQUE_LOCK(vidstate.mutex, lk);
+		timestamp_ = spkt.timestamp;
+		fs->completed(spkt.frame_number);
+	}
+
+	/*const auto *cs = stream_;
 	const auto sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID());
 
 	if (f.hasAll(sel)) {
 		timestamp_ = spkt.timestamp;
 		fs->completed(spkt.frame_number);
-	}
+	}*/
 }
 
 ftl::audio::Decoder *Receiver::_createAudioDecoder(InternalAudioStates &frame, const ftl::codecs::Packet &pkt) {
@@ -349,50 +355,24 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) {
 	for (int i=0; i<pkt.frame_count; ++i) {
 		InternalVideoStates &vidstate = _getVideoFrame(spkt,i);
 		//auto &frame = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+i);
-		auto &frame = fs->frames[spkt.frame_number+i];
-
-		const auto *cs = stream_;
-		const auto sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID());
-
-		//frame.create<cv::cuda::GpuMat>(spkt.channel);
-
-		if (i == 0) {
-			Packet tmppkt = pkt;
-			//frame.pushPacket(spkt.channel, tmppkt);
-		}
+		//auto &frame = fs->frames[spkt.frame_number+i];
 
-		UNIQUE_LOCK(vidstate.mutex, lk);
-		//if (frame.timestamp == spkt.timestamp) {
-			//frame.completed += spkt.channel;
+		/*if (spkt.version < 5) {
+			const auto *cs = stream_;
+			const auto sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID());
 
-			// Complete if all requested channels are found
-			//if ((frame.getChannels() & sel) == sel) {
+			UNIQUE_LOCK(vidstate.mutex, lk);
 			if (frame.availableAll(sel)) {
 				timestamp_ = spkt.timestamp;
-				//frame.reset.clear();
-
-				//LOG(INFO) << "BUILDER PUSH: " << timestamp_ << ", " << spkt.frameNumber() << ", " << (int)pkt.frame_count;
-
-				//if (vidstate.state.getLeft().width == 0) {
-				//	LOG(WARNING) << "Missing calibration for frame";
-				//}
-
-				// TODO: Have multiple builders for different framesets.
-				//builder_.push(frame.timestamp, spkt.frameNumber()+i, frame.frame);
 				fs->completed(spkt.frame_number+i);
-
-				// Check for any state changes and send them back
-				//if (vidstate.state.hasChanged(Channel::Pose)) injectPose(stream_, frame, spkt.timestamp, spkt.frameNumber()+i);
-				//if (vidstate.state.hasChanged(Channel::Calibration)) injectCalibration(stream_, frame, spkt.timestamp, spkt.streamID, spkt.frameNumber()+i);
-				//if (vidstate.state.hasChanged(Channel::Calibration2)) injectCalibration(stream_, frame, spkt.timestamp, spkt.streamID, spkt.frameNumber()+i, true);
-
-				//frame.reset();
-				//frame.completed.clear();
-				//frame.timestamp = -1;
 			}
-		//} else {
-		//	LOG(ERROR) << "Frame timestamps mistmatch";
-		//}
+		}*/
+
+		if (spkt.flags & ftl::codecs::kFlagCompleted) {
+			UNIQUE_LOCK(vidstate.mutex, lk);
+			timestamp_ = spkt.timestamp;
+			fs->completed(spkt.frame_number+i);
+		}
 	}
 }
 
@@ -415,10 +395,16 @@ void Receiver::setStream(ftl::stream::Stream *s) {
 					//LOG(INFO) << "MARK " << frame.source() << " " << (int)spkt.channel;
 					frame.markAvailable(spkt.channel);
 
-					if (frame.availableAll(sel)) {
-						//LOG(INFO) << "FRAME COMPLETED " << frame.source();
+					if (spkt.flags & ftl::codecs::kFlagCompleted) {
+						//UNIQUE_LOCK(vidstate.mutex, lk);  // FIXME: Should have a lock here...
+						timestamp_ = spkt.timestamp;
 						fs->completed(frame.source());
 					}
+
+					//if (frame.availableAll(sel)) {
+						//LOG(INFO) << "FRAME COMPLETED " << frame.source();
+					//	fs->completed(frame.source());
+					//}
 				}
 			}
 			return true;
diff --git a/components/streams/src/sender.cpp b/components/streams/src/sender.cpp
index 9035cbbdd..29de82429 100644
--- a/components/streams/src/sender.cpp
+++ b/components/streams/src/sender.cpp
@@ -115,6 +115,16 @@ void Sender::_sendPersistent(ftl::data::Frame &frame) {
 	}
 }
 
+void Sender::fakePost(ftl::data::FrameSet &fs, ftl::codecs::Channel c) {
+	if (!stream_) return;
+
+	for (size_t i=0; i<fs.frames.size(); ++i) {
+		auto &frame = fs.frames[i];
+		if (frame.hasOwn(c)) ++fs.flush_count;
+		
+	}
+}
+
 void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode) {
 	if (!stream_) return;
 
@@ -136,23 +146,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode
 
 	FTL_Profile("SenderPost", 0.02);
 
-	// Send any frameset data channels
-	/*for (auto c : fs.getDataChannels()) {
-		StreamPacket spkt;
-		spkt.version = 4;
-		spkt.timestamp = fs.timestamp;
-		spkt.streamID = 0; //fs.id;
-		spkt.frame_number = 255;
-		spkt.channel = c;
-
-		ftl::codecs::Packet pkt;
-		pkt.codec = ftl::codecs::codec_t::MSGPACK;
-		pkt.frame_count = 1;
-		pkt.flags = 0;
-		pkt.bitrate = 0;
-		pkt.data = fs.getRawData(c);
-		stream_->post(spkt, pkt);
-	}*/
+	int ccount = 0;
 
 	bool available = false;
 	bool needs_encoding = true;
@@ -161,26 +155,18 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode
 
 	for (size_t i=0; i<fs.frames.size(); ++i) {
 		auto &frame = fs.frames[i];
-		if (!frame.hasOwn(c)) continue;
+		if (!frame.has(c)) continue;
 
 		++valid_frames;
+		++fs.flush_count;
 
 		// TODO: Send entire persistent session on inject
 		if (do_inject) {
 			_sendPersistent(frame);
-
-			//LOG(INFO) << "Force inject calibration";
-			//if (frame.has(Channel::Calibration)) injectCalibration(stream_, fs, i);
-			//if (frame.has(Channel::Calibration2)) injectCalibration(stream_, fs, i, true);
-			//if (frame.has(Channel::Pose)) injectPose(stream_, fs, i);
-			//injectConfig(stream_, fs, i);
-		} else {
-			//if (frame.changed(Channel::Pose)) injectPose(stream_, fs, i);
-			//if (frame.changed(Channel::Calibration)) injectCalibration(stream_, fs, i);
-			//if (frame.changed(Channel::Calibration2)) injectCalibration(stream_, fs, i, true);
-			//if (frame.changed(Channel::Configuration)) injectConfig(stream_, fs, i);
 		}
 
+		ccount += frame.changed().size();
+
 		// FIXME: Allow data channel selection rather than always send
 		/*for (auto c : frame.getDataChannels()) {
 			StreamPacket spkt;
@@ -206,8 +192,9 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode
 				//auto cc = (c == Channel::Colour && frame.hasChannel(Channel::ColourHighRes)) ? Channel::ColourHighRes : c;
 				auto cc = c;
 
+				// FIXME: If last completed channel has encoding, it doesn't get correct flag
 				StreamPacket spkt;
-				spkt.version = 4;
+				spkt.version = 5;
 				spkt.timestamp = fs.timestamp();
 				spkt.streamID = 0; //fs.id;
 				spkt.frame_number = i;
@@ -237,6 +224,8 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode
 
 	}
 
+	bool last_flush = ccount == fs.flush_count;
+
 	// Don't do anything if channel not in any frames.
 	if (valid_frames == 0) return;
 
@@ -249,11 +238,12 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode
 	if (available) {
 		// Not selected so send an empty packet...
 		StreamPacket spkt;
-		spkt.version = 4;
+		spkt.version = 5;
 		spkt.timestamp = fs.timestamp();
 		spkt.streamID = fs.frameset();
 		spkt.frame_number = 255;
 		spkt.channel = c;
+		spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0;
 
 		Packet pkt;
 		pkt.codec = codec_t::Any;
@@ -264,13 +254,13 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode
 	}
 
 	if (needs_encoding) {
-		_encodeChannel(fs, c, do_iframe);
+		_encodeChannel(fs, c, do_iframe, last_flush);
 	}
 }
 
 void Sender::forceAvailable(ftl::data::FrameSet &fs, ftl::codecs::Channel c) {
 	StreamPacket spkt;
-	spkt.version = 4;
+	spkt.version = 5;
 	spkt.timestamp = fs.timestamp();
 	spkt.streamID = fs.frameset();
 	spkt.frame_number = 255;
@@ -318,7 +308,7 @@ void Sender::post(ftl::data::Frame &frame, ftl::codecs::Channel c) {
 				auto cc = c;
 
 				StreamPacket spkt;
-				spkt.version = 4;
+				spkt.version = 5;
 				spkt.timestamp = frame.timestamp();
 				spkt.streamID = 0; //fs.id;
 				spkt.frame_number = frame.source();
@@ -348,7 +338,7 @@ void Sender::post(ftl::data::Frame &frame, ftl::codecs::Channel c) {
 	if (available) {
 		// Not selected so send an empty packet...
 		StreamPacket spkt;
-		spkt.version = 4;
+		spkt.version = 5;
 		spkt.timestamp = frame.timestamp();
 		spkt.streamID = frame.frameset();
 		spkt.frame_number = 255;
@@ -369,7 +359,7 @@ void Sender::post(ftl::data::Frame &frame, ftl::codecs::Channel c) {
 	//do_inject_ = false;
 }
 
-void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset) {
+void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset, bool last_flush) {
 	bool lossless = value("lossless", false);
 	int max_bitrate = std::max(0, std::min(255, value("max_bitrate", 255)));
 	//int min_bitrate = std::max(0, std::min(255, value("min_bitrate", 0)));  // TODO: Use this
@@ -397,11 +387,12 @@ void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset)
 		}
 
 		StreamPacket spkt;
-		spkt.version = 4;
+		spkt.version = 5;
 		spkt.timestamp = fs.timestamp();
 		spkt.streamID = 0; // FIXME: fs.id;
 		spkt.frame_number = offset;
 		spkt.channel = c;
+		spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0;
 
 		auto &tile = _getTile(fs.id(), cc);
 
@@ -476,7 +467,7 @@ void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset)
 	}
 }
 
-void Sender::_encodeAudioChannel(ftl::data::FrameSet &fs, Channel c, bool reset) {
+void Sender::_encodeAudioChannel(ftl::data::FrameSet &fs, Channel c, bool reset, bool last_flush) {
 	
 	// TODO: combine into multiple opus streams
 	for (size_t i=0; i<fs.frames.size(); ++i) {
@@ -487,11 +478,12 @@ void Sender::_encodeAudioChannel(ftl::data::FrameSet &fs, Channel c, bool reset)
 		//auto &settings = fs.frames[i].getSettings();
 
 		StreamPacket spkt;
-		spkt.version = 4;
+		spkt.version = 5;
 		spkt.timestamp = fs.timestamp();
 		spkt.streamID = fs.frameset();
 		spkt.frame_number = i;
 		spkt.channel = c;
+		spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0;
 
 		ftl::codecs::Packet pkt;
 		pkt.codec = ftl::codecs::codec_t::OPUS;
@@ -519,17 +511,18 @@ void Sender::_encodeAudioChannel(ftl::data::FrameSet &fs, Channel c, bool reset)
 	}
 }
 
-void Sender::_encodeDataChannel(ftl::data::FrameSet &fs, Channel c, bool reset) {
+void Sender::_encodeDataChannel(ftl::data::FrameSet &fs, Channel c, bool reset, bool last_flush) {
 	int i=0;
 
 	// TODO: Pack all frames into a single packet
 	for (auto &f : fs.frames) {
 		StreamPacket spkt;
-		spkt.version = 4;
+		spkt.version = 5;
 		spkt.timestamp = fs.timestamp();
 		spkt.streamID = fs.frameset();
 		spkt.frame_number = i++;
 		spkt.channel = c;
+		spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0;
 
 		ftl::codecs::Packet pkt;
 		pkt.frame_count = 1;
@@ -550,7 +543,7 @@ void Sender::_encodeDataChannel(ftl::data::FrameSet &fs, Channel c, bool reset)
 
 void Sender::_encodeDataChannel(ftl::data::Frame &f, Channel c, bool reset) {
 	StreamPacket spkt;
-	spkt.version = 4;
+	spkt.version = 5;
 	spkt.timestamp = f.timestamp();
 	spkt.streamID = f.frameset();
 	spkt.frame_number = f.source();
@@ -572,15 +565,15 @@ void Sender::_encodeDataChannel(ftl::data::Frame &f, Channel c, bool reset) {
 	}
 }
 
-void Sender::_encodeChannel(ftl::data::FrameSet &fs, Channel c, bool reset) {
+void Sender::_encodeChannel(ftl::data::FrameSet &fs, Channel c, bool reset, bool last_flush) {
 	int ic = int(c);
 
 	if (ic < 32) {
-		_encodeVideoChannel(fs, c, reset);
+		_encodeVideoChannel(fs, c, reset, last_flush);
 	} else if (ic < 64) {
-		_encodeAudioChannel(fs, c, reset);
+		_encodeAudioChannel(fs, c, reset, last_flush);
 	} else {
-		_encodeDataChannel(fs, c, reset);
+		_encodeDataChannel(fs, c, reset, last_flush);
 	}
 }
 
diff --git a/components/streams/test/receiver_unit.cpp b/components/streams/test/receiver_unit.cpp
index de9dd9918..a647d6ac8 100644
--- a/components/streams/test/receiver_unit.cpp
+++ b/components/streams/test/receiver_unit.cpp
@@ -100,6 +100,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) {
 		bool r = encoder.encode(m, pkt);
 		REQUIRE( r );
 
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		stream.post(spkt, pkt);
 
 		int count = 0;
@@ -130,6 +131,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) {
 		bool r = encoder.encode(m, pkt);
 		REQUIRE( r );
 
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		stream.post(spkt, pkt);
 
 		std::atomic<int> mask = 0;
@@ -155,6 +157,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) {
 		bool r = encoder.encode(m, pkt);
 		REQUIRE( r );
 
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		stream.post(spkt, pkt);
 
 		int count = 0;
@@ -191,6 +194,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) {
 		bool r = encoder.encode(m, pkt);
 		REQUIRE( r );
 
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		stream.post(spkt, pkt);
 
 		int count = 0;
@@ -228,6 +232,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) {
 		bool r = encoder.encode(m, pkt);
 		REQUIRE( r );
 
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		stream.post(spkt, pkt);
 
 		int count = 0;
@@ -325,12 +330,14 @@ TEST_CASE( "Receiver sync bugs" ) {
 		try { stream.post(spkt, pkt); } catch(...) {}
 		spkt.timestamp = 10;
 		spkt.channel = Channel::ColourHighRes;
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		try { stream.post(spkt, pkt); } catch(...) {}
 		spkt.timestamp = 20;
 		spkt.channel = Channel::Colour2;
 		try { stream.post(spkt, pkt); } catch(...) {}
 		spkt.timestamp = 20;
 		spkt.channel = Channel::Colour;
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		try { stream.post(spkt, pkt); } catch(...) {}
 
 		int i=10;
@@ -410,7 +417,9 @@ TEST_CASE( "Receiver non zero buffer" ) {
 		});
 
 		stream.post(spkt, pkt);
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		spkt.timestamp += 10;
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		stream.post(spkt, pkt);
 
 		int i=10;
@@ -465,6 +474,7 @@ TEST_CASE( "Receiver for data channels" ) {
 		ftl::util::FTLVectorBuffer buf(pkt.data);
 		msgpack::pack(buf, 5.0f);
 
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		stream.post(spkt, pkt);
 
 		int count = 0;
@@ -496,6 +506,7 @@ TEST_CASE( "Receiver for data channels" ) {
 
 		// Need to have at least one frame for this to work
 		spkt.frame_number = 0;
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		stream.post(spkt, pkt);
 
 		int count = 0;
@@ -524,6 +535,7 @@ TEST_CASE( "Receiver for data channels" ) {
 		calib.width = 1024;
 		msgpack::pack(buf, calib);
 
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		stream.post(spkt, pkt);
 
 		int count = 0;
@@ -551,6 +563,7 @@ TEST_CASE( "Receiver for data channels" ) {
 		Eigen::Matrix4d pose;
 		msgpack::pack(buf, pose);
 
+		spkt.flags |= ftl::codecs::kFlagCompleted;
 		stream.post(spkt, pkt);
 
 		int count = 0;
diff --git a/components/streams/test/sender_unit.cpp b/components/streams/test/sender_unit.cpp
index 0564992d0..5277df2db 100644
--- a/components/streams/test/sender_unit.cpp
+++ b/components/streams/test/sender_unit.cpp
@@ -95,7 +95,7 @@ TEST_CASE( "Sender::post() video frames" ) {
 		sender->post(fs, Channel::Colour);
 
 		REQUIRE( count == 1 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
@@ -122,7 +122,7 @@ TEST_CASE( "Sender::post() video frames" ) {
 		sender->post(fs, Channel::Colour);
 
 		REQUIRE( count == 1 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
@@ -149,7 +149,7 @@ TEST_CASE( "Sender::post() video frames" ) {
 		sender->post(fs, Channel::Depth);
 
 		REQUIRE( count == 1 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
@@ -180,7 +180,7 @@ TEST_CASE( "Sender::post() video frames" ) {
 		sender->post(fs, Channel::Depth);
 
 		REQUIRE( count == 2 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 9 );
 		REQUIRE( spkt.streamID == 0 );
@@ -209,7 +209,7 @@ TEST_CASE( "Sender::post() video frames" ) {
 		sender->post(fs, Channel::Depth);
 
 		REQUIRE( count == 1 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
@@ -235,7 +235,7 @@ TEST_CASE( "Sender::post() video frames" ) {
 		sender->post(fs, Channel::Depth);
 
 		REQUIRE( count == 2 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
@@ -302,7 +302,7 @@ TEST_CASE( "Sender request to control encoding" ) {
 		sender->post(fs, Channel::Colour);
 
 		REQUIRE( count == 1 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
@@ -357,7 +357,7 @@ TEST_CASE( "Sender::post() data channels" ) {
 		sender->post(fs, Channel::Calibration);
 
 		REQUIRE( count == 1 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
@@ -378,7 +378,7 @@ TEST_CASE( "Sender::post() data channels" ) {
 		sender->post(fs, Channel::Pose);
 
 		REQUIRE( count == 1 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
@@ -401,7 +401,7 @@ TEST_CASE( "Sender::post() data channels" ) {
 		sender->post(fs, Channel::Data);
 
 		REQUIRE( count == 1 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
@@ -425,7 +425,7 @@ TEST_CASE( "Sender::post() data channels" ) {
 		sender->post(fs, Channel::Data);
 
 		REQUIRE( count == 1 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
@@ -486,7 +486,7 @@ TEST_CASE( "Sender::post() audio channels" ) {
 		sender->post(fs, Channel::AudioMono);
 
 		REQUIRE( count == 1 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
@@ -512,7 +512,7 @@ TEST_CASE( "Sender::post() audio channels" ) {
 
 		sender->post(fs, Channel::AudioMono);
 		REQUIRE( count == 1 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
@@ -553,7 +553,7 @@ TEST_CASE( "Sender::post() audio channels" ) {
 		sender->post(fs, Channel::AudioStereo);
 
 		REQUIRE( count == 1 );
-		REQUIRE( spkt.version == 4 );
+		REQUIRE( spkt.version == 5 );
 		REQUIRE( spkt.timestamp == 1000 );
 		REQUIRE( (int)spkt.frame_number == 0 );
 		REQUIRE( spkt.streamID == 0 );
diff --git a/components/structures/include/ftl/data/new_frameset.hpp b/components/structures/include/ftl/data/new_frameset.hpp
index 8a89f882f..229bf974e 100644
--- a/components/structures/include/ftl/data/new_frameset.hpp
+++ b/components/structures/include/ftl/data/new_frameset.hpp
@@ -42,7 +42,7 @@ class FrameSet : public ftl::data::Frame {
 	std::vector<Frame> frames;
 	std::atomic<int> count;				// Number of valid frames
 	std::atomic<unsigned int> mask;		// Mask of all sources that contributed
-	//bool stale;						// True if buffers have been invalidated
+	std::atomic<int> flush_count;		// How many channels have been flushed
 	SHARED_MUTEX smtx;
 
 	Eigen::Matrix4d pose;  // Set to identity by default.
diff --git a/components/structures/src/frameset.cpp b/components/structures/src/frameset.cpp
index 9baaf02a5..3a876c368 100644
--- a/components/structures/src/frameset.cpp
+++ b/components/structures/src/frameset.cpp
@@ -6,7 +6,8 @@ using ftl::data::FrameSet;
 
 FrameSet::FrameSet(Pool *ppool, FrameID pid, int64_t ts) :
 	Frame(ppool->allocate(FrameID(pid.frameset(),255), ts)), mask(0) {
-
+	
+	flush_count = 0; // Reset flush on store...
 }
 
 FrameSet::~FrameSet() {
@@ -121,6 +122,7 @@ void FrameSet::flush(ftl::codecs::Channel c) {
 		UNIQUE_LOCK(smtx, lk);
 		for (auto &f : frames) if (f.hasOwn(c)) f.flush(c);
 	}
+	
 	pool()->flush_fs_.trigger(*this, c);
 }
 
-- 
GitLab