diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp index 003e8cc48c0a6c0257123e3fec9e43dad2e0d431..7885c167e44dd2a3f377bd88997890bd7fe1301e 100644 --- a/applications/reconstruct/src/main.cpp +++ b/applications/reconstruct/src/main.cpp @@ -194,6 +194,11 @@ static void run(ftl::Configurable *root) { //net.broadcast("grab"); // To sync cameras scene->nextFrame(); + // TODO(Nick) Improve sync further... + for (size_t i = 0; i < inputs.size(); i++) { + if (inputs[i].source->isReady()) inputs[i].source->grab(); + } + for (size_t i = 0; i < inputs.size(); i++) { if (!inputs[i].source->isReady()) { inputs[i].params.m_imageWidth = 0; @@ -220,7 +225,6 @@ static void run(ftl::Configurable *root) { // Get the RGB-Depth frame from input Source *input = inputs[i].source; Mat rgb, depth; - input->grab(); input->getFrames(rgb,depth); active += 1; diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 52e082373276edc14ea553fb0dc13ee6f8a37e20..1541b56fdcb3a8c416150253037d360995dc76c6 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -389,6 +389,8 @@ void Peer::error(int e) { } void Peer::data() { + // TODO(Nick) Should not enter here twice if recv call has yet to be + // processed. //if (!is_waiting_) return; //is_waiting_ = false; std::unique_lock<std::recursive_mutex> lk(recv_mtx_); diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index b0a72d1f395ff5f222862f3164e642935f96fad8..af510a71038b486325a79b7744471dcbb068c817 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -307,6 +307,9 @@ void Universe::_run() { block.tv_usec = 10000; selres = select(n+1, &sfdread_, 0, &sfderror_, &block); + // NOTE Nick: Is it possible that not all the recvs have been called before I + // again reach a select call!? What are the consequences of this? A double recv attempt? + //Some kind of error occured, it is usually possible to recover from this. if (selres < 0) { switch (errno) { diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index a52bfea4e11f7c21b05dd777a904b778e584dcec..7c56e5636753b2161b912dc19ac6f0eb9455b9b4 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -70,12 +70,6 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch depth_.convertTo(depth_, CV_32FC1, 1.0f/(16.0f*100.0f)); N_--; - if (N_ == 0) { - N_ += 10; - if (!host_->getNet()->send(peer_, "get_stream", *host_->get<string>("uri"), 10, 0, host_->getNet()->id(), *host_->get<string>("uri"))) { - active_ = false; - } - } } void NetSource::setPose(const Eigen::Matrix4f &pose) { @@ -134,7 +128,12 @@ void NetSource::_updateURI() { } bool NetSource::grab() { - // net_.broadcast("grab"); + if (N_ == 0) { + N_ += 10; + if (!host_->getNet()->send(peer_, "get_stream", *host_->get<string>("uri"), 10, 0, host_->getNet()->id(), *host_->get<string>("uri"))) { + active_ = false; + } + } return true; } diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index b3b093e492107809bb451587b52d277e24f7e0e4..00a370fc6c8222514fc0ad6e0421c21692c82376 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -139,7 +139,7 @@ void Streamer::stop() { } void Streamer::poll() { - double wait = 1.0f / 25.0f; + double wait = 1.0f / 25.0f; // TODO(Nick) Should be in config auto start = std::chrono::high_resolution_clock::now(); // Create frame jobs at correct FPS interval _schedule(); @@ -194,22 +194,10 @@ void Streamer::_schedule() { for (auto s : sources_) { string uri = s.first; - //shared_lock<shared_mutex> slk(s.second->mutex); - // CHECK Should never be true now - /*if (s.second->state != 0) { - if (!late_) LOG(WARNING) << "Stream not ready to schedule on time: " << uri; - late_ = true; - continue; - } else { - late_ = false; - }*/ - // No point in doing work if no clients if (s.second->clients[0].size() == 0) { - //LOG(ERROR) << "Stream has no clients: " << uri; continue; } - //slk.unlock(); // There will be two jobs for this source... unique_lock<mutex> lk(job_mtx); @@ -220,15 +208,7 @@ void Streamer::_schedule() { pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) { StreamSource *src = sources_[uri]; - //auto start = std::chrono::high_resolution_clock::now(); - //try { - src->src->grab(); - //} catch(...) { - // LOG(ERROR) << "Grab Exception for: " << uri; - //} - /*std::chrono::duration<double> elapsed = - std::chrono::high_resolution_clock::now() - start; - LOG(INFO) << "GRAB Elapsed: " << elapsed.count();*/ + src->src->grab(); // CHECK (Nick) Can state be an atomic instead? unique_lock<shared_mutex> lk(src->mutex); @@ -259,7 +239,15 @@ void Streamer::_schedule() { cv::Mat d2; src->depth.convertTo(d2, CV_16UC1, 16*100); vector<unsigned char> d_buf; - cv::imencode(".png", d2, d_buf); + + // Setting 1 = fast but large + // Setting 9 = small but slow + // Anything up to 8 causes minimal if any impact on frame rate + // on my (Nicks) laptop, but 9 halves the frame rate. + vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 5}; // Default is 1 for fast, 9 = small but slow. + cv::imencode(".png", d2, d_buf, pngparams); + + //LOG(INFO) << "Data size: " << ((rgb_buf.size() + d_buf.size()) / 1024) << "kb"; auto i = src->clients[0].begin(); while (i != src->clients[0].end()) { @@ -274,7 +262,6 @@ void Streamer::_schedule() { (*i).txcount++; if ((*i).txcount >= (*i).txmax) { LOG(INFO) << "Remove client: " << (*i).uri; - //unique_lock<shared_mutex> lk(src->mutex); i = src->clients[0].erase(i); } else { i++;