From ec8de648fbb15ea02db84987a95265ce2a80965c Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Wed, 9 Oct 2019 22:59:24 +0300
Subject: [PATCH] Initial working reader writer

---
 .../codecs/include/ftl/codecs/reader.hpp      | 36 ++++++++-----
 .../codecs/include/ftl/codecs/writer.hpp      | 18 ++++---
 components/codecs/src/reader.cpp              | 52 +++++++++++++++++++
 components/codecs/src/writer.cpp              | 30 +++++++++++
 components/codecs/test/CMakeLists.txt         | 14 +++++
 components/codecs/test/readwrite_test.cpp     | 52 +++++++++++++++++++
 6 files changed, 183 insertions(+), 19 deletions(-)
 create mode 100644 components/codecs/src/reader.cpp
 create mode 100644 components/codecs/src/writer.cpp
 create mode 100644 components/codecs/test/readwrite_test.cpp

diff --git a/components/codecs/include/ftl/codecs/reader.hpp b/components/codecs/include/ftl/codecs/reader.hpp
index 0b34af31f..530c09890 100644
--- a/components/codecs/include/ftl/codecs/reader.hpp
+++ b/components/codecs/include/ftl/codecs/reader.hpp
@@ -1,25 +1,37 @@
 #ifndef _FTL_CODECS_READER_HPP_
 #define _FTL_CODECS_READER_HPP_
 
+#include <iostream>
+#include <msgpack.hpp>
+#include <inttypes.h>
+#include <functional>
+
+#include <ftl/codecs/packet.hpp>
+
 namespace ftl {
 namespace codecs {
 
 class Reader {
 	public:
-	Reader();
+	Reader(std::istream &);
 	~Reader();
 
-	bool open(const std::string &filename);
-
-	bool start(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &);
-	void stop();
-	void pause(bool);
-	bool paused();
-
-	void loop(bool);
-	bool looping();
-
-	
+	/**
+	 * Read packets up to and including requested timestamp. A provided callback
+	 * is called for each packet read, in order stored in file. Returns true if
+	 * there are still more packets available beyond specified timestamp, false
+	 * otherwise (end-of-file). Timestamps are in local (clock adjusted) time
+	 * and the timestamps stored in the file are aligned to the time when open
+	 * was called.
+	 */
+	bool read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &);
+
+	bool begin();
+	bool end();
+
+	private:
+	std::istream *stream_;
+	msgpack::unpacker buffer_;
 };
 
 }
diff --git a/components/codecs/include/ftl/codecs/writer.hpp b/components/codecs/include/ftl/codecs/writer.hpp
index b45be7936..6ff3c8aeb 100644
--- a/components/codecs/include/ftl/codecs/writer.hpp
+++ b/components/codecs/include/ftl/codecs/writer.hpp
@@ -1,23 +1,27 @@
 #ifndef _FTL_CODECS_WRITER_HPP_
 #define _FTL_CODECS_WRITER_HPP_
 
+#include <iostream>
+#include <msgpack.hpp>
+//#include <Eigen/Eigen>
+
+#include <ftl/codecs/packet.hpp>
+
 namespace ftl {
 namespace codecs {
 
 class Writer {
 	public:
-	Writer();
+	Writer(std::ostream &);
 	~Writer();
 
-	bool open(const std::string &filename);
-
+	bool begin();
 	bool write(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &);
-
-	bool write(const ftl::codecs::StreamPacket &, const std::string &json);
-	bool write(const ftl::codecs::StreamPacket &, const Eigen::Matrix4d &m);
+	bool end();
 
 	private:
-	std::ofstream file_;
+	std::ostream *stream_;
+	msgpack::sbuffer buffer_;
 };
 
 }
diff --git a/components/codecs/src/reader.cpp b/components/codecs/src/reader.cpp
new file mode 100644
index 000000000..1f5b7f2aa
--- /dev/null
+++ b/components/codecs/src/reader.cpp
@@ -0,0 +1,52 @@
+#include <ftl/codecs/reader.hpp>
+
+#include <tuple>
+
+using ftl::codecs::Reader;
+using ftl::codecs::StreamPacket;
+using ftl::codecs::Packet;
+using std::get;
+
+Reader::Reader(std::istream &s) : stream_(&s) {
+
+}
+
+Reader::~Reader() {
+
+}
+
+bool Reader::begin() {
+	ftl::codecs::Header h;
+	(*stream_).read((char*)&h, sizeof(h));
+	if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false;
+	return true;
+}
+
+bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) {
+	while (stream_->good()) {
+		if (buffer_.nonparsed_size() == 0) {
+			buffer_.reserve_buffer(100000);
+			stream_->read(buffer_.buffer(), 100000);
+			//if (stream_->bad()) return false;
+
+			int bytes = stream_->gcount();
+			if (bytes == 0) return false;
+			buffer_.buffer_consumed(bytes);
+		}
+
+		msgpack::object_handle msg;
+		if (!buffer_.next(msg)) continue;
+
+		msgpack::object obj = msg.get();
+		std::tuple<StreamPacket,Packet> data;
+		obj.convert(data);
+
+		f(get<0>(data),get<1>(data));
+	}
+
+	return false;
+}
+
+bool Reader::end() {
+	return true;
+}
diff --git a/components/codecs/src/writer.cpp b/components/codecs/src/writer.cpp
new file mode 100644
index 000000000..2022265e0
--- /dev/null
+++ b/components/codecs/src/writer.cpp
@@ -0,0 +1,30 @@
+#include <ftl/codecs/writer.hpp>
+
+#include <tuple>
+
+using ftl::codecs::Writer;
+
+Writer::Writer(std::ostream &s) : stream_(&s) {}
+
+Writer::~Writer() {
+
+}
+
+bool Writer::begin() {
+	ftl::codecs::Header h;
+	h.version = 0;
+	(*stream_).write((const char*)&h, sizeof(h));
+	return true;
+}
+
+bool Writer::end() {
+	return true;
+}
+
+bool Writer::write(const ftl::codecs::StreamPacket &s, const ftl::codecs::Packet &p) {
+	auto data = std::make_tuple(s,p);
+	msgpack::pack(buffer_, data);
+	(*stream_).write(buffer_.data(), buffer_.size());
+	buffer_.clear();
+	return true;
+}
diff --git a/components/codecs/test/CMakeLists.txt b/components/codecs/test/CMakeLists.txt
index 534336fed..89b92059d 100644
--- a/components/codecs/test/CMakeLists.txt
+++ b/components/codecs/test/CMakeLists.txt
@@ -30,3 +30,17 @@ target_link_libraries(nvpipe_codec_unit
 
 
 add_test(NvPipeCodecUnitTest nvpipe_codec_unit)
+
+### Reader Writer Unit ################################################################
+add_executable(rw_unit
+	./tests.cpp
+	../src/writer.cpp
+	../src/reader.cpp
+	./readwrite_test.cpp
+)
+target_include_directories(rw_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include")
+target_link_libraries(rw_unit
+	Threads::Threads ${OS_LIBS} ${OpenCV_LIBS} ${CUDA_LIBRARIES} ftlcommon Eigen3::Eigen)
+
+
+add_test(RWUnitTest rw_unit)
diff --git a/components/codecs/test/readwrite_test.cpp b/components/codecs/test/readwrite_test.cpp
new file mode 100644
index 000000000..e929ebaf9
--- /dev/null
+++ b/components/codecs/test/readwrite_test.cpp
@@ -0,0 +1,52 @@
+#include "catch.hpp"
+
+#include <ftl/codecs/writer.hpp>
+#include <ftl/codecs/reader.hpp>
+
+using ftl::codecs::Writer;
+using ftl::codecs::Reader;
+using ftl::codecs::StreamPacket;
+using ftl::codecs::Packet;
+using ftl::codecs::codec_t;
+using ftl::codecs::definition_t;
+
+TEST_CASE( "Write and read - Single frame" ) {
+	std::stringstream s;
+	Writer w(s);
+
+	StreamPacket spkt;
+	Packet pkt;
+
+	spkt.channel = 0;
+	spkt.timestamp = 0;
+	spkt.streamID = 0;
+
+	pkt.codec = codec_t::JSON;
+	pkt.definition = definition_t::Any;
+	pkt.block_number = 0;
+	pkt.block_total = 1;
+	pkt.flags = 0;
+	pkt.data = {44,44,44};
+
+	w.begin();
+	w.write(spkt, pkt);
+	w.end();
+
+	REQUIRE( s.str().size() > 0 );
+
+	int n = 0;
+
+	Reader r(s);
+	r.begin();
+	bool res = r.read(10, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
+		++n;
+		REQUIRE(rpkt.codec == codec_t::JSON);
+		REQUIRE(rpkt.data.size() == 3);
+		REQUIRE(rpkt.data[0] == 44);
+		REQUIRE(rspkt.channel == 0);
+	});
+	r.end();
+
+	REQUIRE( n == 1 );
+	REQUIRE( !res );
+}
-- 
GitLab