diff --git a/components/control/cpp/src/master.cpp b/components/control/cpp/src/master.cpp index 36108610f0ef46828f541cfc9497c340b6af09ba..bd4cadfe66fb4330eeec854e8a274f805e238666 100644 --- a/components/control/cpp/src/master.cpp +++ b/components/control/cpp/src/master.cpp @@ -62,7 +62,7 @@ void Master::set(const ftl::UUID &peer, const string &uri, json_t &value) { } vector<json_t> Master::getSlaves() { - auto response = net_->findAll<string>("slave_details"); + auto response = net_->findAll<string>("node_details"); vector<json_t> result; for (auto &r : response) { result.push_back(json_t::parse(r)); diff --git a/components/control/cpp/src/slave.cpp b/components/control/cpp/src/slave.cpp index 118a68265d6217d6fcfb2dc8db263a2f21a83157..afb5c9f07c31e06db022cba9cd8a80636f924ef2 100644 --- a/components/control/cpp/src/slave.cpp +++ b/components/control/cpp/src/slave.cpp @@ -53,6 +53,10 @@ Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false) net_->connect(url); }); + net->onConnect([this](ftl::net::Peer *peer) { + net_->broadcast("new_peer", peer->id()); + }); + loguru::add_callback("net_log", netLog, this, loguru::Verbosity_INFO); } diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 7959706416dce578c3113ac96e22b98018171921..94fbdc03814bb38a0d858aaf39d58de74ac65e4f 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -393,7 +393,7 @@ bool Peer::_data() { recv_buf_.reserve_buffer(kMaxMessage); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); - if (rc <= 0) { + if (rc < 0) { return false; } diff --git a/web-service/src/index.js b/web-service/src/index.js index 6a67fce8bf570c90ccee5ac617d662e4a2eac6c3..bbf582d7591608b7c73cb5014504f6bd128a7406 100644 --- a/web-service/src/index.js +++ b/web-service/src/index.js @@ -41,12 +41,33 @@ app.get('/stream/depth', (req, res) => { //app.get('/stream', (req, res)) +function checkStreams(peer) { + if (!peer.master) { + peer.rpc("list_streams", (streams) => { + console.log("STREAMS", streams); + for (let i=0; i<streams.length; i++) { + //uri_to_peer[streams[i]] = peer; + peer_uris[peer.string_id].push(streams[i]); + + uri_data[streams[i]] = { + peer: peer, + title: "", + rgb: null, + depth: null, + pose: null + }; + } + }); + } +} + app.ws('/', (ws, req) => { console.log("New web socket request"); let p = new Peer(ws); p.on("connect", (peer) => { + console.log("Node connected..."); peer.rpc("node_details", (details) => { let obj = JSON.parse(details[0]); @@ -58,23 +79,7 @@ app.ws('/', (ws, req) => { peer_uris[peer.string_id] = []; peer_by_id[peer.string_id] = peer; - if (!peer.master) { - peer.rpc("list_streams", (streams) => { - console.log("STREAMS", streams); - for (let i=0; i<streams.length; i++) { - //uri_to_peer[streams[i]] = peer; - peer_uris[peer.string_id].push(streams[i]); - - uri_data[streams[i]] = { - peer: peer, - title: "", - rgb: null, - depth: null, - pose: null - }; - } - }); - } + checkStreams(peer); }); }); @@ -94,14 +99,20 @@ app.ws('/', (ws, req) => { if (peer_by_id.hasOwnProperty(peer.string_id)) delete peer_by_id[peer.string_id]; }); + p.bind("new_peer", (id) => { + checkStreams(p); + }); + p.bind("list_streams", () => { return Object.keys(uri_data); }); p.bind("find_stream", (uri) => { if (uri_data.hasOwnProperty(uri)) { + console.log("Stream found: ", uri); return [Peer.uuid]; } else { + console.log("Stream not found: ", uri) return null; // or []?? } }); @@ -136,5 +147,5 @@ app.ws('/', (ws, req) => { }); console.log("Listening or port 8080"); -app.listen(8080); +app.listen(80);