From cf9f6f15a1e84659f691522e2d7ead36ef6aab3a Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nwpope@utu.fi> Date: Thu, 3 Dec 2020 09:08:57 +0200 Subject: [PATCH] Finish net component --- components/net/cpp/src/dispatcher.cpp | 33 +----- components/net/cpp/src/ice.cpp | 8 +- components/net/cpp/src/listener.cpp | 24 ++-- components/net/cpp/src/main.cpp | 147 ------------------------ components/net/cpp/src/net_internal.hpp | 6 + components/net/cpp/src/peer.cpp | 43 +------ components/net/cpp/src/universe.cpp | 12 +- components/net/cpp/src/ws_internal.cpp | 7 +- 8 files changed, 43 insertions(+), 237 deletions(-) delete mode 100644 components/net/cpp/src/main.cpp diff --git a/components/net/cpp/src/dispatcher.cpp b/components/net/cpp/src/dispatcher.cpp index 2be0055ae..32ceeda5a 100644 --- a/components/net/cpp/src/dispatcher.cpp +++ b/components/net/cpp/src/dispatcher.cpp @@ -1,4 +1,9 @@ -//#define GLOG_NO_ABBREVIATED_SEVERITIES +/** + * @file dispatcher.cpp + * @copyright Copyright (c) 2020 University of Turku, MIT License + * @author Nicolas Pope + */ + #include <loguru.hpp> #include <ftl/net/dispatcher.hpp> #include <ftl/net/peer.hpp> @@ -11,23 +16,6 @@ using std::vector; using std::string; using std::optional; -/*static std::string hexStr(const std::string &s) -{ - const char *data = s.data(); - int len = s.size(); - std::stringstream ss; - ss << std::hex; - for(int i=0;i<len;++i) - ss << std::setw(2) << std::setfill('0') << (int)data[i]; - return ss.str(); -}*/ - -//void ftl::net::Dispatcher::dispatch(Peer &s, const std::string &msg) { - //std::cout << "Received dispatch : " << hexStr(msg) << std::endl; -// auto unpacked = msgpack::unpack(msg.data(), msg.size()); -// dispatch(s, unpacked.get()); -//} - vector<string> Dispatcher::getBindings() const { vector<string> res; for (auto x : funcs_) { @@ -78,17 +66,9 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { try { auto result = (*func)(s, args); //->get(); s._sendResponse(id, name, result->get()); - /*response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get()); - std::stringstream buf; - msgpack::pack(buf, res_obj); - s.send("__return__", buf.str());*/ } catch (const std::exception &e) { //throw; LOG(ERROR) << "Exception when attempting to call RPC (" << e.what() << ")"; - /*response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object()); - std::stringstream buf; - msgpack::pack(buf, res_obj); - s.send("__return__", buf.str());*/ } } else { LOG(WARNING) << "No binding found for " << name; @@ -128,7 +108,6 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const auto &&args = std::get<2>(the_call); auto binding = _locateHandler(name); - //LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI(); if (binding) { try { diff --git a/components/net/cpp/src/ice.cpp b/components/net/cpp/src/ice.cpp index 0929eaba6..b2a5b49ba 100644 --- a/components/net/cpp/src/ice.cpp +++ b/components/net/cpp/src/ice.cpp @@ -1,4 +1,10 @@ -//#define _GNU_SOURCE +/** + * @file ice.cpp + * @copyright Copyright (c) 2020 University of Turku, MIT License + * @author Nicolas Pope + */ + + #include <ftl/net/ice.hpp> #include <ftl/net/stun.hpp> #include <ftl/uri.hpp> diff --git a/components/net/cpp/src/listener.cpp b/components/net/cpp/src/listener.cpp index 90ec540f4..2a407b921 100644 --- a/components/net/cpp/src/listener.cpp +++ b/components/net/cpp/src/listener.cpp @@ -1,4 +1,9 @@ -//#define GLOG_NO_ABBREVIATED_SEVERITIES +/** + * @file listener.cpp + * @copyright Copyright (c) 2020 University of Turku, MIT License + * @author Nicolas Pope + */ + #include <loguru.hpp> #include <ftl/uri.hpp> @@ -116,22 +121,7 @@ Listener::~Listener() { } void Listener::connection(shared_ptr<Peer> &s) { - /*Handshake hs1; - hs1.magic = ftl::net::MAGIC; - memset(hs1.id, 0, 16); - - if (default_proto_) { - s->setProtocol(default_proto_); - hs1.proto_size = default_proto_->id().size(); - s->send(FTL_PROTOCOL_HS1, hs1, default_proto_->id()); - } else { - s->setProtocol(NULL); - hs1.proto_size = 0; - s->send(FTL_PROTOCOL_HS1, hs1); - } - - LOG(INFO) << "Handshake initiated with " << s->getURI(); - for (auto h : handler_connect_) h(s);*/ + } void Listener::close() { diff --git a/components/net/cpp/src/main.cpp b/components/net/cpp/src/main.cpp deleted file mode 100644 index 7999e37b7..000000000 --- a/components/net/cpp/src/main.cpp +++ /dev/null @@ -1,147 +0,0 @@ -#include <string> -#include <iostream> -#include <map> -//#include <vector> -#include <fstream> -#include <ftl/net.hpp> - -#ifndef WIN32 -#include <readline/readline.h> -#endif - -#ifdef WIN32 -#pragma comment(lib, "Ws2_32.lib") -#pragma comment(lib, "Rpcrt4.lib") -#endif - -using std::string; -using ftl::net::Universe; -using json = nlohmann::json; -using std::ifstream; -using std::map; - -static Universe *universe; -static volatile bool stop = false; - -// Store loaded configuration -static json config; - -/** - * Find and load a JSON configuration file - */ -static bool findConfiguration(const string &file) { - ifstream i; - - if (file != "") i.open(file); - if (!i.is_open()) i.open("./config.json"); - if (!i.is_open()) i.open(FTL_LOCAL_CONFIG_ROOT "/config.json"); - if (!i.is_open()) i.open(FTL_GLOBAL_CONFIG_ROOT "/config.json"); - if (!i.is_open()) return false; - i >> config; - return true; -} - -/** - * Generate a map from command line option to value - */ -map<string, string> read_options(char ***argv, int *argc) { - map<string, string> opts; - - while (*argc > 0) { - string cmd((*argv)[0]); - if (cmd[0] != '-') break; - - size_t p; - if ((p = cmd.find("=")) == string::npos) { - opts[cmd.substr(2)] = "true"; - } else { - opts[cmd.substr(2, p-2)] = cmd.substr(p+1); - } - - (*argc)--; - (*argv)++; - } - - return opts; -} - -/** - * Put command line options into json config. If config element does not exist - * or is of a different type then report an error. - */ -static void process_options(const map<string, string> &opts) { - for (auto opt : opts) { - if (opt.first == "config") continue; - - if (opt.first == "version") { - std::cout << "FTL Vision Node - v" << FTL_VERSION << std::endl; - std::cout << FTL_VERSION_LONG << std::endl; - exit(0); - } - - try { - auto ptr = json::json_pointer("/"+opt.first); - // TODO(nick) Allow strings without quotes - auto v = json::parse(opt.second); - if (v.type() != config.at(ptr).type()) { - LOG(ERROR) << "Incorrect type for argument " << opt.first; - continue; - } - config.at(ptr) = v; - } catch(...) { - LOG(ERROR) << "Unrecognised option: " << opt.first; - } - } -} - -void handle_command(const char *l) { - string cmd = string(l); - - if (cmd == "exit") { - stop = true; - } else if (cmd.find("peer ") == 0) { - cmd = cmd.substr(cmd.find(" ")+1); - universe->connect(cmd); - } else if (cmd.find("list ") == 0) { - cmd = cmd.substr(cmd.find(" ")+1); - if (cmd == "peers") { - //auto res = p2p->getPeers(); - //for (auto r : res) std::cout << " " << r->to_string() << std::endl; - } - } -} - -int main(int argc, char **argv) { - argc--; - argv++; - - // Process Arguments - auto options = read_options(&argv, &argc); - if (!findConfiguration(options["config"])) { - LOG(FATAL) << "Could not find any configuration!"; - } - process_options(options); - - universe = new Universe(config); - - while (!stop) { -#ifndef WIN32 - char *line = readline("> "); -#else - char line[300]; - fgets(line, 299, stdin); -#endif - if (!line) break; - - handle_command(line); - -#ifndef WIN32 - free(line); -#endif - } - stop = true; - - delete universe; - return 0; -} - diff --git a/components/net/cpp/src/net_internal.hpp b/components/net/cpp/src/net_internal.hpp index fa116a675..7ba305ade 100644 --- a/components/net/cpp/src/net_internal.hpp +++ b/components/net/cpp/src/net_internal.hpp @@ -1,3 +1,9 @@ +/** + * @file net_internal.hpp + * @copyright Copyright (c) 2020 University of Turku, MIT License + * @author Nicolas Pope + */ + #ifndef _FTL_NET_INTERNAL_HPP_ #define _FTL_NET_INTERNAL_HPP_ diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 1bf4b3d7e..0dad3b02b 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -1,4 +1,9 @@ -//#define GLOG_NO_ABBREVIATED_SEVERITIES +/** + * @file peer.cpp + * @copyright Copyright (c) 2020 University of Turku, MIT License + * @author Nicolas Pope + */ + #include <loguru.hpp> #include <ctpl_stl.h> @@ -222,8 +227,6 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals }); bind("__ping__", [this]() { - //auto now = std::chrono::high_resolution_clock::now(); - //return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count(); return ftl::timer::get_time(); }); @@ -300,8 +303,6 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), }); bind("__ping__", [this]() { - //auto now = std::chrono::high_resolution_clock::now(); - //return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count(); return ftl::timer::get_time(); }); } @@ -389,9 +390,6 @@ void Peer::_badClose(bool retry) { closesocket(sock_); #endif sock_ = INVALID_SOCKET; - - //auto i = find(sockets.begin(),sockets.end(),this); - //sockets.erase(i); universe_->_notifyDisconnect(this); status_ = kDisconnected; @@ -428,12 +426,8 @@ void Peer::error(int e) { void Peer::data() { { - //auto start = std::chrono::high_resolution_clock::now(); - //int64_t startts = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count(); UNIQUE_LOCK(recv_mtx_,lk); - //LOG(INFO) << "Pool size: " << ftl::pool.q_size(); - int rc=0; recv_buf_.reserve_buffer(kMaxMessage); @@ -447,15 +441,6 @@ void Peer::data() { auto buf = recv_buf_.buffer(); lk.unlock(); - /*#ifndef WIN32 - int n; - unsigned int m = sizeof(n); - getsockopt(sock_,SOL_SOCKET,SO_RCVBUF,(void *)&n, &m); - - int pending; - ioctl(sock_, SIOCINQ, &pending); - if (pending > 100000) LOG(INFO) << "Buffer usage: " << float(pending) / float(n); - #endif*/ rc = ftl::net::internal::recv(sock_, buf, cap, 0); if (rc >= cap-1) { @@ -474,10 +459,6 @@ void Peer::data() { lk.lock(); recv_buf_.buffer_consumed(rc); - //auto end = std::chrono::high_resolution_clock::now(); - //int64_t endts = std::chrono::time_point_cast<std::chrono::milliseconds>(end).time_since_epoch().count(); - //if (endts - startts > 50) LOG(ERROR) << "Excessive delay"; - if (is_waiting_) { is_waiting_ = false; lk.unlock(); @@ -649,14 +630,6 @@ bool Peer::waitConnection() { return status_ == kConnected; } -/*void Peer::onConnect(const std::function<void(Peer&)> &f) { - if (status_ == kConnected) { - f(*this); - } else { - open_handlers_.push_back(f); - } -}*/ - void Peer::_connected() { status_ = kConnected; @@ -683,8 +656,6 @@ int Peer::_send() { if (sendvec[0].iov_len != 0) { LOG(FATAL) << "CORRUPTION in websocket header buffer"; } - - //LOG(INFO) << "SEND SIZE = " << len; // Pack correct websocket header into buffer int rc = ws_prepare(wsheader_type::BINARY_FRAME, false, len, buf, 20); @@ -706,7 +677,6 @@ int Peer::_send() { } DWORD bytessent; - //c = WSASend(sock_, wsabuf.data(), static_cast<DWORD>(send_size), (LPDWORD)&bytessent, 0, NULL, NULL); c = ftl::net::internal::writev(sock_, wsabuf.data(), static_cast<DWORD>(send_size), (LPDWORD)&bytessent); #else c = ftl::net::internal::writev(sock_, send_buf_.vector(), (int)send_buf_.vector_size()); @@ -725,7 +695,6 @@ int Peer::_send() { } DWORD bytessent; - //c = WSASend(sock_, wsabuf.data(), static_cast<DWORD>(send_size), (LPDWORD)&bytessent, 0, NULL, NULL); c = ftl::net::internal::writev(sock_, wsabuf.data(), static_cast<DWORD>(send_size), (LPDWORD)&bytessent); #else c = ftl::net::internal::writev(sock_, send_buf_.vector(), (int)send_buf_.vector_size()); diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 3dbcd5887..bb2a612c0 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -1,3 +1,9 @@ +/** + * @file universe.cpp + * @copyright Copyright (c) 2020 University of Turku, MIT License + * @author Nicolas Pope + */ + #include <ftl/net/universe.hpp> #include <ftl/timer.hpp> #include <chrono> @@ -38,9 +44,7 @@ struct NetImplDetail { } } -//#define TCP_SEND_BUFFER_SIZE (512*1024) -//#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1) - +// Defaults, should be changed in config #define TCP_SEND_BUFFER_SIZE (512*1024) #define TCP_RECEIVE_BUFFER_SIZE (1024*1024) // Perhaps try 24K? #define WS_SEND_BUFFER_SIZE (512*1024) @@ -71,8 +75,6 @@ Universe::Universe(nlohmann::json &config) : this_peer(ftl::net::this_peer), impl_(new ftl::net::NetImplDetail), phase_(0), - //send_size_(value("tcp_send_buffer",TCP_SEND_BUFFER_SIZE)), - //recv_size_(value("tcp_recv_buffer",TCP_RECEIVE_BUFFER_SIZE)), periodic_time_(value("periodics", 1.0)), reconnect_attempts_(value("reconnect_attempts",50)), thread_(Universe::__start, this) { diff --git a/components/net/cpp/src/ws_internal.cpp b/components/net/cpp/src/ws_internal.cpp index 28318ab5b..064dca24e 100644 --- a/components/net/cpp/src/ws_internal.cpp +++ b/components/net/cpp/src/ws_internal.cpp @@ -1,8 +1,9 @@ -/* - * Copyright 2019 Nicolas Pope. +/** + * @file ws_internal.cpp + * @copyright Copyright (c) 2020 University of Turku, MIT License + * @author Nicolas Pope */ -//#define GLOG_NO_ABBREVIATED_SEVERITIES #include <loguru.hpp> #include <cstring> -- GitLab