Skip to content
Snippets Groups Projects
Commit cc6b15d2 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Rework protocol handshake

parent ea2681fb
No related branches found
No related tags found
No related merge requests found
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#define _FTL_NET_HPP_ #define _FTL_NET_HPP_
#include <memory> #include <memory>
#include <functional>
namespace ftl { namespace ftl {
namespace net { namespace net {
...@@ -39,6 +40,8 @@ bool run(bool async=false); ...@@ -39,6 +40,8 @@ bool run(bool async=false);
*/ */
bool wait(); bool wait();
void wait(std::function<bool(void)>);
/** /**
* Check and process any waiting messages, but do not block if there are none. * Check and process any waiting messages, but do not block if there are none.
*/ */
......
...@@ -28,13 +28,15 @@ struct Header { ...@@ -28,13 +28,15 @@ struct Header {
}; };
struct Handshake { struct Handshake {
uint64_t proto; // The protocol the other party is expected to use. uint64_t magic;
char peerid[16]; // GUID for the origin peer. uint32_t name_size;
char reserved_[32]; // RESERVED, must be 0. uint32_t proto_size;
}; };
#pragma pack(pop) #pragma pack(pop)
static const uint64_t MAGIC = 0x1099340053640912;
/** /**
* Each instance of this Protocol class represents a specific protocol. A * Each instance of this Protocol class represents a specific protocol. A
* protocol is a set of RPC bindings and raw message handlers. A protocol is * protocol is a set of RPC bindings and raw message handlers. A protocol is
...@@ -47,7 +49,7 @@ class Protocol { ...@@ -47,7 +49,7 @@ class Protocol {
friend class Socket; friend class Socket;
public: public:
Protocol(uint64_t id); Protocol(const std::string &id);
~Protocol(); ~Protocol();
/** /**
...@@ -63,9 +65,9 @@ class Protocol { ...@@ -63,9 +65,9 @@ class Protocol {
// broadcast? // broadcast?
uint64_t id() const { return id_; } const std::string &id() const { return id_; }
static Protocol *find(uint64_t id); static Protocol *find(const std::string &id);
protected: protected:
void dispatchRPC(Socket &, const std::string &d); void dispatchRPC(Socket &, const std::string &d);
...@@ -78,9 +80,9 @@ class Protocol { ...@@ -78,9 +80,9 @@ class Protocol {
private: private:
ftl::net::Dispatcher disp_; ftl::net::Dispatcher disp_;
std::map<uint32_t,std::function<void(uint32_t,Socket&)>> handlers_; std::map<uint32_t,std::function<void(uint32_t,Socket&)>> handlers_;
uint64_t id_; std::string id_;
static std::map<uint64_t,Protocol*> protocols__; static std::map<std::string,Protocol*> protocols__;
}; };
// --- Template Implementations ------------------------------------------------ // --- Template Implementations ------------------------------------------------
......
...@@ -126,8 +126,8 @@ class Socket { ...@@ -126,8 +126,8 @@ class Socket {
* is current here for testing purposes. * is current here for testing purposes.
* @{ * @{
*/ */
void handshake1(const std::string &d); void handshake1();
void handshake2(const std::string &d); void handshake2();
/** @} */ /** @} */
private: // Functions private: // Functions
......
...@@ -113,6 +113,8 @@ Listener::~Listener() { ...@@ -113,6 +113,8 @@ Listener::~Listener() {
void Listener::connection(shared_ptr<Socket> &s) { void Listener::connection(shared_ptr<Socket> &s) {
if (default_proto_) { if (default_proto_) {
s->setProtocol(default_proto_); s->setProtocol(default_proto_);
} else {
s->setProtocol(NULL);
} }
for (auto h : handler_connect_) h(s); for (auto h : handler_connect_) h(s);
} }
......
...@@ -4,8 +4,10 @@ ...@@ -4,8 +4,10 @@
#include <vector> #include <vector>
#include <iostream> #include <iostream>
#include <chrono>
using namespace std; using namespace std;
using namespace std::chrono;
using ftl::net::Listener; using ftl::net::Listener;
using ftl::net::Socket; using ftl::net::Socket;
...@@ -189,6 +191,12 @@ bool ftl::net::wait() { ...@@ -189,6 +191,12 @@ bool ftl::net::wait() {
return _run(false,false); return _run(false,false);
} }
void ftl::net::wait(std::function<bool(void)> f) {
auto start = steady_clock::now();
while (!f() && duration<float>(steady_clock::now() - start).count() < 3.0)
_run(false,false);
}
bool ftl::net::run(bool async) { bool ftl::net::run(bool async) {
if (async) { if (async) {
// TODO Start thread // TODO Start thread
......
...@@ -6,14 +6,14 @@ ...@@ -6,14 +6,14 @@
using ftl::net::Socket; using ftl::net::Socket;
using ftl::net::Protocol; using ftl::net::Protocol;
std::map<uint64_t,Protocol*> Protocol::protocols__; std::map<std::string,Protocol*> Protocol::protocols__;
Protocol *Protocol::find(uint64_t id) { Protocol *Protocol::find(const std::string &id) {
if (protocols__.count(id) > 0) return protocols__[id]; if (protocols__.count(id) > 0) return protocols__[id];
else return NULL; else return NULL;
} }
Protocol::Protocol(uint64_t id) : id_(id) { Protocol::Protocol(const std::string &id) : id_(id) {
protocols__[id] = this; protocols__[id] = this;
} }
......
...@@ -207,14 +207,25 @@ int Socket::close() { ...@@ -207,14 +207,25 @@ int Socket::close() {
} }
void Socket::setProtocol(Protocol *p) { void Socket::setProtocol(Protocol *p) {
if (proto_ == p) return; if (p != NULL) {
if (proto_ && proto_->id() == p->id()) return; if (proto_ == p) return;
if (proto_ && proto_->id() == p->id()) return;
proto_ = p;
ftl::net::Handshake hs1; proto_ = p;
hs1.proto = p->id(); Handshake hs1;
send(FTL_PROTOCOL_HS1, std::string((char*)&hs1, sizeof(hs1))); hs1.magic = ftl::net::MAGIC;
LOG(INFO) << "Handshake initiated with " << uri_; hs1.name_size = 0;
hs1.proto_size = p->id().size();
send(FTL_PROTOCOL_HS1, hs1, p->id());
LOG(INFO) << "Handshake initiated with " << uri_;
} else {
Handshake hs1;
hs1.magic = ftl::net::MAGIC;
hs1.name_size = 0;
hs1.proto_size = 0;
send(FTL_PROTOCOL_HS1, hs1);
LOG(INFO) << "Handshake initiated with " << uri_;
}
} }
void Socket::error() { void Socket::error() {
...@@ -278,9 +289,9 @@ bool Socket::data() { ...@@ -278,9 +289,9 @@ bool Socket::data() {
gpos_ = 0; gpos_ = 0;
if (service == FTL_PROTOCOL_HS1 && !connected_) { if (service == FTL_PROTOCOL_HS1 && !connected_) {
handshake1(d); handshake1();
} else if (service == FTL_PROTOCOL_HS2 && !connected_) { } else if (service == FTL_PROTOCOL_HS2 && !connected_) {
handshake2(d); handshake2();
} else if (service == FTL_PROTOCOL_RPC) { } else if (service == FTL_PROTOCOL_RPC) {
if (proto_) proto_->dispatchRPC(*this, d); if (proto_) proto_->dispatchRPC(*this, d);
else LOG(WARNING) << "No protocol set for socket " << uri_; else LOG(WARNING) << "No protocol set for socket " << uri_;
...@@ -309,53 +320,33 @@ int Socket::read(std::string &s, size_t count) { ...@@ -309,53 +320,33 @@ int Socket::read(std::string &s, size_t count) {
return count; return count;
} }
void Socket::handshake1(const std::string &d) { void Socket::handshake1() {
ftl::net::Handshake *hs; Handshake header;
if (d.size() != sizeof(ftl::net::Handshake)) { read(header);
LOG(ERROR) << "Handshake failed for " << uri_;
close(); std::string peer;
return; if (header.name_size > 0) read(peer,header.name_size);
}
std::string protouri;
hs = (ftl::net::Handshake*)d.data(); if (header.proto_size > 0) read(protouri,header.proto_size);
auto proto = Protocol::find(hs->proto);
if (proto == NULL) { if (protouri.size() > 0) {
LOG(ERROR) << "Protocol (" << hs->proto << ") not found during handshake for " << uri_; auto proto = Protocol::find(protouri);
close(); if (proto == NULL) {
return; LOG(ERROR) << "Protocol (" << protouri << ") not found during handshake for " << uri_;
} else { close();
proto_ = proto; return;
} else {
proto_ = proto;
}
} }
peerid_ = std::string(&hs->peerid[0],16);
send(FTL_PROTOCOL_HS2); // TODO Counterpart protocol.
ftl::net::Handshake hs2; LOG(INFO) << "Handshake (" << protouri << ") confirmed from " << uri_;
//hs2.magic = ftl::net::MAGIC;
//hs2.version = version_;
// TODO Set peerid;
send(FTL_PROTOCOL_HS2, std::string((char*)&hs2, sizeof(hs2)));
LOG(INFO) << "Handshake" << " confirmed from " << uri_;
_connected(); _connected();
} }
void Socket::handshake2(const std::string &d) { void Socket::handshake2() {
ftl::net::Handshake *hs;
if (d.size() != sizeof(ftl::net::Handshake)) {
LOG(ERROR) << "Handshake failed for " << uri_;
close();
return;
}
hs = (ftl::net::Handshake*)d.data();
/*if (hs->magic != ftl::net::MAGIC) {
LOG(ERROR) << "Handshake magic failed for " << uri_;
close();
return;
}
version_ = (hs->version > ftl::net::version()) ?
ftl::net::version() :
hs->version;*/
peerid_ = std::string(&hs->peerid[0],16);
LOG(INFO) << "Handshake finalised for " << uri_; LOG(INFO) << "Handshake finalised for " << uri_;
_connected(); _connected();
} }
......
...@@ -162,8 +162,7 @@ TEST_CASE("net::listen()", "[net]") { ...@@ -162,8 +162,7 @@ TEST_CASE("net::listen()", "[net]") {
SECTION("can connect to listening socket") { SECTION("can connect to listening socket") {
auto sock = ftl::net::connect("tcp://127.0.0.1:9001"); auto sock = ftl::net::connect("tcp://127.0.0.1:9001");
REQUIRE(sock->isValid()); REQUIRE(sock->isValid());
ftl::net::wait(); // Handshake 1 ftl::net::wait([&sock]() { return sock->isConnected(); });
ftl::net::wait(); // Handshake 2
REQUIRE(sock->isConnected()); REQUIRE(sock->isConnected());
// TODO Need way of knowing about connection // TODO Need way of knowing about connection
...@@ -194,7 +193,7 @@ TEST_CASE("net::listen()", "[net]") { ...@@ -194,7 +193,7 @@ TEST_CASE("net::listen()", "[net]") {
TEST_CASE("Net Integration", "[integrate]") { TEST_CASE("Net Integration", "[integrate]") {
std::string data; std::string data;
Protocol p(143); Protocol p("ftl://utu.fi");
p.bind("add", [](int a, int b) { p.bind("add", [](int a, int b) {
return a + b; return a + b;
......
...@@ -21,7 +21,7 @@ using ftl::net::Socket; ...@@ -21,7 +21,7 @@ using ftl::net::Socket;
class MockProtocol : public Protocol { class MockProtocol : public Protocol {
public: public:
MockProtocol() : Protocol(33) {} MockProtocol() : Protocol("ftl://utu.fi") {}
void mock_dispatchRPC(Socket &s, const std::string &d) { dispatchRPC(s,d); } void mock_dispatchRPC(Socket &s, const std::string &d) { dispatchRPC(s,d); }
void mock_dispatchReturn(Socket &s, const std::string &d) { dispatchReturn(s,d); } void mock_dispatchReturn(Socket &s, const std::string &d) { dispatchReturn(s,d); }
void mock_dispatchRaw(uint32_t msg, Socket &s) { dispatchRaw(msg,s); } void mock_dispatchRaw(uint32_t msg, Socket &s) { dispatchRaw(msg,s); }
......
...@@ -27,13 +27,13 @@ void ftl::net::Protocol::dispatchRaw(uint32_t service, Socket &s) { ...@@ -27,13 +27,13 @@ void ftl::net::Protocol::dispatchRaw(uint32_t service, Socket &s) {
} }
ftl::net::Protocol::Protocol(uint64_t id) { ftl::net::Protocol::Protocol(const std::string &id) {
} }
ftl::net::Protocol::~Protocol() { ftl::net::Protocol::~Protocol() {
} }
ftl::net::Protocol *ftl::net::Protocol::find(uint64_t p) { ftl::net::Protocol *ftl::net::Protocol::find(const std::string &p) {
return NULL; return NULL;
} }
...@@ -168,7 +168,7 @@ TEST_CASE("Socket::call()", "[rpc]") { ...@@ -168,7 +168,7 @@ TEST_CASE("Socket::call()", "[rpc]") {
TEST_CASE("Socket receive RPC", "[rpc]") { TEST_CASE("Socket receive RPC", "[rpc]") {
MockSocket s; MockSocket s;
auto p = new Protocol(444); auto p = new Protocol("ftl://utu.fi");
s.setProtocol(p); s.setProtocol(p);
SECTION("no argument call") { SECTION("no argument call") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment