Skip to content
Snippets Groups Projects
Commit aaf9b03c authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Fix for unique_ptr destruction in RPC return code

parent ecff914e
No related branches found
No related tags found
No related merge requests found
......@@ -6,6 +6,7 @@
#include <memory>
#include <tuple>
#include <functional>
#include <iostream>
namespace ftl {
......@@ -104,6 +105,7 @@ class Dispatcher {
args_type args_real;
args.convert(args_real);
auto z = std::make_unique<msgpack::zone>();
std::cout << "CALL " << ftl::internal::call(func, args_real) << std::endl;
auto result = msgpack::object(ftl::internal::call(func, args_real), *z);
return std::make_unique<msgpack::object_handle>(result, std::move(z));
}));
......
#ifndef _FTL_NET_PROTOCOL_HPP_
#define _FTL_NET_PROTOCOL_HPP_
#define FTL_PROTOCOL_HS1 0x0001 // Handshake step 1
#define FTL_PROTOCOL_HS2 0x0002 // Handshake step 2
#define FTL_PROTOCOL_RPC 0x0100
#define FTL_PROTOCOL_RPCRETURN 0x0101
#define FTL_PROTOCOL_P2P 0x1000
#endif // _FTL_NET_PROTOCOL_HPP_
#ifndef _FTL_NET_RAW_HPP_
#define _FTL_NET_RAW_HPP_
#include <functional>
#include <sstream>
#include <string>
#endif // _FTL_NET_RAW_HPP_
......@@ -43,10 +43,10 @@ class Socket {
//friend bool ftl::net::run(bool);
int _socket() { return m_sock; };
int _socket() const { return m_sock; };
bool isConnected() { return m_sock != INVALID_SOCKET; };
bool isValid() { return m_valid; };
bool isConnected() const { return m_sock != INVALID_SOCKET; };
bool isValid() const { return m_valid; };
std::string getURI() const { return m_uri; };
/**
......@@ -62,8 +62,12 @@ class Socket {
/**
* Bind a function to a raw message type.
*/
void bind(uint32_t service, std::function<void(std::shared_ptr<Socket>,const std::string&)> func);
void bind(uint32_t service, std::function<void(Socket&,
const std::string&)> func);
/**
* Remote Procedure Call.
*/
template <typename T, typename... ARGS>
T call(const std::string &name, ARGS... args) {
bool hasreturned = false;
......@@ -92,7 +96,7 @@ class Socket {
auto rpcid = rpcid__++;
auto call_obj = std::make_tuple(0,rpcid,name,args_obj);
LOG(INFO) << "RPC call sent: " << name;
LOG(INFO) << "RPC " << name << "() -> " << m_uri;
std::stringstream buf;
msgpack::pack(buf, call_obj);
......@@ -105,9 +109,9 @@ class Socket {
void dispatch(const std::string &b) { disp_.dispatch(b); }
void onMessage(sockdatahandler_t handler) { m_handler = handler; }
//void onMessage(sockdatahandler_t handler) { m_handler = handler; }
void onError(sockerrorhandler_t handler) {}
void onConnect(sockconnecthandler_t handler) {}
void onConnect(std::function<void(Socket&)> f);
void onDisconnect(sockdisconnecthandler_t handler) {}
bool data();
......@@ -118,11 +122,15 @@ class Socket {
int m_sock;
size_t m_pos;
char *m_buffer;
sockdatahandler_t m_handlers;
std::map<uint32_t,std::function<void(Socket&,const std::string&)>> handlers_;
std::vector<std::function<void(Socket&)>> connect_handlers_;
bool m_valid;
bool m_connected;
std::map<int, std::function<void(msgpack::object&)>> callbacks_;
ftl::net::Dispatcher disp_;
void _connected();
static int rpcid__;
static const int MAX_MESSAGE = 10*1024*1024; // 10Mb currently
......
......@@ -44,17 +44,19 @@ void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) {
auto &&name = std::get<2>(the_call);
auto &&args = std::get<3>(the_call);
LOG(INFO) << "RPC call received: " << name;
LOG(INFO) << "RPC " << name << "() <- " << sock_->getURI();
auto it_func = funcs_.find(name);
if (it_func != end(funcs_)) {
try {
auto result = (it_func->second)(args)->get();
auto res_obj = std::make_tuple(1,id,msgpack::object(),result);
auto result = (it_func->second)(args); //->get();
response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get());
std::stringstream buf;
msgpack::pack(buf, res_obj);
//std::cout << " RESULT " << result.as<std::string>() << std::endl;
sock_->send(FTL_PROTOCOL_RPCRETURN, buf.str());
} catch (...) {
throw;
......
......@@ -2,6 +2,8 @@
#include <ftl/uri.hpp>
#include <ftl/net/listener.hpp>
#include <ftl/net/socket.hpp>
#include <ftl/net/protocol.hpp>
#include <iostream>
#ifndef WIN32
......@@ -109,6 +111,9 @@ Listener::~Listener() {
}
void Listener::connection(shared_ptr<Socket> &s) {
std::string hs1("HELLO");
s->send(FTL_PROTOCOL_HS1, hs1);
LOG(INFO) << "Handshake initiated with " << s->getURI();
for (auto h : handler_connect_) h(s);
}
......
......@@ -148,18 +148,6 @@ bool _run(bool blocking, bool nodelay) {
l->connection(sock);
sockets.push_back(std::move(sock));
// TODO Save the ip address
// deal with both IPv4 and IPv6:
/*if (addr.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&addr;
//port = ntohs(s->sin_port);
inet_ntop(AF_INET, &s->sin_addr, sock->m_addr, INET6_ADDRSTRLEN);
} else { // AF_INET6
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr;
//port = ntohs(s->sin6_port);
inet_ntop(AF_INET6, &s->sin6_addr, sock->m_addr, INET6_ADDRSTRLEN);
}*/
}
//}
}
......
#include <ftl/net/raw.hpp>
#include <ftl/net/listener.hpp>
#include <ftl/uri.hpp>
#include <vector>
#include <iostream>
#ifndef WIN32
#include <errno.h>
#include <fcntl.h>
#endif
#undef ERROR
using ftl::URI;
using ftl::net::raw::Socket;
//static sockaddr_in slocalAddr;
......@@ -27,6 +27,17 @@ using namespace ftl;
using ftl::net::Socket;
using namespace std;
static std::string hexStr(const std::string &s)
{
const char *data = s.data();
int len = s.size();
std::stringstream ss;
ss << std::hex;
for(int i=0;i<len;++i)
ss << std::setw(2) << std::setfill('0') << (int)data[i];
return ss.str();
}
int Socket::rpcid__ = 0;
static int tcpConnect(URI &uri) {
......@@ -112,6 +123,7 @@ static int wsConnect(URI &uri) {
Socket::Socket(int s) : m_sock(s), m_pos(0), disp_(this) {
m_valid = true;
m_buffer = new char[BUFFER_SIZE];
m_connected = false;
sockaddr_storage addr;
int rsize = sizeof(sockaddr_storage);
......@@ -144,6 +156,7 @@ Socket::Socket(const char *pUri) : m_uri(pUri), m_pos(0), disp_(this) {
URI uri(pUri);
m_valid = false;
m_connected = false;
m_sock = INVALID_SOCKET;
if (uri.getProtocol() == URI::SCHEME_TCP) {
......@@ -173,9 +186,7 @@ void Socket::error() {
int err;
uint32_t optlen = sizeof(err);
getsockopt(m_sock, SOL_SOCKET, SO_ERROR, &err, &optlen);
LOG(ERROR) << "Socket: " << m_uri << " - error " << err;
//close();
}
bool Socket::data() {
......@@ -228,7 +239,17 @@ bool Socket::data() {
auto d = std::string(m_buffer+8, len-4);
//std::cerr << "DATA : " << service << " -> " << d << std::endl;
if (service == FTL_PROTOCOL_RPC) {
if (service == FTL_PROTOCOL_HS1 && !m_connected) {
// TODO Verify data
std::string hs2("HELLO");
send(FTL_PROTOCOL_HS2, hs2);
LOG(INFO) << "Handshake confirmed from " << m_uri;
_connected();
} else if (service == FTL_PROTOCOL_HS2 && !m_connected) {
// TODO Verify data
LOG(INFO) << "Handshake finalised for " << m_uri;
_connected();
} else if (service == FTL_PROTOCOL_RPC) {
dispatch(d);
} else if (service == FTL_PROTOCOL_RPCRETURN) {
auto unpacked = msgpack::unpack(d.data(), d.size());
......@@ -243,7 +264,9 @@ bool Socket::data() {
auto &&id = std::get<1>(the_result);
//auto &&err = std::get<2>(the_result);
auto &&res = std::get<3>(the_result);
std::cout << " ROSULT " << hexStr(d) << std::endl;
if (callbacks_.count(id) > 0) {
LOG(INFO) << "Received return RPC value";
callbacks_[id](res);
......@@ -252,7 +275,8 @@ bool Socket::data() {
LOG(ERROR) << "Missing RPC callback for result";
}
} else {
if (m_handler) m_handler(service, d);
// Lookup raw message handler
if (handlers_.count(service) > 0) handlers_[service](*this, d);
}
//}
......@@ -261,6 +285,31 @@ bool Socket::data() {
return true;
}
void Socket::onConnect(std::function<void(Socket&)> f) {
if (m_connected) {
f(*this);
} else {
connect_handlers_.push_back(f);
}
}
void Socket::_connected() {
m_connected = true;
for (auto h : connect_handlers_) {
h(*this);
}
connect_handlers_.clear();
}
void Socket::bind(uint32_t service, std::function<void(Socket&,
const std::string&)> func) {
if (handlers_.count(service) == 0) {
handlers_[service] = func;
} else {
LOG(ERROR) << "Message service " << service << " already bound";
}
}
int Socket::send(uint32_t service, const std::string &data) {
ftl::net::Header h;
h.size = data.size()+4;
......
......@@ -8,7 +8,7 @@ add_executable(rpc_test EXCLUDE_FROM_ALL
../src/socket.cpp
)
target_include_directories(rpc_test PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(rpc_test uriparser)
target_link_libraries(rpc_test uriparser glog)
add_executable(socket_test EXCLUDE_FROM_ALL
./tests.cpp
......@@ -22,7 +22,7 @@ add_executable(socket_test EXCLUDE_FROM_ALL
../src/dispatcher.cpp
)
target_include_directories(socket_test PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(socket_test uriparser)
target_link_libraries(socket_test uriparser glog)
add_custom_target(tests)
add_dependencies(tests rpc_test socket_test)
......
......@@ -287,8 +287,7 @@ TEST_CASE("Socket.onMessage()", "[net]") {
bool msg = false;
sock->onMessage([&](int service, std::string &data) {
REQUIRE(service == 1);
sock->bind(1, [&](Socket &s, const std::string &data) {
REQUIRE(data == "{message: \"Hello\"}");
msg = true;
});
......@@ -302,8 +301,7 @@ TEST_CASE("Socket.onMessage()", "[net]") {
bool msg = false;
sock->onMessage([&](int service, std::string &data) {
REQUIRE(service == 1);
sock->bind(1, [&](Socket &s, const std::string &data) {
REQUIRE(data == "");
msg = true;
});
......@@ -318,8 +316,7 @@ TEST_CASE("Socket.onMessage()", "[net]") {
int msg = 0;
sock->onMessage([&](int service, std::string &data) {
REQUIRE(service == 1);
sock->bind(1, [&](Socket &s, const std::string &data) {
if (msg == 0) REQUIRE(data == "{message: \"Hello\"}");
else REQUIRE(data == "{test: \"world\"}");
msg++;
......@@ -334,7 +331,7 @@ TEST_CASE("Socket.onMessage()", "[net]") {
bool msg = false;
sock->onMessage([&](int service, std::string &data) {
sock->bind(1, [&](Socket &s, const std::string &data) {
msg = true;
});
......
......@@ -213,13 +213,26 @@ TEST_CASE("Socket::call+bind loop", "[rpc]") {
s->data();
};
s->bind("test1", [](int a) -> int {
std::cout << "Bind test1 called" << std::endl;
return a*2;
});
int res = s->call<int>("test1", 5);
SECTION( "Loop a numeric result" ) {
s->bind("test1", [](int a) -> int {
std::cout << "Bind test1 called" << std::endl;
return a*2;
});
int res = s->call<int>("test1", 5);
REQUIRE( res == 10 );
}
REQUIRE( res == 10 );
SECTION( "Loop a string result" ) {
s->bind("test1", [](std::string a) -> std::string {
std::cout << "Test1 = " << a << std::endl;
return a + " world";
});
auto res = s->call<std::string>("test1", "hello");
REQUIRE( res == "hello world" );
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment