diff --git a/applications/gui/src/src_window.cpp b/applications/gui/src/src_window.cpp index 98b760676a06099b7994ccbb0dc2db5d67b48783..62fbb7265f84bf03ffbe536aa30a2d9d8131828a 100644 --- a/applications/gui/src/src_window.cpp +++ b/applications/gui/src/src_window.cpp @@ -187,7 +187,7 @@ SourceWindow::SourceWindow(nanogui::Widget *parent, ftl::ctrl::Master *ctrl) }); #ifdef HAVE_LIBARCHIVE - auto snapshot = new Button(tools, "Snapshot"); + auto snapshot = new Button(this, "Snapshot"); snapshot->setCallback([this] { try { char timestamp[18]; diff --git a/components/common/cpp/include/ftl/configuration.hpp b/components/common/cpp/include/ftl/configuration.hpp index 9a01b56b3b72704a5b56be0635a5618e5c02129e..9b832753eac89b737562bcee24b3b58eba8b8bd4 100644 --- a/components/common/cpp/include/ftl/configuration.hpp +++ b/components/common/cpp/include/ftl/configuration.hpp @@ -181,14 +181,15 @@ std::vector<T*> ftl::config::createArray(ftl::Configurable *parent, const std::s std::vector<T*> result; if (base.is_array()) { + int i=0; for (auto &entity : base) { if (entity.is_object()) { if (!entity["$id"].is_string()) { std::string id_str = *parent->get<std::string>("$id"); if (id_str.find('#') != std::string::npos) { - entity["$id"] = id_str + std::string("/") + name; + entity["$id"] = id_str + std::string("/") + name + std::string("/") + std::to_string(i); } else { - entity["$id"] = id_str + std::string("#") + name; + entity["$id"] = id_str + std::string("#") + name + std::string("/") + std::to_string(i); } } @@ -197,9 +198,9 @@ std::vector<T*> ftl::config::createArray(ftl::Configurable *parent, const std::s // Must create the object from scratch... std::string id_str = *parent->get<std::string>("$id"); if (id_str.find('#') != std::string::npos) { - id_str = id_str + std::string("/") + name; + id_str = id_str + std::string("/") + name + std::string("/") + std::to_string(i); } else { - id_str = id_str + std::string("#") + name; + id_str = id_str + std::string("#") + name + std::string("/") + std::to_string(i); } parent->getConfig()[name] = { // cppcheck-suppress constStatement @@ -209,6 +210,7 @@ std::vector<T*> ftl::config::createArray(ftl::Configurable *parent, const std::s nlohmann::json &entity2 = parent->getConfig()[name]; result.push_back(create<T>(entity2, args...)); } + i++; } } else { LOG(WARNING) << "Expected an array for '" << name << "' in " << parent->getID(); diff --git a/components/control/cpp/src/master.cpp b/components/control/cpp/src/master.cpp index 56c8948d6ccaa068bf557622ffd3e6c1b3fc80da..bd4cadfe66fb4330eeec854e8a274f805e238666 100644 --- a/components/control/cpp/src/master.cpp +++ b/components/control/cpp/src/master.cpp @@ -28,9 +28,9 @@ Master::Master(Configurable *root, Universe *net) net->broadcast("log_subscribe", net->id()); - //net->onConnect([this](ftl::net::Peer*) { - // net_->broadcast("log_subscribe", net_->id()); - //}); + net->onConnect([this](ftl::net::Peer*) { + net_->broadcast("log_subscribe", net_->id()); + }); } Master::~Master() { diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index b086f8d3573fd4aea40de97d5615f5209ed9344d..62142e17afd98764d34f891604a7e3598e616076 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -248,6 +248,8 @@ class Peer { // Send buffers msgpack::vrefbuffer send_buf_; std::recursive_mutex send_mtx_; + + std::recursive_mutex cb_mtx_; std::string uri_; // Original connection URI, or assumed URI ftl::UUID peerid_; // Received in handshake or allocated @@ -321,7 +323,8 @@ int Peer::asyncCall( LOG(INFO) << "RPC " << name << "() -> " << uri_; { - std::unique_lock<std::recursive_mutex> lk(recv_mtx_); + // Could this be the problem???? + std::unique_lock<std::recursive_mutex> lk(cb_mtx_); // Register the CB rpcid = rpcid__++; callbacks_[rpcid] = std::make_unique<caller<T>>(cb); diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index c9cf014fa66a973d0d6a683d035c32035470569e..8b109651125e5db091de14ea03b2312e397829ae 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -161,6 +161,7 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals disp_ = new Dispatcher(d); is_waiting_ = true; + scheme_ = ftl::URI::SCHEME_TCP; // Send the initiating handshake if valid if (status_ == kConnecting) { @@ -204,7 +205,7 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), disp_ = new Dispatcher(d); - // Must to to prevent receiving message before handlers are installed + // Must do to prevent receiving message before handlers are installed unique_lock<recursive_mutex> lk(recv_mtx_); scheme_ = uri.getProtocol(); @@ -225,11 +226,13 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), } } else { LOG(ERROR) << "Connection refused to " << uri.getHost() << ":" << uri.getPort(); + status_ = kReconnecting; } - status_ = kConnecting; + //status_ = kConnecting; } else { LOG(ERROR) << "Unrecognised connection protocol: " << pUri; + return; } is_waiting_ = true; @@ -282,6 +285,19 @@ bool Peer::reconnect() { } else { return false; } + } else if (scheme_ == URI::SCHEME_WS) { + sock_ = tcpConnect(uri); + if (sock_ != INVALID_SOCKET) { + if (!ws_connect(sock_, uri)) { + return false; + } else { + status_ = kConnecting; + LOG(INFO) << "WEB SOCK CONNECTED"; + return true; + } + } else { + return false; + } } // TODO(Nick) allow for other protocols in reconnect @@ -291,6 +307,7 @@ bool Peer::reconnect() { void Peer::_updateURI() { sockaddr_storage addr; + // TODO(Nick) Get actual protocol... scheme_ = ftl::URI::SCHEME_TCP; int rsize = sizeof(sockaddr_storage); @@ -320,7 +337,6 @@ void Peer::_updateURI() { void Peer::close(bool retry) { if (sock_ != INVALID_SOCKET) { - // Attempt to inform about disconnect send("__disconnect__"); @@ -380,7 +396,7 @@ void Peer::data() { }, this); } -inline std::ostream& hex_dump(std::ostream& o, std::string const& v) { +/*inline std::ostream& hex_dump(std::ostream& o, std::string const& v) { std::ios::fmtflags f(o.flags()); o << std::hex; for (auto c : v) { @@ -388,7 +404,7 @@ inline std::ostream& hex_dump(std::ostream& o, std::string const& v) { } o.flags(f); return o; -} +}*/ bool Peer::_data() { std::unique_lock<std::recursive_mutex> lk(recv_mtx_); @@ -396,7 +412,7 @@ bool Peer::_data() { recv_buf_.reserve_buffer(kMaxMessage); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); - if (rc < 0) { + if (rc <= 0) { return false; } @@ -451,19 +467,24 @@ bool Peer::_data() { void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { // TODO Handle error reporting... - + unique_lock<recursive_mutex> lk(cb_mtx_); if (callbacks_.count(id) > 0) { DLOG(1) << "Received return RPC value"; - // Call the callback with unpacked return value - (*callbacks_[id])(res); + // Allow for unlock before callback + auto cb = std::move(callbacks_[id]); callbacks_.erase(id); + lk.unlock(); + + // Call the callback with unpacked return value + (*cb)(res); } else { LOG(WARNING) << "Missing RPC callback for result - discarding"; } } void Peer::cancelCall(int id) { + unique_lock<recursive_mutex> lk(cb_mtx_); if (callbacks_.count(id) > 0) { callbacks_.erase(id); } @@ -524,6 +545,10 @@ int Peer::_send() { len += sendvec[i].iov_len; } + if (sendvec[0].iov_len != 0) { + LOG(FATAL) << "CORRUPTION in websocket header buffer"; + } + //LOG(INFO) << "SEND SIZE = " << len; // Pack correct websocket header into buffer diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index bc19571c3b1080ca5b726c2d56e7d28072cfa819..0acd8a97db7b82bb176491e5c8c2bcfd0dba77e0 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -30,6 +30,8 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) net_ = net; net->bind("find_stream", [this](const std::string &uri) -> optional<UUID> { + shared_lock<shared_mutex> slk(mutex_); + if (sources_.find(uri) != sources_.end()) { LOG(INFO) << "Valid source request received: " << uri; return net_->id(); @@ -100,16 +102,17 @@ void Streamer::add(Source *src) { sources_[src->getID()] = s; LOG(INFO) << "Streaming: " << src->getID(); + net_->broadcast("add_stream", src->getID()); } void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) { - shared_lock<shared_mutex> slk(mutex_); + unique_lock<shared_mutex> slk(mutex_); if (sources_.find(source) == sources_.end()) return; if (rate < 0 || rate >= 10) return; if (N < 0 || N > ftl::rgbd::kMaxFrames) return; - //LOG(INFO) << "Adding Stream Peer: " << peer.to_string(); + LOG(INFO) << "Adding Stream Peer: " << peer.to_string(); StreamClient c; c.peerid = peer; @@ -118,7 +121,7 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID c.txmax = N; StreamSource *s = sources_[source]; - unique_lock<shared_mutex> ulk(s->mutex); + //unique_lock<shared_mutex> ulk(s->mutex); s->clients[rate].push_back(c); } @@ -148,7 +151,9 @@ void Streamer::poll() { LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count()); } else { // Otherwise, wait until next frame should start. - // CHECK(Nick) Is this accurate enough? + // CHECK(Nick) Is this accurate enough? Almost certainly not + // TODO(Nick) Synchronise by time corrections and use of fixed time points + // but this only works if framerate can be achieved. sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f))); } } @@ -170,6 +175,7 @@ void Streamer::run(bool block) { } } +// Must be called in source locked state or src.state must be atomic void Streamer::_swap(StreamSource &src) { if (src.state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kTransmitted)) { src.src->getFrames(src.rgb, src.depth); @@ -182,25 +188,28 @@ void Streamer::_schedule() { std::condition_variable job_cv; int jobs = 0; + // Prevent new clients during processing. + shared_lock<shared_mutex> slk(mutex_); + for (auto s : sources_) { string uri = s.first; - shared_lock<shared_mutex> slk(s.second->mutex); + //shared_lock<shared_mutex> slk(s.second->mutex); // CHECK Should never be true now - if (s.second->state != 0) { + /*if (s.second->state != 0) { if (!late_) LOG(WARNING) << "Stream not ready to schedule on time: " << uri; late_ = true; continue; } else { late_ = false; - } + }*/ // No point in doing work if no clients if (s.second->clients[0].size() == 0) { //LOG(ERROR) << "Stream has no clients: " << uri; continue; } - slk.unlock(); + //slk.unlock(); // There will be two jobs for this source... unique_lock<mutex> lk(job_mtx); @@ -221,8 +230,8 @@ void Streamer::_schedule() { std::chrono::high_resolution_clock::now() - start; LOG(INFO) << "GRAB Elapsed: " << elapsed.count();*/ + // CHECK (Nick) Can state be an atomic instead? unique_lock<shared_mutex> lk(src->mutex); - //LOG(INFO) << "Grab frame"; src->state |= ftl::rgbd::detail::kGrabbed; _swap(*src); lk.unlock(); @@ -235,6 +244,9 @@ void Streamer::_schedule() { }); // Transmit job + // For any single source and bitrate there is only one thread + // meaning that no lock is required here since outer shared_lock + // prevents addition of new clients. // TODO, could do one for each bitrate... pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) { StreamSource *src = sources_[uri]; @@ -252,6 +264,7 @@ void Streamer::_schedule() { auto i = src->clients[0].begin(); while (i != src->clients[0].end()) { try { + // TODO(Nick) Send pose and timestamp if (!net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf)) { (*i).txcount = (*i).txmax; } @@ -260,8 +273,8 @@ void Streamer::_schedule() { } (*i).txcount++; if ((*i).txcount >= (*i).txmax) { - LOG(INFO) << "Remove client"; - unique_lock<shared_mutex> lk(src->mutex); + LOG(INFO) << "Remove client: " << (*i).uri; + //unique_lock<shared_mutex> lk(src->mutex); i = src->clients[0].erase(i); } else { i++; @@ -276,6 +289,7 @@ void Streamer::_schedule() { std::chrono::high_resolution_clock::now() - start; LOG(INFO) << "Stream Elapsed: " << elapsed.count();*/ + // CHECK (Nick) Could state be an atomic? unique_lock<shared_mutex> lk(src->mutex); DLOG(2) << "Tx Frame: " << uri; src->state |= ftl::rgbd::detail::kTransmitted; diff --git a/web-service/src/index.js b/web-service/src/index.js index bbf582d7591608b7c73cb5014504f6bd128a7406..8fbba0ad66f458b6ad98e91c17364ae607553e6b 100644 --- a/web-service/src/index.js +++ b/web-service/src/index.js @@ -61,6 +61,14 @@ function checkStreams(peer) { } } +function broadcastExcept(exc, name, ...args) { + for (let p in peer_by_id) { + let peer = peer_by_id[p]; + if (peer === exc) continue; + peer.sendB(name, args); + } +} + app.ws('/', (ws, req) => { console.log("New web socket request"); @@ -68,6 +76,9 @@ app.ws('/', (ws, req) => { p.on("connect", (peer) => { console.log("Node connected..."); + peer_uris[peer.string_id] = []; + peer_by_id[peer.string_id] = peer; + peer.rpc("node_details", (details) => { let obj = JSON.parse(details[0]); @@ -76,9 +87,6 @@ app.ws('/', (ws, req) => { peer.master = (obj.kind == "master"); console.log("Peer name = ", peer.name); - peer_uris[peer.string_id] = []; - peer_by_id[peer.string_id] = peer; - checkStreams(peer); }); }); @@ -136,6 +144,7 @@ app.ws('/', (ws, req) => { p.bind("get_stream", (uri, N, rate, pid, dest) => { let peer = uri_data[uri].peer; if (peer) { + // FIXME (NICK) BUG HERE, can't have multiple peers listening to same stream... peer.bind(uri, (rgb, depth) => { uri_data[uri].rgb = rgb; uri_data[uri].depth = depth; @@ -144,8 +153,24 @@ app.ws('/', (ws, req) => { peer.send("get_stream", uri, N, rate, [Peer.uuid], dest); } }); + + p.bind("add_stream", (uri) => { + console.log("Adding stream: ", uri); + //uri_to_peer[streams[i]] = peer; + peer_uris[p.string_id].push(uri); + + uri_data[uri] = { + peer: p, + title: "", + rgb: null, + depth: null, + pose: null + }; + + broadcastExcept(p, "add_stream", uri); + }); }); console.log("Listening or port 8080"); -app.listen(80); +app.listen(8080); diff --git a/web-service/src/peer.js b/web-service/src/peer.js index b90426ed1979e7f5a024f56c53e438b9ad010c3b..b1fc40dd8e6f7198a5857fcaec92d08bc9628307 100644 --- a/web-service/src/peer.js +++ b/web-service/src/peer.js @@ -150,6 +150,14 @@ Peer.prototype.rpc = function(name, cb, ...args) { } } +Peer.prototype.sendB = function(name, args) { + try { + this.sock.send(encode([0, name, args])); + } catch(e) { + this.close(); + } +} + Peer.prototype.send = function(name, ...args) { try { this.sock.send(encode([0, name, args]));