Skip to content
Snippets Groups Projects
Commit 6d362abe authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Resolves #55 and manages recursive log

parent ca6213d3
No related branches found
No related tags found
1 merge request!23Feature/gui implements #53
Pipeline #11222 passed
......@@ -105,6 +105,7 @@ static void run(ftl::Configurable *root) {
LOG(INFO) << "Stopping...";
stream->stop();
net->shutdown();
delete stream;
delete display;
......@@ -126,6 +127,7 @@ int main(int argc, char **argv) {
// ftl::middlebury::test(config);
//}
delete root;
return ftl::exit_code;
}
......@@ -19,7 +19,8 @@ class Slave {
private:
std::vector<ftl::UUID> log_peers_;
ftl::net::Universe *net_;
std::mutex mutex_;
std::recursive_mutex mutex_;
bool in_log_;
};
}
......
......@@ -6,13 +6,14 @@ using ftl::ctrl::Slave;
using std::string;
using std::mutex;
using std::unique_lock;
using std::recursive_mutex;
static void netLog(void* user_data, const loguru::Message& message) {
Slave *slave = static_cast<Slave*>(user_data);
slave->sendLog(message);
}
Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net) {
Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false) {
net->bind("restart", []() {
LOG(WARNING) << "Remote restart...";
//exit(1);
......@@ -43,7 +44,7 @@ Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net) {
});
net->bind("log_subscribe", [this](const ftl::UUID &peer) {
unique_lock<mutex> lk(mutex_);
unique_lock<recursive_mutex> lk(mutex_);
log_peers_.push_back(peer);
});
......@@ -55,10 +56,19 @@ Slave::~Slave() {
}
void Slave::sendLog(const loguru::Message& message) {
unique_lock<mutex> lk(mutex_);
unique_lock<recursive_mutex> lk(mutex_);
if (in_log_) return;
in_log_ = true;
for (auto &p : log_peers_) {
auto peer = net_->getPeer(p);
if (!peer || !peer->isConnected()) continue;
std::cout << "sending log to master..." << std::endl;
if (!net_->send(p, "log", message.verbosity, message.preamble, message.message)) {
// TODO(Nick) Remove peer from loggers list...
}
}
in_log_ = false;
}
......@@ -53,6 +53,13 @@ class Universe : public ftl::Configurable {
* @param addr URI giving protocol, interface and port
*/
bool listen(const std::string &addr);
/**
* Essential to call this before destroying anything that registered
* callbacks or binds for RPC. It will terminate all connections and
* stop any network activity but without deleting the net object.
*/
void shutdown();
/**
* Create a new peer connection.
......
......@@ -165,6 +165,7 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) {
bind("__disconnect__", [this]() {
_badClose(false);
LOG(INFO) << "Peer elected to disconnect: " << id().to_string();
});
send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
......@@ -257,6 +258,7 @@ void Peer::close(bool retry) {
send("__disconnect__");
_badClose(retry);
LOG(INFO) << "Deliberate disconnect of peer.";
}
}
......@@ -269,35 +271,16 @@ void Peer::_badClose(bool retry) {
#endif
sock_ = INVALID_SOCKET;
status_ = kDisconnected;
// Attempt auto reconnect?
if (retry) LOG(INFO) << "Should attempt reconnect...";
//auto i = find(sockets.begin(),sockets.end(),this);
//sockets.erase(i);
_trigger(close_handlers_);
}
}
/*void Peer::setProtocol(Protocol *p) {
if (p != NULL) {
if (proto_ == p) return;
if (proto_ && proto_->id() == p->id()) return;
if (remote_proto_ != "") {
Handshake hs1;
hs1.magic = ftl::net::MAGIC;
//hs1.name_size = 0;
hs1.proto_size = p->id().size();
send(FTL_PROTOCOL_HS1, hs1, p->id());
LOG(INFO) << "Handshake initiated with " << uri_;
}
proto_ = p;
} else {
// Attempt auto reconnect?
if (retry) LOG(INFO) << "Should attempt reconnect...";
}
}*/
}
void Peer::socketError() {
int err;
......@@ -307,6 +290,11 @@ void Peer::socketError() {
uint32_t optlen = sizeof(err);
#endif
getsockopt(sock_, SOL_SOCKET, SO_ERROR, (char*)&err, &optlen);
// Must close before log since log may try to send over net causing
// more socket errors...
_badClose();
LOG(ERROR) << "Socket: " << uri_ << " - error " << err;
}
......@@ -415,6 +403,8 @@ void Peer::_connected() {
}
int Peer::_send() {
if (sock_ == INVALID_SOCKET) return -1;
// Are we using a websocket?
if (scheme_ == ftl::URI::SCHEME_WS) {
// Create a websocket header as well.
......@@ -453,7 +443,7 @@ int Peer::_send() {
// We are blocking, so -1 should mean actual error
if (c == -1) {
socketError();
_badClose();
//_badClose();
}
return c;
......@@ -463,6 +453,7 @@ Peer::~Peer() {
std::unique_lock<std::mutex> lk1(send_mtx_);
std::unique_lock<std::mutex> lk2(recv_mtx_);
_badClose(false);
LOG(INFO) << "Deleting peer object";
delete disp_;
}
......
......@@ -51,6 +51,11 @@ Universe::Universe(nlohmann::json &config) :
}
Universe::~Universe() {
shutdown();
}
void Universe::shutdown() {
if (!active_) return;
LOG(INFO) << "Cleanup Network ...";
active_ = false;
......
......@@ -89,7 +89,7 @@ void Streamer::add(RGBDSource *src) {
unique_lock<shared_mutex> ulk(mutex_);
if (sources_.find(src->getURI()) != sources_.end()) return;
StreamSource *s = new StreamSource; // = sources_.emplace(std::make_pair<std::string,StreamSource>(src->getURI(),{}));
StreamSource *s = new StreamSource;
s->src = src;
s->state = 0;
sources_[src->getURI()] = s;
......@@ -140,6 +140,8 @@ void Streamer::poll() {
if (elapsed.count() >= wait) {
LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
} else {
// Otherwise, wait until next frame should start.
// CHECK(Nick) Is this accurate enough?
sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
}
}
......@@ -148,13 +150,13 @@ void Streamer::run(bool block) {
active_ = true;
if (block) {
while (active_) {
while (ftl::running && active_) {
poll();
}
} else {
// Create thread job for frame ticking
pool_.push([this](int id) {
while (active_) {
while (ftl::running && active_) {
poll();
}
});
......@@ -177,6 +179,7 @@ void Streamer::_schedule() {
string uri = s.first;
shared_lock<shared_mutex> slk(s.second->mutex);
// CHECK Should never be true now
if (s.second->state != 0) {
if (!late_) LOG(WARNING) << "Stream not ready to schedule on time: " << uri;
late_ = true;
......@@ -185,12 +188,14 @@ void Streamer::_schedule() {
late_ = false;
}
// No point in doing work if no clients
if (s.second->clients[0].size() == 0) {
//LOG(ERROR) << "Stream has no clients: " << uri;
continue;
}
slk.unlock();
// There will be two jobs for this source...
unique_lock<mutex> lk(job_mtx);
jobs += 2;
lk.unlock();
......@@ -211,6 +216,7 @@ void Streamer::_schedule() {
_swap(*src);
lk.unlock();
// Mark job as finished
unique_lock<mutex> ulk(job_mtx);
jobs--;
ulk.unlock();
......@@ -261,6 +267,7 @@ void Streamer::_schedule() {
_swap(*src);
lk.unlock();
// Mark job as finished
unique_lock<mutex> ulk(job_mtx);
jobs--;
ulk.unlock();
......@@ -268,7 +275,7 @@ void Streamer::_schedule() {
});
}
// TODO Wait until all jobs completed...
// Wait for all jobs to complete before finishing frame
unique_lock<mutex> lk(job_mtx);
job_cv.wait(lk, [&jobs]{ return jobs == 0; });
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment