diff --git a/.vscode/launch.json b/.vscode/launch.json index 71b0e0b5bbb24188d0e77914415e3c0a139fa0cc..c86bce66099f93f2a10bf28bb18c8a0b3be623df 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -5,14 +5,12 @@ "version": "0.2.0", "configurations": [ { - "name": "g++ - Build and debug active file", + "name": "g++ - Debug Node", "type": "cppdbg", "request": "launch", - "program": "${command:cmake.launchTargetPath}", "args": [], "stopAtEntry": false, "cwd": "${workspaceFolder}/build", - "environment": [], "externalConsole": false, "MIMode": "gdb", "setupCommands": [ @@ -22,7 +20,6 @@ "ignoreFailures": true } ], - "preLaunchTask": "C/C++: g++ build active file", "miDebuggerPath": "/usr/bin/gdb", "sourceFileMap": { "${workspaceFolder}": { diff --git a/include/ftl/handle.hpp b/include/ftl/handle.hpp index 86aeefc2e2357c8b10ad87b4f55c2c87b40ad103..7872e7e9993259e054388345a36f1c1f5491e141 100644 --- a/include/ftl/handle.hpp +++ b/include/ftl/handle.hpp @@ -126,8 +126,8 @@ struct Handler : BaseHandler { void trigger(ARGS ...args) { bool hadFault = false; std::string faultMsg; - // FIXME: This should be a shared_lock but there is a race condition elsewhere. - std::unique_lock<std::shared_mutex> lk(mutex_); + + std::shared_lock<std::shared_mutex> lk(mutex_); for (auto i = callbacks_.begin(); i != callbacks_.end(); ) { bool keep = true; try { diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 375e72f65804914d86f9b6d4913b03cd776395fb..475894c88e861e18561c7107d72878b9d58cc6d2 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -34,6 +34,11 @@ std::string object_type_to_string(const msgpack::type::object_type t) { return "UNKNOWN"; } +Dispatcher::~Dispatcher() { + UNIQUE_LOCK(mutex_, lk); + funcs_.clear(); +} + vector<string> Dispatcher::getBindings() const { SHARED_LOCK(mutex_, lk); vector<string> res; @@ -43,7 +48,22 @@ vector<string> Dispatcher::getBindings() const { return res; } +void Dispatcher::unbind(const std::string &name) { + UNIQUE_LOCK(mutex_, lk); + auto i = funcs_.find(name); + if (i != funcs_.end()) { + funcs_.erase(i); + } +} + void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) { + SHARED_LOCK(mutex_, lk); + std::shared_lock<std::shared_mutex> lk2; + + if (parent_) { + lk2 = std::move(std::shared_lock<std::shared_mutex>(parent_->mutex_)); + } + switch (msg.via.array.size) { case 3: dispatch_notification(s, msg); @@ -109,7 +129,7 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { } optional<Dispatcher::adaptor_type> ftl::net::Dispatcher::_locateHandler(const std::string &name) const { - SHARED_LOCK(mutex_, lk); + //SHARED_LOCK(mutex_, lk); auto it_func = funcs_.find(name); if (it_func == funcs_.end()) { if (parent_ != nullptr) { diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index ab0dbc2a3a2ed6424e46cac5e17de0ac8478db02..e89f86f53749df635d27f799457b2f655b5124fc 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -73,6 +73,7 @@ namespace net { class Dispatcher { public: explicit Dispatcher(Dispatcher *parent = nullptr) : parent_(parent) {} + ~Dispatcher(); /** * Primary method by which a peer dispatches a msgpack object that this @@ -261,13 +262,7 @@ class Dispatcher { /** * Remove a previous bound function by name. */ - void unbind(const std::string &name) { - UNIQUE_LOCK(mutex_, lk); - auto i = funcs_.find(name); - if (i != funcs_.end()) { - funcs_.erase(i); - } - } + void unbind(const std::string &name); /** * @return All bound function names. diff --git a/src/peer.hpp b/src/peer.hpp index 50585439f649d9e09f8c8087042098729541f5a8..de3fd9da769996dbfa7d3e22a1ea3dc30d2fa3ba 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -197,6 +197,8 @@ class Peer { */ void data(); + int jobs() const { return job_count_; } + public: static const int kMaxMessage = 4*1024*1024; // 4Mb currently static const int kDefaultMessage = 512*1024; // 0.5Mb currently diff --git a/src/protocol.cpp b/src/protocol.cpp index 32291fc22dac793c8dbd11b223990c642d853710..22c1bb62327f038a08f2425c221317cb7cca42bb 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -6,15 +6,18 @@ #include <ftl/protocol.hpp> #include <ftl/protocol/self.hpp> +#include <ftl/threads.hpp> #include "universe.hpp" #include "rpc.hpp" static std::shared_ptr<ftl::net::Universe> universe; +static std::mutex globalmtx; // ctpl::thread_pool ftl::pool(std::thread::hardware_concurrency()*2); ctpl::thread_pool ftl::pool(4); void ftl::protocol::reset() { + UNIQUE_LOCK(globalmtx, lk); universe.reset(); } @@ -22,8 +25,11 @@ ftl::UUID ftl::protocol::id; std::shared_ptr<ftl::protocol::Self> ftl::getSelf() { if (!universe) { - universe = std::make_shared<ftl::net::Universe>(); - ftl::rpc::install(universe.get()); + UNIQUE_LOCK(globalmtx, lk); + if (!universe) { + universe = std::make_shared<ftl::net::Universe>(); + ftl::rpc::install(universe.get()); + } } return std::make_shared<ftl::protocol::Self>(universe); } diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index c6a546458614fa2405bf26e9023d734764eadb98..4e873847a9766ee060b0c22fbd94367333e7060e 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -349,33 +349,43 @@ void Net::_run() { auto current = state->buffer.begin(); + std::list<PacketBuffer*> framePackets; + for (size_t i = 0; i < size; ++i) { int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_; // Should the packet be dispatched yet if (pts == ats) { - StreamPacket *spkt; - DataPacket *pkt; - - spkt = ¤t->packets.first; - pkt = ¤t->packets.second; + framePackets.push_back(&(*current)); - ftl::pool.push([this, c = std::move(ftl::Counter(&state->active)), buf = &*current, spkt, pkt, state](int ix) { - try { - _processPacket(buf->peer, 0, *spkt, *pkt); - } catch (const std::exception &e) { - LOG(ERROR) << "Packet processing error: " << e.what(); - } - buf->done = true; - }); - - if (spkt->channel == Channel::kEndFrame) { + if (current->packets.first.channel == Channel::kEndFrame) { break; } } ++current; } + + ftl::pool.push([ + this, + c = std::move(ftl::Counter(&state->active)), + c2 = std::move(ftl::Counter(&jobs_)), + framePackets](int ix) { + for (auto buf : framePackets) { + StreamPacket *spkt; + DataPacket *pkt; + + spkt = &buf->packets.first; + pkt = &buf->packets.second; + + try { + _processPacket(buf->peer, 0, *spkt, *pkt); + } catch (const std::exception &e) { + LOG(ERROR) << "Packet processing error: " << e.what(); + } + buf->done = true; + } + }); } { @@ -722,8 +732,14 @@ bool Net::end() { } active_ = false; + net_->unbind(base_uri_); if (thread_.joinable()) thread_.join(); + + while (jobs_ > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + return true; } diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index 33c7908cd10762975294b2f017d48388cc79b4de..89b22cac48a95f82ce6b2f677bd3776e2222866e 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -105,6 +105,7 @@ class Net : public Stream { int64_t buffering_ = 0; std::atomic_int underuns_ = 0; std::atomic_int drops_ = 0; + std::atomic_int jobs_ = 0; static std::atomic_size_t req_bitrate__; static std::atomic_size_t tx_bitrate__; diff --git a/src/universe.hpp b/src/universe.hpp index 30b0ace3a7a69d98cc29e251cdb13f8247d26a1c..1b0cc090bf6338d55add0cafd88f2282b447ca19 100644 --- a/src/universe.hpp +++ b/src/universe.hpp @@ -241,7 +241,7 @@ class Universe { template <typename F> void Universe::bind(const std::string &name, F func) { - UNIQUE_LOCK(net_mutex_, lk); + // UNIQUE_LOCK(net_mutex_, lk); disp_.bind(name, func, typename ftl::internal::func_kind_info<F>::result_kind(), typename ftl::internal::func_kind_info<F>::args_kind(), diff --git a/test/netstream_unit.cpp b/test/netstream_unit.cpp index b098049620cae2c474a30fbe5707f5d498442192..dae84f6c8ccd5314f8e75d8738541bc8603587a5 100644 --- a/test/netstream_unit.cpp +++ b/test/netstream_unit.cpp @@ -88,7 +88,7 @@ TEST_CASE("Net stream options") { fakedata[0] = ""; send_handshake(*p.get()); p->data(); - sleep_for(milliseconds(50)); + while (p->jobs() > 0) sleep_for(milliseconds(1)); auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false); @@ -114,6 +114,7 @@ TEST_CASE("Net stream options") { spkt.channel = Channel::kColour; writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); p->data(); + while (p->jobs() > 0) sleep_for(milliseconds(1)); while (count < 1) { sleep_for(milliseconds(10)); @@ -125,6 +126,7 @@ TEST_CASE("Net stream options") { s1->setProperty(ftl::protocol::StreamProperty::kBuffering, 0.1f); p->data(); + while (p->jobs() > 0) sleep_for(milliseconds(1)); while (count < 2) { sleep_for(milliseconds(10)); @@ -140,7 +142,7 @@ TEST_CASE("Net stream sending requests") { fakedata[0] = ""; send_handshake(*p.get()); p->data(); - sleep_for(milliseconds(50)); + while (p->jobs() > 0) sleep_for(milliseconds(1)); SECTION("cannot enable if not seen") { auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false); @@ -207,9 +209,10 @@ TEST_CASE("Net stream sending requests") { spkt.timestamp = i; writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); p->data(); + while (p->jobs() > 0) sleep_for(milliseconds(1)); } - while (s1->postCount < 2) sleep_for(milliseconds(10)); + while (s1->postCount < 4) sleep_for(milliseconds(10)); REQUIRE( s1->lastSpkt.channel == Channel::kColour ); REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 ); @@ -251,12 +254,14 @@ TEST_CASE("Net stream sending requests") { spkt.timestamp = i >> 1; writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); p->data(); + while (p->jobs() > 0) sleep_for(milliseconds(1)); } while (s1->postCount < 3) sleep_for(milliseconds(10)); REQUIRE( s1->lastSpkt.channel == Channel::kColour ); REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 ); + s1->end(); } SECTION("responds to requests") { @@ -280,8 +285,8 @@ TEST_CASE("Net stream sending requests") { spkt.flags = ftl::protocol::kFlagRequest; writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); p->data(); + while (p->jobs() > 0) sleep_for(milliseconds(1)); - sleep_for(milliseconds(50)); REQUIRE( seenReq ); } @@ -308,14 +313,14 @@ TEST_CASE("Net stream sending requests") { writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); p->data(); - sleep_for(milliseconds(50)); + while (p->jobs() > 0) sleep_for(milliseconds(1)); REQUIRE( bitrate == 100 ); s1->setProperty(ftl::protocol::StreamProperty::kBitrate, 200); writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); p->data(); - sleep_for(milliseconds(50)); + while (p->jobs() > 0) sleep_for(milliseconds(1)); REQUIRE( bitrate == 200 ); } @@ -341,7 +346,7 @@ TEST_CASE("Net stream sending requests") { writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); p->data(); - sleep_for(milliseconds(50)); + while (p->jobs() > 0) sleep_for(milliseconds(1)); REQUIRE( seenReq ); } @@ -354,7 +359,7 @@ TEST_CASE("Net stream can see received data") { fakedata[0] = ""; send_handshake(*p.get()); p->data(); - sleep_for(milliseconds(50)); + while (p->jobs() > 0) sleep_for(milliseconds(1)); SECTION("available if packet is seen") { auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true); @@ -375,11 +380,12 @@ TEST_CASE("Net stream can see received data") { spkt.channel = Channel::kColour; writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); p->data(); + while (p->jobs() > 0) sleep_for(milliseconds(1)); spkt.channel = Channel::kEndFrame; writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); p->data(); - sleep_for(milliseconds(50)); + while (p->jobs() > 0) sleep_for(milliseconds(1)); REQUIRE( seenReq ); REQUIRE( s1->available(FrameID(1, 1), Channel::kColour) ); }