Skip to content
Snippets Groups Projects
Commit c2344689 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'bug/visionfreeze' into 'master'

Bug/visionfreeze

Closes #28

See merge request nicolas.pope/ftl!10
parents b8ffe0aa 6088eac1
No related branches found
No related tags found
1 merge request!10Bug/visionfreeze
Pipeline #11022 passed
...@@ -54,7 +54,11 @@ void Streamer::send(const Mat &rgb, const Mat &depth) { ...@@ -54,7 +54,11 @@ void Streamer::send(const Mat &rgb, const Mat &depth) {
cv::imencode(".png", d2, d_buf); cv::imencode(".png", d2, d_buf);
LOG(INFO) << "Depth Size = " << ((float)d_buf.size() / (1024.0f*1024.0f)); LOG(INFO) << "Depth Size = " << ((float)d_buf.size() / (1024.0f*1024.0f));
net_.publish(uri_, rgb_buf, d_buf); try {
net_.publish(uri_, rgb_buf, d_buf);
} catch (...) {
LOG(ERROR) << "Exception on net publish to " << uri_;
}
} }
...@@ -195,22 +195,6 @@ class Peer { ...@@ -195,22 +195,6 @@ class Peer {
h(*this, args...); h(*this, args...);
} }
} }
/*template <typename... ARGS>
int _send(const std::string &t, ARGS... args);
template <typename... ARGS>
int _send(const array &b, ARGS... args);
template <typename T, typename... ARGS>
int _send(const std::vector<T> &t, ARGS... args);
template <typename... Types, typename... ARGS>
int _send(const std::tuple<Types...> &t, ARGS... args);
template <typename T, typename... ARGS,
ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)>
int _send(const T &t, ARGS... args);*/
private: // Data private: // Data
Status status_; Status status_;
......
...@@ -138,6 +138,7 @@ class Universe : public ftl::Configurable { ...@@ -138,6 +138,7 @@ class Universe : public ftl::Configurable {
void _installBindings(); void _installBindings();
void _installBindings(Peer *); void _installBindings(Peer *);
bool _subscribe(const std::string &res); bool _subscribe(const std::string &res);
void _remove(Peer *);
static void __start(Universe *u); static void __start(Universe *u);
......
...@@ -51,13 +51,16 @@ namespace ftl { ...@@ -51,13 +51,16 @@ namespace ftl {
* Get a pretty string. * Get a pretty string.
*/ */
std::string to_string() const { std::string to_string() const {
char b[37];
#ifdef WIN32 #ifdef WIN32
UuidToString(&guid_, (RPC_CSTR*)b); RPC_CSTR szUuid = NULL;
if (::UuidToStringA(&guid_, &szUuid) == RPC_S_OK) {
return std::string((char*)szUuid);
}
#else #else
char b[37];
uuid_unparse(uuid_, b); uuid_unparse(uuid_, b);
#endif
return std::string(b); return std::string(b);
#endif
} }
/* Allow the UUID to be packed into an RPC message. */ /* Allow the UUID to be packed into an RPC message. */
......
...@@ -310,7 +310,7 @@ void Peer::data() { ...@@ -310,7 +310,7 @@ void Peer::data() {
} }
bool Peer::_data() { bool Peer::_data() {
// std::unique_lock<std::mutex> lk(recv_mtx_); std::unique_lock<std::mutex> lk(recv_mtx_);
recv_buf_.reserve_buffer(kMaxMessage); recv_buf_.reserve_buffer(kMaxMessage);
int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0);
...@@ -556,6 +556,8 @@ int Peer::_send() { ...@@ -556,6 +556,8 @@ int Peer::_send() {
} }
Peer::~Peer() { Peer::~Peer() {
std::unique_lock<std::mutex> lk1(send_mtx_);
std::unique_lock<std::mutex> lk2(recv_mtx_);
close(); close();
delete disp_; delete disp_;
......
...@@ -6,6 +6,10 @@ ...@@ -6,6 +6,10 @@
#pragma comment(lib, "Rpcrt4.lib") #pragma comment(lib, "Rpcrt4.lib")
#endif #endif
#ifndef WIN32
#include <signal.h>
#endif
using std::string; using std::string;
using std::vector; using std::vector;
using std::thread; using std::thread;
...@@ -122,6 +126,8 @@ int Universe::_setDescriptors() { ...@@ -122,6 +126,8 @@ int Universe::_setDescriptors() {
FD_SET(s->_socket(), &sfdread_); FD_SET(s->_socket(), &sfdread_);
} }
FD_SET(s->_socket(), &sfderror_); FD_SET(s->_socket(), &sfderror_);
} else if (s) {
_remove(s);
} }
} }
...@@ -147,6 +153,20 @@ void Universe::_installBindings() { ...@@ -147,6 +153,20 @@ void Universe::_installBindings() {
}); });
} }
// Note: should be called inside a net lock
void Universe::_remove(Peer *p) {
LOG(INFO) << "Removing disconnected peer: " << p->id().to_string();
for (auto i=peers_.begin(); i!=peers_.end(); i++) {
if ((*i) == p) {
peers_.erase(i); break;
}
}
auto ix = peer_ids_.find(p->id());
if (ix != peer_ids_.end()) peer_ids_.erase(ix);
delete p;
}
Peer *Universe::getPeer(const UUID &id) const { Peer *Universe::getPeer(const UUID &id) const {
auto ix = peer_ids_.find(id); auto ix = peer_ids_.find(id);
if (ix == peer_ids_.end()) return nullptr; if (ix == peer_ids_.end()) return nullptr;
...@@ -197,6 +217,9 @@ bool Universe::_subscribe(const std::string &res) { ...@@ -197,6 +217,9 @@ bool Universe::_subscribe(const std::string &res) {
} }
void Universe::__start(Universe * u) { void Universe::__start(Universe * u) {
#ifndef WIN32
signal(SIGPIPE,SIG_IGN);
#endif // WIN32
u->_run(); u->_run();
} }
...@@ -228,8 +251,10 @@ void Universe::_run() { ...@@ -228,8 +251,10 @@ void Universe::_run() {
//Some kind of error occured, it is usually possible to recover from this. //Some kind of error occured, it is usually possible to recover from this.
if (selres < 0) { if (selres < 0) {
std::cout << "SELECT ERROR " << selres << " - " << strerror(errno) << std::endl; switch (errno) {
//return false; case 9 : continue; // Bad file descriptor = socket closed
default : std::cout << "Unknown select error: " << strerror(errno) << std::endl;
}
continue; continue;
} else if (selres == 0) { } else if (selres == 0) {
// Timeout, nothing to do... // Timeout, nothing to do...
...@@ -245,24 +270,17 @@ void Universe::_run() { ...@@ -245,24 +270,17 @@ void Universe::_run() {
int rsize = sizeof(sockaddr_storage); int rsize = sizeof(sockaddr_storage);
sockaddr_storage addr; sockaddr_storage addr;
//int freeclient = freeSocket(); //Finally accept this client connection.
int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
//if (freeclient >= 0) {
// TODO Limit connection rate or allow a pause in accepting
// TODO Send auto reject message under heavy load
//Finally accept this client connection.
int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
if (csock != INVALID_SOCKET) { if (csock != INVALID_SOCKET) {
auto p = new Peer(csock, &disp_); auto p = new Peer(csock, &disp_);
peers_.push_back(p); peers_.push_back(p);
_installBindings(p); _installBindings(p);
p->onConnect([this](Peer &p) { p->onConnect([this](Peer &p) {
peer_ids_[p.id()] = &p; peer_ids_[p.id()] = &p;
}); });
} }
//}
} }
} }
} }
...@@ -272,25 +290,15 @@ void Universe::_run() { ...@@ -272,25 +290,15 @@ void Universe::_run() {
if (s != NULL && s->isValid()) { if (s != NULL && s->isValid()) {
//If message received from this client then deal with it //If message received from this client then deal with it
if (FD_ISSET(s->_socket(), &sfdread_)) { if (FD_ISSET(s->_socket(), &sfdread_)) {
//s->data(); s->data();
//std::cout << "QUEUE DATA PROC" << std::endl;
//p.push([](int id, Peer *s) {
// std::cout << "Processing in thread " << std::to_string(id) << std::endl;
s->data();
//}, s);
} }
if (FD_ISSET(s->_socket(), &sfderror_)) { if (FD_ISSET(s->_socket(), &sfderror_)) {
s->socketError(); s->socketError();
s->close();
} }
} else if (s != NULL) { } else if (s != NULL) {
// Erase it // Erase it
_remove(s);
for (auto i=peers_.begin(); i!=peers_.end(); i++) {
if ((*i) == s) {
LOG(INFO) << "REMOVING SOCKET";
peers_.erase(i); break;
}
}
} }
} }
} }
......
...@@ -207,7 +207,8 @@ TEST_CASE("Universe::publish()", "") { ...@@ -207,7 +207,8 @@ TEST_CASE("Universe::publish()", "") {
Universe a; Universe a;
Universe b; Universe b;
a.listen("tcp://localhost:7077"); a.listen("tcp://localhost:7077");
b.connect("tcp://localhost:7077")->waitConnection(); ftl::net::Peer *p = b.connect("tcp://localhost:7077");
p->waitConnection();
SECTION("no subscribers") { SECTION("no subscribers") {
a.createResource("ftl://test"); a.createResource("ftl://test");
...@@ -227,6 +228,23 @@ TEST_CASE("Universe::publish()", "") { ...@@ -227,6 +228,23 @@ TEST_CASE("Universe::publish()", "") {
REQUIRE( done == 56 ); REQUIRE( done == 56 );
} }
SECTION("publish to disconnected subscriber") {
int done = 0;
a.createResource("ftl://test2");
REQUIRE( b.subscribe("ftl://test2", [&done](int a) {
done = a;
}) );
sleep_for(milliseconds(50));
p->close();
sleep_for(milliseconds(100));
a.publish("ftl://test2", 56);
sleep_for(milliseconds(50));
REQUIRE( done == 0 );
}
} }
/*TEST_CASE("net::listen()", "[net]") { /*TEST_CASE("net::listen()", "[net]") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment