From b66546590523000de3d89ec0da0f820e4f1d6523 Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nicolas.pope@utu.fi> Date: Thu, 18 Jul 2019 08:35:35 +0300 Subject: [PATCH] Resolves #50 stream sync --- applications/reconstruct/src/main.cpp | 5 +- components/net/cpp/src/peer.cpp | 10 +- .../include/ftl/rgbd/detail/source.hpp | 3 +- .../rgbd-sources/include/ftl/rgbd/source.hpp | 3 + .../include/ftl/rgbd/streamer.hpp | 4 + components/rgbd-sources/src/net.cpp | 34 ++++++- components/rgbd-sources/src/net.hpp | 7 +- components/rgbd-sources/src/source.cpp | 24 +++-- components/rgbd-sources/src/stereovideo.cpp | 6 +- components/rgbd-sources/src/streamer.cpp | 98 +++++++++++++++---- 10 files changed, 154 insertions(+), 40 deletions(-) diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp index 712b3232b..4a3f9b823 100644 --- a/applications/reconstruct/src/main.cpp +++ b/applications/reconstruct/src/main.cpp @@ -126,9 +126,10 @@ static void run(ftl::Configurable *root) { active = scene->upload(); // Make sure previous virtual camera frame has finished rendering - stream->wait(); + //stream->wait(); + cudaSafeCall(cudaStreamSynchronize(scene->getIntegrationStream())); - LOG(INFO) << "Heap: " << scene->getHeapFreeCount(); + //LOG(INFO) << "Heap: " << scene->getHeapFreeCount(); // Merge new frames into the voxel structure scene->integrate(); diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 8e279090d..ab627b720 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -195,8 +195,9 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals LOG(INFO) << "Peer elected to disconnect: " << id().to_string(); }); - bind("__ping__", [this](unsigned long long timestamp) { - return timestamp; + bind("__ping__", [this]() { + auto now = std::chrono::high_resolution_clock::now(); + return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count(); }); send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); @@ -269,8 +270,9 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), LOG(INFO) << "Peer elected to disconnect: " << id().to_string(); }); - bind("__ping__", [this](unsigned long long timestamp) { - return timestamp; + bind("__ping__", [this]() { + auto now = std::chrono::high_resolution_clock::now(); + return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count(); }); } } diff --git a/components/rgbd-sources/include/ftl/rgbd/detail/source.hpp b/components/rgbd-sources/include/ftl/rgbd/detail/source.hpp index 8ebdab720..326e7c332 100644 --- a/components/rgbd-sources/include/ftl/rgbd/detail/source.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/detail/source.hpp @@ -24,7 +24,7 @@ class Source { friend class ftl::rgbd::Source; public: - explicit Source(ftl::rgbd::Source *host) : capabilities_(0), host_(host), params_({0}) { } + explicit Source(ftl::rgbd::Source *host) : capabilities_(0), host_(host), params_({0}), timestamp_(0) { } virtual ~Source() {} /** @@ -41,6 +41,7 @@ class Source { ftl::rgbd::Camera params_; cv::Mat rgb_; cv::Mat depth_; + int64_t timestamp_; //Eigen::Matrix4f pose_; }; diff --git a/components/rgbd-sources/include/ftl/rgbd/source.hpp b/components/rgbd-sources/include/ftl/rgbd/source.hpp index a1190849e..64321d36a 100644 --- a/components/rgbd-sources/include/ftl/rgbd/source.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/source.hpp @@ -118,6 +118,8 @@ class Source : public ftl::Configurable { void writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<uint> &depth, cudaStream_t stream); void writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<float> &depth, cudaStream_t stream); + int64_t timestamp() const { return (impl_) ? impl_->timestamp_ : 0; } + /** * Directly upload source RGB and Depth to GPU. */ @@ -201,6 +203,7 @@ class Source : public ftl::Configurable { bool paused_; bool bullet_; channel_t channel_; + cudaStream_t stream_; detail::Source *_createImplementation(); detail::Source *_createFileImpl(const ftl::URI &uri); diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index 30631b7f9..621063858 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -114,6 +114,10 @@ class Streamer : public ftl::Configurable { std::condition_variable job_cv_; std::atomic<int> jobs_; int compress_level_; + int64_t clock_adjust_; + ftl::UUID time_peer_; + int64_t last_frame_; + int64_t frame_no_; void _schedule(); void _swap(detail::StreamSource *); diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index 7dca9d416..587db2ca4 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -59,7 +59,7 @@ bool NetSource::_getCalibration(Universe &net, const UUID &peer, const string &s } NetSource::NetSource(ftl::rgbd::Source *host) - : ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1) { + : ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1), current_frame_(0) { gamma_ = host->value("gamma", 1.0f); temperature_ = host->value("temperature", 6500); @@ -105,11 +105,31 @@ NetSource::~NetSource() { host_->getNet()->removeCallback(h_); } -void NetSource::_recvChunk(int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { +void NetSource::_recvChunk(int64_t frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { cv::Mat tmp_rgb, tmp_depth; if (!active_) return; + // A new frame has been started... finish the last one + if (frame > current_frame_) { + // Lock host to prevent grab + UNIQUE_LOCK(host_->mutex(),lk); + + // Swap the double buffers + cv::Mat tmp; + tmp = rgb_; + rgb_ = d_rgb_; + d_rgb_ = tmp; + tmp = depth_; + depth_ = d_depth_; + d_depth_ = tmp; + + timestamp_ = current_frame_*40; // FIXME: Don't hardcode 40ms + current_frame_ = frame; + } else if (frame < current_frame_) { + LOG(WARNING) << "Chunk dropped"; + } + // Decode in temporary buffers to prevent long locks cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb); if (d.size() > 0) cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth); @@ -123,10 +143,12 @@ void NetSource::_recvChunk(int frame, int chunk, bool delta, const vector<unsign // Lock host to prevent grab UNIQUE_LOCK(host_->mutex(),lk); + + // TODO:(Nick) Decode directly into double buffer if no scaling cv::Rect roi(cx,cy,chunk_width_,chunk_height_); - cv::Mat chunkRGB = rgb_(roi); - cv::Mat chunkDepth = depth_(roi); + cv::Mat chunkRGB = d_rgb_(roi); + cv::Mat chunkDepth = d_depth_(roi); // Original size so just copy if (tmp_rgb.cols == chunkRGB.cols) { @@ -191,7 +213,7 @@ void NetSource::_updateURI() { has_calibration_ = _getCalibration(*host_->getNet(), peer_, *uri, params_); - host_->getNet()->bind(*uri, [this](int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { + host_->getNet()->bind(*uri, [this](int64_t frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { _recvChunk(frame, chunk, delta, jpg, d); }); @@ -210,6 +232,8 @@ void NetSource::_updateURI() { chunk_height_ = params_.height / chunks_dim_; rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0)); depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f); + d_rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0)); + d_depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f); uri_ = *uri; active_ = true; diff --git a/components/rgbd-sources/src/net.hpp b/components/rgbd-sources/src/net.hpp index 3f8a86df3..766053faa 100644 --- a/components/rgbd-sources/src/net.hpp +++ b/components/rgbd-sources/src/net.hpp @@ -47,10 +47,15 @@ class NetSource : public detail::Source { int maxN_; int default_quality_; ftl::rgbd::channel_t prev_chan_; + int64_t current_frame_; + + // Double buffering + cv::Mat d_depth_; + cv::Mat d_rgb_; bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::Camera &p); void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); - void _recvChunk(int frame, int chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); + void _recvChunk(int64_t frame, int chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); void _updateURI(); }; diff --git a/components/rgbd-sources/src/source.cpp b/components/rgbd-sources/src/source.cpp index c5ad451de..d2ab4e416 100644 --- a/components/rgbd-sources/src/source.cpp +++ b/components/rgbd-sources/src/source.cpp @@ -29,6 +29,7 @@ using ftl::rgbd::capability_t; Source::Source(ftl::config::json_t &cfg) : Configurable(cfg), pose_(Eigen::Matrix4d::Identity()), net_(nullptr) { impl_ = nullptr; params_ = {0}; + stream_ = 0; reset(); on("uri", [this](const ftl::config::Event &e) { @@ -40,6 +41,7 @@ Source::Source(ftl::config::json_t &cfg) : Configurable(cfg), pose_(Eigen::Matri Source::Source(ftl::config::json_t &cfg, ftl::net::Universe *net) : Configurable(cfg), pose_(Eigen::Matrix4d::Identity()), net_(net) { impl_ = nullptr; params_ = {0}; + stream_ = 0; reset(); on("uri", [this](const ftl::config::Event &e) { @@ -220,7 +222,12 @@ void Source::reset() { bool Source::grab() { UNIQUE_LOCK(mutex_,lk); - if (impl_ && impl_->grab(-1,-1)) { + if (!impl_ && stream_ != 0) { + cudaSafeCall(cudaStreamSynchronize(stream_)); + if (depth_.type() == CV_32SC1) depth_.convertTo(depth_, CV_32F, 1.0f / 1000.0f); + stream_ = 0; + return true; + } else if (impl_ && impl_->grab(-1,-1)) { impl_->rgb_.copyTo(rgb_); impl_->depth_.copyTo(depth_); return true; @@ -243,9 +250,9 @@ void Source::writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl: cudaSafeCall(cudaMemcpy2DAsync(rgb_.data, rgb_.step, rgb.devicePtr(), rgb.pitch(), rgb_.cols * sizeof(uchar4), rgb_.rows, cudaMemcpyDeviceToHost, stream)); depth_.create(depth.height(), depth.width(), CV_32SC1); cudaSafeCall(cudaMemcpy2DAsync(depth_.data, depth_.step, depth.devicePtr(), depth.pitch(), depth_.cols * sizeof(uint), depth_.rows, cudaMemcpyDeviceToHost, stream)); - cudaSafeCall(cudaStreamSynchronize(stream)); // TODO:(Nick) Don't wait here. - - depth_.convertTo(depth_, CV_32F, 1.0f / 1000.0f); + //cudaSafeCall(cudaStreamSynchronize(stream)); // TODO:(Nick) Don't wait here. + stream_ = stream; + //depth_.convertTo(depth_, CV_32F, 1.0f / 1000.0f); } else { LOG(ERROR) << "writeFrames cannot be done on this source: " << getURI(); } @@ -258,12 +265,17 @@ void Source::writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl: cudaSafeCall(cudaMemcpy2DAsync(rgb_.data, rgb_.step, rgb.devicePtr(), rgb.pitch(), rgb_.cols * sizeof(uchar4), rgb_.rows, cudaMemcpyDeviceToHost, stream)); depth_.create(depth.height(), depth.width(), CV_32FC1); cudaSafeCall(cudaMemcpy2DAsync(depth_.data, depth_.step, depth.devicePtr(), depth.pitch(), depth_.cols * sizeof(float), depth_.rows, cudaMemcpyDeviceToHost, stream)); - cudaSafeCall(cudaStreamSynchronize(stream)); // TODO:(Nick) Don't wait here. + stream_ = stream; } } bool Source::thumbnail(cv::Mat &t) { - if (impl_) { + if (!impl_ && stream_ != 0) { + cudaSafeCall(cudaStreamSynchronize(stream_)); + if (depth_.type() == CV_32SC1) depth_.convertTo(depth_, CV_32F, 1.0f / 1000.0f); + stream_ = 0; + return true; + } else if (impl_) { UNIQUE_LOCK(mutex_,lk); impl_->grab(1, 9); impl_->rgb_.copyTo(rgb_); diff --git a/components/rgbd-sources/src/stereovideo.cpp b/components/rgbd-sources/src/stereovideo.cpp index 6e1c321a1..dfb8df8c4 100644 --- a/components/rgbd-sources/src/stereovideo.cpp +++ b/components/rgbd-sources/src/stereovideo.cpp @@ -135,19 +135,19 @@ bool StereoVideoSource::grab(int n, int b) { //rgb_ = lsrc_->cachedLeft(); depth_tmp_.download(depth_, stream_); - stream_.waitForCompletion(); + stream_.waitForCompletion(); // TODO:(Nick) Move to getFrames } else if (chan == ftl::rgbd::kChanRight) { lsrc_->get(left_, right_, stream_); calib_->rectifyStereo(left_, right_, stream_); left_.download(rgb_, stream_); right_.download(depth_, stream_); - stream_.waitForCompletion(); + stream_.waitForCompletion(); // TODO:(Nick) Move to getFrames } else { lsrc_->get(left_, right_, stream_); calib_->rectifyStereo(left_, right_, stream_); //rgb_ = lsrc_->cachedLeft(); left_.download(rgb_, stream_); - stream_.waitForCompletion(); + stream_.waitForCompletion(); // TODO:(Nick) Move to getFrames } return true; } diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index 74738e587..f9d253a9a 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -29,6 +29,8 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) active_ = false; net_ = net; + time_peer_ = ftl::UUID(0); + clock_adjust_ = 0; compress_level_ = value("compression", 1); @@ -99,13 +101,13 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) } }); - net->bind("sync_streams", [this](unsigned long long time) { + //net->bind("sync_streams", [this](unsigned long long time) { // Calc timestamp delta - }); + //}); - net->bind("ping_streamer", [this](unsigned long long time) -> unsigned long long { - return time; - }); + //net->bind("ping_streamer", [this](unsigned long long time) -> unsigned long long { + // return time; + //}); } Streamer::~Streamer() { @@ -119,8 +121,6 @@ Streamer::~Streamer() { } void Streamer::add(Source *src) { - StreamSource *s = nullptr; - { UNIQUE_LOCK(mutex_,ulk); if (sources_.find(src->getID()) != sources_.end()) return; @@ -148,9 +148,24 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID if (rate < 0 || rate >= 10) return; if (N < 0 || N > ftl::rgbd::kMaxFrames) return; - LOG(INFO) << "Adding Stream Peer: " << peer.to_string() << " rate=" << rate << " N=" << N; + DLOG(INFO) << "Adding Stream Peer: " << peer.to_string() << " rate=" << rate << " N=" << N; s = sources_[source]; + + // Set a time peer for clock sync + if (time_peer_ == ftl::UUID(0)) { + time_peer_ = peer; + + // Also do a time sync (but should be repeated periodically) + auto start = std::chrono::high_resolution_clock::now(); + int64_t mastertime = net_->call<int64_t>(peer, "__ping__"); + auto elapsed = std::chrono::high_resolution_clock::now() - start; + int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(); + clock_adjust_ = mastertime - (std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() + (latency/2)); + LOG(INFO) << "Clock adjustment: " << clock_adjust_; + LOG(INFO) << "Latency: " << (latency / 2); + LOG(INFO) << "Local: " << std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() << ", master: " << mastertime; + } } if (!s) return; // No matching stream @@ -189,24 +204,36 @@ void Streamer::stop() { } void Streamer::poll() { - double wait = 1.0f / 25.0f; // TODO:(Nick) Should be in config - auto start = std::chrono::high_resolution_clock::now(); + //double wait = 1.0f / 25.0f; // TODO:(Nick) Should be in config + //auto start = std::chrono::high_resolution_clock::now(); + //int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_; + + //int64_t msdelay = 40 - (now % 40); + //while (msdelay >= 20) { + // sleep_for(milliseconds(10)); + // now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; + // msdelay = 40 - (now % 40); + //} + //LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay; + // Create frame jobs at correct FPS interval _schedule(); + //std::function<void(int)> j = ftl::pool.pop(); + //if (j) j(-1); - std::chrono::duration<double> elapsed = - std::chrono::high_resolution_clock::now() - start; + //std::chrono::duration<double> elapsed = + // std::chrono::high_resolution_clock::now() - start; - if (elapsed.count() >= wait) { - LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count()); - } else { + //if (elapsed.count() >= wait) { + //LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count()); + //} else { //LOG(INFO) << "Frame rate @ " << (1.0f / elapsed.count()); // Otherwise, wait until next frame should start. // FIXME:(Nick) Is this accurate enough? Almost certainly not // TODO:(Nick) Synchronise by time corrections and use of fixed time points // but this only works if framerate can be achieved. - sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f))); - } + //sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f))); + //} } void Streamer::run(bool block) { @@ -246,6 +273,7 @@ void Streamer::_swap(StreamSource *src) { } src->src->getFrames(src->rgb, src->depth); + //if (!src->rgb.empty() && src->prev_depth.empty()) { //src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0)); //LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows; @@ -269,6 +297,9 @@ void Streamer::wait() { if (jobs_ != 0) { LOG(FATAL) << "Deadlock detected"; } + + // Capture frame number? + frame_no_ = last_frame_; } void Streamer::_schedule() { @@ -277,6 +308,9 @@ void Streamer::_schedule() { //std::condition_variable job_cv; //int jobs = 0; + //auto now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; + //LOG(INFO) << "Frame time = " << (now-(last_frame_*40)) << "ms"; + // Prevent new clients during processing. SHARED_LOCK(mutex_,slk); @@ -301,6 +335,31 @@ void Streamer::_schedule() { ftl::pool.push([this,src](int id) { //auto start = std::chrono::high_resolution_clock::now(); + auto start = std::chrono::high_resolution_clock::now(); + int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_; + int64_t target = now / 40; + + // TODO:(Nick) A now%40 == 0 should be accepted + if (target != last_frame_) LOG(WARNING) << "Frame " << "(" << (target-last_frame_) << ") dropped by " << (now%40) << "ms"; + + // Use sleep_for for larger delays + int64_t msdelay = 40 - (now % 40); + //LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay; + while (msdelay >= 20) { + sleep_for(milliseconds(10)); + now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; + msdelay = 40 - (now % 40); + } + + // Spin loop until exact grab time + //LOG(INFO) << "Spin Delay: " << (now / 40) << " = " << (40 - (now%40)); + target = now / 40; + while ((now/40) == target) { + _mm_pause(); // SSE2 nano pause intrinsic + now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; + }; + last_frame_ = now/40; + try { src->src->grab(); } catch (std::exception &ex) { @@ -311,6 +370,9 @@ void Streamer::_schedule() { LOG(ERROR) << "Unknown exception when grabbing frame"; } + //now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; + //if (now%40 > 0) LOG(INFO) << "Grab in: " << (now%40) << "ms"; + //std::chrono::duration<double> elapsed = // std::chrono::high_resolution_clock::now() - start; //LOG(INFO) << "Grab in " << elapsed.count() << "s"; @@ -405,7 +467,7 @@ void Streamer::_encodeAndTransmit(StreamSource *src, int chunk) { while (c != src->clients[b].end()) { try { // TODO:(Nick) Send pose and timestamp - if (!net_->send((*c).peerid, (*c).uri, 0, chunk, delta, rgb_buf, d_buf)) { + if (!net_->send((*c).peerid, (*c).uri, frame_no_, chunk, delta, rgb_buf, d_buf)) { // Send failed so mark as client stream completed (*c).txcount = (*c).txmax; } else { -- GitLab