Newer
Older
#include <ftl/net/universe.hpp>
#ifdef WIN32
#include <Ws2tcpip.h>
#pragma comment(lib, "Rpcrt4.lib")
#endif
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
using std::string;
using std::vector;
using std::thread;
using ftl::net::Peer;
using ftl::net::Listener;
using ftl::net::Universe;
Universe::Universe(const string &base) :
active_(true), base_(base), thread_(Universe::__start, this) {
}
Universe::~Universe() {
active_ = false;
thread_.join();
for (auto s : peers_) {
s->close();
}
peers_.clear();
for (auto l : listeners_) {
l->close();
}
listeners_.clear();
}
bool Universe::listen(const string &addr) {
auto l = new Listener(addr.c_str());
if (!l) return false;
listeners_.push_back(l);
return l->isListening();
}
bool Universe::connect(const string &addr) {
auto p = new Peer(addr.c_str(), &disp_);
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
if (!p) return false;
if (p->status() != Peer::kInvalid) {
peers_.push_back(p);
}
_installBindings(p);
return p->status() == Peer::kConnecting;
}
int Universe::_setDescriptors() {
//Reset all file descriptors
FD_ZERO(&sfdread_);
FD_ZERO(&sfderror_);
int n = 0;
//Set file descriptor for the listening sockets.
for (auto l : listeners_) {
if (l != nullptr && l->isListening()) {
FD_SET(l->_socket(), &sfdread_);
FD_SET(l->_socket(), &sfderror_);
if (l->_socket() > n) n = l->_socket();
}
}
//Set the file descriptors for each client
for (auto s : peers_) {
if (s != nullptr && s->isValid()) {
if (s->_socket() > n) {
n = s->_socket();
}
if (s->isWaiting()) {
FD_SET(s->_socket(), &sfdread_);
}
FD_SET(s->_socket(), &sfderror_);
}
}
return n;
}
void Universe::_installBindings(Peer *p) {
p->bind("__subscribe__", [this](const string &uri) {
// Add this peer to subscription list for uri resource
});
}
void Universe::__start(Universe * u) {
u->_run();
}
void Universe::_run() {
timeval block;
while (active_) {
int n = _setDescriptors();
int selres = 1;
//Wait for a network event or timeout in 3 seconds
block.tv_sec = 0;
block.tv_usec = 10000;
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
selres = select(n+1, &sfdread_, 0, &sfderror_, &block);
//Some kind of error occured, it is usually possible to recover from this.
if (selres < 0) {
std::cout << "SELECT ERROR " << selres << std::endl;
//return false;
continue;
} else if (selres == 0) {
// Timeout, nothing to do...
continue;
}
//If connection request is waiting
for (auto l : listeners_) {
if (l && l->isListening()) {
if (FD_ISSET(l->_socket(), &sfdread_)) {
int rsize = sizeof(sockaddr_storage);
sockaddr_storage addr;
//int freeclient = freeSocket();
//if (freeclient >= 0) {
// TODO Limit connection rate or allow a pause in accepting
// TODO Send auto reject message under heavy load
//Finally accept this client connection.
int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
if (csock != INVALID_SOCKET) {
peers_.push_back(p);
_installBindings(p);
}
//}
}
}
}
//Also check each clients socket to see if any messages or errors are waiting
for (auto s : peers_) {
if (s != NULL && s->isValid()) {
//If message received from this client then deal with it
if (FD_ISSET(s->_socket(), &sfdread_)) {
//s->data();
//std::cout << "QUEUE DATA PROC" << std::endl;
//p.push([](int id, Peer *s) {
// std::cout << "Processing in thread " << std::to_string(id) << std::endl;
s->data();
//}, s);
}
if (FD_ISSET(s->_socket(), &sfderror_)) {
s->socketError();
}
} else if (s != NULL) {
// Erase it
for (auto i=peers_.begin(); i!=peers_.end(); i++) {
if ((*i) == s) {
LOG(INFO) << "REMOVING SOCKET";
peers_.erase(i); break;
}
}
}
}
}
}