diff --git a/net/include/ftl/net/dispatcher.hpp b/net/include/ftl/net/dispatcher.hpp index 840cc79adf72741a8af9c63f059ec0efeb15629e..e652650f203f5bb7c9434c7f5272897c1619ca5c 100644 --- a/net/include/ftl/net/dispatcher.hpp +++ b/net/include/ftl/net/dispatcher.hpp @@ -6,6 +6,7 @@ #include <memory> #include <tuple> #include <functional> +#include <iostream> namespace ftl { @@ -104,6 +105,7 @@ class Dispatcher { args_type args_real; args.convert(args_real); auto z = std::make_unique<msgpack::zone>(); + std::cout << "CALL " << ftl::internal::call(func, args_real) << std::endl; auto result = msgpack::object(ftl::internal::call(func, args_real), *z); return std::make_unique<msgpack::object_handle>(result, std::move(z)); })); diff --git a/net/include/ftl/net/protocol.hpp b/net/include/ftl/net/protocol.hpp index 000e983de5eeb7870303f0a30c4e028fa631f2b9..6da75b48fa741b48b1bdfe3b11b743f4e4f7702a 100644 --- a/net/include/ftl/net/protocol.hpp +++ b/net/include/ftl/net/protocol.hpp @@ -1,8 +1,12 @@ #ifndef _FTL_NET_PROTOCOL_HPP_ #define _FTL_NET_PROTOCOL_HPP_ +#define FTL_PROTOCOL_HS1 0x0001 // Handshake step 1 +#define FTL_PROTOCOL_HS2 0x0002 // Handshake step 2 + #define FTL_PROTOCOL_RPC 0x0100 #define FTL_PROTOCOL_RPCRETURN 0x0101 + #define FTL_PROTOCOL_P2P 0x1000 #endif // _FTL_NET_PROTOCOL_HPP_ diff --git a/net/include/ftl/net/raw.hpp b/net/include/ftl/net/raw.hpp deleted file mode 100644 index c869a919ee1a095c819c77373e4f360229432127..0000000000000000000000000000000000000000 --- a/net/include/ftl/net/raw.hpp +++ /dev/null @@ -1,11 +0,0 @@ -#ifndef _FTL_NET_RAW_HPP_ -#define _FTL_NET_RAW_HPP_ - -#include <functional> -#include <sstream> -#include <string> - - - - -#endif // _FTL_NET_RAW_HPP_ diff --git a/net/include/ftl/net/socket.hpp b/net/include/ftl/net/socket.hpp index 630f4821a91fdb858e68f08641fdb3e6a01fc552..76fbb58c96143db1df2a6fa186a376ad732e1118 100644 --- a/net/include/ftl/net/socket.hpp +++ b/net/include/ftl/net/socket.hpp @@ -43,10 +43,10 @@ class Socket { //friend bool ftl::net::run(bool); - int _socket() { return m_sock; }; + int _socket() const { return m_sock; }; - bool isConnected() { return m_sock != INVALID_SOCKET; }; - bool isValid() { return m_valid; }; + bool isConnected() const { return m_sock != INVALID_SOCKET; }; + bool isValid() const { return m_valid; }; std::string getURI() const { return m_uri; }; /** @@ -62,8 +62,12 @@ class Socket { /** * Bind a function to a raw message type. */ - void bind(uint32_t service, std::function<void(std::shared_ptr<Socket>,const std::string&)> func); + void bind(uint32_t service, std::function<void(Socket&, + const std::string&)> func); + /** + * Remote Procedure Call. + */ template <typename T, typename... ARGS> T call(const std::string &name, ARGS... args) { bool hasreturned = false; @@ -92,7 +96,7 @@ class Socket { auto rpcid = rpcid__++; auto call_obj = std::make_tuple(0,rpcid,name,args_obj); - LOG(INFO) << "RPC call sent: " << name; + LOG(INFO) << "RPC " << name << "() -> " << m_uri; std::stringstream buf; msgpack::pack(buf, call_obj); @@ -105,9 +109,9 @@ class Socket { void dispatch(const std::string &b) { disp_.dispatch(b); } - void onMessage(sockdatahandler_t handler) { m_handler = handler; } + //void onMessage(sockdatahandler_t handler) { m_handler = handler; } void onError(sockerrorhandler_t handler) {} - void onConnect(sockconnecthandler_t handler) {} + void onConnect(std::function<void(Socket&)> f); void onDisconnect(sockdisconnecthandler_t handler) {} bool data(); @@ -118,11 +122,15 @@ class Socket { int m_sock; size_t m_pos; char *m_buffer; - sockdatahandler_t m_handlers; + std::map<uint32_t,std::function<void(Socket&,const std::string&)>> handlers_; + std::vector<std::function<void(Socket&)>> connect_handlers_; bool m_valid; + bool m_connected; std::map<int, std::function<void(msgpack::object&)>> callbacks_; ftl::net::Dispatcher disp_; + void _connected(); + static int rpcid__; static const int MAX_MESSAGE = 10*1024*1024; // 10Mb currently diff --git a/net/src/dispatcher.cpp b/net/src/dispatcher.cpp index 300a89ea2e4111db9338fadb9e4908cfa0bd5400..2514b403118f75cd7609b98cfa43c60481da4904 100644 --- a/net/src/dispatcher.cpp +++ b/net/src/dispatcher.cpp @@ -44,17 +44,19 @@ void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) { auto &&name = std::get<2>(the_call); auto &&args = std::get<3>(the_call); - LOG(INFO) << "RPC call received: " << name; + LOG(INFO) << "RPC " << name << "() <- " << sock_->getURI(); auto it_func = funcs_.find(name); if (it_func != end(funcs_)) { try { - auto result = (it_func->second)(args)->get(); - auto res_obj = std::make_tuple(1,id,msgpack::object(),result); + auto result = (it_func->second)(args); //->get(); + response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get()); std::stringstream buf; msgpack::pack(buf, res_obj); + //std::cout << " RESULT " << result.as<std::string>() << std::endl; + sock_->send(FTL_PROTOCOL_RPCRETURN, buf.str()); } catch (...) { throw; diff --git a/net/src/listener.cpp b/net/src/listener.cpp index 028e2ec55825dcefd681a46ca8d50583798912fd..3b2808fd9489a21c46d0919e080a44c064771981 100644 --- a/net/src/listener.cpp +++ b/net/src/listener.cpp @@ -2,6 +2,8 @@ #include <ftl/uri.hpp> #include <ftl/net/listener.hpp> +#include <ftl/net/socket.hpp> +#include <ftl/net/protocol.hpp> #include <iostream> #ifndef WIN32 @@ -109,6 +111,9 @@ Listener::~Listener() { } void Listener::connection(shared_ptr<Socket> &s) { + std::string hs1("HELLO"); + s->send(FTL_PROTOCOL_HS1, hs1); + LOG(INFO) << "Handshake initiated with " << s->getURI(); for (auto h : handler_connect_) h(s); } diff --git a/net/src/net.cpp b/net/src/net.cpp index d84b6ecc5649055263e24bb9c9fa9365b0ad32eb..dafb471a0e688942d0b1a75ca8405549531f57eb 100644 --- a/net/src/net.cpp +++ b/net/src/net.cpp @@ -148,18 +148,6 @@ bool _run(bool blocking, bool nodelay) { l->connection(sock); sockets.push_back(std::move(sock)); - - // TODO Save the ip address - // deal with both IPv4 and IPv6: - /*if (addr.ss_family == AF_INET) { - struct sockaddr_in *s = (struct sockaddr_in *)&addr; - //port = ntohs(s->sin_port); - inet_ntop(AF_INET, &s->sin_addr, sock->m_addr, INET6_ADDRSTRLEN); - } else { // AF_INET6 - struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr; - //port = ntohs(s->sin6_port); - inet_ntop(AF_INET6, &s->sin6_addr, sock->m_addr, INET6_ADDRSTRLEN); - }*/ } //} } diff --git a/net/src/raw.cpp b/net/src/raw.cpp deleted file mode 100644 index 08052b2c5861fdb652b3365ddc21b9ec1d0f7483..0000000000000000000000000000000000000000 --- a/net/src/raw.cpp +++ /dev/null @@ -1,31 +0,0 @@ -#include <ftl/net/raw.hpp> -#include <ftl/net/listener.hpp> -#include <ftl/uri.hpp> -#include <vector> -#include <iostream> - -#ifndef WIN32 -#include <errno.h> -#include <fcntl.h> -#endif - -#undef ERROR - -using ftl::URI; -using ftl::net::raw::Socket; - - -//static sockaddr_in slocalAddr; - - - - - - - - - - - - - diff --git a/net/src/socket.cpp b/net/src/socket.cpp index 693a25cf4b3779d3a2afd1f7f81726a9fb6c5f51..cae4efde76d33af5d397d50a5f5982a3ae0d85c5 100644 --- a/net/src/socket.cpp +++ b/net/src/socket.cpp @@ -27,6 +27,17 @@ using namespace ftl; using ftl::net::Socket; using namespace std; +static std::string hexStr(const std::string &s) +{ + const char *data = s.data(); + int len = s.size(); + std::stringstream ss; + ss << std::hex; + for(int i=0;i<len;++i) + ss << std::setw(2) << std::setfill('0') << (int)data[i]; + return ss.str(); +} + int Socket::rpcid__ = 0; static int tcpConnect(URI &uri) { @@ -112,6 +123,7 @@ static int wsConnect(URI &uri) { Socket::Socket(int s) : m_sock(s), m_pos(0), disp_(this) { m_valid = true; m_buffer = new char[BUFFER_SIZE]; + m_connected = false; sockaddr_storage addr; int rsize = sizeof(sockaddr_storage); @@ -144,6 +156,7 @@ Socket::Socket(const char *pUri) : m_uri(pUri), m_pos(0), disp_(this) { URI uri(pUri); m_valid = false; + m_connected = false; m_sock = INVALID_SOCKET; if (uri.getProtocol() == URI::SCHEME_TCP) { @@ -173,9 +186,7 @@ void Socket::error() { int err; uint32_t optlen = sizeof(err); getsockopt(m_sock, SOL_SOCKET, SO_ERROR, &err, &optlen); - LOG(ERROR) << "Socket: " << m_uri << " - error " << err; - //close(); } bool Socket::data() { @@ -228,7 +239,17 @@ bool Socket::data() { auto d = std::string(m_buffer+8, len-4); //std::cerr << "DATA : " << service << " -> " << d << std::endl; - if (service == FTL_PROTOCOL_RPC) { + if (service == FTL_PROTOCOL_HS1 && !m_connected) { + // TODO Verify data + std::string hs2("HELLO"); + send(FTL_PROTOCOL_HS2, hs2); + LOG(INFO) << "Handshake confirmed from " << m_uri; + _connected(); + } else if (service == FTL_PROTOCOL_HS2 && !m_connected) { + // TODO Verify data + LOG(INFO) << "Handshake finalised for " << m_uri; + _connected(); + } else if (service == FTL_PROTOCOL_RPC) { dispatch(d); } else if (service == FTL_PROTOCOL_RPCRETURN) { auto unpacked = msgpack::unpack(d.data(), d.size()); @@ -243,7 +264,9 @@ bool Socket::data() { auto &&id = std::get<1>(the_result); //auto &&err = std::get<2>(the_result); auto &&res = std::get<3>(the_result); - + + std::cout << " ROSULT " << hexStr(d) << std::endl; + if (callbacks_.count(id) > 0) { LOG(INFO) << "Received return RPC value"; callbacks_[id](res); @@ -252,7 +275,8 @@ bool Socket::data() { LOG(ERROR) << "Missing RPC callback for result"; } } else { - if (m_handler) m_handler(service, d); + // Lookup raw message handler + if (handlers_.count(service) > 0) handlers_[service](*this, d); } //} @@ -261,6 +285,31 @@ bool Socket::data() { return true; } +void Socket::onConnect(std::function<void(Socket&)> f) { + if (m_connected) { + f(*this); + } else { + connect_handlers_.push_back(f); + } +} + +void Socket::_connected() { + m_connected = true; + for (auto h : connect_handlers_) { + h(*this); + } + connect_handlers_.clear(); +} + +void Socket::bind(uint32_t service, std::function<void(Socket&, + const std::string&)> func) { + if (handlers_.count(service) == 0) { + handlers_[service] = func; + } else { + LOG(ERROR) << "Message service " << service << " already bound"; + } +} + int Socket::send(uint32_t service, const std::string &data) { ftl::net::Header h; h.size = data.size()+4; diff --git a/net/test/CMakeLists.txt b/net/test/CMakeLists.txt index f8c7f42cca668421b96611494e416876b914900e..2e7aff73cc0c86b3e6d3268fd82732d5897684c2 100644 --- a/net/test/CMakeLists.txt +++ b/net/test/CMakeLists.txt @@ -8,7 +8,7 @@ add_executable(rpc_test EXCLUDE_FROM_ALL ../src/socket.cpp ) target_include_directories(rpc_test PUBLIC ${PROJECT_SOURCE_DIR}/include) -target_link_libraries(rpc_test uriparser) +target_link_libraries(rpc_test uriparser glog) add_executable(socket_test EXCLUDE_FROM_ALL ./tests.cpp @@ -22,7 +22,7 @@ add_executable(socket_test EXCLUDE_FROM_ALL ../src/dispatcher.cpp ) target_include_directories(socket_test PUBLIC ${PROJECT_SOURCE_DIR}/include) -target_link_libraries(socket_test uriparser) +target_link_libraries(socket_test uriparser glog) add_custom_target(tests) add_dependencies(tests rpc_test socket_test) diff --git a/net/test/net_raw.cpp b/net/test/net_raw.cpp index c4a6fad3ebf2ec2a48acab3155795ad07bc1f38c..3f48834a0a67b29b879d1314f0c5c3f8f57cb254 100644 --- a/net/test/net_raw.cpp +++ b/net/test/net_raw.cpp @@ -287,8 +287,7 @@ TEST_CASE("Socket.onMessage()", "[net]") { bool msg = false; - sock->onMessage([&](int service, std::string &data) { - REQUIRE(service == 1); + sock->bind(1, [&](Socket &s, const std::string &data) { REQUIRE(data == "{message: \"Hello\"}"); msg = true; }); @@ -302,8 +301,7 @@ TEST_CASE("Socket.onMessage()", "[net]") { bool msg = false; - sock->onMessage([&](int service, std::string &data) { - REQUIRE(service == 1); + sock->bind(1, [&](Socket &s, const std::string &data) { REQUIRE(data == ""); msg = true; }); @@ -318,8 +316,7 @@ TEST_CASE("Socket.onMessage()", "[net]") { int msg = 0; - sock->onMessage([&](int service, std::string &data) { - REQUIRE(service == 1); + sock->bind(1, [&](Socket &s, const std::string &data) { if (msg == 0) REQUIRE(data == "{message: \"Hello\"}"); else REQUIRE(data == "{test: \"world\"}"); msg++; @@ -334,7 +331,7 @@ TEST_CASE("Socket.onMessage()", "[net]") { bool msg = false; - sock->onMessage([&](int service, std::string &data) { + sock->bind(1, [&](Socket &s, const std::string &data) { msg = true; }); diff --git a/net/test/rpc.cpp b/net/test/rpc.cpp index 1b83ce8c0650d4ff11f3e25a7cd20cb2ec376209..1f4f9d04c1428a0a4e31dfa636459eb358f76796 100644 --- a/net/test/rpc.cpp +++ b/net/test/rpc.cpp @@ -213,13 +213,26 @@ TEST_CASE("Socket::call+bind loop", "[rpc]") { s->data(); }; - s->bind("test1", [](int a) -> int { - std::cout << "Bind test1 called" << std::endl; - return a*2; - }); - - int res = s->call<int>("test1", 5); + SECTION( "Loop a numeric result" ) { + s->bind("test1", [](int a) -> int { + std::cout << "Bind test1 called" << std::endl; + return a*2; + }); + + int res = s->call<int>("test1", 5); + + REQUIRE( res == 10 ); + } - REQUIRE( res == 10 ); + SECTION( "Loop a string result" ) { + s->bind("test1", [](std::string a) -> std::string { + std::cout << "Test1 = " << a << std::endl; + return a + " world"; + }); + + auto res = s->call<std::string>("test1", "hello"); + + REQUIRE( res == "hello world" ); + } }