diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 8c50709e2d6253f73849f5560c72eee9a6ff5b83..402c5bed22cc394fd08251f2d2625983b89ed48e 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -138,9 +138,7 @@ int Universe::_setDescriptors() { n = s->_socket(); } - //if (s->isWaiting()) { - FD_SET(s->_socket(), &sfdread_); - //} + FD_SET(s->_socket(), &sfdread_); FD_SET(s->_socket(), &sfderror_); } } @@ -154,17 +152,7 @@ void Universe::_installBindings(Peer *p) { } void Universe::_installBindings() { - /*bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool { - LOG(INFO) << "Subscription to " << uri << " by " << id.to_string(); - unique_lock<shared_mutex> lk(net_mutex_); - subscribers_[ftl::URI(uri).to_string()].push_back(id); - return true; - }); - - bind("__owner__", [this](const std::string &res) -> optional<UUID> { - if (owned_.count(res) > 0) return this_peer; - else return {}; - });*/ + } // Note: should be called inside a net lock @@ -173,6 +161,8 @@ void Universe::_cleanupPeers() { if (ftl::pool.n_idle() == ftl::pool.size()) { if (garbage_.size() > 0) LOG(INFO) << "Garbage collection"; while (garbage_.size() > 0) { + // FIXME: There is possibly still something with a peer pointer + // that is causing this throw an exception sometimes? delete garbage_.front(); garbage_.pop_front(); } @@ -287,8 +277,8 @@ void Universe::_run() { continue; } - // CHECK Could this mutex be the problem!? { + // TODO:(Nick) Shared lock unless connection is made UNIQUE_LOCK(net_mutex_,lk); //If connection request is waiting @@ -304,7 +294,7 @@ void Universe::_run() { if (csock != INVALID_SOCKET) { auto p = new Peer(csock, this, &disp_); peers_.push_back(p); - _installBindings(p); + //_installBindings(p); } } } diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index 0143bc0b0a030e509629e5bb6185dd553d1c9683..c01237e1525dab9cb4844f54e5c7b4e3cef9160b 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -29,7 +29,9 @@ struct StreamClient { static const unsigned int kGrabbed = 0x1; static const unsigned int kRGB = 0x2; -static const unsigned int kDepth = 0x4; +static const unsigned int kDepth = 0x4; + +static const unsigned int kFrameDropLimit = 5; struct StreamSource { ftl::rgbd::Source *src; @@ -119,12 +121,18 @@ class Streamer : public ftl::Configurable { int64_t last_frame_; int64_t frame_no_; + int64_t mspf_; + float actual_fps_; + //int64_t last_dropped_; + //int drop_count_; + void _schedule(); void _swap(detail::StreamSource *); void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest); void _encodeAndTransmit(detail::StreamSource *src, int chunk); void _encodeChannel1(const cv::Mat &in, std::vector<unsigned char> &out, unsigned int b); bool _encodeChannel2(const cv::Mat &in, std::vector<unsigned char> &out, ftl::rgbd::channel_t c, unsigned int b); + void _decideFrameRate(int64_t framesdropped, int64_t msremainder); }; } diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index 135395c57a1bc1f9983308719e88f7f49d5df8cf..12a76cc40d38f876f11cbc4919cf7bcb13a4bee5 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -161,7 +161,7 @@ void NetSource::_recvChunk(int64_t frame, int chunk, bool delta, const vector<un depth_ = d_depth_; d_depth_ = tmp; - timestamp_ = current_frame_*40; // FIXME: Don't hardcode 40ms + timestamp_ = current_frame_; current_frame_ = frame; } @@ -277,13 +277,6 @@ void NetSource::_updateURI() { N_ = 0; - // Initiate stream with request for first 10 frames - //try { - // host_->getNet()->send(peer_, "get_stream", *uri, N_, 0, host_->getNet()->id(), *uri); - //} catch(...) { - // LOG(ERROR) << "Could not connect to stream " << *uri; - //} - // Update chunk details chunks_dim_ = ftl::rgbd::kChunkDim; chunk_width_ = params_.width / chunks_dim_; diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index 3fd69afe3fd6568f16ed9a9c9b489fde36bdc64a..dd4a93d696e7d1af9392ba0176b0f5206ea0d5fb 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -4,6 +4,7 @@ #include <thread> #include <chrono> #include <tuple> +#include <algorithm> #include "bitrate_settings.hpp" @@ -31,6 +32,9 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) net_ = net; time_peer_ = ftl::UUID(0); clock_adjust_ = 0; + mspf_ = 1000 / value("fps", 25); + //last_dropped_ = 0; + //drop_count_ = 0; compress_level_ = value("compression", 1); @@ -199,42 +203,43 @@ void Streamer::remove(const std::string &) { } +void Streamer::_decideFrameRate(int64_t framesdropped, int64_t msremainder) { + actual_fps_ = 1000.0f / (float)((framesdropped+1)*mspf_+(msremainder)); + LOG(INFO) << "Actual FPS = " << actual_fps_; + + /*if (framesdropped > 0) { + // If N consecutive frames are dropped, work out new rate + if (last_dropped_/mspf_ >= last_frame_/mspf_ - 2*framesdropped) drop_count_++; + else drop_count_ = 0; + + last_dropped_ = last_frame_+mspf_; + + if (drop_count_ >= ftl::rgbd::detail::kFrameDropLimit) { + drop_count_ = 0; + + const int64_t actualmspf = std::min((int64_t)1000, framesdropped*mspf_+(mspf_ - msremainder)); + + LOG(WARNING) << "Suggest FPS @ " << (1000 / actualmspf); + //mspf_ = actualmspf; + + // Also notify all clients of change + } + } else { + // Perhaps we can boost framerate? + const int64_t actualmspf = std::min((int64_t)1000, framesdropped*mspf_+(mspf_ - msremainder)); + LOG(INFO) << "Boost framerate: " << (1000 / actualmspf); + //mspf_ = actualmspf; + }*/ +} + void Streamer::stop() { active_ = false; wait(); } void Streamer::poll() { - //double wait = 1.0f / 25.0f; // TODO:(Nick) Should be in config - //auto start = std::chrono::high_resolution_clock::now(); - //int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_; - - //int64_t msdelay = 40 - (now % 40); - //while (msdelay >= 20) { - // sleep_for(milliseconds(10)); - // now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; - // msdelay = 40 - (now % 40); - //} - //LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay; - // Create frame jobs at correct FPS interval _schedule(); - //std::function<void(int)> j = ftl::pool.pop(); - //if (j) j(-1); - - //std::chrono::duration<double> elapsed = - // std::chrono::high_resolution_clock::now() - start; - - //if (elapsed.count() >= wait) { - //LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count()); - //} else { - //LOG(INFO) << "Frame rate @ " << (1.0f / elapsed.count()); - // Otherwise, wait until next frame should start. - // FIXME:(Nick) Is this accurate enough? Almost certainly not - // TODO:(Nick) Synchronise by time corrections and use of fixed time points - // but this only works if framerate can be achieved. - //sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f))); - //} } void Streamer::run(bool block) { @@ -306,12 +311,6 @@ void Streamer::wait() { void Streamer::_schedule() { wait(); - //std::mutex job_mtx; - //std::condition_variable job_cv; - //int jobs = 0; - - //auto now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; - //LOG(INFO) << "Frame time = " << (now-(last_frame_*40)) << "ms"; // Prevent new clients during processing. SHARED_LOCK(mutex_,slk); @@ -339,28 +338,30 @@ void Streamer::_schedule() { auto start = std::chrono::high_resolution_clock::now(); int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_; - int64_t target = now / 40; + int64_t target = now / mspf_; - // TODO:(Nick) A now%40 == 0 should be accepted - if (target != last_frame_) LOG(WARNING) << "Frame " << "(" << (target-last_frame_) << ") dropped by " << (now%40) << "ms"; + // TODO:(Nick) A now%mspf_ == 0 should be accepted + if (target != last_frame_) { + LOG(WARNING) << "Frame " << "(" << (target-last_frame_) << ") dropped by " << (now%mspf_) << "ms"; + } + //_decideFrameRate(target-last_frame_, now%mspf_); // Use sleep_for for larger delays - int64_t msdelay = 40 - (now % 40); + int64_t msdelay = mspf_ - (now % mspf_); //LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay; while (msdelay >= 20) { sleep_for(milliseconds(10)); now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; - msdelay = 40 - (now % 40); + msdelay = mspf_ - (now % mspf_); } // Spin loop until exact grab time - //LOG(INFO) << "Spin Delay: " << (now / 40) << " = " << (40 - (now%40)); - target = now / 40; - while ((now/40) == target) { + target = now / mspf_; + while ((now/mspf_) == target) { _mm_pause(); // SSE2 nano pause intrinsic now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; }; - last_frame_ = now/40; + last_frame_ = now/mspf_; try { src->src->capture(); @@ -372,20 +373,13 @@ void Streamer::_schedule() { LOG(ERROR) << "Unknown exception when grabbing frame"; } - //now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; - //if (now%40 > 0) LOG(INFO) << "Grab in: " << (now%40) << "ms"; - - //std::chrono::duration<double> elapsed = - // std::chrono::high_resolution_clock::now() - start; - //LOG(INFO) << "Grab in " << elapsed.count() << "s"; - src->jobs--; _swap(src); // Mark job as finished std::unique_lock<std::mutex> lk(job_mtx_); --jobs_; - job_cv_.notify_one(); + if (jobs_ == 0) job_cv_.notify_one(); }); // Compute job @@ -426,7 +420,7 @@ void Streamer::_schedule() { _swap(src); std::unique_lock<std::mutex> lk(job_mtx_); --jobs_; - job_cv_.notify_one(); + if (jobs_ == 0) job_cv_.notify_one(); }); } } @@ -490,8 +484,8 @@ void Streamer::_encodeAndTransmit(StreamSource *src, int chunk) { auto c = src->clients[b].begin(); while (c != src->clients[b].end()) { try { - // TODO:(Nick) Send pose and timestamp - if (!net_->send((*c).peerid, (*c).uri, frame_no_, chunk, delta, rgb_buf, d_buf)) { + // TODO:(Nick) Send pose + if (!net_->send((*c).peerid, (*c).uri, frame_no_*mspf_, chunk, delta, rgb_buf, d_buf)) { // Send failed so mark as client stream completed (*c).txcount = (*c).txmax; } else {