From 7edc578a02f15b791cca7a29b0366a9364d3b023 Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nwpope@utu.fi> Date: Mon, 28 Oct 2019 09:44:20 +0200 Subject: [PATCH] Fix for write race condition --- .../codecs/include/ftl/codecs/writer.hpp | 2 ++ components/codecs/src/reader.cpp | 21 ++++++++++++++++++- components/codecs/src/writer.cpp | 3 ++- 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/components/codecs/include/ftl/codecs/writer.hpp b/components/codecs/include/ftl/codecs/writer.hpp index 044624ba8..94d82f1ec 100644 --- a/components/codecs/include/ftl/codecs/writer.hpp +++ b/components/codecs/include/ftl/codecs/writer.hpp @@ -5,6 +5,7 @@ #include <msgpack.hpp> //#include <Eigen/Eigen> +#include <ftl/threads.hpp> #include <ftl/codecs/packet.hpp> namespace ftl { @@ -24,6 +25,7 @@ class Writer { msgpack::sbuffer buffer_; int64_t timestart_; bool active_; + MUTEX mutex_; }; } diff --git a/components/codecs/src/reader.cpp b/components/codecs/src/reader.cpp index 97a74acd1..ba68f26c9 100644 --- a/components/codecs/src/reader.cpp +++ b/components/codecs/src/reader.cpp @@ -37,6 +37,22 @@ bool Reader::begin() { return true; } +/*static void printMsgpack(msgpack::object &obj) { + switch (obj.type) { + case msgpack::type::NIL: return; + case msgpack::type::BOOLEAN: LOG(INFO) << "BOOL " << obj.as<bool>(); return; + case msgpack::type::POSITIVE_INTEGER: + case msgpack::type::NEGATIVE_INTEGER: LOG(INFO) << "INT " << obj.as<int>(); return; + case msgpack::type::FLOAT32: LOG(INFO) << "FLOAT " << obj.as<float>(); return; + case msgpack::type::FLOAT64: LOG(INFO) << "DOUBLE " << obj.as<double>(); return; + case msgpack::type::STR: LOG(INFO) << "STRING " << obj.as<std::string>(); return; + case msgpack::type::BIN: return; + case msgpack::type::EXT: return; + case msgpack::type::ARRAY: LOG(INFO) << "ARRAY: "; return; + case msgpack::type::MAP: LOG(INFO) << "MAP: "; return; + } +}*/ + bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)> &f) { //UNIQUE_LOCK(mtx_, lk); std::unique_lock<std::mutex> lk(mtx_, std::defer_lock); @@ -75,10 +91,13 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream //std::tuple<StreamPacket,Packet> data; msgpack::object obj = msg.get(); + + //printMsgpack(obj); + try { obj.convert(data_.emplace_back()); } catch (std::exception &e) { - LOG(INFO) << "Corrupt message: " << buffer_.nonparsed_size(); + LOG(INFO) << "Corrupt message: " << buffer_.nonparsed_size() << " - " << e.what(); //partial = true; //continue; return false; diff --git a/components/codecs/src/writer.cpp b/components/codecs/src/writer.cpp index d7582f118..1e3841a8e 100644 --- a/components/codecs/src/writer.cpp +++ b/components/codecs/src/writer.cpp @@ -41,7 +41,8 @@ bool Writer::write(const ftl::codecs::StreamPacket &s, const ftl::codecs::Packet auto data = std::make_tuple(s2,p); msgpack::sbuffer buffer; msgpack::pack(buffer, data); + + UNIQUE_LOCK(mutex_, lk); (*stream_).write(buffer.data(), buffer.size()); - //buffer_.clear(); return true; } -- GitLab