Skip to content
Snippets Groups Projects
Commit 6088eac1 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Bug/visionfreeze

parent b8ffe0aa
No related branches found
No related tags found
No related merge requests found
......@@ -54,7 +54,11 @@ void Streamer::send(const Mat &rgb, const Mat &depth) {
cv::imencode(".png", d2, d_buf);
LOG(INFO) << "Depth Size = " << ((float)d_buf.size() / (1024.0f*1024.0f));
net_.publish(uri_, rgb_buf, d_buf);
try {
net_.publish(uri_, rgb_buf, d_buf);
} catch (...) {
LOG(ERROR) << "Exception on net publish to " << uri_;
}
}
......@@ -195,22 +195,6 @@ class Peer {
h(*this, args...);
}
}
/*template <typename... ARGS>
int _send(const std::string &t, ARGS... args);
template <typename... ARGS>
int _send(const array &b, ARGS... args);
template <typename T, typename... ARGS>
int _send(const std::vector<T> &t, ARGS... args);
template <typename... Types, typename... ARGS>
int _send(const std::tuple<Types...> &t, ARGS... args);
template <typename T, typename... ARGS,
ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)>
int _send(const T &t, ARGS... args);*/
private: // Data
Status status_;
......
......@@ -138,6 +138,7 @@ class Universe : public ftl::Configurable {
void _installBindings();
void _installBindings(Peer *);
bool _subscribe(const std::string &res);
void _remove(Peer *);
static void __start(Universe *u);
......
......@@ -51,13 +51,16 @@ namespace ftl {
* Get a pretty string.
*/
std::string to_string() const {
char b[37];
#ifdef WIN32
UuidToString(&guid_, (RPC_CSTR*)b);
RPC_CSTR szUuid = NULL;
if (::UuidToStringA(&guid_, &szUuid) == RPC_S_OK) {
return std::string((char*)szUuid);
}
#else
char b[37];
uuid_unparse(uuid_, b);
#endif
return std::string(b);
#endif
}
/* Allow the UUID to be packed into an RPC message. */
......
......@@ -310,7 +310,7 @@ void Peer::data() {
}
bool Peer::_data() {
// std::unique_lock<std::mutex> lk(recv_mtx_);
std::unique_lock<std::mutex> lk(recv_mtx_);
recv_buf_.reserve_buffer(kMaxMessage);
int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0);
......@@ -556,6 +556,8 @@ int Peer::_send() {
}
Peer::~Peer() {
std::unique_lock<std::mutex> lk1(send_mtx_);
std::unique_lock<std::mutex> lk2(recv_mtx_);
close();
delete disp_;
......
......@@ -6,6 +6,10 @@
#pragma comment(lib, "Rpcrt4.lib")
#endif
#ifndef WIN32
#include <signal.h>
#endif
using std::string;
using std::vector;
using std::thread;
......@@ -122,6 +126,8 @@ int Universe::_setDescriptors() {
FD_SET(s->_socket(), &sfdread_);
}
FD_SET(s->_socket(), &sfderror_);
} else if (s) {
_remove(s);
}
}
......@@ -147,6 +153,20 @@ void Universe::_installBindings() {
});
}
// Note: should be called inside a net lock
void Universe::_remove(Peer *p) {
LOG(INFO) << "Removing disconnected peer: " << p->id().to_string();
for (auto i=peers_.begin(); i!=peers_.end(); i++) {
if ((*i) == p) {
peers_.erase(i); break;
}
}
auto ix = peer_ids_.find(p->id());
if (ix != peer_ids_.end()) peer_ids_.erase(ix);
delete p;
}
Peer *Universe::getPeer(const UUID &id) const {
auto ix = peer_ids_.find(id);
if (ix == peer_ids_.end()) return nullptr;
......@@ -197,6 +217,9 @@ bool Universe::_subscribe(const std::string &res) {
}
void Universe::__start(Universe * u) {
#ifndef WIN32
signal(SIGPIPE,SIG_IGN);
#endif // WIN32
u->_run();
}
......@@ -228,8 +251,10 @@ void Universe::_run() {
//Some kind of error occured, it is usually possible to recover from this.
if (selres < 0) {
std::cout << "SELECT ERROR " << selres << " - " << strerror(errno) << std::endl;
//return false;
switch (errno) {
case 9 : continue; // Bad file descriptor = socket closed
default : std::cout << "Unknown select error: " << strerror(errno) << std::endl;
}
continue;
} else if (selres == 0) {
// Timeout, nothing to do...
......@@ -245,24 +270,17 @@ void Universe::_run() {
int rsize = sizeof(sockaddr_storage);
sockaddr_storage addr;
//int freeclient = freeSocket();
//if (freeclient >= 0) {
// TODO Limit connection rate or allow a pause in accepting
// TODO Send auto reject message under heavy load
//Finally accept this client connection.
int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
//Finally accept this client connection.
int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
if (csock != INVALID_SOCKET) {
auto p = new Peer(csock, &disp_);
peers_.push_back(p);
_installBindings(p);
p->onConnect([this](Peer &p) {
peer_ids_[p.id()] = &p;
});
}
//}
if (csock != INVALID_SOCKET) {
auto p = new Peer(csock, &disp_);
peers_.push_back(p);
_installBindings(p);
p->onConnect([this](Peer &p) {
peer_ids_[p.id()] = &p;
});
}
}
}
}
......@@ -272,25 +290,15 @@ void Universe::_run() {
if (s != NULL && s->isValid()) {
//If message received from this client then deal with it
if (FD_ISSET(s->_socket(), &sfdread_)) {
//s->data();
//std::cout << "QUEUE DATA PROC" << std::endl;
//p.push([](int id, Peer *s) {
// std::cout << "Processing in thread " << std::to_string(id) << std::endl;
s->data();
//}, s);
s->data();
}
if (FD_ISSET(s->_socket(), &sfderror_)) {
s->socketError();
s->close();
}
} else if (s != NULL) {
// Erase it
for (auto i=peers_.begin(); i!=peers_.end(); i++) {
if ((*i) == s) {
LOG(INFO) << "REMOVING SOCKET";
peers_.erase(i); break;
}
}
_remove(s);
}
}
}
......
......@@ -207,7 +207,8 @@ TEST_CASE("Universe::publish()", "") {
Universe a;
Universe b;
a.listen("tcp://localhost:7077");
b.connect("tcp://localhost:7077")->waitConnection();
ftl::net::Peer *p = b.connect("tcp://localhost:7077");
p->waitConnection();
SECTION("no subscribers") {
a.createResource("ftl://test");
......@@ -227,6 +228,23 @@ TEST_CASE("Universe::publish()", "") {
REQUIRE( done == 56 );
}
SECTION("publish to disconnected subscriber") {
int done = 0;
a.createResource("ftl://test2");
REQUIRE( b.subscribe("ftl://test2", [&done](int a) {
done = a;
}) );
sleep_for(milliseconds(50));
p->close();
sleep_for(milliseconds(100));
a.publish("ftl://test2", 56);
sleep_for(milliseconds(50));
REQUIRE( done == 0 );
}
}
/*TEST_CASE("net::listen()", "[net]") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment