diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000000000000000000000000000000000000..23e9642684878e488e3b0352ed441ebfa372a0f6 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "files.associations": { + "deque": "cpp", + "string": "cpp", + "vector": "cpp" + } +} \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index d42fc20fc164ed22ed0d79b2e4c26d73c386ae4c..2b0f6b92aebab4de40975588bd5e6ca205f9b62b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -160,6 +160,16 @@ add_library(beyond-protocol STATIC src/uri.cpp src/config.cpp src/time.cpp + src/node.cpp + src/self.cpp + src/socket/socket.cpp + src/protocol/connection.cpp + src/protocol/factory.cpp + src/protocol/tcp.cpp + src/protocol/tls.cpp + src/protocol/websocket.cpp + src/base64.cpp + src/protocol.cpp ) target_include_directories(beyond-protocol PUBLIC diff --git a/include/ftl/protocol.hpp b/include/ftl/protocol.hpp index 6b5e515bb22bc3309ead65b07b98b1bdfcd011b1..c735e0da4efdd47b19ae0853144e23941733a2cf 100644 --- a/include/ftl/protocol.hpp +++ b/include/ftl/protocol.hpp @@ -13,7 +13,7 @@ namespace ftl { namespace protocol { class Node; class Stream; -class Listener; +class Self; class Service; /** Reset network and streams. Used by tests. */ @@ -22,9 +22,10 @@ void reset(); extern ftl::UUID id; } -std::shared_ptr<ftl::protocol::Listener> createListener(const std::string &uri); +std::shared_ptr<ftl::protocol::Self> getSelf(); +std::shared_ptr<ftl::protocol::Self> createDummySelf(); std::shared_ptr<ftl::protocol::Service> setServiceProvider(const std::string &uri); -std::shared_ptr<ftl::protocol::Node> createPeer(const std::string &uri); +std::shared_ptr<ftl::protocol::Node> createNode(const std::string &uri); std::shared_ptr<ftl::protocol::Stream> createStream(const std::string &uri); } diff --git a/include/ftl/protocol/listener.hpp b/include/ftl/protocol/listener.hpp deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/include/ftl/protocol/node.hpp b/include/ftl/protocol/node.hpp index 9c4345c36672f2a1e9d4dd33374bec6a8e55a80e..061de515125c6ffb93ba3fb7c263ee77bd7c8636 100644 --- a/include/ftl/protocol/node.hpp +++ b/include/ftl/protocol/node.hpp @@ -35,8 +35,9 @@ enum struct NodeStatus { * created directly. */ class Node { + public: /** Peer for outgoing connection: resolve address and connect */ - explicit Node(const std::unique_ptr<ftl::net::Peer> &impl); + explicit Node(const std::shared_ptr<ftl::net::Peer> &impl); virtual ~Node(); /** @@ -108,7 +109,7 @@ class Node { unsigned int localID(); protected: - std::unique_ptr<ftl::net::Peer> peer_; + std::shared_ptr<ftl::net::Peer> peer_; }; } diff --git a/include/ftl/protocol/self.hpp b/include/ftl/protocol/self.hpp new file mode 100644 index 0000000000000000000000000000000000000000..0837878df271ab26758979b7c49110ebe4409fcf --- /dev/null +++ b/include/ftl/protocol/self.hpp @@ -0,0 +1,66 @@ +/** + * @file node.hpp + * @copyright Copyright (c) 2022 University of Turku, MIT License + * @author Nicolas Pope + */ + +#pragma once + +#include <ftl/uuid.hpp> +#include <ftl/uri.hpp> + +#include <memory> + +namespace ftl { +namespace net { +class Universe; +} + +namespace protocol { + +class Self { + public: + /** Peer for outgoing connection: resolve address and connect */ + explicit Self(const std::shared_ptr<ftl::net::Universe> &impl); + virtual ~Self(); + + void start(); + + /** + * Open a new listening port on a given interfaces. + * eg. "tcp://localhost:9000" + * @param addr URI giving protocol, interface and port + */ + bool listen(const ftl::URI &addr); + + std::vector<ftl::URI> getListeningURIs(); + + /** + * Essential to call this before destroying anything that registered + * callbacks or binds for RPC. It will terminate all connections and + * stop any network activity but without deleting the net object. + */ + void shutdown(); + + bool isConnected(const ftl::URI &uri); + bool isConnected(const std::string &s); + + size_t numberOfNodes() const; + + /** + * Will block until all currently registered connnections have completed. + * You should not use this, but rather use onConnect. + */ + int waitConnections(); + + /** get peer pointer by peer UUID, returns nullptr if not found */ + std::shared_ptr<ftl::protocol::Node> getNode(const ftl::UUID &pid) const; + /** get webservice peer pointer, returns nullptr if not connected to webservice */ + std::shared_ptr<ftl::protocol::Node> getWebService() const; + + protected: + std::shared_ptr<ftl::net::Universe> universe_; +}; + +} +} diff --git a/include/ftl/utility/base64.hpp b/include/ftl/utility/base64.hpp new file mode 100644 index 0000000000000000000000000000000000000000..197bd7df333629c5e8316fd36a7e654977ecd30f --- /dev/null +++ b/include/ftl/utility/base64.hpp @@ -0,0 +1,35 @@ +// +// base64 encoding and decoding with C++. +// Version: 2.rc.04 (release candidate) +// + +#ifndef BASE64_H_C0CE2A47_D10E_42C9_A27C_C883944E704A +#define BASE64_H_C0CE2A47_D10E_42C9_A27C_C883944E704A + +#include <string> + +#if __cplusplus >= 201703L +#include <string_view> +#endif // __cplusplus >= 201703L + +std::string base64_encode (std::string const& s, bool url = false); +std::string base64_encode_pem (std::string const& s); +std::string base64_encode_mime(std::string const& s); + +std::string base64_decode(std::string const& s, bool remove_linebreaks = false); +std::string base64_encode(unsigned char const*, size_t len, bool url = false); + +#if __cplusplus >= 201703L +// +// Interface with std::string_view rather than const std::string& +// Requires C++17 +// Provided by Yannic Bonenberger (https://github.com/Yannic) +// +std::string base64_encode (std::string_view s, bool url = false); +std::string base64_encode_pem (std::string_view s); +std::string base64_encode_mime(std::string_view s); + +std::string base64_decode(std::string_view s, bool remove_linebreaks = false); +#endif // __cplusplus >= 201703L + +#endif /* BASE64_H_C0CE2A47_D10E_42C9_A27C_C883944E704A */ diff --git a/src/base64.cpp b/src/base64.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a1be53b84b72f87ba937cd0f12ec5b0df8ab8c96 --- /dev/null +++ b/src/base64.cpp @@ -0,0 +1,256 @@ +/* + base64.cpp and base64.h + + base64 encoding and decoding with C++. + More information at + https://renenyffenegger.ch/notes/development/Base64/Encoding-and-decoding-base-64-with-cpp + + Version: 2.rc.04 (release candidate) + + Copyright (C) 2004-2017, 2020 René Nyffenegger + + This source code is provided 'as-is', without any express or implied + warranty. In no event will the author be held liable for any damages + arising from the use of this software. + + Permission is granted to anyone to use this software for any purpose, + including commercial applications, and to alter it and redistribute it + freely, subject to the following restrictions: + + 1. The origin of this source code must not be misrepresented; you must not + claim that you wrote the original source code. If you use this source code + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. + + 2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original source code. + + 3. This notice may not be removed or altered from any source distribution. + + René Nyffenegger rene.nyffenegger@adp-gmbh.ch + +*/ + +#include <ftl/utility/base64.hpp> + + // + // Depending on the url parameter in base64_chars, one of + // two sets of base64 characters needs to be chosen. + // They differ in their last two characters. + // +const char* base64_chars[2] = { + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789" + "+/", + + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789" + "-_"}; + +static unsigned int pos_of_char(const unsigned char chr) { + // + // Return the position of chr within base64_encode() + // + + if (chr >= 'A' && chr <= 'Z') return chr - 'A'; + else if (chr >= 'a' && chr <= 'z') return chr - 'a' + ('Z' - 'A') + 1; + else if (chr >= '0' && chr <= '9') return chr - '0' + ('Z' - 'A') + ('z' - 'a') + 2; + else if (chr == '+' || chr == '-') return 62; // Be liberal with input and accept both url ('-') and non-url ('+') base 64 characters ( + else if (chr == '/' || chr == '_') return 63; // Ditto for '/' and '_' + + throw "If input is correct, this line should never be reached."; +} + +static std::string insert_linebreaks(std::string str, size_t distance) { + // + // Provided by https://github.com/JomaCorpFX, adapted by me. + // + if (!str.length()) { + return ""; + } + + size_t pos = distance; + + while (pos < str.size()) { + str.insert(pos, "\n"); + pos += distance + 1; + } + + return str; +} + +template <typename String, unsigned int line_length> +static std::string encode_with_line_breaks(String s) { + return insert_linebreaks(base64_encode(s, false), line_length); +} + +template <typename String> +static std::string encode_pem(String s) { + return encode_with_line_breaks<String, 64>(s); +} + +template <typename String> +static std::string encode_mime(String s) { + return encode_with_line_breaks<String, 76>(s); +} + +template <typename String> +static std::string encode(String s, bool url) { + return base64_encode(reinterpret_cast<const unsigned char*>(s.data()), s.length(), url); +} + +std::string base64_encode(unsigned char const* bytes_to_encode, size_t in_len, bool url) { + + size_t len_encoded = (in_len +2) / 3 * 4; + + unsigned char trailing_char = url ? '.' : '='; + + // + // Choose set of base64 characters. They differ + // for the last two positions, depending on the url + // parameter. + // A bool (as is the parameter url) is guaranteed + // to evaluate to either 0 or 1 in C++ therfore, + // the correct character set is chosen by subscripting + // base64_chars with url. + // + const char* base64_chars_ = base64_chars[url]; + + std::string ret; + ret.reserve(len_encoded); + + unsigned int pos = 0; + + while (pos < in_len) { + ret.push_back(base64_chars_[(bytes_to_encode[pos + 0] & 0xfc) >> 2]); + + if (pos+1 < in_len) { + ret.push_back(base64_chars_[((bytes_to_encode[pos + 0] & 0x03) << 4) + ((bytes_to_encode[pos + 1] & 0xf0) >> 4)]); + + if (pos+2 < in_len) { + ret.push_back(base64_chars_[((bytes_to_encode[pos + 1] & 0x0f) << 2) + ((bytes_to_encode[pos + 2] & 0xc0) >> 6)]); + ret.push_back(base64_chars_[ bytes_to_encode[pos + 2] & 0x3f]); + } + else { + ret.push_back(base64_chars_[(bytes_to_encode[pos + 1] & 0x0f) << 2]); + ret.push_back(trailing_char); + } + } + else { + + ret.push_back(base64_chars_[(bytes_to_encode[pos + 0] & 0x03) << 4]); + ret.push_back(trailing_char); + ret.push_back(trailing_char); + } + + pos += 3; + } + + + return ret; +} + +template <typename String> +static std::string decode(String encoded_string, bool remove_linebreaks) { + // + // decode(…) is templated so that it can be used with String = const std::string& + // or std::string_view (requires at least C++17) + // + + if (remove_linebreaks) { + + if (! encoded_string.length() ) { + return ""; + } + + std::string copy(encoded_string); + + size_t pos=0; + while ((pos = copy.find("\n", pos)) != std::string::npos) { + copy.erase(pos, 1); + } + + return base64_decode(copy, false); + + } + + size_t length_of_string = encoded_string.length(); + if (!length_of_string) return std::string(""); + + size_t in_len = length_of_string; + size_t pos = 0; + + // + // The approximate length (bytes) of the decoded string might be one ore + // two bytes smaller, depending on the amount of trailing equal signs + // in the encoded string. This approximation is needed to reserve + // enough space in the string to be returned. + // + size_t approx_length_of_decoded_string = length_of_string / 4 * 3; + std::string ret; + ret.reserve(approx_length_of_decoded_string); + + while (pos < in_len) { + + unsigned int pos_of_char_1 = pos_of_char(encoded_string[pos+1] ); + + ret.push_back(static_cast<std::string::value_type>( ( (pos_of_char(encoded_string[pos+0]) ) << 2 ) + ( (pos_of_char_1 & 0x30 ) >> 4))); + + if (encoded_string[pos+2] != '=' && encoded_string[pos+2] != '.') { // accept URL-safe base 64 strings, too, so check for '.' also. + + unsigned int pos_of_char_2 = pos_of_char(encoded_string[pos+2] ); + ret.push_back(static_cast<std::string::value_type>( (( pos_of_char_1 & 0x0f) << 4) + (( pos_of_char_2 & 0x3c) >> 2))); + + if (encoded_string[pos+3] != '=' && encoded_string[pos+3] != '.') { + ret.push_back(static_cast<std::string::value_type>( ( (pos_of_char_2 & 0x03 ) << 6 ) + pos_of_char(encoded_string[pos+3]) )); + } + } + + pos += 4; + } + + return ret; +} + +std::string base64_decode(std::string const& s, bool remove_linebreaks) { + return decode(s, remove_linebreaks); +} + +std::string base64_encode(std::string const& s, bool url) { + return encode(s, url); +} + +std::string base64_encode_pem (std::string const& s) { + return encode_pem(s); +} + +std::string base64_encode_mime(std::string const& s) { + return encode_mime(s); +} + +#if __cplusplus >= 201703L +// +// Interface with std::string_view rather than const std::string& +// Requires C++17 +// Provided by Yannic Bonenberger (https://github.com/Yannic) +// + +std::string base64_encode(std::string_view s, bool url) { + return encode(s, url); +} + +std::string base64_encode_pem(std::string_view s) { + return encode_pem(s); +} + +std::string base64_encode_mime(std::string_view s) { + return encode_mime(s); +} + +std::string base64_decode(std::string_view s, bool remove_linebreaks) { + return decode(s, remove_linebreaks); +} + +#endif // __cplusplus >= 201703L diff --git a/src/node.cpp b/src/node.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ceb4bb520e1acb796e70a1334e564e50cd90156e --- /dev/null +++ b/src/node.cpp @@ -0,0 +1,65 @@ +#include <ftl/protocol/node.hpp> +#include "peer.hpp" + +using ftl::protocol::Node; +using ftl::net::Peer; + +Node::Node(const std::shared_ptr<Peer> &impl): peer_(impl) {} + +Node::~Node() {} + +void Node::close(bool retry) { + peer_->close(retry); +} + +bool Node::isConnected() const { + return peer_->isConnected(); +} + +bool Node::waitConnection() { + return peer_->waitConnection(); +} + +bool Node::reconnect() { + return peer_->reconnect(); +} + +bool Node::isOutgoing() const { + return peer_->isOutgoing(); +} + +bool Node::isValid() const { + return peer_->isValid(); +} + +ftl::protocol::NodeStatus Node::status() const { + return peer_->status(); +} + +uint32_t Node::getFTLVersion() const { + return peer_->getFTLVersion(); +} + +std::string Node::getURI() const { + return peer_->getURI(); +} + +const ftl::UUID &Node::id() const { + return peer_->id(); +} + +std::string Node::to_string() const { + return peer_->to_string(); +} + +bool Node::isWaiting() const { + return peer_->isWaiting(); +} + +void Node::noReconnect() { + peer_->noReconnect(); +} + +unsigned int Node::localID() { + return peer_->localID(); +} diff --git a/src/peer.cpp b/src/peer.cpp index 0755c4fc5177c10d2712dc16713e2375cc40a89c..0dfa85c26d52e15bbb29689358337c31e3a2c716 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -493,8 +493,8 @@ bool Peer::waitConnection() { std::unique_lock<std::mutex> lk(m); std::condition_variable cv; - Callback h = net_->onConnect([this, &cv](Peer *p) { - if (p == this) { + Callback h = net_->onConnect([this, &cv](const std::shared_ptr<Peer> &p) { + if (p.get() == this) { cv.notify_one(); } }); diff --git a/src/protocol.cpp b/src/protocol.cpp new file mode 100644 index 0000000000000000000000000000000000000000..26e7436f261a6cb5cd2501aec5cd9d223e28494d --- /dev/null +++ b/src/protocol.cpp @@ -0,0 +1,36 @@ +#include <ftl/protocol.hpp> +#include <ftl/protocol/self.hpp> +#include "universe.hpp" + +static std::shared_ptr<ftl::net::Universe> universe; + +ctpl::thread_pool ftl::pool(std::thread::hardware_concurrency()*2); + +/** Reset network and streams. Used by tests. */ +void ftl::protocol::reset() { + universe.reset(); +} + +ftl::UUID ftl::protocol::id; + +std::shared_ptr<ftl::protocol::Self> ftl::getSelf() { + if (!universe) universe = std::make_shared<ftl::net::Universe>(); + return std::make_shared<ftl::protocol::Self>(universe); +} + +std::shared_ptr<ftl::protocol::Self> ftl::createDummySelf() { + return std::make_shared<ftl::protocol::Self>(std::make_shared<ftl::net::Universe>()); +} + +/*std::shared_ptr<ftl::protocol::Service> ftl::setServiceProvider(const std::string &uri) { + +}*/ + +std::shared_ptr<ftl::protocol::Node> ftl::createNode(const std::string &uri) { + if (!universe) universe = std::make_shared<ftl::net::Universe>(); + return std::make_shared<ftl::protocol::Node>(universe->connect(uri)); +} + +/*std::shared_ptr<ftl::protocol::Stream> ftl::createStream(const std::string &uri) { + +}*/ diff --git a/src/protocol/tcp.cpp b/src/protocol/tcp.cpp index 440566afbf7c9ac8d6c19d5363f26446eb0fd7d1..f8ffdeec79f7df4b6bdd1695340b6d3bb540e680 100644 --- a/src/protocol/tcp.cpp +++ b/src/protocol/tcp.cpp @@ -2,6 +2,7 @@ #include <ftl/exception.hpp> #include "tcp.hpp" +#include <ftl/lib/loguru.hpp> using namespace ftl::net::internal; diff --git a/src/protocol/websocket.cpp b/src/protocol/websocket.cpp index 5af2c19009ebea428f6c5126d0356ae40102d824..9955e809f8017a54135e591a9842b35a3823a223 100644 --- a/src/protocol/websocket.cpp +++ b/src/protocol/websocket.cpp @@ -1,7 +1,10 @@ #include "websocket.hpp" +#include <ftl/lib/loguru.hpp> #include <ftl/utility/base64.hpp> +using uchar = unsigned char; + #ifdef HAVE_GNUTLS #include <gnutls/crypto.h> diff --git a/src/self.cpp b/src/self.cpp new file mode 100644 index 0000000000000000000000000000000000000000..603c9363251fdac07e7f12975c32c8b6f03c9945 --- /dev/null +++ b/src/self.cpp @@ -0,0 +1,48 @@ +#include "universe.hpp" +#include <ftl/protocol/self.hpp> + +using ftl::protocol::Self; + +Self::Self(const std::shared_ptr<ftl::net::Universe> &impl): universe_(impl) {} + +Self::~Self() {} + +void Self::start() { + universe_->start(); +} + +bool Self::listen(const ftl::URI &addr) { + return universe_->listen(addr); +} + +std::vector<ftl::URI> Self::getListeningURIs() { + return universe_->getListeningURIs(); +} + +void Self::shutdown() { + universe_->shutdown(); +} + +bool Self::isConnected(const ftl::URI &uri) { + return universe_->isConnected(uri); +} + +bool Self::isConnected(const std::string &s) { + return universe_->isConnected(s); +} + +size_t Self::numberOfNodes() const { + return universe_->numberOfPeers(); +} + +int Self::waitConnections() { + return universe_->waitConnections(); +} + +std::shared_ptr<ftl::protocol::Node> Self::getNode(const ftl::UUID &pid) const { + return std::make_shared<ftl::protocol::Node>(universe_->getPeer(pid)); +} + +std::shared_ptr<ftl::protocol::Node> Self::getWebService() const { + return std::make_shared<ftl::protocol::Node>(universe_->getWebService()); +} diff --git a/src/socket/socket.cpp b/src/socket/socket.cpp index 78257a95a22239bac314fceab46e8f3573feef30..511724ed3e45d55a33826e33718b7e6606d193e2 100644 --- a/src/socket/socket.cpp +++ b/src/socket/socket.cpp @@ -1,6 +1,6 @@ // OS specific implementations for TCP sockets -#include "../socket.hpp" +#include "../socketImpl.hpp" #ifdef WIN32 #include "socket_windows.cpp" diff --git a/src/universe.cpp b/src/universe.cpp index 79bb91401f140b271ac81224c4bbfb831a96a483..c91b3b301ef627b6dda96aba1ae4faf75e5b1c9b 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -70,7 +70,7 @@ struct NetImplDetail { #define WS_RECEIVE_BUFFER_SIZE (62*1024) Callback ftl::net::Universe::cbid__ = 0; -Universe *Universe::instance_ = nullptr; +std::shared_ptr<Universe> Universe::instance_ = nullptr; Universe::Universe() : active_(true), @@ -102,7 +102,7 @@ Universe::Universe() : });*/ if (instance_ != nullptr) LOG(FATAL) << "Multiple net instances"; - instance_ = this; + //instance_ = this; } Universe::~Universe() { @@ -212,32 +212,30 @@ bool Universe::isConnected(const std::string &s) { return isConnected(uri); } -Peer *Universe::connect(const ftl::URI &u) { +std::shared_ptr<Peer> Universe::connect(const ftl::URI &u) { // Check if already connected or if self (when could this happen?) { UNIQUE_LOCK(net_mutex_,lk); if (peer_by_uri_.find(u.getBaseURI()) != peer_by_uri_.end()) { - return peer_by_uri_.at(u.getBaseURI()); + return peers_[peer_by_uri_.at(u.getBaseURI())]; } - if (u.getHost() == "localhost" || u.getHost() == "127.0.0.1") { - for (const auto &l : listeners_) { - if (l->port() == u.getPort()) { - throw FTL_Error("Cannot connect to self"); - } // TODO extend api - } - } + //if (u.getHost() == "localhost" || u.getHost() == "127.0.0.1") { + //for (const auto &l : listeners_) { + //if (l->port() == u.getPort()) { + // throw FTL_Error("Cannot connect to self"); + //} // TODO extend api + //} + //} } - - Peer* p; - p = new Peer(u, this, &disp_); + auto p = std::make_shared<Peer>(u, this, &disp_); if (p->status() != NodeStatus::kInvalid) { UNIQUE_LOCK(net_mutex_,lk); peers_.push_back(p); - peer_by_uri_[u.getBaseURI()] = p; + peer_by_uri_[u.getBaseURI()] = peers_.size() - 1; } else { LOG(ERROR) << "Peer in invalid state"; @@ -247,7 +245,7 @@ Peer *Universe::connect(const ftl::URI &u) { return p; } -Peer* Universe::connect(const std::string& addr) { +std::shared_ptr<Peer> Universe::connect(const std::string& addr) { return connect(ftl::URI(addr)); } @@ -300,7 +298,7 @@ socket_t Universe::_setDescriptors() { return n; } -void Universe::_installBindings(Peer *p) { +void Universe::_installBindings(const std::shared_ptr<Peer> &p) { } @@ -316,15 +314,15 @@ void Universe::_cleanupPeers() { (*i)->status() == NodeStatus::kReconnecting || (*i)->status() == NodeStatus::kDisconnected) { - Peer *p = *i; + const auto &p = *i; LOG(INFO) << "Removing disconnected peer: " << p->id().to_string(); - _notifyDisconnect(p); + _notifyDisconnect(p.get()); auto ix = peer_ids_.find(p->id()); if (ix != peer_ids_.end()) peer_ids_.erase(ix); for (auto i=peer_by_uri_.begin(); i != peer_by_uri_.end(); ++i) { - if (i->second == p) { + if (peers_[i->second] == p) { peer_by_uri_.erase(i); break; } @@ -343,16 +341,16 @@ void Universe::_cleanupPeers() { } } -Peer *Universe::getPeer(const UUID &id) const { +std::shared_ptr<Peer> Universe::getPeer(const UUID &id) const { SHARED_LOCK(net_mutex_,lk); auto ix = peer_ids_.find(id); if (ix == peer_ids_.end()) return nullptr; - else return ix->second; + else return peers_[ix->second]; } -Peer *Universe::getWebService() const { +std::shared_ptr<Peer> Universe::getWebService() const { SHARED_LOCK(net_mutex_,lk); - for (auto* p : peers_) { + for (const auto &p : peers_) { if (p->getType() == NodeType::kWebService) { return p; } @@ -404,7 +402,7 @@ void Universe::_periodic() { } } -void Universe::__start(Universe * u) { +void Universe::__start(Universe *u) { #ifndef WIN32 // TODO: move somewhere else (common initialization file?) signal(SIGPIPE,SIG_IGN); @@ -467,7 +465,7 @@ void Universe::_run() { try { auto csock = l->accept(); - auto p = new Peer(std::move(csock), this, &disp_); + auto p = std::make_shared<Peer>(std::move(csock), this, &disp_); peers_.push_back(p); } catch (const std::exception &ex) { @@ -508,21 +506,21 @@ void Universe::_run() { } } -Callback Universe::onConnect(const std::function<void(ftl::net::Peer*)> &cb) { +Callback Universe::onConnect(const std::function<void(const std::shared_ptr<Peer>&)> &cb) { UNIQUE_LOCK(handler_mutex_,lk); Callback id = cbid__++; on_connect_.push_back({id, cb}); return id; } -Callback Universe::onDisconnect(const std::function<void(ftl::net::Peer*)> &cb) { +Callback Universe::onDisconnect(const std::function<void(const std::shared_ptr<Peer>&)> &cb) { UNIQUE_LOCK(handler_mutex_,lk); Callback id = cbid__++; on_disconnect_.push_back({id, cb}); return id; } -Callback Universe::onError(const std::function<void(ftl::net::Peer*, const ftl::net::Error &)> &cb) { +Callback Universe::onError(const std::function<void(const std::shared_ptr<Peer>&, const ftl::net::Error &)> &cb) { UNIQUE_LOCK(handler_mutex_,lk); Callback id = cbid__++; on_error_.push_back({id, cb}); @@ -565,13 +563,22 @@ void Universe::removeCallback(Callback cbid) { } } +static std::shared_ptr<Peer> findPeer(const std::vector<std::shared_ptr<Peer>> &peers, const Peer *p) { + for (const auto &pp : peers) { + if (pp.get() == p) return pp; + } + return nullptr; +} + void Universe::_notifyConnect(Peer *p) { UNIQUE_LOCK(handler_mutex_,lk); - peer_ids_[p->id()] = p; + const auto ptr = findPeer(peers_, p); + + peer_ids_[ptr->id()] = ptr->localID(); for (auto &i : on_connect_) { try { - i.h(p); + i.h(ptr); } catch(...) { LOG(ERROR) << "Exception inside OnConnect hander: " << i.id; } @@ -582,9 +589,11 @@ void Universe::_notifyDisconnect(Peer *p) { // In all cases, should already be locked outside this function call //unique_lock<mutex> lk(net_mutex_); UNIQUE_LOCK(handler_mutex_,lk); + const auto ptr = findPeer(peers_, p); + for (auto &i : on_disconnect_) { try { - i.h(p); + i.h(ptr); } catch(...) { LOG(ERROR) << "Exception inside OnDisconnect hander: " << i.id; } diff --git a/src/universe.hpp b/src/universe.hpp index 51a57ffc0c37f50543d4c8aafc4687239e46d0ed..c688238b07ebf04b3d7293ac8bfb92d9fbf99464 100644 --- a/src/universe.hpp +++ b/src/universe.hpp @@ -34,7 +34,7 @@ struct Error { struct ReconnectInfo { int tries; float delay; - Peer *peer; + std::shared_ptr<Peer> peer; }; struct NetImplDetail; @@ -86,8 +86,8 @@ public: * * @param addr URI giving protocol, interface and port */ - Peer *connect(const std::string &addr); - Peer *connect(const ftl::URI &addr); + std::shared_ptr<Peer> connect(const std::string &addr); + std::shared_ptr<Peer> connect(const ftl::URI &addr); bool isConnected(const ftl::URI &uri); bool isConnected(const std::string &s); @@ -101,9 +101,9 @@ public: int waitConnections(); /** get peer pointer by peer UUID, returns nullptr if not found */ - Peer *getPeer(const ftl::UUID &pid) const; + std::shared_ptr<Peer> getPeer(const ftl::UUID &pid) const; /** get webservice peer pointer, returns nullptr if not connected to webservice */ - Peer *getWebService() const; + std::shared_ptr<Peer> getWebService() const; /** * Bind a function to an RPC or service call name. This will implicitely @@ -161,27 +161,27 @@ public: // --- Event Handlers ------------------------------------------------------ - Callback onConnect(const std::function<void(ftl::net::Peer*)>&); - Callback onDisconnect(const std::function<void(ftl::net::Peer*)>&); - Callback onError(const std::function<void(ftl::net::Peer*, const ftl::net::Error &)>&); + Callback onConnect(const std::function<void(const std::shared_ptr<ftl::net::Peer>&)>&); + Callback onDisconnect(const std::function<void(const std::shared_ptr<ftl::net::Peer>&)>&); + Callback onError(const std::function<void(const std::shared_ptr<Peer>&, const ftl::net::Error &)>&); void removeCallback(Callback cbid); size_t getSendBufferSize(ftl::URI::scheme_t s); size_t getRecvBufferSize(ftl::URI::scheme_t s); - static inline Universe *getInstance() { return instance_; } + static inline std::shared_ptr<Universe> getInstance() { return instance_; } private: void _run(); SOCKET _setDescriptors(); // TODO: move to implementation void _installBindings(); - void _installBindings(Peer *); + void _installBindings(const std::shared_ptr<ftl::net::Peer>&); //bool _subscribe(const std::string &res); void _cleanupPeers(); - void _notifyConnect(Peer *); - void _notifyDisconnect(Peer *); - void _notifyError(Peer *, const ftl::net::Error &); + void _notifyConnect(ftl::net::Peer *); + void _notifyDisconnect(ftl::net::Peer *); + void _notifyError(ftl::net::Peer *, const ftl::net::Error &); void _periodic(); static void __start(Universe *u); @@ -194,14 +194,14 @@ private: std::unique_ptr<NetImplDetail> impl_; std::vector<std::unique_ptr<ftl::net::internal::SocketServer>> listeners_; - std::vector<ftl::net::Peer*> peers_; - std::unordered_map<std::string, ftl::net::Peer*> peer_by_uri_; - std::map<ftl::UUID, ftl::net::Peer*> peer_ids_; + std::vector<std::shared_ptr<ftl::net::Peer>> peers_; + std::unordered_map<std::string, size_t> peer_by_uri_; + std::map<ftl::UUID, size_t> peer_ids_; ftl::net::Dispatcher disp_; std::list<ReconnectInfo> reconnects_; size_t phase_; - std::list<ftl::net::Peer*> garbage_; + std::list<std::shared_ptr<ftl::net::Peer>> garbage_; ftl::Handle garbage_timer_; size_t send_size_; @@ -211,12 +211,12 @@ private: struct ConnHandler { Callback id; - std::function<void(ftl::net::Peer*)> h; + std::function<void(const std::shared_ptr<ftl::net::Peer>&)> h; }; struct ErrHandler { Callback id; - std::function<void(ftl::net::Peer*, const ftl::net::Error &)> h; + std::function<void(const std::shared_ptr<ftl::net::Peer>&, const ftl::net::Error &)> h; }; // Handlers @@ -225,7 +225,7 @@ private: std::list<ErrHandler> on_error_; static Callback cbid__; - static Universe *instance_; + static std::shared_ptr<Universe> instance_; // NOTE: Must always be last member std::thread thread_; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index b83b6df0bfb16a6d3ad2fcc6d322e66a396b932a..3a52a8252683064ee70d9e36a53d3ef7de12414a 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -27,3 +27,14 @@ target_link_libraries(handle_unit beyond-protocol Threads::Threads ${OS_LIBS}) add_test(HandleUnitTest handle_unit) + +### URI ######################################################################## +add_executable(net_integration + $<TARGET_OBJECTS:CatchTest> + ./net_integration.cpp) +target_include_directories(net_integration PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") +target_link_libraries(net_integration beyond-protocol + Threads::Threads ${OS_LIBS} + ${URIPARSER_LIBRARIES}) + +add_test(NetIntegrationTest net_integration) diff --git a/test/net_integration.cpp b/test/net_integration.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c33a79c8a00f17aa32ec24e636136032888faba6 --- /dev/null +++ b/test/net_integration.cpp @@ -0,0 +1,349 @@ +#include "catch.hpp" +#include <ftl/protocol.hpp> +#include <ftl/protocol/self.hpp> +#include <ftl/protocol/node.hpp> +#include <ftl/uri.hpp> + +#include <thread> +#include <chrono> + +using std::this_thread::sleep_for; +using std::chrono::milliseconds; + +// --- Support ----------------------------------------------------------------- + +/*static bool try_for(int count, const std::function<bool()> &f) { + int i=count; + while (i-- > 0) { + if (f()) return true; + sleep_for(milliseconds(10)); + } + return false; +}*/ + +// --- Tests ------------------------------------------------------------------- + +TEST_CASE("Listen and Connect", "[net]") { + ftl::protocol::reset(); + + auto self = ftl::createDummySelf(); + + self->listen(ftl::URI("tcp://localhost:0")); + auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); + + SECTION("valid tcp connection using ipv4") { + LOG(INFO) << uri; + auto p = ftl::createNode(uri); + REQUIRE( p ); + + p->waitConnection(); + + REQUIRE( self->numberOfNodes() == 1 ); + } + + /*SECTION("valid tcp connection using hostname") { + auto p = b.connect(uri); + REQUIRE( p ); + + p->waitConnection(); + + REQUIRE( a.numberOfPeers() == 1 ); + REQUIRE( b.numberOfPeers() == 1 ); + } + + SECTION("invalid protocol") { + bool throws = false; + try { + auto p = b.connect("http://localhost:1234"); + } + catch (const ftl::exception& ex) { + ex.ignore(); + throws = true; + } + REQUIRE(throws); + } + + SECTION("automatic reconnect, after clean disconnect") { + std::mutex mtx; + std::condition_variable cv; + std::unique_lock<std::mutex> lk(mtx); + + auto p_connecting = b.connect(uri); + REQUIRE(p_connecting); + + bool disconnected_once = false; + + a.onConnect([&](ftl::net::Peer* p_listening) { + if (!disconnected_once) { + // remote closes on first connection + disconnected_once = true; + p_listening->close(); + LOG(INFO) << "disconnected"; + } else { + // notify on second + cv.notify_one(); + } + }); + + REQUIRE(cv.wait_for(lk, std::chrono::seconds(5)) == std::cv_status::no_timeout); + REQUIRE(p_connecting->isConnected()); + } + + SECTION("automatic reconnect, socket close") { + std::mutex mtx; + std::condition_variable cv; + std::unique_lock<std::mutex> lk(mtx); + + auto p_connecting = b.connect(uri); + REQUIRE(p_connecting); + + bool disconnected_once = false; + + a.onConnect([&](ftl::net::Peer* p_listening) { + if (!disconnected_once) { + // disconnect on first connection + disconnected_once = true; + p_listening->rawClose(); + LOG(INFO) << "disconnected"; + } + else { + // notify on second + cv.notify_one(); + } + }); + + REQUIRE(cv.wait_for(lk, std::chrono::seconds(5)) == std::cv_status::no_timeout); + REQUIRE(p_connecting->isConnected()); + }*/ +} + +/*TEST_CASE("Universe::onConnect()", "[net]") { + Universe a; + Universe b; + + a.listen(ftl::URI("tcp://localhost:0")); + auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); + + SECTION("single valid remote init connection") { + bool done = false; + + a.onConnect([&done](Peer *p) { + done = true; + }); + + b.connect(uri)->waitConnection(); + + REQUIRE( try_for(20, [&done]{ return done; }) ); + } + + SECTION("single valid init connection") { + bool done = false; + + b.onConnect([&done](Peer *p) { + done = true; + }); + + b.connect(uri)->waitConnection(); + //sleep_for(milliseconds(100)); + REQUIRE( done ); + } +} + +TEST_CASE("Universe::onDisconnect()", "[net]") { + Universe a; + Universe b; + + a.listen(ftl::URI("tcp://localhost:0")); + auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); + + SECTION("single valid remote close") { + bool done = false; + + a.onDisconnect([&done](Peer *p) { + done = true; + }); + + Peer *p = b.connect(uri); + p->waitConnection(); + sleep_for(milliseconds(20)); + p->close(); + + REQUIRE( try_for(20, [&done]{ return done; }) ); + } + + SECTION("single valid close") { + bool done = false; + + b.onDisconnect([&done](Peer *p) { + done = true; + }); + + Peer *p = b.connect(uri); + p->waitConnection(); + sleep_for(milliseconds(20)); + p->close(); + + REQUIRE( try_for(20, [&done]{ return done; }) ); + } +} + +TEST_CASE("Universe::broadcast()", "[net]") { + Universe a; + Universe b; + + a.listen(ftl::URI("tcp://localhost:0")); + auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); + + SECTION("no arguments to no peers") { + bool done = false; + a.bind("hello", [&done]() { + done = true; + }); + + b.broadcast("done"); + + sleep_for(milliseconds(50)); + REQUIRE( !done ); + } + + SECTION("no arguments to one peer") { + b.connect(uri)->waitConnection(); + + bool done = false; + a.bind("hello", [&done]() { + done = true; + }); + + b.broadcast("hello"); + + REQUIRE( try_for(20, [&done]{ return done; }) ); + } + + SECTION("one argument to one peer") { + b.connect(uri)->waitConnection(); + + int done = 0; + a.bind("hello", [&done](int v) { + done = v; + }); + + b.broadcast("hello", 676); + + REQUIRE( try_for(20, [&done]{ return done == 676; }) ); + } + + SECTION("one argument to two peers") { + Universe c; + + b.connect(uri)->waitConnection(); + c.connect(uri)->waitConnection(); + + int done1 = 0; + b.bind("hello", [&done1](int v) { + done1 = v; + }); + + int done2 = 0; + c.bind("hello", [&done2](int v) { + done2 = v; + }); + + REQUIRE( a.numberOfPeers() == 2 ); + //sleep_for(milliseconds(100)); // NOTE: Binding might not be ready + + a.broadcast("hello", 676); + + REQUIRE( try_for(20, [&done1, &done2]{ return done1 == 676 && done2 == 676; }) ); + } +} + +TEST_CASE("Universe::findAll()", "") { + Universe a; + Universe b; + Universe c; + + a.listen(ftl::URI("tcp://localhost:0")); + auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); + + b.connect(uri)->waitConnection(); + c.connect(uri)->waitConnection(); + + SECTION("no values exist") { + REQUIRE( (c.findAll<int>("test_all").size() == 0) ); + } + + SECTION("one set exists") { + a.bind("test_all", []() -> std::vector<int> { + return {3,4,5}; + }); + + auto res = c.findAll<int>("test_all"); + REQUIRE( (res.size() == 3) ); + REQUIRE( (res[0] == 3) ); + } + + SECTION("two sets exists") { + b.bind("test_all", []() -> std::vector<int> { + return {3,4,5}; + }); + c.bind("test_all", []() -> std::vector<int> { + return {6,7,8}; + }); + + //sleep_for(milliseconds(100)); // NOTE: Binding might not be ready + + auto res = a.findAll<int>("test_all"); + REQUIRE( (res.size() == 6) ); + REQUIRE( (res[0] == 3 || res[0] == 6) ); + } +} + +TEST_CASE("Peer::call() __ping__", "") { + Universe a; + Universe b; + Universe c; + + a.listen(ftl::URI("tcp://localhost:0")); + auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); + + auto *p = b.connect(uri); + p->waitConnection(); + + SECTION("single ping") { + int64_t res = p->call<int64_t>("__ping__"); + REQUIRE((res <= ftl::timer::get_time() && res > 0)); + } + + SECTION("large number of pings") { + for (int i=0; i<100; ++i) { + int64_t res = p->call<int64_t>("__ping__"); + REQUIRE(res > 0); + } + } + + SECTION("large number of parallel pings") { + std::atomic<int> count = 0; + for (int i=0; i<100; ++i) { + ftl::pool.push([&count, p](int id) { + int64_t res = p->call<int64_t>("__ping__"); + REQUIRE( res > 0 ); + count++; + }); + } + + while (count < 100) std::this_thread::sleep_for(milliseconds(5)); + } + + SECTION("single invalid rpc") { + bool errored = false; + try { + int64_t res = p->call<int64_t>("__ping2__"); + REQUIRE( res > 0 ); // Not called or required actually + } catch (const ftl::exception &e) { + e.ignore(); // supress log output + errored = true; + } + + REQUIRE(errored); + } +}*/