Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <ftl/net/universe.hpp>
using std::string;
using std::vector;
using std::thread;
using ftl::net::Peer;
using ftl::net::Listener;
using ftl::net::Universe;
Universe::Universe(const string &base) :
active_(true), base_(base), thread_(Universe::__start, this) {
}
Universe::~Universe() {
active_ = false;
thread_.join();
for (auto s : peers_) {
s->close();
}
peers_.clear();
for (auto l : listeners_) {
l->close();
}
listeners_.clear();
}
bool Universe::listen(const string &addr) {
auto l = new Listener(addr.c_str());
if (!l) return false;
listeners_.push_back(l);
return l->isListening();
}
bool Universe::connect(const string &addr) {
auto p = new Peer(addr.c_str(), &disp_);
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
if (!p) return false;
if (p->status() != Peer::kInvalid) {
peers_.push_back(p);
}
_installBindings(p);
return p->status() == Peer::kConnecting;
}
int Universe::_setDescriptors() {
//Reset all file descriptors
FD_ZERO(&sfdread_);
FD_ZERO(&sfderror_);
int n = 0;
//Set file descriptor for the listening sockets.
for (auto l : listeners_) {
if (l != nullptr && l->isListening()) {
FD_SET(l->_socket(), &sfdread_);
FD_SET(l->_socket(), &sfderror_);
if (l->_socket() > n) n = l->_socket();
}
}
//Set the file descriptors for each client
for (auto s : peers_) {
if (s != nullptr && s->isValid()) {
if (s->_socket() > n) {
n = s->_socket();
}
FD_SET(s->_socket(), &sfdread_);
FD_SET(s->_socket(), &sfderror_);
}
}
return n;
}
void Universe::_installBindings(Peer *p) {
p->bind("__subscribe__", [this](const string &uri) {
// Add this peer to subscription list for uri resource
});
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
}
void Universe::__start(Universe * u) {
u->_run();
}
void Universe::_run() {
timeval block;
while (active_) {
int n = _setDescriptors();
int selres = 1;
//Wait for a network event or timeout in 3 seconds
block.tv_sec = 3;
block.tv_usec = 0;
selres = select(n+1, &sfdread_, 0, &sfderror_, &block);
//Some kind of error occured, it is usually possible to recover from this.
if (selres < 0) {
std::cout << "SELECT ERROR " << selres << std::endl;
//return false;
continue;
} else if (selres == 0) {
// Timeout, nothing to do...
continue;
}
//If connection request is waiting
for (auto l : listeners_) {
if (l && l->isListening()) {
if (FD_ISSET(l->_socket(), &sfdread_)) {
int rsize = sizeof(sockaddr_storage);
sockaddr_storage addr;
//int freeclient = freeSocket();
//if (freeclient >= 0) {
// TODO Limit connection rate or allow a pause in accepting
// TODO Send auto reject message under heavy load
//Finally accept this client connection.
int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
if (csock != INVALID_SOCKET) {
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
peers_.push_back(p);
_installBindings(p);
}
//}
}
}
}
//Also check each clients socket to see if any messages or errors are waiting
for (auto s : peers_) {
if (s != NULL && s->isValid()) {
//If message received from this client then deal with it
if (FD_ISSET(s->_socket(), &sfdread_)) {
s->data();
}
if (FD_ISSET(s->_socket(), &sfderror_)) {
s->socketError();
}
} else if (s != NULL) {
// Erase it
for (auto i=peers_.begin(); i!=peers_.end(); i++) {
if ((*i) == s) {
LOG(INFO) << "REMOVING SOCKET";
peers_.erase(i); break;
}
}
}
}
}
}