Skip to content
Snippets Groups Projects
Commit c2d63ce7 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/#16' into 'main'

#16 Add existing RPC methods

See merge request nicolaspope/beyond-protocol!10
parents b25d1cab 4ca1aa75
No related branches found
No related tags found
No related merge requests found
...@@ -8,7 +8,10 @@ ...@@ -8,7 +8,10 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector>
#include <ftl/uuid.hpp> #include <ftl/uuid.hpp>
#include <ftl/protocol/frameid.hpp>
#include <nlohmann/json_fwd.hpp>
namespace ftl { namespace ftl {
namespace net { namespace net {
...@@ -146,6 +149,26 @@ class Node { ...@@ -146,6 +149,26 @@ class Node {
int connectionCount() const; int connectionCount() const;
// === RPC Methods ===
void restart();
void shutdown();
bool hasStream(const std::string &uri);
void createStream(const std::string &uri, FrameID id);
nlohmann::json details();
int64_t ping();
nlohmann::json getConfig(const std::string &path);
void setConfig(const std::string &path, const nlohmann::json &value);
std::vector<std::string> listConfigs();
protected: protected:
ftl::net::PeerPtr peer_; ftl::net::PeerPtr peer_;
}; };
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include <ftl/uri.hpp> #include <ftl/uri.hpp>
#include <ftl/handle.hpp> #include <ftl/handle.hpp>
#include <ftl/protocol/error.hpp> #include <ftl/protocol/error.hpp>
#include <ftl/protocol/frameid.hpp>
namespace ftl { namespace ftl {
namespace net { namespace net {
...@@ -159,6 +160,111 @@ class Self { ...@@ -159,6 +160,111 @@ class Self {
// Used for testing // Used for testing
ftl::net::Universe *getUniverse() const { return universe_.get(); } ftl::net::Universe *getUniverse() const { return universe_.get(); }
// === The RPC methods ===
/**
* @brief Restart all locally connected nodes.
*
*/
void restartAll();
/**
* @brief Shutdown all locally connected nodes.
*
*/
void shutdownAll();
/**
* @brief Get JSON metadata for all connected nodes. This includes names, descriptions
* and hardware resources.
*
* @return std::vector<nlohmann::json>
*/
std::vector<nlohmann::json> getAllNodeDetails();
/**
* @brief Get a list of all streams available to this node. It will collate from all
* connected nodes and the web service.
*
* @return std::vector<std::string>
*/
std::vector<std::string> getStreams();
/**
* @brief Find which node provides a stream. The returned pointer is a nullptr if the
* stream is not found.
*
* @param uri The stream URI.
* @return std::shared_ptr<ftl::protocol::Node>
*/
std::shared_ptr<ftl::protocol::Node> locateStream(const std::string &uri);
/**
* @brief Handle a restart request from other machines.
*
* @param cb
* @return ftl::Handle
*/
void onRestart(const std::function<void()> &cb);
/**
* @brief Handle a shutdown request from other machines.
*
* @param cb
* @return ftl::Handle
*/
void onShutdown(const std::function<void()> &cb);
/**
* @brief Handle a stream creation request. Most likely this is being sent by the web service.
*
* @param cb
* @return ftl::Handle
*/
void onCreateStream(const std::function<void(const std::string &uri, FrameID id)> &cb);
/**
* @brief Handle a node details request.
*
* The returned JSON object should have the following keys:
* * id
* * title
* * devices
* * gpus
*
* It may also have:
* * description
* * tags
*
* @param cb
*/
void onNodeDetails(const std::function<nlohmann::json()> &cb);
/**
* @brief Handle a get configuration request. A path to the configuration property is
* provided and a JSON value, possibly an entire object, is returned. Null can also be
* returned if not found.
*
* @param cb
*/
void onGetConfig(const std::function<nlohmann::json(const std::string &)> &cb);
/**
* @brief Handle a change config request. The configuration property path and new JSON
* value is given.
*
* @param cb
*/
void onSetConfig(const std::function<void(const std::string &, const nlohmann::json &)> &cb);
/**
* @brief Handle a request for all config properties on this machine. This could return
* just the root level objects, or every property.
*
* @param cb
*/
void onListConfig(const std::function<std::vector<std::string>()> &cb);
protected: protected:
std::shared_ptr<ftl::net::Universe> universe_; std::shared_ptr<ftl::net::Universe> universe_;
}; };
......
...@@ -6,9 +6,12 @@ ...@@ -6,9 +6,12 @@
#include <ftl/protocol/node.hpp> #include <ftl/protocol/node.hpp>
#include "peer.hpp" #include "peer.hpp"
#include <ftl/lib/nlohmann/json.hpp>
#include <ftl/time.hpp>
using ftl::protocol::Node; using ftl::protocol::Node;
using ftl::net::PeerPtr; using ftl::net::PeerPtr;
using ftl::protocol::FrameID;
Node::Node(const PeerPtr &impl): peer_(impl) {} Node::Node(const PeerPtr &impl): peer_(impl) {}
...@@ -69,3 +72,40 @@ unsigned int Node::localID() { ...@@ -69,3 +72,40 @@ unsigned int Node::localID() {
int Node::connectionCount() const { int Node::connectionCount() const {
return peer_->connectionCount(); return peer_->connectionCount();
} }
void Node::restart() {
peer_->send("restart");
}
void Node::shutdown() {
peer_->send("shutdown");
}
bool Node::hasStream(const std::string &uri) {
return !!peer_->call<std::optional<std::string>>("find_stream", uri);
}
void Node::createStream(const std::string &uri, FrameID id) {
peer_->send("create_stream", uri, id.frameset(), id.source());
}
nlohmann::json Node::details() {
const std::string res = peer_->call<std::string>("node_details");
return nlohmann::json::parse(res);
}
int64_t Node::ping() {
return peer_->call<int64_t>("__ping__");
}
nlohmann::json Node::getConfig(const std::string &path) {
return nlohmann::json::parse(peer_->call<std::string>("get_cfg", path));
}
void Node::setConfig(const std::string &path, const nlohmann::json &value) {
peer_->send("update_cfg", path, value.dump());
}
std::vector<std::string> Node::listConfigs() {
return peer_->call<std::vector<std::string>>("list_configurables");
}
...@@ -8,8 +8,11 @@ ...@@ -8,8 +8,11 @@
#include <ftl/protocol/self.hpp> #include <ftl/protocol/self.hpp>
#include "./streams/netstream.hpp" #include "./streams/netstream.hpp"
#include "./streams/filestream.hpp" #include "./streams/filestream.hpp"
#include <ftl/lib/nlohmann/json.hpp>
#include <ftl/uuid.hpp>
using ftl::protocol::Self; using ftl::protocol::Self;
using ftl::protocol::FrameID;
Self::Self(const std::shared_ptr<ftl::net::Universe> &impl): universe_(impl) {} Self::Self(const std::shared_ptr<ftl::net::Universe> &impl): universe_(impl) {}
...@@ -115,3 +118,70 @@ ftl::Handle Self::onError(const ErrorCb &cb) { ...@@ -115,3 +118,70 @@ ftl::Handle Self::onError(const ErrorCb &cb) {
return cb(std::make_shared<ftl::protocol::Node>(p), e, estr); return cb(std::make_shared<ftl::protocol::Node>(p), e, estr);
}); });
} }
void Self::restartAll() {
universe_->broadcast("restart");
}
void Self::shutdownAll() {
universe_->broadcast("shutdown");
}
std::vector<nlohmann::json> Self::getAllNodeDetails() {
auto response = universe_->findAll<std::string>("node_details");
std::vector<nlohmann::json> result(response.size());
for (auto &r : response) {
result.push_back(nlohmann::json::parse(r));
}
return result;
}
std::vector<std::string> Self::getStreams() {
return universe_->findAll<std::string>("list_streams");
}
std::shared_ptr<ftl::protocol::Node> Self::locateStream(const std::string &uri) {
auto p = universe_->findOne<ftl::UUID>("find_stream", uri);
if (!p) return nullptr;
auto peer = universe_->getPeer(*p);
if (!peer) return nullptr;
return std::make_shared<ftl::protocol::Node>(peer);
}
void Self::onRestart(const std::function<void()> &cb) {
universe_->bind("restart", cb);
}
void Self::onShutdown(const std::function<void()> &cb) {
universe_->bind("shutdown", cb);
}
void Self::onCreateStream(const std::function<void(const std::string &uri, FrameID id)> &cb) {
universe_->bind("create_stream", [cb](const std::string &uri, int fsid, int fid) {
cb(uri, FrameID(fsid, fid));
});
}
void Self::onNodeDetails(const std::function<nlohmann::json()> &cb) {
universe_->bind("node_details", [cb]() {
return cb().dump();
});
}
void Self::onGetConfig(const std::function<nlohmann::json(const std::string &)> &cb) {
universe_->bind("get_cfg", [cb](const std::string &path) {
return cb(path).dump();
});
}
void Self::onSetConfig(const std::function<void(const std::string &, const nlohmann::json &)> &cb) {
universe_->bind("update_cfg", [cb](const std::string &path, const std::string &value) {
cb(path, nlohmann::json::parse(value));
});
}
void Self::onListConfig(const std::function<std::vector<std::string>()> &cb) {
universe_->bind("list_configurables", cb);
}
...@@ -29,7 +29,7 @@ target_link_libraries(handle_unit beyond-protocol ...@@ -29,7 +29,7 @@ target_link_libraries(handle_unit beyond-protocol
add_test(HandleUnitTest handle_unit) add_test(HandleUnitTest handle_unit)
### URI ######################################################################## ### Net Integ ##################################################################
add_executable(net_integration add_executable(net_integration
$<TARGET_OBJECTS:CatchTestFTL> $<TARGET_OBJECTS:CatchTestFTL>
./net_integration.cpp) ./net_integration.cpp)
...@@ -130,3 +130,14 @@ target_link_libraries(peer_unit ...@@ -130,3 +130,14 @@ target_link_libraries(peer_unit
beyond-protocol GnuTLS::GnuTLS Threads::Threads ${URIPARSER_LIBRARIES} ${UUID_LIBRARIES} ${OS_LIBS}) beyond-protocol GnuTLS::GnuTLS Threads::Threads ${URIPARSER_LIBRARIES} ${UUID_LIBRARIES} ${OS_LIBS})
add_test(PeerUnitTest peer_unit) add_test(PeerUnitTest peer_unit)
### RPC Integ ##################################################################
add_executable(rpc_integration
$<TARGET_OBJECTS:CatchTestFTL>
./rpc_integration.cpp)
target_include_directories(rpc_integration PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include")
target_link_libraries(rpc_integration beyond-protocol
Threads::Threads ${OS_LIBS}
${URIPARSER_LIBRARIES})
add_test(RPCIntegrationTest rpc_integration)
#include "catch.hpp"
#include <ftl/protocol.hpp>
#include <ftl/protocol/self.hpp>
#include <ftl/protocol/node.hpp>
#include <ftl/uri.hpp>
#include <ftl/exception.hpp>
#include <ftl/lib/nlohmann/json.hpp>
#include <ftl/protocol/streams.hpp>
#include <thread>
#include <chrono>
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
// --- Tests -------------------------------------------------------------------
TEST_CASE("RPC Node Details", "[rpc]") {
auto self = ftl::createDummySelf();
self->listen(ftl::URI("tcp://localhost:0"));
{
auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort());
LOG(INFO) << uri;
auto p = ftl::connectNode(uri);
REQUIRE(p);
REQUIRE(p->waitConnection(5));
REQUIRE(self->waitConnections(5) == 1);
self->onNodeDetails([]() {
nlohmann::json details;
details["title"] = "Test node";
return details;
});
auto details = p->details();
REQUIRE(details["title"] == "Test node");
}
ftl::protocol::reset();
}
TEST_CASE("RPC List Streams", "[rpc]") {
auto self = ftl::createDummySelf();
self->listen(ftl::URI("tcp://localhost:0"));
{
auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort());
LOG(INFO) << uri;
auto p = ftl::connectNode(uri);
REQUIRE(p);
REQUIRE(p->waitConnection(5));
REQUIRE(self->waitConnections(5) == 1);
auto s = self->createStream("ftl://mystream");
s->begin();
auto streams = ftl::getSelf()->getStreams();
REQUIRE(streams.size() == 1);
REQUIRE(streams[0] == "ftl://mystream");
s->end();
streams = ftl::getSelf()->getStreams();
REQUIRE(streams.size() == 0);
}
ftl::protocol::reset();
}
TEST_CASE("RPC get config", "[rpc]") {
auto self = ftl::createDummySelf();
self->listen(ftl::URI("tcp://localhost:0"));
{
auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort());
LOG(INFO) << uri;
auto p = ftl::connectNode(uri);
REQUIRE(p);
REQUIRE(p->waitConnection(5));
REQUIRE(self->waitConnections(5) == 1);
self->onGetConfig([](const std::string &path) {
nlohmann::json cfg;
cfg["path"] = path;
return cfg;
});
auto cfg = p->getConfig("path is this");
REQUIRE(cfg["path"] == "path is this");
}
ftl::protocol::reset();
}
TEST_CASE("RPC shutdown", "[rpc]") {
auto self = ftl::createDummySelf();
self->listen(ftl::URI("tcp://localhost:0"));
{
auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort());
LOG(INFO) << uri;
auto p = ftl::connectNode(uri);
REQUIRE(p);
REQUIRE(p->waitConnection(5));
REQUIRE(self->waitConnections(5) == 1);
bool shutdown = false;
self->onShutdown([&shutdown]() {
shutdown = true;
});
ftl::getSelf()->shutdownAll();
sleep_for(milliseconds(100));
REQUIRE(shutdown);
}
ftl::protocol::reset();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment