diff --git a/src/peer.cpp b/src/peer.cpp index 8342b8f7099efefb2c60e202855cd1ef93b940f4..9da2e0769fcd5388d9cc48bc248c3b2ab38f894a 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -405,8 +405,23 @@ bool Peer::_data() { // Must handle immediately with no other thread able // to read next message before completion. // The handshake handler must not block. - //disp_->dispatch(*this, obj); - //return true; + + try { + disp_->dispatch(*this, obj); + } catch (const std::exception &e) { + net_->_notifyError(this, ftl::protocol::Error::kDispatchFailed, e.what()); + } + + ++job_count_; + ftl::pool.push([this](int id) { + try { + _data(); + } catch (const std::exception &e) { + net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what()); + } + --job_count_; + }); + return true; } } catch(...) { DLOG(WARNING) << "Bad first message format... waiting"; diff --git a/test/stream_unit.cpp b/test/stream_unit.cpp index 8c39beb4b50d84e039ddc0a5744f8fb9429f345e..fcbb1430fe399b28ae352782e7bd5b5d63c2352d 100644 --- a/test/stream_unit.cpp +++ b/test/stream_unit.cpp @@ -38,12 +38,12 @@ class TestStream : public ftl::protocol::Stream { //std::function<void(const StreamPacket &, const Packet &)> cb_; }; -TEST_CASE("ftl::stream::Muxer()::write", "[stream]") { +TEST_CASE("ftl::stream::Muxer()::post, distinct framesets", "[stream]") { std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); REQUIRE(mux); - SECTION("write with one stream") { + SECTION("write with one stream fails") { std::shared_ptr<Stream> s = std::make_shared<TestStream>(); REQUIRE(s); @@ -104,7 +104,7 @@ TEST_CASE("ftl::stream::Muxer()::write", "[stream]") { } } -TEST_CASE("ftl::stream::Muxer()::post multi-frameset", "[stream]") { +TEST_CASE("ftl::stream::Muxer()::post, single frameset", "[stream]") { std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); REQUIRE(mux); @@ -115,7 +115,7 @@ TEST_CASE("ftl::stream::Muxer()::post multi-frameset", "[stream]") { std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); REQUIRE(s2); - mux->add(s1); + mux->add(s1,1); mux->add(s2,1); StreamPacket tspkt = {4,0,0,1,Channel::kColour}; @@ -125,15 +125,15 @@ TEST_CASE("ftl::stream::Muxer()::post multi-frameset", "[stream]") { }); REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.streamID == 0 ); + REQUIRE( tspkt.streamID == 1 ); REQUIRE( tspkt.frame_number == 0 ); REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); REQUIRE( tspkt.streamID == 1 ); - REQUIRE( tspkt.frame_number == 0 ); + REQUIRE( tspkt.frame_number == 1 ); - StreamPacket tspkt2 = {4,0,0,1,Channel::kColour}; - StreamPacket tspkt3 = {4,0,0,1,Channel::kColour}; + StreamPacket tspkt2 = {4,0,4,4,Channel::kColour}; + StreamPacket tspkt3 = {4,0,4,4,Channel::kColour}; auto h2 = s1->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { tspkt2 = spkt; return true; @@ -143,9 +143,15 @@ TEST_CASE("ftl::stream::Muxer()::post multi-frameset", "[stream]") { return true; }); - REQUIRE( mux->post({4,200,1,0,Channel::kColour},{}) ); + REQUIRE( mux->post({4,200,1,1,Channel::kColour},{}) ); + REQUIRE( tspkt2.streamID == 4 ); + REQUIRE( tspkt2.frame_number == 4 ); REQUIRE( tspkt3.streamID == 0 ); REQUIRE( tspkt3.frame_number == 0 ); + + REQUIRE( mux->post({4,200,1,0,Channel::kColour},{}) ); + REQUIRE( tspkt2.streamID == 0 ); + REQUIRE( tspkt2.frame_number == 0 ); } }