diff --git a/include/ftl/protocol/node.hpp b/include/ftl/protocol/node.hpp index 97887e92b5f9c1387c797acef29ae18d2b76abf6..1df000e566e9179b940631010e7b2025c8c13c90 100644 --- a/include/ftl/protocol/node.hpp +++ b/include/ftl/protocol/node.hpp @@ -13,6 +13,7 @@ namespace ftl { namespace net { class Peer; +using PeerPtr = std::shared_ptr<Peer>; } namespace protocol { @@ -37,7 +38,7 @@ enum struct NodeStatus { class Node { public: /** Peer for outgoing connection: resolve address and connect */ - explicit Node(const std::shared_ptr<ftl::net::Peer> &impl); + explicit Node(const ftl::net::PeerPtr &impl); virtual ~Node(); /** @@ -107,7 +108,7 @@ class Node { unsigned int localID(); protected: - std::shared_ptr<ftl::net::Peer> peer_; + ftl::net::PeerPtr peer_; }; } diff --git a/src/node.cpp b/src/node.cpp index cac2db8bb5a1c60901792a86688cb4f7320332e1..60f5c64dd29d9d47489156a8c7838066d62b18a4 100644 --- a/src/node.cpp +++ b/src/node.cpp @@ -2,9 +2,9 @@ #include "peer.hpp" using ftl::protocol::Node; -using ftl::net::Peer; +using ftl::net::PeerPtr; -Node::Node(const std::shared_ptr<Peer> &impl): peer_(impl) {} +Node::Node(const PeerPtr &impl): peer_(impl) {} Node::~Node() {} diff --git a/src/peer.cpp b/src/peer.cpp index bbca9dde5ee54fa547d08d4ab5472e3326fb4d3d..b6725875660b99895aa3ec5a704cbdad5426b87a 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -30,6 +30,7 @@ using ftl::net::internal::SocketConnection; using std::tuple; using std::get; using ftl::net::Peer; +using ftl::net::PeerPtr; using ftl::URI; using ftl::net::Dispatcher; using std::chrono::seconds; @@ -397,13 +398,13 @@ bool Peer::_data() { // Allow a small delay in case another thread is doing the handshake //lk.unlock(); - //std::this_thread::sleep_for(std::chrono::milliseconds(10)); - //if (status_ == NodeStatus::kConnecting) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + if (status_ == NodeStatus::kConnecting) { LOG(ERROR) << "failed to get handshake"; close(reconnect_on_protocol_error_); //lk.lock(); return false; - //} + } } else { // Must handle immediately with no other thread able // to read next message before completion. @@ -514,7 +515,7 @@ bool Peer::waitConnection(int s) { std::unique_lock<std::mutex> lk(m); std::condition_variable cv; - auto h = net_->onConnect([this, &cv](const std::shared_ptr<Peer> &p) { + auto h = net_->onConnect([this, &cv](const PeerPtr &p) { if (p.get() == this) { cv.notify_one(); } diff --git a/src/peer.hpp b/src/peer.hpp index 01c7cd67c065d3215b17c3e170d1a2e8699175bd..db1566846b2392ddc27119700f5ab95905efebf1 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -346,5 +346,7 @@ int Peer::asyncCall( return rpcid; } +using PeerPtr = std::shared_ptr<ftl::net::Peer>; + }; }; diff --git a/src/self.cpp b/src/self.cpp index c027e1d0788ab94ac5df008dc04aafb12792daed..bab56a0f73361d1c586247d1aaf3a0c6109703a2 100644 --- a/src/self.cpp +++ b/src/self.cpp @@ -79,19 +79,19 @@ std::shared_ptr<ftl::protocol::Node> Self::getWebService() const { } ftl::Handle Self::onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)> &cb) { - return universe_->onConnect([cb](const std::shared_ptr<ftl::net::Peer> &p) { + return universe_->onConnect([cb](const ftl::net::PeerPtr &p) { return cb(std::make_shared<ftl::protocol::Node>(p)); }); } ftl::Handle Self::onDisconnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)> &cb) { - return universe_->onDisconnect([cb](const std::shared_ptr<ftl::net::Peer> &p) { + return universe_->onDisconnect([cb](const ftl::net::PeerPtr &p) { return cb(std::make_shared<ftl::protocol::Node>(p)); }); } ftl::Handle Self::onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, const ftl::protocol::Error &)> &cb) { - return universe_->onError([cb](const std::shared_ptr<ftl::net::Peer> &p, const ftl::net::Error &err) { + return universe_->onError([cb](const ftl::net::PeerPtr &p, const ftl::net::Error &err) { ftl::protocol::Error perr = {}; return cb(std::make_shared<ftl::protocol::Node>(p), perr); }); diff --git a/src/universe.cpp b/src/universe.cpp index a5d907d0b7f030f89cbc0ede21ddc5d65ef85b2a..dcf5f01d12fb779dc99b826b70889a3f89c60261 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -31,6 +31,7 @@ using std::string; using std::vector; using std::thread; using ftl::net::Peer; +using ftl::net::PeerPtr; using ftl::net::Universe; using nlohmann::json; using ftl::UUID; @@ -219,7 +220,7 @@ bool Universe::isConnected(const std::string &s) { return isConnected(uri); } -void Universe::_insertPeer(const std::shared_ptr<Peer> &ptr) { +void Universe::_insertPeer(const PeerPtr &ptr) { UNIQUE_LOCK(net_mutex_,lk); for (size_t i=0; i<peers_.size(); ++i) { if (!peers_[i]) { @@ -237,7 +238,7 @@ void Universe::_insertPeer(const std::shared_ptr<Peer> &ptr) { throw FTL_Error("Too many connections"); } -std::shared_ptr<Peer> Universe::connect(const ftl::URI &u) { +PeerPtr Universe::connect(const ftl::URI &u) { // Check if already connected or if self (when could this happen?) { @@ -266,7 +267,7 @@ std::shared_ptr<Peer> Universe::connect(const ftl::URI &u) { return p; } -std::shared_ptr<Peer> Universe::connect(const std::string& addr) { +PeerPtr Universe::connect(const std::string& addr) { return connect(ftl::URI(addr)); } @@ -343,7 +344,7 @@ socket_t Universe::_setDescriptors() { return n; } -void Universe::_installBindings(const std::shared_ptr<Peer> &p) { +void Universe::_installBindings(const PeerPtr &p) { } @@ -351,7 +352,7 @@ void Universe::_installBindings() { } -void Universe::_removePeer(std::shared_ptr<Peer> &p) { +void Universe::_removePeer(PeerPtr &p) { UNIQUE_LOCK(net_mutex_, ulk); if (p && (!p->isValid() || @@ -399,14 +400,14 @@ void Universe::_cleanupPeers() { } } -std::shared_ptr<Peer> Universe::getPeer(const UUID &id) const { +PeerPtr Universe::getPeer(const UUID &id) const { SHARED_LOCK(net_mutex_,lk); auto ix = peer_ids_.find(id); if (ix == peer_ids_.end()) return nullptr; else return peers_[ix->second]; } -std::shared_ptr<Peer> Universe::getWebService() const { +PeerPtr Universe::getWebService() const { SHARED_LOCK(net_mutex_,lk); auto it = std::find_if(peers_.begin(), peers_.end(), [](const auto &p) { return p && p->getType() == NodeType::kWebService; @@ -591,19 +592,19 @@ void Universe::_run() { garbage_.clear(); } -ftl::Handle Universe::onConnect(const std::function<bool(const std::shared_ptr<Peer>&)> &cb) { +ftl::Handle Universe::onConnect(const std::function<bool(const PeerPtr&)> &cb) { return on_connect_.on(cb); } -ftl::Handle Universe::onDisconnect(const std::function<bool(const std::shared_ptr<Peer>&)> &cb) { +ftl::Handle Universe::onDisconnect(const std::function<bool(const PeerPtr&)> &cb) { return on_disconnect_.on(cb); } -ftl::Handle Universe::onError(const std::function<bool(const std::shared_ptr<Peer>&, const ftl::net::Error &)> &cb) { +ftl::Handle Universe::onError(const std::function<bool(const PeerPtr&, const ftl::net::Error &)> &cb) { return on_error_.on(cb); } -std::shared_ptr<Peer> Universe::_findPeer(const Peer *p) { +PeerPtr Universe::_findPeer(const Peer *p) { SHARED_LOCK(net_mutex_,lk); for (const auto &pp : peers_) { if (pp.get() == p) return pp; diff --git a/src/universe.hpp b/src/universe.hpp index 4d932089e2a98275e181ac899af8ba9b15a7ebd0..688292fa69755870e39ef21aaf791963f920d018 100644 --- a/src/universe.hpp +++ b/src/universe.hpp @@ -34,7 +34,7 @@ struct Error { struct ReconnectInfo { int tries; float delay; - std::shared_ptr<Peer> peer; + PeerPtr peer; }; struct NetImplDetail; @@ -86,8 +86,8 @@ public: * * @param addr URI giving protocol, interface and port */ - std::shared_ptr<Peer> connect(const std::string &addr); - std::shared_ptr<Peer> connect(const ftl::URI &addr); + PeerPtr connect(const std::string &addr); + PeerPtr connect(const ftl::URI &addr); bool isConnected(const ftl::URI &uri); bool isConnected(const std::string &s); @@ -101,9 +101,9 @@ public: int waitConnections(); /** get peer pointer by peer UUID, returns nullptr if not found */ - std::shared_ptr<Peer> getPeer(const ftl::UUID &pid) const; + PeerPtr getPeer(const ftl::UUID &pid) const; /** get webservice peer pointer, returns nullptr if not connected to webservice */ - std::shared_ptr<Peer> getWebService() const; + PeerPtr getWebService() const; /** * Bind a function to an RPC or service call name. This will implicitely @@ -161,9 +161,9 @@ public: // --- Event Handlers ------------------------------------------------------ - ftl::Handle onConnect(const std::function<bool(const std::shared_ptr<ftl::net::Peer>&)>&); - ftl::Handle onDisconnect(const std::function<bool(const std::shared_ptr<ftl::net::Peer>&)>&); - ftl::Handle onError(const std::function<bool(const std::shared_ptr<ftl::net::Peer>&, const ftl::net::Error &)>&); + ftl::Handle onConnect(const std::function<bool(const ftl::net::PeerPtr&)>&); + ftl::Handle onDisconnect(const std::function<bool(const ftl::net::PeerPtr&)>&); + ftl::Handle onError(const std::function<bool(const ftl::net::PeerPtr&, const ftl::net::Error &)>&); size_t getSendBufferSize(ftl::URI::scheme_t s); size_t getRecvBufferSize(ftl::URI::scheme_t s); @@ -174,16 +174,16 @@ private: void _run(); SOCKET _setDescriptors(); // TODO: move to implementation void _installBindings(); - void _installBindings(const std::shared_ptr<ftl::net::Peer>&); + void _installBindings(const ftl::net::PeerPtr&); //bool _subscribe(const std::string &res); void _cleanupPeers(); void _notifyConnect(ftl::net::Peer *); void _notifyDisconnect(ftl::net::Peer *); void _notifyError(ftl::net::Peer *, const ftl::net::Error &); void _periodic(); - std::shared_ptr<ftl::net::Peer> _findPeer(const ftl::net::Peer *p); - void _removePeer(std::shared_ptr<Peer> &p); - void _insertPeer(const std::shared_ptr<ftl::net::Peer> &ptr); + ftl::net::PeerPtr _findPeer(const ftl::net::Peer *p); + void _removePeer(PeerPtr &p); + void _insertPeer(const ftl::net::PeerPtr &ptr); static void __start(Universe *u); @@ -195,14 +195,14 @@ private: std::unique_ptr<NetImplDetail> impl_; std::vector<std::unique_ptr<ftl::net::internal::SocketServer>> listeners_; - std::vector<std::shared_ptr<ftl::net::Peer>> peers_; + std::vector<ftl::net::PeerPtr> peers_; std::unordered_map<std::string, size_t> peer_by_uri_; std::map<ftl::UUID, size_t> peer_ids_; ftl::net::Dispatcher disp_; std::list<ReconnectInfo> reconnects_; size_t phase_; - std::list<std::shared_ptr<ftl::net::Peer>> garbage_; + std::list<ftl::net::PeerPtr> garbage_; ftl::Handle garbage_timer_; // size_t send_size_; @@ -212,9 +212,9 @@ private: std::atomic_int connection_count_ = 0; // Active connections std::atomic_int peer_instances_ = 0; // Actual peers dependent on Universe - ftl::Handler<const std::shared_ptr<ftl::net::Peer>&> on_connect_; - ftl::Handler<const std::shared_ptr<ftl::net::Peer>&> on_disconnect_; - ftl::Handler<const std::shared_ptr<ftl::net::Peer>&, const ftl::net::Error &> on_error_; + ftl::Handler<const ftl::net::PeerPtr&> on_connect_; + ftl::Handler<const ftl::net::PeerPtr&> on_disconnect_; + ftl::Handler<const ftl::net::PeerPtr&, const ftl::net::Error &> on_error_; static std::shared_ptr<Universe> instance_; @@ -316,7 +316,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { template <typename R, typename... ARGS> R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) { - std::shared_ptr<Peer> p = getPeer(pid); + PeerPtr p = getPeer(pid); if (p == nullptr || !p->isConnected()) { if (p == nullptr) throw FTL_Error("Attempting to call an unknown peer : " << pid.to_string()); else throw FTL_Error("Attempting to call an disconnected peer : " << pid.to_string()); @@ -326,7 +326,7 @@ R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) { template <typename R, typename... ARGS> int Universe::asyncCall(const ftl::UUID &pid, const std::string &name, std::function<void(const R&)> cb, ARGS... args) { - std::shared_ptr<Peer> p = getPeer(pid); + PeerPtr p = getPeer(pid); if (p == nullptr || !p->isConnected()) { if (p == nullptr) throw FTL_Error("Attempting to call an unknown peer : " << pid.to_string()); else throw FTL_Error("Attempting to call an disconnected peer : " << pid.to_string()); @@ -336,7 +336,7 @@ int Universe::asyncCall(const ftl::UUID &pid, const std::string &name, std::func template <typename... ARGS> bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) { - std::shared_ptr<Peer> p = getPeer(pid); + PeerPtr p = getPeer(pid); if (p == nullptr) { LOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string(); return false; @@ -347,7 +347,7 @@ bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) template <typename... ARGS> int Universe::try_send(const ftl::UUID &pid, const std::string &name, ARGS... args) { - std::shared_ptr<Peer> p = getPeer(pid); + PeerPtr p = getPeer(pid); if (p == nullptr) { //DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string(); return false; diff --git a/test/net_integration.cpp b/test/net_integration.cpp index 68f0fb9b2178ec769d58ad2950bc49d1de024fc5..217ba2874686bd19f0bd992f536380e418ef98d3 100644 --- a/test/net_integration.cpp +++ b/test/net_integration.cpp @@ -25,8 +25,6 @@ static bool try_for(int count, const std::function<bool()> &f) { // --- Tests ------------------------------------------------------------------- TEST_CASE("Listen and Connect", "[net]") { - ftl::protocol::reset(); - auto self = ftl::createDummySelf(); self->listen(ftl::URI("tcp://localhost:0")); @@ -124,11 +122,11 @@ TEST_CASE("Listen and Connect", "[net]") { REQUIRE(cv.wait_for(lk, std::chrono::seconds(5)) == std::cv_status::no_timeout); REQUIRE(p_connecting->isConnected()); } -} -TEST_CASE("Self::onConnect()", "[net]") { ftl::protocol::reset(); +} +TEST_CASE("Self::onConnect()", "[net]") { auto self = ftl::createDummySelf(); self->listen(ftl::URI("tcp://localhost:0")); @@ -161,6 +159,8 @@ TEST_CASE("Self::onConnect()", "[net]") { REQUIRE( done ); } + + ftl::protocol::reset(); } /*TEST_CASE("Universe::onDisconnect()", "[net]") { diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp index bb9dd983f92f0b295c1182570fe46a5cb480ea40..7fd303cf500556b96c535a4bf2a53eac80f3d54e 100644 --- a/test/stream_integration.cpp +++ b/test/stream_integration.cpp @@ -11,8 +11,6 @@ TEST_CASE("TCP Stream", "[net]") { std::mutex mtx; - ftl::protocol::reset(); - auto self = ftl::createDummySelf(); self->listen(ftl::URI("tcp://localhost:0")); @@ -62,4 +60,7 @@ TEST_CASE("TCP Stream", "[net]") { REQUIRE( rpkt.codec == ftl::protocol::Codec::kJPG ); REQUIRE( rpkt.frame_count == 1 ); } + + p.reset(); + ftl::protocol::reset(); }