Newer
Older
#include <ftl/net/universe.hpp>
#ifdef WIN32
#include <Ws2tcpip.h>
#pragma comment(lib, "Rpcrt4.lib")
#endif
using std::string;
using std::vector;
using std::thread;
using ftl::net::Peer;
using ftl::net::Listener;
using ftl::net::Universe;
using ftl::UUID;
using std::optional;
#define TCP_SEND_BUFFER_SIZE (512*1024)
#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1)
callback_t ftl::net::Universe::cbid__ = 0;
Universe::Universe() :
Configurable(),
active_(true),
this_peer(ftl::net::this_peer),
phase_(0),
send_size_(TCP_SEND_BUFFER_SIZE),
recv_size_(TCP_RECEIVE_BUFFER_SIZE),
periodic_time_(1.0),
reconnect_attempts_(50),
thread_(Universe::__start, this) {
LOG(WARNING) << "Deprecated Universe constructor";
Configurable(config),
active_(true),
this_peer(ftl::net::this_peer),
phase_(0),
send_size_(value("tcp_send_buffer",TCP_SEND_BUFFER_SIZE)),
recv_size_(value("tcp_recv_buffer",TCP_RECEIVE_BUFFER_SIZE)),
periodic_time_(value("periodics", 1.0)),
reconnect_attempts_(value("reconnect_attempts",50)),
thread_(Universe::__start, this) {
// Add an idle timer job to garbage collect peer objects
// Note: Important to be a timer job to ensure no other timer jobs are
// using the object.
ftl::timer::add(ftl::timer::kTimerIdle10, [this](int64_t ts) {
if (garbage_.size() > 0) {
UNIQUE_LOCK(net_mutex_,lk);
if (ftl::pool.n_idle() == ftl::pool.size()) {
if (garbage_.size() > 0) LOG(INFO) << "Garbage collection";
while (garbage_.size() > 0) {
delete garbage_.front();
garbage_.pop_front();
}
}
}
return true;
});
}
Universe::~Universe() {
shutdown();
}
void Universe::start() {
/*cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(1, &cpus);
pthread_setaffinity_np(thread_.native_handle(), sizeof(cpus), &cpus);*/
auto l = get<json_t>("listen");
if (l && (*l).is_array()) {
for (auto &ll : *l) {
listen(ll);
} else if (l && (*l).is_string()) {
listen((*l).get<string>());
auto p = get<json_t>("peers");
if (p && (*p).is_array()) {
for (auto &pp : *p) {
connect(pp);
}
void Universe::shutdown() {
if (!active_) return;
LOG(INFO) << "Cleanup Network ...";
active_ = false;
thread_.join();
for (auto s : peers_) {
}
peers_.clear();
for (auto l : listeners_) {
l->close();
}
listeners_.clear();
}
bool Universe::listen(const string &addr) {
auto l = new Listener(addr.c_str());
if (!l) return false;
listeners_.push_back(l);
return l->isListening();
}
Peer *Universe::connect(const string &addr) {
auto p = new Peer(addr.c_str(), this, &disp_);
if (p->status() != Peer::kInvalid) {
peers_.push_back(p);
}
_installBindings(p);
int Universe::waitConnections() {
int count = 0;
for (auto p : peers_) {
if (p->waitConnection()) count++;
}
return count;
}
int Universe::_setDescriptors() {
//Reset all file descriptors
FD_ZERO(&sfdread_);
FD_ZERO(&sfderror_);
//Set file descriptor for the listening sockets.
for (auto l : listeners_) {
if (l != nullptr && l->isListening()) {
FD_SET(l->_socket(), &sfdread_);
FD_SET(l->_socket(), &sfderror_);
if (l->_socket() > n) n = l->_socket();
}
}
//Set the file descriptors for each client
for (auto s : peers_) {
if (s != nullptr && s->isValid()) {
if (s->_socket() > n) {
n = s->_socket();
}
FD_SET(s->_socket(), &sfderror_);
}
}
return n;
}
void Universe::_installBindings(Peer *p) {
}
void Universe::_installBindings() {
void Universe::_cleanupPeers() {
auto i = peers_.begin();
while (i != peers_.end()) {
if (!(*i)->isValid()) {
Peer *p = *i;
LOG(INFO) << "Removing disconnected peer: " << p->id().to_string();
auto ix = peer_ids_.find(p->id());
if (ix != peer_ids_.end()) peer_ids_.erase(ix);
i = peers_.erase(i);
if (p->status() == ftl::net::Peer::kReconnecting) {
reconnects_.push_back({reconnect_attempts_, 1.0f, p});
//delete p;
garbage_.push_back(p);
Peer *Universe::getPeer(const UUID &id) const {
auto ix = peer_ids_.find(id);
if (ix == peer_ids_.end()) return nullptr;
else return ix->second;
}
void Universe::_periodic() {
auto i = reconnects_.begin();
while (i != reconnects_.end()) {
if ((*i).peer->reconnect()) {
peers_.push_back((*i).peer);
i = reconnects_.erase(i);
} else if ((*i).tries > 0) {
(*i).tries--;
i++;
} else {
//delete (*i).peer;
garbage_.push_back((*i).peer);
i = reconnects_.erase(i);
LOG(WARNING) << "Reconnection to peer failed";
}
}
void Universe::__start(Universe * u) {
#ifndef WIN32
signal(SIGPIPE,SIG_IGN);
#endif // WIN32
u->_run();
}
void Universe::_run() {
timeval block;
#ifdef WIN32
WSAData wsaData;
//If Win32 then load winsock
if (WSAStartup(MAKEWORD(1, 1), &wsaData) != 0) {
LOG(ERROR) << "Could not initiate sockets";
return;
}
#endif
auto start = std::chrono::high_resolution_clock::now();
while (active_) {
int n = _setDescriptors();
int selres = 1;
// Do periodics
auto now = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = now - start;
if (elapsed.count() >= periodic_time_) {
// It is an error to use "select" with no sockets ... so just sleep
if (n == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(300));
continue;
}
//Wait for a network event or timeout in 3 seconds
selres = select(n+1, &sfdread_, 0, &sfderror_, &block);
// NOTE Nick: Is it possible that not all the recvs have been called before I
// again reach a select call!? What are the consequences of this? A double recv attempt?
//Some kind of error occured, it is usually possible to recover from this.
if (selres < 0) {
switch (errno) {
case 9 : continue; // Bad file descriptor = socket closed
case 4 : continue; // Interrupted system call ... no problem
default : LOG(WARNING) << "Unhandled select error: " << strerror(errno) << "(" << errno << ")";
continue;
} else if (selres == 0) {
// Timeout, nothing to do...
continue;
}
// TODO:(Nick) Shared lock unless connection is made
UNIQUE_LOCK(net_mutex_,lk);
//If connection request is waiting
for (auto l : listeners_) {
if (l && l->isListening()) {
if (FD_ISSET(l->_socket(), &sfdread_)) {
int rsize = sizeof(sockaddr_storage);
sockaddr_storage addr;
//Finally accept this client connection.
SOCKET csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
if (csock != INVALID_SOCKET) {
auto p = new Peer(csock, this, &disp_);
peers_.push_back(p);
{
SHARED_LOCK(net_mutex_, lk);
// Also check each clients socket to see if any messages or errors are waiting
for (size_t p=0; p<peers_.size(); ++p) {
auto s = peers_[(p+phase_)%peers_.size()];
if (s != NULL && s->isValid()) {
// Note: It is possible that the socket becomes invalid after check but before
// looking at the FD sets, therefore cache the original socket
SOCKET sock = s->_socket();
if (sock == INVALID_SOCKET) continue;
if (FD_ISSET(sock, &sfderror_)) {
s->socketError();
s->close();
continue; // No point in reading data...
}
//If message received from this client then deal with it
if (FD_ISSET(sock, &sfdread_)) {
s->data();
}
callback_t Universe::onConnect(const std::function<void(ftl::net::Peer*)> &cb) {
callback_t id = cbid__++;
on_connect_.push_back({id, cb});
return id;
callback_t Universe::onDisconnect(const std::function<void(ftl::net::Peer*)> &cb) {
callback_t id = cbid__++;
on_disconnect_.push_back({id, cb});
return id;
callback_t Universe::onError(const std::function<void(ftl::net::Peer*, const ftl::net::Error &)> &cb) {
callback_t id = cbid__++;
on_error_.push_back({id, cb});
return id;
void Universe::removeCallback(callback_t cbid) {
{
auto i = on_connect_.begin();
while (i != on_connect_.end()) {
i = on_connect_.erase(i);
} else {
i++;
}
}
}
{
auto i = on_disconnect_.begin();
while (i != on_disconnect_.end()) {
i = on_disconnect_.erase(i);
} else {
i++;
}
}
}
{
auto i = on_error_.begin();
while (i != on_error_.end()) {
i = on_error_.erase(i);
} else {
i++;
}
}
}
}
void Universe::_notifyConnect(Peer *p) {
for (auto &i : on_connect_) {
try {
i.h(p);
} catch(...) {
LOG(ERROR) << "Exception inside OnConnect hander: " << i.id;
void Universe::_notifyDisconnect(Peer *p) {
// In all cases, should already be locked outside this function call
//unique_lock<mutex> lk(net_mutex_);
for (auto &i : on_disconnect_) {
try {
i.h(p);
} catch(...) {
LOG(ERROR) << "Exception inside OnDisconnect hander: " << i.id;
}
}
}
void Universe::_notifyError(Peer *p, const ftl::net::Error &e) {