diff --git a/include/ftl/protocol/node.hpp b/include/ftl/protocol/node.hpp index 6730023c0496b3a4626c5c8e4bbeefe496b6eb8a..b041836a304d87e385816862368d3b82a8fb67a3 100644 --- a/include/ftl/protocol/node.hpp +++ b/include/ftl/protocol/node.hpp @@ -8,7 +8,10 @@ #include <memory> #include <string> +#include <vector> #include <ftl/uuid.hpp> +#include <ftl/protocol/frameid.hpp> +#include <nlohmann/json_fwd.hpp> namespace ftl { namespace net { @@ -146,6 +149,26 @@ class Node { int connectionCount() const; + // === RPC Methods === + + void restart(); + + void shutdown(); + + bool hasStream(const std::string &uri); + + void createStream(const std::string &uri, FrameID id); + + nlohmann::json details(); + + int64_t ping(); + + nlohmann::json getConfig(const std::string &path); + + void setConfig(const std::string &path, const nlohmann::json &value); + + std::vector<std::string> listConfigs(); + protected: ftl::net::PeerPtr peer_; }; diff --git a/include/ftl/protocol/self.hpp b/include/ftl/protocol/self.hpp index d60450d3f70f6827a37945aee71a84d5bccad592..064a83e068bd14688da3f8ad20a77dd9bd7c0008 100644 --- a/include/ftl/protocol/self.hpp +++ b/include/ftl/protocol/self.hpp @@ -14,6 +14,7 @@ #include <ftl/uri.hpp> #include <ftl/handle.hpp> #include <ftl/protocol/error.hpp> +#include <ftl/protocol/frameid.hpp> namespace ftl { namespace net { @@ -159,6 +160,111 @@ class Self { // Used for testing ftl::net::Universe *getUniverse() const { return universe_.get(); } + // === The RPC methods === + + /** + * @brief Restart all locally connected nodes. + * + */ + void restartAll(); + + /** + * @brief Shutdown all locally connected nodes. + * + */ + void shutdownAll(); + + /** + * @brief Get JSON metadata for all connected nodes. This includes names, descriptions + * and hardware resources. + * + * @return std::vector<nlohmann::json> + */ + std::vector<nlohmann::json> getAllNodeDetails(); + + /** + * @brief Get a list of all streams available to this node. It will collate from all + * connected nodes and the web service. + * + * @return std::vector<std::string> + */ + std::vector<std::string> getStreams(); + + /** + * @brief Find which node provides a stream. The returned pointer is a nullptr if the + * stream is not found. + * + * @param uri The stream URI. + * @return std::shared_ptr<ftl::protocol::Node> + */ + std::shared_ptr<ftl::protocol::Node> locateStream(const std::string &uri); + + /** + * @brief Handle a restart request from other machines. + * + * @param cb + * @return ftl::Handle + */ + void onRestart(const std::function<void()> &cb); + + /** + * @brief Handle a shutdown request from other machines. + * + * @param cb + * @return ftl::Handle + */ + void onShutdown(const std::function<void()> &cb); + + /** + * @brief Handle a stream creation request. Most likely this is being sent by the web service. + * + * @param cb + * @return ftl::Handle + */ + void onCreateStream(const std::function<void(const std::string &uri, FrameID id)> &cb); + + /** + * @brief Handle a node details request. + * + * The returned JSON object should have the following keys: + * * id + * * title + * * devices + * * gpus + * + * It may also have: + * * description + * * tags + * + * @param cb + */ + void onNodeDetails(const std::function<nlohmann::json()> &cb); + + /** + * @brief Handle a get configuration request. A path to the configuration property is + * provided and a JSON value, possibly an entire object, is returned. Null can also be + * returned if not found. + * + * @param cb + */ + void onGetConfig(const std::function<nlohmann::json(const std::string &)> &cb); + + /** + * @brief Handle a change config request. The configuration property path and new JSON + * value is given. + * + * @param cb + */ + void onSetConfig(const std::function<void(const std::string &, const nlohmann::json &)> &cb); + + /** + * @brief Handle a request for all config properties on this machine. This could return + * just the root level objects, or every property. + * + * @param cb + */ + void onListConfig(const std::function<std::vector<std::string>()> &cb); + protected: std::shared_ptr<ftl::net::Universe> universe_; }; diff --git a/src/node.cpp b/src/node.cpp index 1aa9d11ded10b9e013700861f34b5224a7caa349..00ed176ba77c16e70e2483dba151adcc292226c1 100644 --- a/src/node.cpp +++ b/src/node.cpp @@ -6,9 +6,12 @@ #include <ftl/protocol/node.hpp> #include "peer.hpp" +#include <ftl/lib/nlohmann/json.hpp> +#include <ftl/time.hpp> using ftl::protocol::Node; using ftl::net::PeerPtr; +using ftl::protocol::FrameID; Node::Node(const PeerPtr &impl): peer_(impl) {} @@ -69,3 +72,40 @@ unsigned int Node::localID() { int Node::connectionCount() const { return peer_->connectionCount(); } + +void Node::restart() { + peer_->send("restart"); +} + +void Node::shutdown() { + peer_->send("shutdown"); +} + +bool Node::hasStream(const std::string &uri) { + return !!peer_->call<std::optional<std::string>>("find_stream", uri); +} + +void Node::createStream(const std::string &uri, FrameID id) { + peer_->send("create_stream", uri, id.frameset(), id.source()); +} + +nlohmann::json Node::details() { + const std::string res = peer_->call<std::string>("node_details"); + return nlohmann::json::parse(res); +} + +int64_t Node::ping() { + return peer_->call<int64_t>("__ping__"); +} + +nlohmann::json Node::getConfig(const std::string &path) { + return nlohmann::json::parse(peer_->call<std::string>("get_cfg", path)); +} + +void Node::setConfig(const std::string &path, const nlohmann::json &value) { + peer_->send("update_cfg", path, value.dump()); +} + +std::vector<std::string> Node::listConfigs() { + return peer_->call<std::vector<std::string>>("list_configurables"); +} diff --git a/src/self.cpp b/src/self.cpp index 47a339ee48f0c589962ced018e6389334b212acc..1c4692a1d31449a7f2084551dc8549ae4936b822 100644 --- a/src/self.cpp +++ b/src/self.cpp @@ -8,8 +8,11 @@ #include <ftl/protocol/self.hpp> #include "./streams/netstream.hpp" #include "./streams/filestream.hpp" +#include <ftl/lib/nlohmann/json.hpp> +#include <ftl/uuid.hpp> using ftl::protocol::Self; +using ftl::protocol::FrameID; Self::Self(const std::shared_ptr<ftl::net::Universe> &impl): universe_(impl) {} @@ -115,3 +118,70 @@ ftl::Handle Self::onError(const ErrorCb &cb) { return cb(std::make_shared<ftl::protocol::Node>(p), e, estr); }); } + +void Self::restartAll() { + universe_->broadcast("restart"); +} + +void Self::shutdownAll() { + universe_->broadcast("shutdown"); +} + +std::vector<nlohmann::json> Self::getAllNodeDetails() { + auto response = universe_->findAll<std::string>("node_details"); + std::vector<nlohmann::json> result(response.size()); + for (auto &r : response) { + result.push_back(nlohmann::json::parse(r)); + } + return result; +} + +std::vector<std::string> Self::getStreams() { + return universe_->findAll<std::string>("list_streams"); +} + +std::shared_ptr<ftl::protocol::Node> Self::locateStream(const std::string &uri) { + auto p = universe_->findOne<ftl::UUID>("find_stream", uri); + + if (!p) return nullptr; + auto peer = universe_->getPeer(*p); + if (!peer) return nullptr; + + return std::make_shared<ftl::protocol::Node>(peer); +} + +void Self::onRestart(const std::function<void()> &cb) { + universe_->bind("restart", cb); +} + +void Self::onShutdown(const std::function<void()> &cb) { + universe_->bind("shutdown", cb); +} + +void Self::onCreateStream(const std::function<void(const std::string &uri, FrameID id)> &cb) { + universe_->bind("create_stream", [cb](const std::string &uri, int fsid, int fid) { + cb(uri, FrameID(fsid, fid)); + }); +} + +void Self::onNodeDetails(const std::function<nlohmann::json()> &cb) { + universe_->bind("node_details", [cb]() { + return cb().dump(); + }); +} + +void Self::onGetConfig(const std::function<nlohmann::json(const std::string &)> &cb) { + universe_->bind("get_cfg", [cb](const std::string &path) { + return cb(path).dump(); + }); +} + +void Self::onSetConfig(const std::function<void(const std::string &, const nlohmann::json &)> &cb) { + universe_->bind("update_cfg", [cb](const std::string &path, const std::string &value) { + cb(path, nlohmann::json::parse(value)); + }); +} + +void Self::onListConfig(const std::function<std::vector<std::string>()> &cb) { + universe_->bind("list_configurables", cb); +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0cee8f73f86d40562ca9a24cdb0e0790516784a7..17aed1cce0f2837757414c26eef0b13dddebcf95 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -29,7 +29,7 @@ target_link_libraries(handle_unit beyond-protocol add_test(HandleUnitTest handle_unit) -### URI ######################################################################## +### Net Integ ################################################################## add_executable(net_integration $<TARGET_OBJECTS:CatchTestFTL> ./net_integration.cpp) @@ -129,4 +129,15 @@ target_include_directories(peer_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../incl target_link_libraries(peer_unit beyond-protocol GnuTLS::GnuTLS Threads::Threads ${URIPARSER_LIBRARIES} ${UUID_LIBRARIES} ${OS_LIBS}) -add_test(PeerUnitTest peer_unit) \ No newline at end of file +add_test(PeerUnitTest peer_unit) + +### RPC Integ ################################################################## +add_executable(rpc_integration + $<TARGET_OBJECTS:CatchTestFTL> + ./rpc_integration.cpp) +target_include_directories(rpc_integration PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") +target_link_libraries(rpc_integration beyond-protocol + Threads::Threads ${OS_LIBS} + ${URIPARSER_LIBRARIES}) + +add_test(RPCIntegrationTest rpc_integration) diff --git a/test/net_integration.cpp b/test/net_integration.cpp index df9d898765c82020abdd9dcb2fc8588bf4b76d96..b1490f721326d5c6b7560e3bdcbcb23115b496eb 100644 --- a/test/net_integration.cpp +++ b/test/net_integration.cpp @@ -14,324 +14,324 @@ using std::chrono::milliseconds; // --- Support ----------------------------------------------------------------- static bool try_for(int count, const std::function<bool()> &f) { - int i=count; - while (i-- > 0) { - if (f()) return true; - sleep_for(milliseconds(10)); - } - return false; + int i=count; + while (i-- > 0) { + if (f()) return true; + sleep_for(milliseconds(10)); + } + return false; } // --- Tests ------------------------------------------------------------------- TEST_CASE("Listen and Connect", "[net]") { - auto self = ftl::createDummySelf(); - - self->listen(ftl::URI("tcp://localhost:0")); - - SECTION("valid tcp connection using ipv4") { - auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort()); - LOG(INFO) << uri; - auto p = ftl::connectNode(uri); - REQUIRE( p ); - - REQUIRE( p->waitConnection(5) ); - - REQUIRE( self->waitConnections(5) == 1 ); - REQUIRE( ftl::getSelf()->numberOfNodes() == 1); - } - - SECTION("valid tcp connection using hostname") { - auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); - auto p = ftl::connectNode(uri); - REQUIRE( p ); - - REQUIRE( p->waitConnection(5) ); - - REQUIRE( self->waitConnections(5) == 1 ); - REQUIRE( ftl::getSelf()->numberOfNodes() == 1); - } - - SECTION("invalid protocol") { - bool throws = false; - try { - auto p = ftl::connectNode("http://localhost:1234"); - } - catch (const ftl::exception& ex) { - ex.ignore(); - throws = true; - } - REQUIRE(throws); - } - - SECTION("automatic reconnect from originating connection") { - auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); - - auto p_connecting = ftl::connectNode(uri); - REQUIRE(p_connecting); - - REQUIRE(p_connecting->waitConnection(5)); - p_connecting->close(true); - - REQUIRE(p_connecting->status() != ftl::protocol::NodeStatus::kConnected); - REQUIRE(p_connecting->waitConnection(5)); - } - - SECTION("automatic reconnect from remote termination") { - auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); - - auto p_connecting = ftl::connectNode(uri); - REQUIRE(p_connecting); - - REQUIRE(p_connecting->waitConnection(5)); - REQUIRE(p_connecting->connectionCount() == 1); - - auto nodes = self->getNodes(); - REQUIRE( nodes.size() == 1 ); - for (auto &node : nodes) { - node->waitConnection(5); - node->close(); - } - - bool r = try_for(500, [p_connecting]{ return p_connecting->connectionCount() >= 2; }); - REQUIRE( r ); - } - - ftl::protocol::reset(); + auto self = ftl::createDummySelf(); + + self->listen(ftl::URI("tcp://localhost:0")); + + SECTION("valid tcp connection using ipv4") { + auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort()); + LOG(INFO) << uri; + auto p = ftl::connectNode(uri); + REQUIRE( p ); + + REQUIRE( p->waitConnection(5) ); + + REQUIRE( self->waitConnections(5) == 1 ); + REQUIRE( ftl::getSelf()->numberOfNodes() == 1); + } + + SECTION("valid tcp connection using hostname") { + auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); + auto p = ftl::connectNode(uri); + REQUIRE( p ); + + REQUIRE( p->waitConnection(5) ); + + REQUIRE( self->waitConnections(5) == 1 ); + REQUIRE( ftl::getSelf()->numberOfNodes() == 1); + } + + SECTION("invalid protocol") { + bool throws = false; + try { + auto p = ftl::connectNode("http://localhost:1234"); + } + catch (const ftl::exception& ex) { + ex.ignore(); + throws = true; + } + REQUIRE(throws); + } + + SECTION("automatic reconnect from originating connection") { + auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); + + auto p_connecting = ftl::connectNode(uri); + REQUIRE(p_connecting); + + REQUIRE(p_connecting->waitConnection(5)); + p_connecting->close(true); + + REQUIRE(p_connecting->status() != ftl::protocol::NodeStatus::kConnected); + REQUIRE(p_connecting->waitConnection(5)); + } + + SECTION("automatic reconnect from remote termination") { + auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); + + auto p_connecting = ftl::connectNode(uri); + REQUIRE(p_connecting); + + REQUIRE(p_connecting->waitConnection(5)); + REQUIRE(p_connecting->connectionCount() == 1); + + auto nodes = self->getNodes(); + REQUIRE( nodes.size() == 1 ); + for (auto &node : nodes) { + node->waitConnection(5); + node->close(); + } + + bool r = try_for(500, [p_connecting]{ return p_connecting->connectionCount() >= 2; }); + REQUIRE( r ); + } + + ftl::protocol::reset(); } TEST_CASE("Self::onConnect()", "[net]") { - auto self = ftl::createDummySelf(); - - self->listen(ftl::URI("tcp://localhost:0")); + auto self = ftl::createDummySelf(); + + self->listen(ftl::URI("tcp://localhost:0")); - auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); + auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); - SECTION("single valid remote init connection") { - bool done = false; + SECTION("single valid remote init connection") { + bool done = false; - auto h = self->onConnect([&](const std::shared_ptr<ftl::protocol::Node> &p_listening) { - done = true; - return true; - }); + auto h = self->onConnect([&](const std::shared_ptr<ftl::protocol::Node> &p_listening) { + done = true; + return true; + }); - REQUIRE( ftl::connectNode(uri) ); + REQUIRE( ftl::connectNode(uri) ); - bool result = try_for(20, [&done]{ return done; }); - REQUIRE( result ); - } + bool result = try_for(20, [&done]{ return done; }); + REQUIRE( result ); + } - SECTION("single valid init connection") { - bool done = false; + SECTION("single valid init connection") { + bool done = false; - auto h = ftl::getSelf()->onConnect([&](const std::shared_ptr<ftl::protocol::Node> &p_listening) { - done = true; - return true; - }); + auto h = ftl::getSelf()->onConnect([&](const std::shared_ptr<ftl::protocol::Node> &p_listening) { + done = true; + return true; + }); - REQUIRE( ftl::connectNode(uri)->waitConnection(5) ); + REQUIRE( ftl::connectNode(uri)->waitConnection(5) ); - REQUIRE( done ); - } + REQUIRE( done ); + } - ftl::protocol::reset(); + ftl::protocol::reset(); } /*TEST_CASE("Universe::onDisconnect()", "[net]") { - Universe a; - Universe b; + Universe a; + Universe b; - a.listen(ftl::URI("tcp://localhost:0")); - auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); + a.listen(ftl::URI("tcp://localhost:0")); + auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); - SECTION("single valid remote close") { - bool done = false; + SECTION("single valid remote close") { + bool done = false; - a.onDisconnect([&done](Peer *p) { - done = true; - }); + a.onDisconnect([&done](Peer *p) { + done = true; + }); - Peer *p = b.connect(uri); - p->waitConnection(); - sleep_for(milliseconds(20)); - p->close(); + Peer *p = b.connect(uri); + p->waitConnection(); + sleep_for(milliseconds(20)); + p->close(); - REQUIRE( try_for(20, [&done]{ return done; }) ); - } + REQUIRE( try_for(20, [&done]{ return done; }) ); + } - SECTION("single valid close") { - bool done = false; + SECTION("single valid close") { + bool done = false; - b.onDisconnect([&done](Peer *p) { - done = true; - }); + b.onDisconnect([&done](Peer *p) { + done = true; + }); - Peer *p = b.connect(uri); - p->waitConnection(); - sleep_for(milliseconds(20)); - p->close(); + Peer *p = b.connect(uri); + p->waitConnection(); + sleep_for(milliseconds(20)); + p->close(); - REQUIRE( try_for(20, [&done]{ return done; }) ); - } + REQUIRE( try_for(20, [&done]{ return done; }) ); + } } TEST_CASE("Universe::broadcast()", "[net]") { - Universe a; - Universe b; - - a.listen(ftl::URI("tcp://localhost:0")); - auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); - - SECTION("no arguments to no peers") { - bool done = false; - a.bind("hello", [&done]() { - done = true; - }); - - b.broadcast("done"); - - sleep_for(milliseconds(50)); - REQUIRE( !done ); - } - - SECTION("no arguments to one peer") { - b.connect(uri)->waitConnection(); - - bool done = false; - a.bind("hello", [&done]() { - done = true; - }); - - b.broadcast("hello"); - - REQUIRE( try_for(20, [&done]{ return done; }) ); - } - - SECTION("one argument to one peer") { - b.connect(uri)->waitConnection(); - - int done = 0; - a.bind("hello", [&done](int v) { - done = v; - }); - - b.broadcast("hello", 676); - - REQUIRE( try_for(20, [&done]{ return done == 676; }) ); - } - - SECTION("one argument to two peers") { - Universe c; - - b.connect(uri)->waitConnection(); - c.connect(uri)->waitConnection(); - - int done1 = 0; - b.bind("hello", [&done1](int v) { - done1 = v; - }); - - int done2 = 0; - c.bind("hello", [&done2](int v) { - done2 = v; - }); - - REQUIRE( a.numberOfPeers() == 2 ); - //sleep_for(milliseconds(100)); // NOTE: Binding might not be ready - - a.broadcast("hello", 676); - - REQUIRE( try_for(20, [&done1, &done2]{ return done1 == 676 && done2 == 676; }) ); - } + Universe a; + Universe b; + + a.listen(ftl::URI("tcp://localhost:0")); + auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); + + SECTION("no arguments to no peers") { + bool done = false; + a.bind("hello", [&done]() { + done = true; + }); + + b.broadcast("done"); + + sleep_for(milliseconds(50)); + REQUIRE( !done ); + } + + SECTION("no arguments to one peer") { + b.connect(uri)->waitConnection(); + + bool done = false; + a.bind("hello", [&done]() { + done = true; + }); + + b.broadcast("hello"); + + REQUIRE( try_for(20, [&done]{ return done; }) ); + } + + SECTION("one argument to one peer") { + b.connect(uri)->waitConnection(); + + int done = 0; + a.bind("hello", [&done](int v) { + done = v; + }); + + b.broadcast("hello", 676); + + REQUIRE( try_for(20, [&done]{ return done == 676; }) ); + } + + SECTION("one argument to two peers") { + Universe c; + + b.connect(uri)->waitConnection(); + c.connect(uri)->waitConnection(); + + int done1 = 0; + b.bind("hello", [&done1](int v) { + done1 = v; + }); + + int done2 = 0; + c.bind("hello", [&done2](int v) { + done2 = v; + }); + + REQUIRE( a.numberOfPeers() == 2 ); + //sleep_for(milliseconds(100)); // NOTE: Binding might not be ready + + a.broadcast("hello", 676); + + REQUIRE( try_for(20, [&done1, &done2]{ return done1 == 676 && done2 == 676; }) ); + } } TEST_CASE("Universe::findAll()", "") { - Universe a; - Universe b; - Universe c; - - a.listen(ftl::URI("tcp://localhost:0")); - auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); - - b.connect(uri)->waitConnection(); - c.connect(uri)->waitConnection(); - - SECTION("no values exist") { - REQUIRE( (c.findAll<int>("test_all").size() == 0) ); - } - - SECTION("one set exists") { - a.bind("test_all", []() -> std::vector<int> { - return {3,4,5}; - }); - - auto res = c.findAll<int>("test_all"); - REQUIRE( (res.size() == 3) ); - REQUIRE( (res[0] == 3) ); - } - - SECTION("two sets exists") { - b.bind("test_all", []() -> std::vector<int> { - return {3,4,5}; - }); - c.bind("test_all", []() -> std::vector<int> { - return {6,7,8}; - }); - - //sleep_for(milliseconds(100)); // NOTE: Binding might not be ready - - auto res = a.findAll<int>("test_all"); - REQUIRE( (res.size() == 6) ); - REQUIRE( (res[0] == 3 || res[0] == 6) ); - } + Universe a; + Universe b; + Universe c; + + a.listen(ftl::URI("tcp://localhost:0")); + auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); + + b.connect(uri)->waitConnection(); + c.connect(uri)->waitConnection(); + + SECTION("no values exist") { + REQUIRE( (c.findAll<int>("test_all").size() == 0) ); + } + + SECTION("one set exists") { + a.bind("test_all", []() -> std::vector<int> { + return {3,4,5}; + }); + + auto res = c.findAll<int>("test_all"); + REQUIRE( (res.size() == 3) ); + REQUIRE( (res[0] == 3) ); + } + + SECTION("two sets exists") { + b.bind("test_all", []() -> std::vector<int> { + return {3,4,5}; + }); + c.bind("test_all", []() -> std::vector<int> { + return {6,7,8}; + }); + + //sleep_for(milliseconds(100)); // NOTE: Binding might not be ready + + auto res = a.findAll<int>("test_all"); + REQUIRE( (res.size() == 6) ); + REQUIRE( (res[0] == 3 || res[0] == 6) ); + } } TEST_CASE("Peer::call() __ping__", "") { - Universe a; - Universe b; - Universe c; - - a.listen(ftl::URI("tcp://localhost:0")); - auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); - - auto *p = b.connect(uri); - p->waitConnection(); - - SECTION("single ping") { - int64_t res = p->call<int64_t>("__ping__"); - REQUIRE((res <= ftl::timer::get_time() && res > 0)); - } - - SECTION("large number of pings") { - for (int i=0; i<100; ++i) { - int64_t res = p->call<int64_t>("__ping__"); - REQUIRE(res > 0); - } - } - - SECTION("large number of parallel pings") { - std::atomic<int> count = 0; - for (int i=0; i<100; ++i) { - ftl::pool.push([&count, p](int id) { - int64_t res = p->call<int64_t>("__ping__"); - REQUIRE( res > 0 ); - count++; - }); - } - - while (count < 100) std::this_thread::sleep_for(milliseconds(5)); - } - - SECTION("single invalid rpc") { - bool errored = false; - try { - int64_t res = p->call<int64_t>("__ping2__"); - REQUIRE( res > 0 ); // Not called or required actually - } catch (const ftl::exception &e) { - e.ignore(); // supress log output - errored = true; - } - - REQUIRE(errored); - } + Universe a; + Universe b; + Universe c; + + a.listen(ftl::URI("tcp://localhost:0")); + auto uri = "tcp://localhost:" + std::to_string(a.getListeningURIs().front().getPort()); + + auto *p = b.connect(uri); + p->waitConnection(); + + SECTION("single ping") { + int64_t res = p->call<int64_t>("__ping__"); + REQUIRE((res <= ftl::timer::get_time() && res > 0)); + } + + SECTION("large number of pings") { + for (int i=0; i<100; ++i) { + int64_t res = p->call<int64_t>("__ping__"); + REQUIRE(res > 0); + } + } + + SECTION("large number of parallel pings") { + std::atomic<int> count = 0; + for (int i=0; i<100; ++i) { + ftl::pool.push([&count, p](int id) { + int64_t res = p->call<int64_t>("__ping__"); + REQUIRE( res > 0 ); + count++; + }); + } + + while (count < 100) std::this_thread::sleep_for(milliseconds(5)); + } + + SECTION("single invalid rpc") { + bool errored = false; + try { + int64_t res = p->call<int64_t>("__ping2__"); + REQUIRE( res > 0 ); // Not called or required actually + } catch (const ftl::exception &e) { + e.ignore(); // supress log output + errored = true; + } + + REQUIRE(errored); + } }*/ diff --git a/test/rpc_integration.cpp b/test/rpc_integration.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1ea1ad3b810663182e53f83db5683546e346bc29 --- /dev/null +++ b/test/rpc_integration.cpp @@ -0,0 +1,126 @@ +#include "catch.hpp" +#include <ftl/protocol.hpp> +#include <ftl/protocol/self.hpp> +#include <ftl/protocol/node.hpp> +#include <ftl/uri.hpp> +#include <ftl/exception.hpp> +#include <ftl/lib/nlohmann/json.hpp> +#include <ftl/protocol/streams.hpp> + +#include <thread> +#include <chrono> + +using std::this_thread::sleep_for; +using std::chrono::milliseconds; + +// --- Tests ------------------------------------------------------------------- + +TEST_CASE("RPC Node Details", "[rpc]") { + auto self = ftl::createDummySelf(); + + self->listen(ftl::URI("tcp://localhost:0")); + + { + auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort()); + LOG(INFO) << uri; + auto p = ftl::connectNode(uri); + REQUIRE(p); + REQUIRE(p->waitConnection(5)); + REQUIRE(self->waitConnections(5) == 1); + + self->onNodeDetails([]() { + nlohmann::json details; + details["title"] = "Test node"; + return details; + }); + + auto details = p->details(); + + REQUIRE(details["title"] == "Test node"); + } + + ftl::protocol::reset(); +} + +TEST_CASE("RPC List Streams", "[rpc]") { + auto self = ftl::createDummySelf(); + + self->listen(ftl::URI("tcp://localhost:0")); + + { + auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort()); + LOG(INFO) << uri; + auto p = ftl::connectNode(uri); + REQUIRE(p); + REQUIRE(p->waitConnection(5)); + REQUIRE(self->waitConnections(5) == 1); + + auto s = self->createStream("ftl://mystream"); + s->begin(); + + auto streams = ftl::getSelf()->getStreams(); + + REQUIRE(streams.size() == 1); + REQUIRE(streams[0] == "ftl://mystream"); + + s->end(); + + streams = ftl::getSelf()->getStreams(); + REQUIRE(streams.size() == 0); + } + + ftl::protocol::reset(); +} + +TEST_CASE("RPC get config", "[rpc]") { + auto self = ftl::createDummySelf(); + + self->listen(ftl::URI("tcp://localhost:0")); + + { + auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort()); + LOG(INFO) << uri; + auto p = ftl::connectNode(uri); + REQUIRE(p); + REQUIRE(p->waitConnection(5)); + REQUIRE(self->waitConnections(5) == 1); + + self->onGetConfig([](const std::string &path) { + nlohmann::json cfg; + cfg["path"] = path; + return cfg; + }); + + auto cfg = p->getConfig("path is this"); + + REQUIRE(cfg["path"] == "path is this"); + } + + ftl::protocol::reset(); +} + +TEST_CASE("RPC shutdown", "[rpc]") { + auto self = ftl::createDummySelf(); + + self->listen(ftl::URI("tcp://localhost:0")); + + { + auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort()); + LOG(INFO) << uri; + auto p = ftl::connectNode(uri); + REQUIRE(p); + REQUIRE(p->waitConnection(5)); + REQUIRE(self->waitConnections(5) == 1); + + bool shutdown = false; + self->onShutdown([&shutdown]() { + shutdown = true; + }); + + ftl::getSelf()->shutdownAll(); + sleep_for(milliseconds(100)); + REQUIRE(shutdown); + } + + ftl::protocol::reset(); +}