From ae9fa8edc1641a0088f06cb76e7f6df7ce4c10e7 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Wed, 5 Aug 2020 16:37:14 +0300
Subject: [PATCH] Resolve stream reset and EndFrame problems

---
 CMakeLists.txt                                |  5 ++
 applications/reconstruct2/src/main.cpp        | 50 ++++++++-------
 .../codecs/include/ftl/codecs/codecs.hpp      |  1 +
 .../include/ftl/operators/operator.hpp        |  1 +
 components/operators/src/operator.cpp         | 13 ++++
 .../streams/include/ftl/streams/netstream.hpp |  2 +-
 components/streams/src/builder.cpp            |  4 +-
 components/streams/src/feed.cpp               | 15 +++--
 components/streams/src/netstream.cpp          |  9 ++-
 components/streams/src/receiver.cpp           | 64 +++----------------
 components/streams/src/sender.cpp             |  2 +-
 components/streams/test/receiver_unit.cpp     | 33 +++++++++-
 components/structures/src/new_frame.cpp       |  2 +
 13 files changed, 108 insertions(+), 93 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index ced2238cc..ac4d7991e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -4,6 +4,10 @@ include (CheckIncludeFileCXX)
 include (CheckFunctionExists)
 include(CheckLanguage)
 
+if (WIN32)
+	set(CMAKE_GENERATOR_TOOLSET "host=x64")
+endif()
+
 project (ftl.utu.fi VERSION 0.0.4)
 
 include(GNUInstallDirs)
@@ -371,6 +375,7 @@ include(ftl_paths)
 
 if (WIN32) # TODO(nick) Should do based upon compiler (VS)
 	add_definitions(-DWIN32)
+	set(CMAKE_GENERATOR_TOOLSET "host=x64")
 	set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /arch:AVX2 /MP4 /std:c++17 /wd4996")
 	set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /DFTL_DEBUG /Wall")
 	set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /O2")
