From b990b97ab1c2a54f91ad0b0e6ed834ea28c3a63a Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Fri, 11 Oct 2019 13:00:04 +0300
Subject: [PATCH] Fix corrupted write

---
 applications/player/src/main.cpp       | 26 +++++++++++++++-----------
 applications/reconstruct/src/main.cpp  |  2 +-
 components/codecs/src/reader.cpp       | 13 ++++++++++---
 components/codecs/src/writer.cpp       |  7 ++++---
 components/rgbd-sources/src/source.cpp |  1 +
 5 files changed, 31 insertions(+), 18 deletions(-)

diff --git a/applications/player/src/main.cpp b/applications/player/src/main.cpp
index 78d66a8db..78a919a63 100644
--- a/applications/player/src/main.cpp
+++ b/applications/player/src/main.cpp
@@ -35,22 +35,26 @@ int main(int argc, char **argv) {
 
     LOG(INFO) << "Playing...";
 
-    bool res = r.read(90000000000000, [](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
+    int current_stream = 0;
+
+    bool res = r.read(90000000000000, [&current_stream](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
         if (spkt.channel & 0x1 > 0) return;
+        if (spkt.streamID == current_stream) {
 
-        LOG(INFO) << "Reading packet: (" << (int)spkt.streamID << "," << (int)spkt.channel << ") " << (int)pkt.codec << ", " << (int)pkt.definition;
+            LOG(INFO) << "Reading packet: (" << (int)spkt.streamID << "," << (int)spkt.channel << ") " << (int)pkt.codec << ", " << (int)pkt.definition;
 
-        cv::Mat frame(cv::Size(ftl::codecs::getWidth(pkt.definition),ftl::codecs::getHeight(pkt.definition)), CV_8UC3);
-        createDecoder(pkt);
+            cv::Mat frame(cv::Size(ftl::codecs::getWidth(pkt.definition),ftl::codecs::getHeight(pkt.definition)), CV_8UC3);
+            createDecoder(pkt);
 
-        try {
-            decoder->decode(pkt, frame);
-        } catch (std::exception &e) {
-            LOG(INFO) << "Decoder exception: " << e.what();
-        }
+            try {
+                decoder->decode(pkt, frame);
+            } catch (std::exception &e) {
+                LOG(INFO) << "Decoder exception: " << e.what();
+            }
 
-        if (!frame.empty()) {
-            cv::imshow("Player", frame);
+            if (!frame.empty()) {
+                cv::imshow("Player", frame);
+            }
             cv::waitKey(20);
         }
     });
diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp
index 3ee32ab4a..7a391be6d 100644
--- a/applications/reconstruct/src/main.cpp
+++ b/applications/reconstruct/src/main.cpp
@@ -197,7 +197,7 @@ static void run(ftl::Configurable *root) {
 	// -------------------------------------------------------------------------
 
 	stream->setLatency(5);  // FIXME: This depends on source!?
-	stream->add(&group);
+	//stream->add(&group);
 	stream->run();
 
 	bool busy = false;
diff --git a/components/codecs/src/reader.cpp b/components/codecs/src/reader.cpp
index 571feec84..5c0d46909 100644
--- a/components/codecs/src/reader.cpp
+++ b/components/codecs/src/reader.cpp
@@ -39,9 +39,9 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream
 	bool partial = false;
 
 	while (stream_->good() || buffer_.nonparsed_size() > 0) {
-		if (buffer_.nonparsed_size() == 0 || partial) {
+		if (buffer_.nonparsed_size() == 0 || (partial && buffer_.nonparsed_size() < 10000000)) {
 			buffer_.reserve_buffer(10000000);
-			stream_->read(buffer_.buffer(), 10000000);
+			stream_->read(buffer_.buffer(), buffer_.buffer_capacity());
 			//if (stream_->bad()) return false;
 
 			int bytes = stream_->gcount();
@@ -59,7 +59,14 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream
 
 		std::tuple<StreamPacket,Packet> data;
 		msgpack::object obj = msg.get();
-		obj.convert(data);
+		try {
+			obj.convert(data);
+		} catch (std::exception &e) {
+			LOG(INFO) << "Corrupt message: " << buffer_.nonparsed_size();
+			//partial = true;
+			//continue;
+			return false;
+		}
 
 		// Adjust timestamp
 		get<0>(data).timestamp += timestart_;
diff --git a/components/codecs/src/writer.cpp b/components/codecs/src/writer.cpp
index 70c864707..2c19a01fd 100644
--- a/components/codecs/src/writer.cpp
+++ b/components/codecs/src/writer.cpp
@@ -32,8 +32,9 @@ bool Writer::write(const ftl::codecs::StreamPacket &s, const ftl::codecs::Packet
 	s2.timestamp -= timestart_;
 
 	auto data = std::make_tuple(s2,p);
-	msgpack::pack(buffer_, data);
-	(*stream_).write(buffer_.data(), buffer_.size());
-	buffer_.clear();
+	msgpack::sbuffer buffer;
+	msgpack::pack(buffer, data);
+	(*stream_).write(buffer.data(), buffer.size());
+	//buffer_.clear();
 	return true;
 }
diff --git a/components/rgbd-sources/src/source.cpp b/components/rgbd-sources/src/source.cpp
index db2dfa108..96335342e 100644
--- a/components/rgbd-sources/src/source.cpp
+++ b/components/rgbd-sources/src/source.cpp
@@ -157,6 +157,7 @@ ftl::codecs::Reader *Source::__createReader(const std::string &path) {
 
 	auto *r = new ftl::codecs::Reader(*file);
 	readers__[path] = r;
+	r->begin();
 	return r;
 }
 
-- 
GitLab