diff --git a/applications/reconstruct2/src/main.cpp b/applications/reconstruct2/src/main.cpp
index 1a0bb7c59..7aa75b51f 100644
--- a/applications/reconstruct2/src/main.cpp
+++ b/applications/reconstruct2/src/main.cpp
@@ -53,6 +53,32 @@ static void run(ftl::Configurable *root) {
 	Feed *feed = ftl::create<Feed>(root, "feed", net);
 	std::string group_name = root->value("group", std::string("Reconstruction"));
 
+	feed->set("uri", root->value("uri", std::string("ftl://ftlab.utu.fi/reconstruction")));
+	feed->setPipelineCreator([](ftl::operators::Graph *pipeline) {
+		LOG(INFO) << "Using reconstruction pipeline creator";
+
+		pipeline->append<ftl::operators::DepthChannel>("depth")->value("enabled", false);  // Ensure there is a depth channel
+		pipeline->append<ftl::operators::DisparityBilateralFilter>("bilateral_filter")->value("enabled", false);
+		pipeline->append<ftl::operators::DisparityToDepth>("calculate_depth")->value("enabled", false);
+		pipeline->append<ftl::operators::ColourChannels>("colour");  // Convert BGR to BGRA
+		pipeline->append<ftl::operators::ClipScene>("clipping")->value("enabled", false);
+		pipeline->append<ftl::operators::DetectAndTrack>("facedetection")->value("enabled", false);
+		pipeline->append<ftl::operators::ArUco>("aruco")->value("enabled", false);
+		//pipeline_->append<ftl::operators::HFSmoother>("hfnoise");  // Remove high-frequency noise
+		pipeline->append<ftl::operators::Normals>("normals");  // Estimate surface normals
+		//pipeline_->append<ftl::operators::SmoothChannel>("smoothing");  // Generate a smoothing channel
+		//pipeline_->append<ftl::operators::ScanFieldFill>("filling");  // Generate a smoothing channel
+		pipeline->append<ftl::operators::CrossSupport>("cross");
+		pipeline->append<ftl::operators::DiscontinuityMask>("discontinuity");
+		pipeline->append<ftl::operators::CrossSupport>("cross2")->value("discon_support", true);
+		pipeline->append<ftl::operators::BorderMask>("border_mask")->value("enabled", false);
+		pipeline->append<ftl::operators::CullDiscontinuity>("remove_discontinuity")->set("enabled", false);
+		//pipeline_->append<ftl::operators::AggreMLS>("mls");  // Perform MLS (using smoothing channel)
+		pipeline->append<ftl::operators::VisCrossSupport>("viscross")->value("enabled", false);
+		pipeline->append<ftl::operators::MultiViewMLS>("mvmls");
+		pipeline->append<ftl::operators::Poser>("poser")->value("enabled", false);
+	});
+
 	// Add sources here
 	if (root->getConfig().contains("sources")) {
 		for (const auto &s : root->getConfig()["sources"]) {
@@ -85,29 +111,7 @@ static void run(ftl::Configurable *root) {
 	});
 
 	auto *filter = feed->filter({Channel::Colour, Channel::Depth});
-	feed->set("uri", root->value("uri", std::string("ftl://ftlab.utu.fi/reconstruction")));
-	feed->setPipelineCreator([](ftl::operators::Graph *pipeline) {
-		pipeline->append<ftl::operators::DepthChannel>("depth");  // Ensure there is a depth channel
-		pipeline->append<ftl::operators::DisparityBilateralFilter>("bilateral_filter")->value("enabled", false);
-		pipeline->append<ftl::operators::DisparityToDepth>("calculate_depth")->value("enabled", false);
-		pipeline->append<ftl::operators::ColourChannels>("colour");  // Convert BGR to BGRA
-		pipeline->append<ftl::operators::ClipScene>("clipping")->value("enabled", false);
-		pipeline->append<ftl::operators::DetectAndTrack>("facedetection")->value("enabled", false);
-		pipeline->append<ftl::operators::ArUco>("aruco")->value("enabled", false);
-		//pipeline_->append<ftl::operators::HFSmoother>("hfnoise");  // Remove high-frequency noise
-		pipeline->append<ftl::operators::Normals>("normals");  // Estimate surface normals
-		//pipeline_->append<ftl::operators::SmoothChannel>("smoothing");  // Generate a smoothing channel
-		//pipeline_->append<ftl::operators::ScanFieldFill>("filling");  // Generate a smoothing channel
-		pipeline->append<ftl::operators::CrossSupport>("cross");
-		pipeline->append<ftl::operators::DiscontinuityMask>("discontinuity");
-		pipeline->append<ftl::operators::CrossSupport>("cross2")->value("discon_support", true);
-		pipeline->append<ftl::operators::BorderMask>("border_mask")->value("enabled", false);
-		pipeline->append<ftl::operators::CullDiscontinuity>("remove_discontinuity")->set("enabled", false);
-		//pipeline_->append<ftl::operators::AggreMLS>("mls");  // Perform MLS (using smoothing channel)
-		pipeline->append<ftl::operators::VisCrossSupport>("viscross")->value("enabled", false);
-		pipeline->append<ftl::operators::MultiViewMLS>("mvmls");
-		pipeline->append<ftl::operators::Poser>("poser")->value("enabled", false);
-	});
+	
 	//feed->lowLatencyMode();
 	feed->startStreaming(filter);
 
diff --git a/components/codecs/include/ftl/codecs/codecs.hpp b/components/codecs/include/ftl/codecs/codecs.hpp
index 68d4591e6..ddd418b27 100644
--- a/components/codecs/include/ftl/codecs/codecs.hpp
+++ b/components/codecs/include/ftl/codecs/codecs.hpp
@@ -24,6 +24,7 @@ static constexpr uint8_t kFlagMultiple = 0x80;		// Multiple video frames in sing
 
 static constexpr uint8_t kFlagRequest = 0x01;		// Used for empty data packets to mark a request for data
 static constexpr uint8_t kFlagCompleted = 0x02;		// Last packet for timestamp
+static constexpr uint8_t kFlagReset = 0x04;
 
 /**
  * Compression format used.
diff --git a/components/operators/include/ftl/operators/operator.hpp b/components/operators/include/ftl/operators/operator.hpp
index 399adddd4..566e2715b 100644
--- a/components/operators/include/ftl/operators/operator.hpp
+++ b/components/operators/include/ftl/operators/operator.hpp
@@ -138,6 +138,7 @@ class Graph : public ftl::Configurable {
 	std::list<ftl::operators::detail::OperatorNode> operators_;
 	std::map<std::string, ftl::Configurable*> configs_;
 	cudaStream_t stream_;
+	std::atomic_flag busy_;
 
 	ftl::Configurable *_append(ftl::operators::detail::ConstructionHelperBase*);
 };
diff --git a/components/operators/src/operator.cpp b/components/operators/src/operator.cpp
index 07928ffa3..7eabcdc70 100644
--- a/components/operators/src/operator.cpp
+++ b/components/operators/src/operator.cpp
@@ -35,6 +35,7 @@ bool Operator::apply(FrameSet &in, Frame &out, cudaStream_t stream) {
 
 Graph::Graph(nlohmann::json &config) : ftl::Configurable(config) {
 	cudaSafeCall( cudaStreamCreate(&stream_) );
+	busy_.clear();
 }
 
 Graph::~Graph() {
@@ -58,6 +59,11 @@ bool Graph::apply(FrameSet &in, FrameSet &out, cudaStream_t stream) {
 
 	if (in.frames.size() != out.frames.size()) return false;
 
+	if (busy_.test_and_set()) {
+		LOG(ERROR) << "Pipeline already in use: " << in.timestamp();
+		return false;
+	}
+
 	for (auto &i : operators_) {
 		if (i.instances.size() < 1) {
 			i.instances.push_back(i.maker->make());
@@ -112,6 +118,7 @@ bool Graph::apply(FrameSet &in, FrameSet &out, cudaStream_t stream) {
 		cudaSafeCall(cudaStreamSynchronize(stream_actual));
 	}
 
+	busy_.clear();
 	return success;
 }
 
@@ -135,6 +142,11 @@ bool Graph::apply(Frame &in, Frame &out, cudaStream_t stream) {
 	auto stream_actual = (stream == 0) ? stream_ : stream;
 	bool success = true;
 
+	if (busy_.test_and_set()) {
+		LOG(ERROR) << "Pipeline already in use: " << in.timestamp();
+		return false;
+	}
+
 	for (auto &i : operators_) {
 		// Make sure there are enough instances
 		if (i.instances.size() < 1) {
@@ -161,6 +173,7 @@ bool Graph::apply(Frame &in, Frame &out, cudaStream_t stream) {
 		cudaSafeCall(cudaStreamSynchronize(stream_actual));
 	}
 
+	busy_.clear();
 	return success;
 }
 
diff --git a/components/streams/include/ftl/streams/netstream.hpp b/components/streams/include/ftl/streams/netstream.hpp
index 65d0d1952..1b7368779 100644
--- a/components/streams/include/ftl/streams/netstream.hpp
+++ b/components/streams/include/ftl/streams/netstream.hpp
@@ -91,7 +91,7 @@ class Net : public Stream {
 
 	bool _processRequest(ftl::net::Peer &p, ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt);
 	void _checkDataRate(size_t tx_size, int64_t tx_latency, int64_t ts);
-	bool _sendRequest(ftl::codecs::Channel c, uint8_t frameset, uint8_t frames, uint8_t count, uint8_t bitrate);
+	bool _sendRequest(ftl::codecs::Channel c, uint8_t frameset, uint8_t frames, uint8_t count, uint8_t bitrate, bool doreset=false);
 	void _cleanUp();
 };
 
diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp
index a209a44e6..33d19b946 100644
--- a/components/streams/src/builder.cpp
+++ b/components/streams/src/builder.cpp
@@ -191,7 +191,7 @@ void ManualSourceBuilder::tick() {
 ForeignBuilder::ForeignBuilder(ftl::data::Pool *pool, int id) : BaseBuilder(pool, id), head_(0) {
 	jobs_ = 0;
 	skip_ = false;
-	bufferSize_ = 1;
+	bufferSize_ = 0;
 	last_frame_ = 0;
 
 	mspf_ = ftl::timer::getInterval();
@@ -200,7 +200,7 @@ ForeignBuilder::ForeignBuilder(ftl::data::Pool *pool, int id) : BaseBuilder(pool
 ForeignBuilder::ForeignBuilder() : BaseBuilder(), head_(0) {
 	jobs_ = 0;
 	skip_ = false;
-	bufferSize_ = 1;
+	bufferSize_ = 0;
 	last_frame_ = 0;
 
 	mspf_ = ftl::timer::getInterval();
diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp
index 20fe0c827..e78e892d8 100644
--- a/components/streams/src/feed.cpp
+++ b/components/streams/src/feed.cpp
@@ -175,12 +175,18 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) :
 				}
 			}
 
-			// FIXME: What happens if pipeline added concurrently?
+			ftl::operators::Graph *pipeline = nullptr;
+
+			SHARED_LOCK(mtx_, lk);
 			if (pre_pipelines_.count(fs->frameset()) == 1) {
-				pre_pipelines_[fs->frameset()]->apply(*fs, *fs, 0);
+				pipeline = pre_pipelines_[fs->frameset()]; //->apply(*fs, *fs, 0);
 			}
 
-			SHARED_LOCK(mtx_, lk);
+			lk.unlock();
+
+			if (pipeline) pipeline->apply(*fs, *fs, 0);
+
+			lk.lock();
 
 			std::atomic_store(&latest_.at(fs->frameset()), fs);
 
@@ -442,7 +448,7 @@ void Feed::_createPipeline(uint32_t fsid) {
 	// Don't recreate if already exists
 	if (pre_pipelines_.count(fsid)) return;
 
-	LOG(INFO) << "Creating pipeline";
+	LOG(INFO) << "Creating pipeline: " << fsid;
 	auto *p = _addPipeline(fsid);
 
 	if (pipe_creator_) {
@@ -1075,6 +1081,7 @@ void Feed::startStreaming(Filter *f) {
 		nstream->set("uri", value("uri", std::string("ftl://vision.utu.fi/live")));
 
 		record_new_client_ = nstream->onClientConnect([this](ftl::net::Peer *p) {
+			LOG(INFO) << "Client connect, resetting streams";
 			stream_->reset();
 			return true;
 		});
diff --git a/components/streams/src/netstream.cpp b/components/streams/src/netstream.cpp
index 2823bb2e3..19ca6f083 100644
--- a/components/streams/src/netstream.cpp
+++ b/components/streams/src/netstream.cpp
@@ -336,13 +336,13 @@ void Net::reset() {
 		auto sel = selected(0);
 		
 		for (auto c : sel) {
-			_sendRequest(c, kAllFramesets, kAllFrames, 30, 255);
+			_sendRequest(c, kAllFramesets, kAllFrames, 30, 255, true);
 		}
 	}
 	tally_ = 30*kTallyScale;
 }
 
-bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t count, uint8_t bitrate) {
+bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t count, uint8_t bitrate, bool doreset) {
 	if (!active_ || host_) return false;
 
 	//LOG(INFO) << "SENDING REQUEST FOR " << (int)c;
@@ -356,13 +356,16 @@ bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t coun
 		0
 	};
 
+	uint8_t sflags = ftl::codecs::kFlagRequest;
+	if (doreset) sflags |= ftl::codecs::kFlagReset;
+
 	StreamPacket spkt = {
 		5,
 		ftl::timer::get_time(),
 		frameset,
 		frames,
 		c,
-		ftl::codecs::kFlagRequest,
+		sflags,
 		0,
 		0,
 		0
diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp
index efc4ffba2..9db6ee792 100644
--- a/components/streams/src/receiver.cpp
+++ b/components/streams/src/receiver.cpp
@@ -29,39 +29,22 @@ using ftl::rgbd::Capability;
 Receiver::Receiver(nlohmann::json &config, ftl::data::Pool *p) : ftl::Configurable(config), stream_(nullptr), pool_(p) {
 	timestamp_ = 0;
 	second_channel_ = Channel::Depth;
-	frame_mask_ = value("frame_mask", 0xFFFFFFFFu);
-
-	//size_t bsize = value("frameset_buffer_size", 3);
-	/*for (size_t i=0; i<ftl::stream::kMaxStreams; ++i) {
-		builder_[i].setID(i);
-		builder_[i].setBufferSize(bsize);
-	}*/
 
 	on("frameset_buffer_size", [this]() {
-		size_t bsize = value("frameset_buffer_size", 3);
+		size_t bsize = value("frameset_buffer_size", 0);
 		for (auto &i : builders_) {
 			i.second->setBufferSize(bsize);
 		}
 	});
-
-	on("frame_mask", [this]() {
-		frame_mask_ = value("frame_mask", 0xFFFFFFFFu);
-	});
 }
 
 Receiver::~Receiver() {
-	//if (stream_) {
-	//	stream_->onPacket(nullptr);
-	//}
-
-	//builder_[0].onFrameSet(nullptr);
 }
 
 void Receiver::loopback(ftl::data::Frame &f, ftl::codecs::Channel c) {
 	auto &build = builder(f.frameset());
 	auto fs = build.get(f.timestamp(), f.source());
 	fs->frames[f.source()].informChange(c, build.changeType(), f.getAnyMutable(c));
-	//f.remove(c);
 }
 
 ftl::streams::BaseBuilder &Receiver::builder(uint32_t id) {
@@ -72,7 +55,7 @@ ftl::streams::BaseBuilder &Receiver::builder(uint32_t id) {
 		auto &b = builders_[id];
 		b->setID(id);
 		b->setPool(pool_);
-		fb->setBufferSize(value("frameset_buffer_size", 3));
+		fb->setBufferSize(value("frameset_buffer_size", 0));
 		handles_[id] = std::move(fb->onFrameSet([this](const ftl::data::FrameSetPtr& fs) {
 			callback_.trigger(fs);
 			return true;
@@ -107,14 +90,12 @@ void Receiver::_createDecoder(InternalVideoStates &frame, int chan, const ftl::c
 	auto *decoder = frame.decoders[chan];
 	if (decoder) {
 		if (!decoder->accepts(pkt)) {
-			//UNIQUE_LOCK(mutex_,lk);
 			ftl::codecs::free(frame.decoders[chan]);
 		} else {
 			return;
 		}
 	}
 
-	//UNIQUE_LOCK(mutex_,lk);
 	frame.decoders[chan] = ftl::codecs::allocateDecoder(pkt);
 }
 
@@ -128,12 +109,10 @@ Receiver::InternalVideoStates &Receiver::_getVideoFrame(const StreamPacket &spkt
 
 	UNIQUE_LOCK(mutex_, lk);
 	while (video_frames_[spkt.streamID].size() <= fn) {
-		//frames_.resize(spkt.frameNumber()+1);
 		video_frames_[spkt.streamID].push_back(new InternalVideoStates);
-		//video_frames_[spkt.streamID][video_frames_[spkt.streamID].size()-1]->state.set("name",std::string("Source ")+std::to_string(fn+1));
 	}
+
 	auto &f = *video_frames_[spkt.streamID][fn];
-	//if (!f.frame.origin()) f.frame.setOrigin(&f.state);
 	return f;
 }
 
@@ -146,12 +125,10 @@ Receiver::InternalAudioStates &Receiver::_getAudioFrame(const StreamPacket &spkt
 
 	UNIQUE_LOCK(mutex_, lk);
 	while (audio_frames_[spkt.streamID].size() <= fn) {
-		//frames_.resize(spkt.frameNumber()+1);
 		audio_frames_[spkt.streamID].push_back(new InternalAudioStates);
-		//audio_frames_[spkt.streamID][audio_frames_[spkt.streamID].size()-1]->state.set("name",std::string("Source ")+std::to_string(fn+1));
 	}
+
 	auto &f = *audio_frames_[spkt.streamID][fn];
-	//if (!f.frame.origin()) f.frame.setOrigin(&f.state);
 	return f;
 }
 
@@ -204,7 +181,6 @@ void Receiver::_processAudio(const StreamPacket &spkt, const Packet &pkt) {
 	// Audio Data
 	InternalAudioStates &state = _getAudioFrame(spkt);
 
-	//frame.frame.reset();
 	state.timestamp = spkt.timestamp;
 
 	auto &build = builder(spkt.streamID);
@@ -237,7 +213,6 @@ namespace sgm {
 void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) {
 	FTL_Profile("VideoPacket", 0.02);
 
-	//const ftl::codecs::Channel rchan = spkt.channel;
 	const unsigned int channum = (unsigned int)spkt.channel;
 	InternalVideoStates &ividstate = _getVideoFrame(spkt);
 
@@ -245,14 +220,8 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) {
 
 	// Get the frameset
 	auto &build = builder(spkt.streamID);
-	auto fs = build.get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1);  // TODO: This is a hack
-
-	//if (!fs->frames[spkt.frame_number].has(Channel::Calibration)) {
-	//	LOG(WARNING) << "No calibration, skipping frame: " << spkt.timestamp;
-	//	return;
-	//}
+	auto fs = build.get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1);
 
-	//const auto &calibration = std::get<0>(fs->frames[spkt.frame_number].get<std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, int>>(Channel::Calibration));
 	int width = ividstate.width; //calibration.width;
 	int height = ividstate.height; //calibration.height;
 
@@ -273,19 +242,10 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) {
 		return;
 	}
 
-	//LOG(INFO) << " CODEC = " << (int)pkt.codec << " " << (int)pkt.flags << " " << (int)spkt.channel;
-	//LOG(INFO) << "Decode surface: " << (width*tx) << "x" << (height*ty);
-
 	auto &surface = ividstate.surface[static_cast<int>(spkt.channel)];
 
 	// Allocate a decode surface, this is a tiled image to be split later
 	int cvtype = ftl::codecs::type(spkt.channel);
-	if (cvtype == CV_32F) {
-		//cvtype = CV_16U; //(pkt.flags & 0x2) ? CV_16UC4 : CV_16U;
-		//if (pkt.flags & 0x2) sheight += sheight/2;
-	}
-
-	//surface.create(height*ty, width*tx, ((isFloatChannel(spkt.channel)) ? ((pkt.flags & 0x2) ? CV_16UC4 : CV_16U) : CV_8UC4));
 	surface.create(height*ty, width*tx, cvtype);
 
 	bool is_static = ividstate.decoders[channum] && (spkt.hint_capability & ftl::codecs::kStreamCap_Static);
@@ -322,6 +282,7 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) {
 	// Now split the tiles from surface into frames, doing colour conversions
 	// at the same time.
 	// Note: Done in reverse to allocate correct number of frames first time round
+	// FIXME: Don't do this copy for single tiles
 	for (int i=pkt.frame_count-1; i>=0; --i) {
 		//InternalVideoStates &vidstate = _getVideoFrame(spkt,i);
 		auto &frame = fs->frames[spkt.frame_number+i];
@@ -347,9 +308,7 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) {
 
 	fs->localTimestamp = spkt.localTimestamp;
 
-	for (int i=pkt.frame_count-1; i>=0; --i) {
-		_finishPacket(fs, spkt.frame_number+i);
-	}
+	_finishPacket(fs, spkt.frame_number);
 }
 
 void Receiver::_finishPacket(ftl::streams::LockedFrameSet &fs, size_t fix) {
@@ -394,10 +353,6 @@ void Receiver::processPackets(const StreamPacket &spkt, const Packet &pkt) {
 		return;
 	}
 
-	//LOG(INFO) << "PACKET: " << spkt.timestamp << ", " << (int)spkt.channel << ", " << (int)pkt.codec << ", " << (int)pkt.definition;
-
-	// TODO: Allow for multiple framesets
-	//if (spkt.frameSetID() > 0) LOG(INFO) << "Frameset " << spkt.frameSetID() << " received: " << (int)spkt.channel;
 	if (spkt.frameSetID() >= ftl::stream::kMaxStreams) return;
 
 	// Frameset level data channels
@@ -407,8 +362,7 @@ void Receiver::processPackets(const StreamPacket &spkt, const Packet &pkt) {
 	}
 
 	// Too many frames, so ignore.
-	//if (spkt.frameNumber() >= value("max_frames",32)) return;
-	if (spkt.frameNumber() >= 32 || ((1 << spkt.frameNumber()) & frame_mask_) == 0) return;
+	if (spkt.frameNumber() >= 32) return;
 
 
 	if (channum >= 64) {
@@ -431,8 +385,6 @@ void Receiver::setStream(ftl::stream::Stream *s) {
 }
 
 ftl::Handle Receiver::onFrameSet(const std::function<bool(const ftl::data::FrameSetPtr&)> &cb) {
-	//for (auto &b : builders_)
-	//	b.second.onFrameSet(cb);
 	return callback_.on(cb);
 }
 
diff --git a/components/streams/src/sender.cpp b/components/streams/src/sender.cpp
index 2035bcca5..fba9c922a 100644
--- a/components/streams/src/sender.cpp
+++ b/components/streams/src/sender.cpp
@@ -68,7 +68,7 @@ void Sender::setStream(ftl::stream::Stream*s) {
 		if (reqcb_) reqcb_(spkt,pkt);
 
 		// Inject state packets
-		if (spkt.hint_capability & ftl::codecs::kStreamCap_NewConnection) do_inject_.clear();
+		if ((spkt.hint_capability & ftl::codecs::kStreamCap_NewConnection) || (spkt.flags & ftl::codecs::kFlagReset)) do_inject_.clear();
 
 		return true;
 	});
diff --git a/components/streams/test/receiver_unit.cpp b/components/streams/test/receiver_unit.cpp
index 55ea7c837..fcdeafc1a 100644
--- a/components/streams/test/receiver_unit.cpp
+++ b/components/streams/test/receiver_unit.cpp
@@ -53,13 +53,37 @@ class TestStream : public ftl::stream::Stream {
 		spkt2.channel = Channel::EndFrame;
 		spkt2.streamID = spkt.streamID;
 
-		for (int i=0; i<pkt.frame_count; ++i) {
-			spkt2.frame_number = i;
+		//for (int i=pkt.frame_count-1; i>=0; --i) {
+		//	spkt2.frame_number = spkt.frame_number+i;
+		//	if (i > 0) pkt2.frame_count = 1;
+		//	else pkt2.frame_count = count+1;
 			post(spkt2, pkt2);
-		}
+		//}
 		return post(spkt, pkt);
 	}
 
+	bool postEnd(int64_t ts, int frame, int count) {
+		ftl::codecs::Packet pkt2;
+		pkt2.codec = codec_t::Invalid;
+		pkt2.bitrate = 255;
+		pkt2.packet_count = count+1;
+		pkt2.frame_count = 1;
+
+		ftl::codecs::StreamPacket spkt2;
+		spkt2.version = 4;
+		spkt2.timestamp = ts;
+		spkt2.frame_number = frame;
+		spkt2.channel = Channel::EndFrame;
+		spkt2.streamID = 0;
+
+		//for (int i=pkt.frame_count-1; i>=0; --i) {
+		//	spkt2.frame_number = spkt.frame_number+i;
+		//	if (i > 0) pkt2.frame_count = 1;
+		//	else pkt2.frame_count = count+1;
+			return post(spkt2, pkt2);
+		//}
+	}
+
 	bool begin() override { return true; }
 	bool end() override { return true; }
 	bool active() override { return true; }
@@ -176,6 +200,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) {
 		bool r = encoder.encode(m, pkt);
 		REQUIRE( r );
 
+		stream.postEnd(spkt.timestamp, 1, 1);
 		stream.postEnd(spkt, pkt, 2);
 
 		int count = 0;
@@ -212,6 +237,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) {
 		bool r = encoder.encode(m, pkt);
 		REQUIRE( r );
 
+		stream.postEnd(spkt.timestamp, 1, 1);
 		stream.postEnd(spkt, pkt, 2);
 
 		int count = 0;
@@ -249,6 +275,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) {
 		bool r = encoder.encode(m, pkt);
 		REQUIRE( r );
 
+		stream.postEnd(spkt.timestamp, 1, 1);
 		stream.postEnd(spkt, pkt, 2);
 
 		int count = 0;
diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp
index 4181712fd..576896702 100644
--- a/components/structures/src/new_frame.cpp
+++ b/components/structures/src/new_frame.cpp
@@ -377,6 +377,8 @@ void Frame::swapChannels(ftl::codecs::Channel c1, ftl::codecs::Channel c2) {
 		d1.status = d2.status;
 		d2.status = status;
 
+		std::swap(d1.encoded, d2.encoded);
+
 		changed_[c1] = (mode_ == FrameMode::PRIMARY) ? ChangeType::PRIMARY : ChangeType::RESPONSE;
 		changed_[c2] = (mode_ == FrameMode::PRIMARY) ? ChangeType::PRIMARY : ChangeType::RESPONSE;
 	}
-- 
GitLab