diff --git a/applications/groupview/src/main.cpp b/applications/groupview/src/main.cpp index ed163daa525609ba131ff916e50b63ce92c6fcdf..03a6dad32260753f59e6373bcd37187f084a891a 100644 --- a/applications/groupview/src/main.cpp +++ b/applications/groupview/src/main.cpp @@ -87,7 +87,7 @@ void modeLeftRight(ftl::Configurable *root) { std::atomic<bool> new_frames = false; vector<Mat> rgb(n_cameras), rgb_new(n_cameras); - group.sync([&mutex, &new_frames, &rgb_new](const ftl::rgbd::FrameSet &frames) { + group.sync([&mutex, &new_frames, &rgb_new](ftl::rgbd::FrameSet &frames) { mutex.lock(); bool good = true; for (size_t i = 0; i < frames.channel1.size(); i ++) { @@ -170,14 +170,32 @@ void modeFrame(ftl::Configurable *root, int frames=1) { std::atomic<bool> grab = false; std::atomic<bool> video = false; - - std::vector<cv::Mat> rgb(sources.size()); - std::vector<cv::Mat> depth(sources.size()); - std::mutex m; - group.sync([&grab](const ftl::rgbd::FrameSet &fs) { - LOG(INFO) << "Complete set: " << fs.timestamp; + vector<cv::Mat> rgb(sources.size()); + vector<cv::Mat> depth(sources.size()); + MUTEX mtx; + + group.sync([&grab,&rgb,&depth,&mtx](ftl::rgbd::FrameSet &fs) { + UNIQUE_LOCK(mtx, lk); + //LOG(INFO) << "Complete set: " << fs.timestamp; if (!ftl::running) { return false; } + + + for (size_t i=0; i<fs.sources.size(); ++i) { + if (fs.channel1[i].empty() || fs.channel2[i].empty()) return true; + } + + cv::Mat show; + + stack(fs.channel1, show); + + cv::resize(show, show, cv::Size(1280,720)); + cv::namedWindow("Cameras", cv::WINDOW_KEEPRATIO | cv::WINDOW_NORMAL); + cv::imshow("Cameras", show); + + auto key = cv::waitKey(1); + if (key == 27) ftl::running = false; + if (key == 'g') grab = true; #ifdef HAVE_LIBARCHIVE if (grab) { @@ -197,23 +215,27 @@ void modeFrame(ftl::Configurable *root, int frames=1) { return true; }); - cv::Mat show; + /*cv::Mat show; while (ftl::running) { for (auto s : sources) s->grab(30); for (size_t i = 0; i < sources.size(); i++) { - do { sources[i]->getFrames(rgb[i], depth[i]); } + //do { sources[i]->getFrames(rgb[i], depth[i]); } while(rgb[i].empty()); } - stack(rgb, show); + { + UNIQUE_LOCK(mtx, lk); + stack(rgb, show); + } + cv::resize(show, show, cv::Size(1280,720)); cv::namedWindow("Cameras", cv::WINDOW_KEEPRATIO | cv::WINDOW_NORMAL); cv::imshow("Cameras", show); auto key = cv::waitKey(20); if (key == 27) break; if (key == 'g') grab = true; - } + }*/ } void modeVideo(ftl::Configurable *root) { diff --git a/applications/gui/src/camera.cpp b/applications/gui/src/camera.cpp index f1682a06d90bf6dedf74631708b1e927262a8616..f44521b3a81d06a4087a56a8eacf6d2bf4d4700b 100644 --- a/applications/gui/src/camera.cpp +++ b/applications/gui/src/camera.cpp @@ -129,7 +129,7 @@ ftl::gui::Camera::Camera(ftl::gui::Screen *screen, ftl::rgbd::Source *src) : scr rotmat_.setIdentity(); //up_ = Eigen::Vector3f(0,1.0f,0); lerpSpeed_ = 0.999f; - depth_ = false; + sdepth_ = false; ftime_ = (float)glfwGetTime(); pause_ = false; @@ -142,6 +142,14 @@ ftl::gui::Camera::Camera(ftl::gui::Screen *screen, ftl::rgbd::Source *src) : scr posewin_ = new PoseWindow(screen, src_->getURI()); posewin_->setTheme(screen->windowtheme); posewin_->setVisible(false); + + src->setCallback([this](int64_t ts, cv::Mat &rgb, cv::Mat &depth) { + UNIQUE_LOCK(mutex_, lk); + rgb_.create(rgb.size(), rgb.type()); + depth_.create(depth.size(), depth.type()); + cv::swap(rgb_,rgb); + cv::swap(depth_, depth); + }); } ftl::gui::Camera::~Camera() { @@ -281,13 +289,21 @@ static void drawEdges( const cv::Mat &in, cv::Mat &out, cv::addWeighted(edges, weight, out, 1.0, 0.0, out, CV_8UC3); } +bool ftl::gui::Camera::thumbnail(cv::Mat &thumb) { + UNIQUE_LOCK(mutex_, lk); + src_->grab(1,9); + if (rgb_.empty()) return false; + cv::resize(rgb_, thumb, cv::Size(320,180)); + return true; +} + const GLTexture &ftl::gui::Camera::captureFrame() { float now = (float)glfwGetTime(); delta_ = now - ftime_; ftime_ = now; if (src_ && src_->isReady()) { - cv::Mat rgb, depth; + UNIQUE_LOCK(mutex_, lk); // Lerp the Eye eye_[0] += (neye_[0] - eye_[0]) * lerpSpeed_ * delta_; @@ -300,37 +316,37 @@ const GLTexture &ftl::gui::Camera::captureFrame() { if (src_->hasCapabilities(ftl::rgbd::kCapMovable)) src_->setPose(viewPose); src_->grab(); - src_->getFrames(rgb, depth); + //src_->getFrames(rgb, depth); // When switching from right to depth, client may still receive // right images from previous batch (depth.channels() == 1 check) if (channel_ == ftl::rgbd::kChanDeviation && - depth.rows > 0 && depth.channels() == 1) + depth_.rows > 0 && depth_.channels() == 1) { if (!stats_) { - stats_ = new StatisticsImage(depth.size()); + stats_ = new StatisticsImage(depth_.size()); } - stats_->update(depth); + stats_->update(depth_); } cv::Mat tmp; switch(channel_) { case ftl::rgbd::kChanEnergy: - if (depth.rows == 0) { break; } - visualizeEnergy(depth, tmp, 10.0); + if (depth_.rows == 0) { break; } + visualizeEnergy(depth_, tmp, 10.0); texture_.update(tmp); break; case ftl::rgbd::kChanDepth: - if (depth.rows == 0) { break; } - visualizeDepthMap(depth, tmp, 7.0); - if (screen_->root()->value("showEdgesInDepth", false)) drawEdges(rgb, tmp); + if (depth_.rows == 0) { break; } + visualizeDepthMap(depth_, tmp, 7.0); + if (screen_->root()->value("showEdgesInDepth", false)) drawEdges(rgb_, tmp); texture_.update(tmp); break; case ftl::rgbd::kChanDeviation: - if (depth.rows == 0) { break; } + if (depth_.rows == 0) { break; } //imageSize = Vector2f(depth.cols, depth.rows); stats_->getStdDev(tmp); tmp.convertTo(tmp, CV_8U, 1000.0); @@ -342,14 +358,14 @@ const GLTexture &ftl::gui::Camera::captureFrame() { case ftl::rgbd::kChanConfidence: case ftl::rgbd::kChanNormals: case ftl::rgbd::kChanRight: - if (depth.rows == 0 || depth.type() != CV_8UC3) { break; } - texture_.update(depth); + if (depth_.rows == 0 || depth_.type() != CV_8UC3) { break; } + texture_.update(depth_); break; default: - if (rgb.rows == 0) { break; } + if (rgb_.rows == 0) { break; } //imageSize = Vector2f(rgb.cols,rgb.rows); - texture_.update(rgb); + texture_.update(rgb_); } } diff --git a/applications/gui/src/camera.hpp b/applications/gui/src/camera.hpp index 8d48e2c0648caa37ad3064f195db47329e0bf82c..f412d9c43d1809374566b1ba24c427ecda792695 100644 --- a/applications/gui/src/camera.hpp +++ b/applications/gui/src/camera.hpp @@ -40,6 +40,8 @@ class Camera { const GLTexture &captureFrame(); + bool thumbnail(cv::Mat &thumb); + nlohmann::json getMetaData(); StatisticsImage *stats_ = nullptr; @@ -58,10 +60,13 @@ class Camera { float ftime_; float delta_; float lerpSpeed_; - bool depth_; + bool sdepth_; bool pause_; ftl::rgbd::channel_t channel_; std::vector<ftl::rgbd::channel_t> channels_; + cv::Mat rgb_; + cv::Mat depth_; + MUTEX mutex_; }; } diff --git a/applications/gui/src/src_window.cpp b/applications/gui/src/src_window.cpp index e98ec081b1d23f34cfe541756cbd8925e4290dbc..e7ffc4b1e0e44caccd67cce5d60d34b481ec58d7 100644 --- a/applications/gui/src/src_window.cpp +++ b/applications/gui/src/src_window.cpp @@ -116,7 +116,7 @@ void SourceWindow::draw(NVGcontext *ctx) { cv::Mat t; auto *cam = cameras_[available_[i]]; if (cam) { - if (cam->source()->thumbnail(t)) { + if (cam->thumbnail(t)) { thumbs_[i].update(t); } else { refresh_thumbs_ = true; diff --git a/applications/reconstruct/include/ftl/voxel_scene.hpp b/applications/reconstruct/include/ftl/voxel_scene.hpp index d594d479f1f35ffc9cece087b867be2f4b781ee0..b1ee6f3bc1388a0c57398fc63971b7ccb163235a 100644 --- a/applications/reconstruct/include/ftl/voxel_scene.hpp +++ b/applications/reconstruct/include/ftl/voxel_scene.hpp @@ -10,6 +10,7 @@ #include <ftl/matrix_conversion.hpp> #include <ftl/voxel_hash.hpp> #include <ftl/depth_camera.hpp> +#include <ftl/rgbd/group.hpp> #include <unordered_set> namespace ftl { @@ -34,6 +35,8 @@ class SceneRep : public ftl::Configurable { */ int upload(); + int upload(ftl::rgbd::FrameSet &); + /** * Merge all camera frames into the voxel hash datastructure. */ diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp index 46629bd0d8a5a2ec0e2ebbf8f2168bd0ecdccb57..97ff62de60d97d3497eb822d44fd2364001b848b 100644 --- a/applications/reconstruct/src/main.cpp +++ b/applications/reconstruct/src/main.cpp @@ -14,6 +14,7 @@ #include <ftl/virtual_source.hpp> #include <ftl/rgbd/streamer.hpp> #include <ftl/slave.hpp> +#include <ftl/rgbd/group.hpp> #include "splat_render.hpp" @@ -94,6 +95,7 @@ static void run(ftl::Configurable *root) { ftl::rgbd::Streamer *stream = ftl::create<ftl::rgbd::Streamer>(root, "stream", net); ftl::rgbd::Source *virt = ftl::create<ftl::rgbd::Source>(root, "virtual", net); ftl::render::Splatter *splat = new ftl::render::Splatter(scene); + ftl::rgbd::Group group; //auto virtimpl = new ftl::rgbd::VirtualSource(virt); //virt->customImplementation(virtimpl); @@ -103,11 +105,47 @@ static void run(ftl::Configurable *root) { for (size_t i=0; i<sources.size(); i++) { Source *in = sources[i]; in->setChannel(ftl::rgbd::kChanDepth); - stream->add(in); + //stream->add(in); scene->addSource(in); + group.addSource(in); } - int active = sources.size(); + stream->run(); + + bool busy = false; + + group.sync([scene,splat,virt,&busy,&slave](ftl::rgbd::FrameSet &fs) -> bool { + if (busy) { + LOG(INFO) << "Group frameset dropped: " << fs.timestamp; + return true; + } + busy = true; + scene->nextFrame(); + + // Send all frames to GPU, block until done? + // TODO: Allow non-block and keep frameset locked until later + if (!slave.isPaused()) scene->upload(fs); + + int64_t ts = fs.timestamp; + + ftl::pool.push([scene,splat,virt,&busy,ts,&slave](int id) { + // TODO: Release frameset here... + cudaSafeCall(cudaStreamSynchronize(scene->getIntegrationStream())); + + if (!slave.isPaused()) { + scene->integrate(); + scene->garbage(); + } + + // Don't render here... but update timestamp. + splat->render(ts, virt, scene->getIntegrationStream()); + busy = false; + }); + return true; + }); + + + /*int active = sources.size(); while (ftl::running) { if (active == 0) { LOG(INFO) << "Waiting for sources..."; @@ -144,7 +182,7 @@ static void run(ftl::Configurable *root) { // Start virtual camera rendering and previous frame compression stream->poll(); - } + }*/ } int main(int argc, char **argv) { diff --git a/applications/reconstruct/src/splat_render.cpp b/applications/reconstruct/src/splat_render.cpp index df9d99a9f3b48dd59c2c7dd0b508a6a32aaded53..cc52bb7bf33d8688a9fc1da9f3b0d07474aff811 100644 --- a/applications/reconstruct/src/splat_render.cpp +++ b/applications/reconstruct/src/splat_render.cpp @@ -13,7 +13,7 @@ Splatter::~Splatter() { } -void Splatter::render(ftl::rgbd::Source *src, cudaStream_t stream) { +void Splatter::render(int64_t ts, ftl::rgbd::Source *src, cudaStream_t stream) { if (!src->isReady()) return; const auto &camera = src->parameters(); @@ -71,7 +71,7 @@ void Splatter::render(ftl::rgbd::Source *src, cudaStream_t stream) { ftl::cuda::isosurface_point_image(scene_->getHashData(), depth1_, params, stream); //ftl::cuda::splat_points(depth1_, depth2_, params, stream); //ftl::cuda::dibr(depth2_, colour1_, scene_->cameraCount(), params, stream); - src->writeFrames(colour1_, depth2_, stream); + src->writeFrames(ts, colour1_, depth2_, stream); } else { ftl::cuda::clear_depth(depth1_, stream); ftl::cuda::clear_depth(depth3_, stream); @@ -90,17 +90,17 @@ void Splatter::render(ftl::rgbd::Source *src, cudaStream_t stream) { if (src->value("splatting", false)) { //ftl::cuda::splat_points(depth1_, colour1_, normal1_, depth2_, colour2_, params, stream); ftl::cuda::int_to_float(depth1_, depth2_, 1.0f / 1000.0f, stream); - src->writeFrames(colour1_, depth2_, stream); + src->writeFrames(ts, colour1_, depth2_, stream); } else { ftl::cuda::int_to_float(depth1_, depth2_, 1.0f / 1000.0f, stream); - src->writeFrames(colour1_, depth2_, stream); + src->writeFrames(ts, colour1_, depth2_, stream); } } else if (src->getChannel() == ftl::rgbd::kChanEnergy) { //ftl::cuda::int_to_float(depth1_, depth2_, 1.0f / 1000.0f, stream); //if (src->value("splatting", false)) { //ftl::cuda::splat_points(depth1_, colour1_, normal1_, depth2_, colour2_, params, stream); //ftl::cuda::int_to_float(depth1_, depth2_, 1.0f / 1000.0f, stream); - src->writeFrames(colour1_, depth2_, stream); + src->writeFrames(ts, colour1_, depth2_, stream); //} else { //ftl::cuda::int_to_float(depth1_, depth2_, 1.0f / 1000.0f, stream); // src->writeFrames(colour1_, depth2_, stream); @@ -114,14 +114,14 @@ void Splatter::render(ftl::rgbd::Source *src, cudaStream_t stream) { ftl::cuda::clear_depth(depth1_, stream); ftl::cuda::dibr(depth1_, colour1_, normal1_, depth2_, colour_tmp_, depth3_, scene_->cameraCount(), params, stream); - src->writeFrames(colour1_, colour2_, stream); + src->writeFrames(ts, colour1_, colour2_, stream); } else { if (src->value("splatting", false)) { //ftl::cuda::splat_points(depth1_, colour1_, normal1_, depth2_, colour2_, params, stream); - src->writeFrames(colour1_, depth2_, stream); + src->writeFrames(ts, colour1_, depth2_, stream); } else { ftl::cuda::int_to_float(depth1_, depth2_, 1.0f / 1000.0f, stream); - src->writeFrames(colour1_, depth2_, stream); + src->writeFrames(ts, colour1_, depth2_, stream); } } } diff --git a/applications/reconstruct/src/splat_render.hpp b/applications/reconstruct/src/splat_render.hpp index 7511e859ff6471dcba18d5aee5518f55045eef9b..828db11fad6bbb5e3aeeae0899bc15549fab1708 100644 --- a/applications/reconstruct/src/splat_render.hpp +++ b/applications/reconstruct/src/splat_render.hpp @@ -26,7 +26,7 @@ class Splatter { explicit Splatter(ftl::voxhash::SceneRep *scene); ~Splatter(); - void render(ftl::rgbd::Source *src, cudaStream_t stream=0); + void render(int64_t ts, ftl::rgbd::Source *src, cudaStream_t stream=0); void setOutputDevice(int); diff --git a/applications/reconstruct/src/voxel_scene.cpp b/applications/reconstruct/src/voxel_scene.cpp index 2dca323166936658a14870606a036baf80991b89..cba6e61e9e9134845e59d71dd6e69d4ad7ae8beb 100644 --- a/applications/reconstruct/src/voxel_scene.cpp +++ b/applications/reconstruct/src/voxel_scene.cpp @@ -195,6 +195,84 @@ int SceneRep::upload() { return active; } +int SceneRep::upload(ftl::rgbd::FrameSet &fs) { + int active = 0; + + for (size_t i=0; i<cameras_.size(); ++i) { + auto &cam = cameras_[i]; + + if (!cam.source->isReady()) { + cam.params.m_imageWidth = 0; + // TODO(Nick) : Free gpu allocs if was ready before + LOG(INFO) << "Source not ready: " << cam.source->getURI(); + continue; + } else { + auto in = cam.source; + + cam.params.fx = in->parameters().fx; + cam.params.fy = in->parameters().fy; + cam.params.mx = -in->parameters().cx; + cam.params.my = -in->parameters().cy; + + // Only now do we have camera parameters for allocations... + if (cam.params.m_imageWidth == 0) { + cam.params.m_imageWidth = in->parameters().width; + cam.params.m_imageHeight = in->parameters().height; + cam.params.m_sensorDepthWorldMax = in->parameters().maxDepth; + cam.params.m_sensorDepthWorldMin = in->parameters().minDepth; + cam.gpu.alloc(cam.params, true); + LOG(INFO) << "GPU Allocated camera " << i; + } + } + + cam.params.flags = m_frameCount; + } + + _updateCameraConstant(); + //cudaSafeCall(cudaDeviceSynchronize()); + + for (size_t i=0; i<cameras_.size(); ++i) { + auto &cam = cameras_[i]; + + // Get the RGB-Depth frame from input + Source *input = cam.source; + //Mat rgb, depth; + + // TODO(Nick) Direct GPU upload to save copy + //input->getFrames(rgb,depth); + + active += 1; + + //if (depth.cols == 0) continue; + + // Must be in RGBA for GPU + Mat rgbt, rgba; + cv::cvtColor(fs.channel1[i],rgbt, cv::COLOR_BGR2Lab); + cv::cvtColor(rgbt,rgba, cv::COLOR_BGR2BGRA); + + // Send to GPU and merge view into scene + //cam.gpu.updateParams(cam.params); + cam.gpu.updateData(fs.channel2[i], rgba, cam.stream); + + //setLastRigidTransform(input->getPose().cast<float>()); + + //make the rigid transform available on the GPU + //m_hashData.updateParams(m_hashParams, cv::cuda::StreamAccessor::getStream(cam.stream)); + + //if (i > 0) cudaSafeCall(cudaStreamSynchronize(cv::cuda::StreamAccessor::getStream(cameras_[i-1].stream))); + + //allocate all hash blocks which are corresponding to depth map entries + if (value("voxels", false)) _alloc(i, cv::cuda::StreamAccessor::getStream(cam.stream)); + + // Calculate normals + } + + // Must have finished all allocations and rendering before next integration + cudaSafeCall(cudaDeviceSynchronize()); + + return active; +} + void SceneRep::integrate() { /*for (size_t i=0; i<cameras_.size(); ++i) { auto &cam = cameras_[i]; diff --git a/components/common/cpp/CMakeLists.txt b/components/common/cpp/CMakeLists.txt index 60577c96040dae74e93ccfe6f8ee4ae0c7abc856..8759e81039afede246d4f7e783531da98a1bc473 100644 --- a/components/common/cpp/CMakeLists.txt +++ b/components/common/cpp/CMakeLists.txt @@ -6,6 +6,8 @@ set(COMMONSRC src/loguru.cpp src/opencv_to_pcl.cpp src/cuda_common.cpp + src/ctpl_stl.cpp + src/timer.cpp ) check_function_exists(uriParseSingleUriA HAVE_URIPARSESINGLE) diff --git a/components/common/cpp/include/ctpl_stl.h b/components/common/cpp/include/ctpl_stl.h index 82d8029850bad486e3e1acfcb9b3f51886e8f6d2..fac0de42a001711a7033dfc5142ff33a54323525 100644 --- a/components/common/cpp/include/ctpl_stl.h +++ b/components/common/cpp/include/ctpl_stl.h @@ -62,6 +62,10 @@ namespace ctpl { std::unique_lock<std::mutex> lock(this->mutex); return this->q.empty(); } + size_t size() { + std::unique_lock<std::mutex> lock(this->mutex); + return this->q.size(); + } private: std::queue<T> q; std::mutex mutex; @@ -87,6 +91,8 @@ namespace ctpl { int n_idle() { return this->nWaiting; } std::thread & get_thread(int i) { return *this->threads[i]; } + size_t q_size() { return this->q.size(); } + // change the number of threads in the pool // should be called from one thread, otherwise be careful to not interleave, also with this->stop() // nThreads must be >= 0 @@ -206,32 +212,7 @@ namespace ctpl { thread_pool & operator=(const thread_pool &);// = delete; thread_pool & operator=(thread_pool &&);// = delete; - void set_thread(int i) { - std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag - auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() { - std::atomic<bool> & _flag = *flag; - std::function<void(int id)> * _f; - bool isPop = this->q.pop(_f); - while (true) { - while (isPop) { // if there is anything in the queue - std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred - (*_f)(i); - if (_flag) - return; // the thread is wanted to stop, return even if the queue is not empty yet - else - isPop = this->q.pop(_f); - } - // the queue is empty here, wait for the next command - std::unique_lock<std::mutex> lock(this->mutex); - ++this->nWaiting; - this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; }); - --this->nWaiting; - if (!isPop) - return; // if the queue is empty and this->isDone == true or *flag then return - } - }; - this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() - } + void set_thread(int i); void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; } diff --git a/components/common/cpp/include/ftl/threads.hpp b/components/common/cpp/include/ftl/threads.hpp index 5de4f5f75b6ff8e7012bd45b5f2aca8a41bc7b63..b498ca9d00318835589092e3989ce227c3dc4866 100644 --- a/components/common/cpp/include/ftl/threads.hpp +++ b/components/common/cpp/include/ftl/threads.hpp @@ -8,7 +8,7 @@ #define POOL_SIZE 10 #define DEBUG_MUTEX -#define MUTEX_TIMEOUT 15 +#define MUTEX_TIMEOUT 20 #if defined DEBUG_MUTEX #include <loguru.hpp> diff --git a/components/common/cpp/include/ftl/timer.hpp b/components/common/cpp/include/ftl/timer.hpp new file mode 100644 index 0000000000000000000000000000000000000000..6af48cd1cd4b053e69586af88e90be3c4ded0c35 --- /dev/null +++ b/components/common/cpp/include/ftl/timer.hpp @@ -0,0 +1,101 @@ +#ifndef _FTL_COMMON_TIMER_HPP_ +#define _FTL_COMMON_TIMER_HPP_ + +#include <functional> + +namespace ftl { + +/** + * A single global timer mechanism to call functions with either high or low + * precision timing accuracy. This is used to provide accurate frame timing for + * capture and rendering sync and it is deliberately a namespace and not a class + * since the entire system should be operating at a single frame rate. It + * controls the entire pipelines behaviour. It uses timestamps that are + * multiples of the millisecond interval between frames. + */ +namespace timer { + +enum timerlevel_t { + kTimerHighPrecision = 0, + kTimerSwap, + kTimerMain, + kTimerIdle1, // 1ms jobs to optionally do whilst idle + kTimerIdle10, // 10ms jobs to optionally do whilst idle + kTimerMAXLEVEL +}; + +/** + * Represents a timer job for control purposes. Use to remove timer jobs in + * a destructor, for example. + */ +struct TimerHandle { + const int id = -1; + + /** + * Cancel the timer job. If currently executing it will block and wait for + * the job to complete. + */ + void cancel() const; + void pause() const; + void unpause() const; + + /** + * Do the timer job every N frames. + */ + void setMultiplier(unsigned int) const; + + /** + * Give the timer job a name for logging output. + */ + void setName(const std::string &) const; + + /** + * Allow copy assignment. + */ + TimerHandle &operator=(const TimerHandle &h) { const_cast<int&>(id) = h.id; return *this; } +}; + +/** + * Milliseconds between calls. + */ +void setInterval(int ms); + +int getInterval(); + +/** + * Add the specified number of milliseconds to the clock when generating + * timestamps. This is used to synchronise clocks on multiple machines as it + * influences the behaviour of the timer. + */ +void setClockAdjustment(int64_t ms); + +/** + * Add a timer callback with a given precision and ordering. The highest + * precision callbacks occur within 1ms of required time and should return + * almost immediately to prevent delays to other callbacks. Other precisions + * occur later and in separate thread jobs for each callback. If a callback + * fails to return before the next time step, it is skipped for that timestep. + * If all high precision callbacks together take more than 1ms to complete, a + * warning is produced. + */ +const TimerHandle add(timerlevel_t, const std::function<void(int64_t ts)> &); + +/** + * Initiate the timer and optionally block the current process. + */ +void start(bool block=false); + +/** + * Stop the timer after any current callbacks complete. Blocks until stopped. + */ +void stop(bool wait=true); + +/** + * Stop and clear all callbacks. Used for testing purposes. + */ +void reset(); + +} +} + +#endif // _FTL_COMMON_TIMER_HPP_ diff --git a/components/common/cpp/src/configuration.cpp b/components/common/cpp/src/configuration.cpp index f6666eaffceeec8a39dbd86a5266f99697dfa7ba..786c07e5f55750fde8a1f543b5549f1c1e980901 100644 --- a/components/common/cpp/src/configuration.cpp +++ b/components/common/cpp/src/configuration.cpp @@ -19,6 +19,7 @@ #include <ftl/configurable.hpp> #include <ftl/uri.hpp> #include <ftl/threads.hpp> +#include <ftl/timer.hpp> #include <fstream> #include <string> @@ -35,7 +36,7 @@ using ftl::is_file; using ftl::is_directory; using ftl::Configurable; -ctpl::thread_pool ftl::pool(POOL_SIZE); +ctpl::thread_pool ftl::pool(std::thread::hardware_concurrency()*2); // Store loaded configuration namespace ftl { @@ -515,6 +516,13 @@ Configurable *ftl::config::configure(int argc, char **argv, const std::string &r } }); + // Some global settings + ftl::timer::setInterval(rootcfg->value("fps",20)); + + int pool_size = rootcfg->value("thread_pool_factor", 2.0f)*std::thread::hardware_concurrency(); + if (pool_size != ftl::pool.size()) ftl::pool.resize(pool_size); + + //LOG(INFO) << "CONFIG: " << config["vision_default"]; //CHECK_EQ( &config, config_index["ftl://utu.fi"] ); diff --git a/components/common/cpp/src/ctpl_stl.cpp b/components/common/cpp/src/ctpl_stl.cpp new file mode 100644 index 0000000000000000000000000000000000000000..08308a975dfc9fd92b1a0a5d22336f6d68f82a97 --- /dev/null +++ b/components/common/cpp/src/ctpl_stl.cpp @@ -0,0 +1,59 @@ +#ifdef WIN32 +#include <Ws2tcpip.h> +#include <windows.h> +#endif + +#include <ctpl_stl.h> + +void ctpl::thread_pool::set_thread(int i) { + std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag + auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() { + std::atomic<bool> & _flag = *flag; + std::function<void(int id)> * _f; + bool isPop = this->q.pop(_f); + while (true) { + while (isPop) { // if there is anything in the queue + std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred + (*_f)(i); + if (_flag) + return; // the thread is wanted to stop, return even if the queue is not empty yet + else + isPop = this->q.pop(_f); + } + // the queue is empty here, wait for the next command + std::unique_lock<std::mutex> lock(this->mutex); + ++this->nWaiting; + this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; }); + --this->nWaiting; + if (!isPop) + return; // if the queue is empty and this->isDone == true or *flag then return + } + }; + this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() + + // For excess threads, ensure they only operate if needed. + /*if (i >= std::thread::hardware_concurrency()-1) { + #ifndef WIN32 + sched_param p; + p.sched_priority = sched_get_priority_min(SCHED_FIFO); + pthread_setschedparam(threads[i]->native_handle(), SCHED_FIFO, &p); + #endif + } else { + #ifndef WIN32 + sched_param p; + p.sched_priority = sched_get_priority_max(SCHED_FIFO); + pthread_setschedparam(threads[i]->native_handle(), SCHED_FIFO, &p); + #endif + }*/ + + /* + #ifdef WIN32 + SetThreadAffinityMask(this->threads[i]->native_handle(), 1 << i); + #else + cpu_set_t cpus; + CPU_ZERO(&cpus); + CPU_SET(i, &cpus); + pthread_setaffinity_np(this->threads[i]->native_handle(), sizeof(cpus), &cpus); + #endif + */ +} diff --git a/components/common/cpp/src/timer.cpp b/components/common/cpp/src/timer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5f96f9045aac2cf506f28e9440f787b5d3f6a8d9 --- /dev/null +++ b/components/common/cpp/src/timer.cpp @@ -0,0 +1,212 @@ +#include <ftl/timer.hpp> +#include <ftl/threads.hpp> +#include <loguru.hpp> + +#include <vector> +#include <list> +#include <chrono> + +#include <xmmintrin.h> + +using std::vector; +using std::list; +using std::function; +using std::chrono::time_point_cast; +using std::chrono::milliseconds; +using std::chrono::high_resolution_clock; +using std::this_thread::sleep_for; + +using namespace ftl::timer; + +static int64_t last_frame = 0; +static int64_t mspf = 50; +static int64_t clock_adjust = 0; +static bool active = false; +static std::atomic<int> active_jobs = 0; +static MUTEX mtx; +static int last_id = 0; + +struct TimerJob { + int id; + function<void(int64_t)> job; + volatile bool active; + bool paused; + int multiplier; + int countdown; + std::string name; +}; + +static list<TimerJob> jobs[kTimerMAXLEVEL]; + +static inline int64_t get_time() { + return time_point_cast<milliseconds>(high_resolution_clock::now()).time_since_epoch().count()+clock_adjust; +} + +static void waitTimePoint() { + auto start = high_resolution_clock::now(); + int64_t now = get_time(); + int64_t target = now / mspf; + int64_t msdelay = mspf - (now % mspf); + int64_t sincelast = now - last_frame*mspf; + + if (sincelast > mspf) LOG(WARNING) << "Frame " << "(" << (target-last_frame) << ") dropped by " << sincelast << "ms"; + + // Use sleep_for for larger delays + + //LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay; + while (msdelay >= 35 && sincelast != mspf) { + sleep_for(milliseconds(10)); + now = get_time(); + msdelay = mspf - (now % mspf); + } + + // Spin loop until exact grab time + //LOG(INFO) << "Spin Delay: " << (now / 40) << " = " << (40 - (now%40)); + + if (sincelast != mspf) { + target = now / mspf; + while ((now/mspf) == target) { + _mm_pause(); // SSE2 nano pause intrinsic + now = get_time(); + }; + } + last_frame = now/mspf; +} + +void ftl::timer::setInterval(int ms) { + mspf = ms; +} + +int ftl::timer::getInterval() { + return mspf; +} + +void ftl::timer::setClockAdjustment(int64_t ms) { + clock_adjust = ms; +} + +const TimerHandle ftl::timer::add(timerlevel_t l, const std::function<void(int64_t ts)> &f) { + if (l < 0 || l >= kTimerMAXLEVEL) return {-1}; + + UNIQUE_LOCK(mtx, lk); + int newid = last_id++; + jobs[l].push_back({newid, f, false, false, 0, 0, "NoName"}); + return {newid}; +} + +static void removeJob(int id) { + UNIQUE_LOCK(mtx, lk); + if (id < 0) return; + for (size_t j=0; j<kTimerMAXLEVEL; ++j) { + for (auto i=jobs[j].begin(); i!=jobs[j].end(); i++) { + if ((*i).id == id) { + while ((*i).active) { + sleep_for(milliseconds(10)); + } + + jobs[j].erase(i); + return; + } + } + } +} + +static void trigger_jobs() { + UNIQUE_LOCK(mtx, lk); + const int64_t ts = last_frame*mspf; + + // First do non-blocking high precision callbacks + const int64_t before = get_time(); + for (auto &j : jobs[kTimerHighPrecision]) { + j.job(ts); + } + const int64_t after = get_time(); + if (after - before > 0) LOG(WARNING) << "Precision jobs took too long (" << (after-before) << "ms)"; + + // Then do also non-blocking swap callbacks + for (auto &j : jobs[kTimerSwap]) { + j.job(ts); + } + + // Now use thread jobs to do more intensive callbacks + for (auto &j : jobs[kTimerMain]) { + if (j.active) { + DLOG(WARNING) << "Timer job too slow ... skipped for " << ts; + continue; + } + j.active = true; + active_jobs++; + ftl::pool.push([&j,ts](int id) { + j.job(ts); + j.active = false; + active_jobs--; + }); + } +} + +namespace ftl { + extern bool running; +} + +void ftl::timer::start(bool block) { + if (active) return; + active = true; + + if (block) { + active_jobs++; + while (ftl::running && active) { + waitTimePoint(); + trigger_jobs(); + } + active_jobs--; + } else { + ftl::pool.push([](int id) { + active_jobs++; + while (ftl::running && active) { + waitTimePoint(); + trigger_jobs(); + } + active_jobs--; + }); + } +} + +void ftl::timer::stop(bool wait) { + active = false; + + if (wait) { + // All callbacks must complete before returning. + while (active_jobs > 0) { + sleep_for(milliseconds(10)); + } + } +} + +void ftl::timer::reset() { + stop(true); + for (int i=0; i<ftl::timer::kTimerMAXLEVEL; i++) { + jobs[i].clear(); + } +} + +// ===== TimerHandle =========================================================== + +void ftl::timer::TimerHandle::cancel() const { + removeJob(id); +} + +void ftl::timer::TimerHandle::pause() const { + +} + +void ftl::timer::TimerHandle::unpause() const { + +} + +void ftl::timer::TimerHandle::setMultiplier(unsigned int N) const { + +} + +void ftl::timer::TimerHandle::setName(const std::string &name) const { + +} diff --git a/components/common/cpp/test/CMakeLists.txt b/components/common/cpp/test/CMakeLists.txt index e2a54abbf11716aa0077e21ba25e9ce58dfa521a..2ef8e4338ae4e5bb3b39f4eaf8c88ec343fb1b95 100644 --- a/components/common/cpp/test/CMakeLists.txt +++ b/components/common/cpp/test/CMakeLists.txt @@ -6,6 +6,7 @@ add_executable(configurable_unit ../src/config.cpp ../src/configuration.cpp ../src/loguru.cpp + ../src/ctpl_stl.cpp ./configurable_unit.cpp ) target_include_directories(configurable_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") @@ -24,9 +25,22 @@ target_link_libraries(uri_unit Threads::Threads ${OS_LIBS} ${URIPARSER_LIBRARIES}) +### Timer Unit ################################################################ +add_executable(timer_unit + ./tests.cpp + ../src/timer.cpp + ../src/config.cpp + ../src/loguru.cpp + ../src/ctpl_stl.cpp + ./timer_unit.cpp +) +target_include_directories(timer_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") +target_link_libraries(timer_unit + Threads::Threads ${OS_LIBS}) add_test(ConfigurableUnitTest configurable_unit) add_test(URIUnitTest uri_unit) +# add_test(TimerUnitTest timer_unit) CI server can't achieve this diff --git a/components/common/cpp/test/configurable_unit.cpp b/components/common/cpp/test/configurable_unit.cpp index 428208f7465a85421eb1c7864f384a9dcc3f7a98..af44e026a552279da53986dde38079e7695a889b 100644 --- a/components/common/cpp/test/configurable_unit.cpp +++ b/components/common/cpp/test/configurable_unit.cpp @@ -6,6 +6,12 @@ using ftl::Configurable; using std::string; +namespace ftl { +namespace timer { +void setInterval(int i) {} +} +} + SCENARIO( "Configurable::get()" ) { GIVEN( "a non-existent property" ) { // cppcheck-suppress constStatement diff --git a/components/common/cpp/test/timer_unit.cpp b/components/common/cpp/test/timer_unit.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6244341a17d3fdf91034c8fbd5c298f92bad4e49 --- /dev/null +++ b/components/common/cpp/test/timer_unit.cpp @@ -0,0 +1,176 @@ +#include "catch.hpp" +#define LOGURU_REPLACE_GLOG 1 +#include <loguru.hpp> +#include <ftl/timer.hpp> +#include <ftl/threads.hpp> + +ctpl::thread_pool ftl::pool(4); + +namespace ftl { + bool running = true; +} + +TEST_CASE( "Timer::add() High Precision Accuracy" ) { + SECTION( "An instantly returning callback" ) { + bool didrun = false; + + ftl::timer::reset(); + + auto rc = ftl::timer::add(ftl::timer::kTimerHighPrecision, [&didrun](int64_t ts) { + didrun = true; + ftl::timer::stop(false); + return; + }); + + REQUIRE( (rc.id >= 0) ); + + ftl::timer::start(true); + REQUIRE( didrun == true ); + } + + SECTION( "A slow returning callback" ) { + bool didrun = false; + + ftl::timer::reset(); + + auto rc = ftl::timer::add(ftl::timer::kTimerHighPrecision, [&didrun](int64_t ts) { + didrun = true; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + ftl::timer::stop(false); + return; + }); + + REQUIRE( (rc.id >= 0) ); + + ftl::timer::start(true); + REQUIRE( didrun == true ); + } + + SECTION( "Multiple callback" ) { + bool didrun[3] = {false}; + + ftl::timer::reset(); + + auto rc = ftl::timer::add(ftl::timer::kTimerHighPrecision, [&didrun](int64_t ts) { + didrun[0] = true; + ftl::timer::stop(false); + return; + }); + + REQUIRE( (rc.id >= 0) ); + + ftl::timer::add(ftl::timer::kTimerHighPrecision, [&didrun](int64_t ts) { + didrun[1] = true; + ftl::timer::stop(false); + return; + }); + + ftl::timer::add(ftl::timer::kTimerHighPrecision, [&didrun](int64_t ts) { + didrun[2] = true; + ftl::timer::stop(false); + return; + }); + + ftl::timer::start(true); + REQUIRE( didrun[0] == true ); + REQUIRE( didrun[1] == true ); + REQUIRE( didrun[2] == true ); + } +} + +TEST_CASE( "Timer::add() Main job" ) { + SECTION( "Quick main job" ) { + bool didrun = false; + + ftl::timer::reset(); + + auto rc = ftl::timer::add(ftl::timer::kTimerMain, [&didrun](int64_t ts) { + didrun = true; + ftl::timer::stop(false); + return; + }); + + REQUIRE( (rc.id >= 0) ); + + ftl::timer::start(true); + REQUIRE( didrun == true ); + } + + SECTION( "Slow main job" ) { + bool didrun = false; + + ftl::timer::reset(); + + auto rc = ftl::timer::add(ftl::timer::kTimerMain, [&didrun](int64_t ts) { + didrun = true; + std::this_thread::sleep_for(std::chrono::milliseconds(60)); + ftl::timer::stop(false); + return; + }); + + REQUIRE( (rc.id >= 0) ); + + ftl::timer::start(true); + REQUIRE( didrun == true ); + } + + SECTION( "Slow and fast main jobs" ) { + int job1 = 0; + int job2 = 0; + + ftl::timer::reset(); + + auto rc = ftl::timer::add(ftl::timer::kTimerMain, [&job1](int64_t ts) { + job1++; + std::this_thread::sleep_for(std::chrono::milliseconds(60)); + ftl::timer::stop(false); + return; + }); + + REQUIRE( (rc.id >= 0) ); + + ftl::timer::add(ftl::timer::kTimerMain, [&job2](int64_t ts) { + job2++; + return; + }); + + ftl::timer::start(true); + REQUIRE( (job1 == 1 && job2 == 2) ); + } +} + +TEST_CASE( "TimerHandle::cancel()" ) { + SECTION( "Invalid id" ) { + bool didjob = false; + ftl::timer::reset(); + + ftl::timer::add(ftl::timer::kTimerMain, [&didjob](int64_t ts) { + didjob = true; + ftl::timer::stop(false); + return; + }); + + // Fake Handle + ftl::timer::TimerHandle h = {44}; + h.cancel(); + ftl::timer::start(true); + REQUIRE( didjob ); + } + + SECTION( "Valid id" ) { + bool didjob = false; + ftl::timer::reset(); + + auto id = ftl::timer::add(ftl::timer::kTimerMain, [&didjob](int64_t ts) { + didjob = true; + ftl::timer::stop(false); + return; + }); + + id.cancel(); + ftl::timer::start(false); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + ftl::timer::stop(); + REQUIRE( !didjob ); + } +} diff --git a/components/net/cpp/include/ftl/net/common.hpp b/components/net/cpp/include/ftl/net/common.hpp index 78325d09b717d232751c21040be591f259fe7cf8..1ff4c5bbd73f5888b7141a7be988e162d7e279a6 100644 --- a/components/net/cpp/include/ftl/net/common.hpp +++ b/components/net/cpp/include/ftl/net/common.hpp @@ -3,6 +3,7 @@ #ifndef WIN32 #include <unistd.h> +#include <sys/poll.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index 438fc171425b1f1b2cab12317d05eed2f95d8372..f1dfbb4c7f574fc7ce20a2e0196e2234ac24ae3d 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -168,6 +168,9 @@ class Peer { */ template <typename... ARGS> int send(const std::string &name, ARGS... args); + + template <typename... ARGS> + int try_send(const std::string &name, ARGS... args); /** * Bind a function to an RPC call name. Note: if an overriding dispatcher @@ -240,7 +243,7 @@ class Peer { ftl::net::Universe *universe_; // Origin net universe // Receive buffers - bool is_waiting_; + volatile bool is_waiting_; msgpack::unpacker recv_buf_; RECURSIVE_MUTEX recv_mtx_; bool ws_read_header_; @@ -277,6 +280,32 @@ int Peer::send(const std::string &s, ARGS... args) { return rc; } +template <typename... ARGS> +int Peer::try_send(const std::string &s, ARGS... args) { +#ifdef WIN32 + WSAPOLLFD fds; + fds.fd = sock_; + fds.events = POLLOUT; + int rc = WSAPoll(&fds, 1, 0); +#else + pollfd fds; + fds.fd = sock_; + fds.events = POLLOUT; + int rc = poll(&fds, 1, 0); +#endif + if (rc == SOCKET_ERROR) return -1; + if (rc == 0) return 0; + + UNIQUE_LOCK(send_mtx_, lk); + // Leave a blank entry for websocket header + if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); + auto args_obj = std::make_tuple(args...); + auto call_obj = std::make_tuple(0,s,args_obj); + msgpack::pack(send_buf_, call_obj); + rc = _send(); + return (rc < 0) ? -1 : 1; +} + template <typename F> void Peer::bind(const std::string &name, F func) { disp_->bind(name, func, diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index d0e10034993b2de34757f6fea64b5abbb1c8456b..6394e26e36aed185c3b57af30e55eb97c5ef5876 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -141,6 +141,9 @@ class Universe : public ftl::Configurable { template <typename... ARGS> bool send(const UUID &pid, const std::string &name, ARGS... args); + template <typename... ARGS> + int try_send(const UUID &pid, const std::string &name, ARGS... args); + template <typename R, typename... ARGS> std::optional<R> findOne(const std::string &name, ARGS... args); @@ -185,6 +188,9 @@ class Universe : public ftl::Configurable { ftl::net::callback_t onError(const std::function<void(ftl::net::Peer*, const ftl::net::Error &)>&); void removeCallback(ftl::net::callback_t cbid); + + size_t getSendBufferSize() const { return send_size_; } + size_t getRecvBufferSize() const { return recv_size_; } private: void _run(); @@ -217,6 +223,13 @@ class Universe : public ftl::Configurable { std::list<ReconnectInfo> reconnects_; size_t phase_; std::list<ftl::net::Peer*> garbage_; + + size_t send_size_; + size_t recv_size_; + double periodic_time_; + int reconnect_attempts_; + + // NOTE: Must always be last member std::thread thread_; struct ConnHandler { @@ -385,6 +398,17 @@ bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) } +template <typename... ARGS> +int Universe::try_send(const ftl::UUID &pid, const std::string &name, ARGS... args) { + Peer *p = getPeer(pid); + if (p == nullptr) { + DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string(); + return false; + } + + return (p->isConnected()) ? p->try_send(name, args...) : -1; +} + /*template <typename... ARGS> void Universe::publish(const std::string &res, ARGS... args) { ftl::URI uri(res); diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 8adbeae8c42ed7b3bbd3215a561ae45aacffebf3..6738470fe013e533d769a1cb3c860537a0cfa4ff 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -18,6 +18,11 @@ #pragma comment(lib, "Rpcrt4.lib") #endif +#ifndef WIN32 +#include <sys/ioctl.h> +#include <linux/sockios.h> +#endif + #include <ftl/uri.hpp> #include <ftl/net/peer.hpp> #include <ftl/net/ws_internal.hpp> @@ -43,8 +48,6 @@ using ftl::net::Universe; using ftl::net::callback_t; using std::vector; -#define TCP_BUFFER_SIZE (1024*1024*10) - /*static std::string hexStr(const std::string &s) { const char *data = s.data(); @@ -64,7 +67,7 @@ ftl::UUID ftl::net::this_peer; //static ctpl::thread_pool pool(5); // TODO:(nick) Move to tcp_internal.cpp -static SOCKET tcpConnect(URI &uri) { +static SOCKET tcpConnect(URI &uri, int ssize, int rsize) { int rc; //sockaddr_in destAddr; @@ -87,10 +90,11 @@ static SOCKET tcpConnect(URI &uri) { int flags =1; if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; - int a = TCP_BUFFER_SIZE; + int a = rsize; if (setsockopt(csocket, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); } + a = ssize; if (setsockopt(csocket, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); } @@ -178,10 +182,11 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals int flags =1; if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; - int a = TCP_BUFFER_SIZE; + int a = u->getRecvBufferSize(); if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); } + a = u->getSendBufferSize(); if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); } @@ -234,12 +239,12 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), scheme_ = uri.getProtocol(); if (uri.getProtocol() == URI::SCHEME_TCP) { - sock_ = tcpConnect(uri); + sock_ = tcpConnect(uri, u->getSendBufferSize(), u->getRecvBufferSize()); if (sock_ != INVALID_SOCKET) status_ = kConnecting; else status_ = kReconnecting; } else if (uri.getProtocol() == URI::SCHEME_WS) { LOG(INFO) << "Websocket connect " << uri.getPath(); - sock_ = tcpConnect(uri); + sock_ = tcpConnect(uri, u->getSendBufferSize(), u->getRecvBufferSize()); if (sock_ != INVALID_SOCKET) { if (!ws_connect(sock_, uri)) { LOG(ERROR) << "Websocket connection failed"; @@ -302,7 +307,7 @@ bool Peer::reconnect() { LOG(INFO) << "Reconnecting to " << uri_ << " ..."; if (scheme_ == URI::SCHEME_TCP) { - sock_ = tcpConnect(uri); + sock_ = tcpConnect(uri, universe_->getSendBufferSize(), universe_->getRecvBufferSize()); if (sock_ != INVALID_SOCKET) { status_ = kConnecting; is_waiting_ = true; @@ -311,7 +316,7 @@ bool Peer::reconnect() { return false; } } else if (scheme_ == URI::SCHEME_WS) { - sock_ = tcpConnect(uri); + sock_ = tcpConnect(uri, universe_->getSendBufferSize(), universe_->getRecvBufferSize()); if (sock_ != INVALID_SOCKET) { if (!ws_connect(sock_, uri)) { return false; @@ -412,8 +417,12 @@ void Peer::error(int e) { void Peer::data() { { + //auto start = std::chrono::high_resolution_clock::now(); + //int64_t startts = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count(); UNIQUE_LOCK(recv_mtx_,lk); + //LOG(INFO) << "Pool size: " << ftl::pool.q_size(); + int rc=0; int c=0; @@ -426,8 +435,19 @@ void Peer::data() { } int cap = recv_buf_.buffer_capacity(); - rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), cap, 0); - //if (c > 0 && rc > 0) LOG(INFO) << "RECV: " << rc; + auto buf = recv_buf_.buffer(); + lk.unlock(); + + /*#ifndef WIN32 + int n; + unsigned int m = sizeof(n); + getsockopt(sock_,SOL_SOCKET,SO_RCVBUF,(void *)&n, &m); + + int pending; + ioctl(sock_, SIOCINQ, &pending); + if (pending > 100000) LOG(INFO) << "Buffer usage: " << float(pending) / float(n); + #endif*/ + rc = ftl::net::internal::recv(sock_, buf, cap, 0); if (rc >= cap-1) { LOG(WARNING) << "More than buffers worth of data received"; @@ -445,13 +465,23 @@ void Peer::data() { //if (rc == -1) break; ++c; + lk.lock(); recv_buf_.buffer_consumed(rc); //} while (rc > 0); - } - ftl::pool.push([this](int id) { - _data(); - }); + //auto end = std::chrono::high_resolution_clock::now(); + //int64_t endts = std::chrono::time_point_cast<std::chrono::milliseconds>(end).time_since_epoch().count(); + //if (endts - startts > 50) LOG(ERROR) << "Excessive delay"; + + if (is_waiting_) { + is_waiting_ = false; + lk.unlock(); + + ftl::pool.push([this](int id) { + _data(); + }); + } + } } bool Peer::_data() { @@ -467,10 +497,14 @@ bool Peer::_data() { ws_read_header_ = true; } - if (!recv_buf_.next(msg)) return false; - msgpack::object obj = msg.get(); + if (!recv_buf_.next(msg)) { + is_waiting_ = true; + return false; + } + lk.unlock(); + msgpack::object obj = msg.get(); ftl::pool.push([this](int id) { _data(); }); diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index c2b94acc1aeead0369601c7a65bf576c97d88832..a70e42141dfe096b1cb7f71cac27bbda2ed8671d 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -22,16 +22,38 @@ using std::optional; using ftl::config::json_t; using ftl::net::callback_t; +#define TCP_SEND_BUFFER_SIZE (512*1024) +#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1) + callback_t ftl::net::Universe::cbid__ = 0; -Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) { +Universe::Universe() : + Configurable(), + active_(true), + this_peer(ftl::net::this_peer), + phase_(0), + send_size_(TCP_SEND_BUFFER_SIZE), + recv_size_(TCP_RECEIVE_BUFFER_SIZE), + periodic_time_(1.0), + reconnect_attempts_(50), + thread_(Universe::__start, this) { _installBindings(); } Universe::Universe(nlohmann::json &config) : - Configurable(config), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) { + Configurable(config), + active_(true), + this_peer(ftl::net::this_peer), + phase_(0), + send_size_(value("tcp_send_buffer",TCP_SEND_BUFFER_SIZE)), + recv_size_(value("tcp_recv_buffer",TCP_RECEIVE_BUFFER_SIZE)), + periodic_time_(value("periodics", 1.0)), + reconnect_attempts_(value("reconnect_attempts",50)), + thread_(Universe::__start, this) { _installBindings(); + + LOG(INFO) << "SEND BUFFER SIZE = " << send_size_; } Universe::~Universe() { @@ -39,6 +61,11 @@ Universe::~Universe() { } void Universe::start() { + /*cpu_set_t cpus; + CPU_ZERO(&cpus); + CPU_SET(1, &cpus); + pthread_setaffinity_np(thread_.native_handle(), sizeof(cpus), &cpus);*/ + auto l = get<json_t>("listen"); if (l && (*l).is_array()) { @@ -181,7 +208,7 @@ void Universe::_cleanupPeers() { i = peers_.erase(i); if (p->status() == ftl::net::Peer::kReconnecting) { - reconnects_.push_back({50, 1.0f, p}); + reconnects_.push_back({reconnect_attempts_, 1.0f, p}); } else { //delete p; garbage_.push_back(p); @@ -245,7 +272,7 @@ void Universe::_run() { // Do periodics auto now = std::chrono::high_resolution_clock::now(); std::chrono::duration<double> elapsed = now - start; - if (elapsed.count() >= 1.0) { + if (elapsed.count() >= periodic_time_) { start = now; _periodic(); } diff --git a/components/net/cpp/test/peer_unit.cpp b/components/net/cpp/test/peer_unit.cpp index 09eb4bef9e70171b84e57f1e954e833aa9768566..c4ace71797063eef78bafee6e3784f0f5e4fa124 100644 --- a/components/net/cpp/test/peer_unit.cpp +++ b/components/net/cpp/test/peer_unit.cpp @@ -50,6 +50,9 @@ class Universe { callback_t onConnect(const std::function<void(Peer*)> &f) { return 0; } callback_t onDisconnect(const std::function<void(Peer*)> &f) { return 0; } + + size_t getSendBufferSize() const { return 10*1024; } + size_t getRecvBufferSize() const { return 10*1024; } }; } } diff --git a/components/rgbd-sources/include/ftl/rgbd/detail/source.hpp b/components/rgbd-sources/include/ftl/rgbd/detail/source.hpp index dcde8047ee902990dad4e03c550cd0f48a8480aa..25e19e39641b1643eb9ba8cdc3240a4a719834b3 100644 --- a/components/rgbd-sources/include/ftl/rgbd/detail/source.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/detail/source.hpp @@ -52,7 +52,12 @@ class Source { /** * Perform hardware data capture. */ - virtual bool capture() { return true; }; + virtual bool capture(int64_t ts)=0; + + /** + * Perform IO operation to get the data. + */ + virtual bool retrieve()=0; /** * Do any processing from previously captured frames... diff --git a/components/rgbd-sources/include/ftl/rgbd/group.hpp b/components/rgbd-sources/include/ftl/rgbd/group.hpp index 1fc92a772e4edb7ce21590429aa08c7f602ed2bd..8f1dd61a64ec9b9a520e35f4158ba881e6b96f0c 100644 --- a/components/rgbd-sources/include/ftl/rgbd/group.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/group.hpp @@ -2,6 +2,7 @@ #define _FTL_RGBD_GROUP_HPP_ #include <ftl/threads.hpp> +#include <ftl/timer.hpp> #include <opencv2/opencv.hpp> #include <vector> @@ -11,37 +12,109 @@ namespace rgbd { class Source; +/** + * Represents a set of synchronised frames, each with two channels. This is + * used to collect all frames from multiple computers that have the same + * timestamp. + */ struct FrameSet { - int64_t timestamp; - std::vector<Source*> sources; - std::vector<cv::Mat> channel1; - std::vector<cv::Mat> channel2; - int count; - unsigned int mask; + int64_t timestamp; // Millisecond timestamp of all frames + std::vector<Source*> sources; // All source objects involved. + std::vector<cv::Mat> channel1; // RGB + std::vector<cv::Mat> channel2; // Depth (usually) + volatile int count; // Number of valid frames + volatile unsigned int mask; // Mask of all sources that contributed + bool stale; // True if buffers have been invalidated + MUTEX mtx; }; +// Allows a latency of 20 frames maximum static const size_t kFrameBufferSize = 20; +/** + * Manage a group of RGB-D sources to obtain synchronised sets of frames from + * those sources. The Group class provides a synchronised callback mechanism + * that uses the high precision timer to ensure that it is called once per + * frame. The callback is not called if the frameset is not completed or + * is unavailable for some other reason. By default if the required frame is + * not available but there is an older frame available that has not been used + * then it will be used. This can be disabled. It is also possible to allow + * incomplete frames to be used, but this is disabled by default. + */ class Group { public: Group(); ~Group(); + /** + * Add a new source to the group. Framesets generated prior to the source + * being added will still be valid and will not contain a frame from this + * source. Sets generated after addition will require a frame from this + * source. + */ void addSource(ftl::rgbd::Source *); - void sync(int N=-1, int B=-1); - void sync(std::function<bool(const FrameSet &)>); + /** + * Add another group to this one. All sources in the other group are made + * available to this group in a synchronised way. There is additional + * overhead in supporting this as additional data copies are required + * internally for all the source frames. + */ + void addGroup(ftl::rgbd::Group *); - bool getFrames(FrameSet &, bool complete=false); + /** + * Provide a function to be called once per frame with a valid frameset + * at the specified latency. The function may not be called under certain + * conditions (missing frameset). No guarantee is made about the timing + * accuracy of the call, it should be close to the frame point. This + * function may or may not block. It is intended that the buffers within + * the frameset are swapped during the function call, meaning that the + * frameset data is no longer valid upon returning. + */ + void sync(std::function<bool(FrameSet &)>); + + /** @deprecated */ + //bool getFrames(FrameSet &, bool complete=false); + + /** To be deprecated in favour of ftl::timer::setInterval. + */ + void setFPS(int fps); + + /** + * Set the minimum number of frames latency. The latency is from the most + * recent frame obtained, meaning that the timestamp of the input frames is + * the reference point, this may already be several frames old. Latency + * does not correspond to actual current time. + */ + void setLatency(int frames) { latency_ = frames; } + + void stop() {} private: std::vector<FrameSet> framesets_; std::vector<Source*> sources_; size_t head_; - std::function<bool(const FrameSet &)> callback_; + std::function<bool(FrameSet &)> callback_; MUTEX mutex_; + int mspf_; + int latency_; + int64_t last_ts_; + std::atomic<int> jobs_; + volatile bool skip_; + ftl::timer::TimerHandle cap_id_; + ftl::timer::TimerHandle swap_id_; + ftl::timer::TimerHandle main_id_; + /* Insert a new frameset into the buffer, along with all intermediate + * framesets between the last in buffer and the new one. + */ void _addFrameset(int64_t timestamp); + + void _retrieveJob(ftl::rgbd::Source *); + void _computeJob(ftl::rgbd::Source *); + + /* Find a frameset with given latency in frames. */ + ftl::rgbd::FrameSet *_getFrameset(int f); }; } diff --git a/components/rgbd-sources/include/ftl/rgbd/source.hpp b/components/rgbd-sources/include/ftl/rgbd/source.hpp index f5e27cd882fe00c3c25dd97bcb9dbc69ea450d71..4ddf48cdb50e463619113e31edfe79d2ec7a6807 100644 --- a/components/rgbd-sources/include/ftl/rgbd/source.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/source.hpp @@ -69,9 +69,15 @@ class Source : public ftl::Configurable { channel_t getChannel() const { return channel_; } /** - * Perform the hardware or virtual frame grab operation. + * Perform the hardware or virtual frame grab operation. This should be + * fast and non-blocking. */ - bool capture(); + bool capture(int64_t ts); + + /** + * Download captured frame. This could be a blocking IO operation. + */ + bool retrieve(); /** * Between frames, do any required buffer swaps. @@ -90,7 +96,8 @@ class Source : public ftl::Configurable { * It is more optimal to perform capture and compute in parallel. */ bool grab(int N=-1, int B=-1) { - bool c = capture(); + bool c = capture(0); + c = c && retrieve(); swap(); return c && compute(N,B); } @@ -118,10 +125,10 @@ class Source : public ftl::Configurable { * their data provided from an external rendering class. This function only * works when there is no internal generator. */ - void writeFrames(const cv::Mat &rgb, const cv::Mat &depth); - void writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<uint> &depth, cudaStream_t stream); - void writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<float> &depth, cudaStream_t stream); - void writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<uchar4> &rgb2, cudaStream_t stream); + void writeFrames(int64_t ts, const cv::Mat &rgb, const cv::Mat &depth); + void writeFrames(int64_t ts, const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<uint> &depth, cudaStream_t stream); + void writeFrames(int64_t ts, const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<float> &depth, cudaStream_t stream); + void writeFrames(int64_t ts, const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<uchar4> &rgb2, cudaStream_t stream); int64_t timestamp() const { return timestamp_; } @@ -133,6 +140,8 @@ class Source : public ftl::Configurable { void uploadColour(cv::cuda::GpuMat&); void uploadDepth(cv::cuda::GpuMat&); + bool isVirtual() const { return impl_ == nullptr; } + /** * Get the source's camera intrinsics. */ @@ -198,8 +207,8 @@ class Source : public ftl::Configurable { SHARED_MUTEX &mutex() { return mutex_; } - std::function<void(int64_t, const cv::Mat &, const cv::Mat &)> &callback() { return callback_; } - void setCallback(std::function<void(int64_t, const cv::Mat &, const cv::Mat &)> cb); + std::function<void(int64_t, cv::Mat &, cv::Mat &)> &callback() { return callback_; } + void setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb); void removeCallback() { callback_ = nullptr; } @@ -217,7 +226,7 @@ class Source : public ftl::Configurable { channel_t channel_; cudaStream_t stream_; int64_t timestamp_; - std::function<void(int64_t, const cv::Mat &, const cv::Mat &)> callback_; + std::function<void(int64_t, cv::Mat &, cv::Mat &)> callback_; detail::Source *_createImplementation(); detail::Source *_createFileImpl(const ftl::URI &uri); diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index 82dba747857eb0c6fc7ab62e52990f3e2a30ae67..3ca7b8574c972e829873c492677253702399bd05 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -5,6 +5,7 @@ #include <ftl/configuration.hpp> #include <ftl/configurable.hpp> #include <ftl/rgbd/source.hpp> +#include <ftl/rgbd/group.hpp> #include <ftl/net/universe.hpp> #include <ftl/threads.hpp> #include <string> @@ -15,8 +16,8 @@ namespace ftl { namespace rgbd { -static const int kChunkDim = 4; -static constexpr int kChunkCount = kChunkDim * kChunkDim; +//static const int kChunkDim = 4; +//static constexpr int kChunkCount = kChunkDim * kChunkDim; namespace detail { @@ -106,6 +107,7 @@ class Streamer : public ftl::Configurable { Source *get(const std::string &uri); private: + ftl::rgbd::Group group_; std::map<std::string, detail::StreamSource*> sources_; //ctpl::thread_pool pool_; SHARED_MUTEX mutex_; @@ -120,20 +122,20 @@ class Streamer : public ftl::Configurable { ftl::UUID time_peer_; int64_t last_frame_; int64_t frame_no_; + size_t chunk_count_; + size_t chunk_dim_; int64_t mspf_; float actual_fps_; //int64_t last_dropped_; //int drop_count_; - void _schedule(); - void _schedule(detail::StreamSource *); - void _swap(detail::StreamSource *); + void _transmit(ftl::rgbd::FrameSet &); + void _cleanUp(); void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest); - void _encodeAndTransmit(detail::StreamSource *src, int chunk); + void _encodeAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk); void _encodeChannel1(const cv::Mat &in, std::vector<unsigned char> &out, unsigned int b); bool _encodeChannel2(const cv::Mat &in, std::vector<unsigned char> &out, ftl::rgbd::channel_t c, unsigned int b); - void _decideFrameRate(int64_t framesdropped, int64_t msremainder); }; } diff --git a/components/rgbd-sources/src/group.cpp b/components/rgbd-sources/src/group.cpp index b8dbd6369376cb54f28bb912729a02d48d24dbc2..1d700e10856192dca7eca6a2bc9544e3b887af16 100644 --- a/components/rgbd-sources/src/group.cpp +++ b/components/rgbd-sources/src/group.cpp @@ -1,19 +1,46 @@ #include <ftl/rgbd/group.hpp> #include <ftl/rgbd/source.hpp> +#include <ftl/timer.hpp> + +#include <chrono> using ftl::rgbd::Group; using ftl::rgbd::Source; using ftl::rgbd::kFrameBufferSize; using std::vector; +using std::chrono::milliseconds; +using std::this_thread::sleep_for; Group::Group() : framesets_(kFrameBufferSize), head_(0) { framesets_[0].timestamp = -1; + jobs_ = 0; + skip_ = false; + //setFPS(20); + + mspf_ = ftl::timer::getInterval(); + + setLatency(5); } Group::~Group() { for (auto s : sources_) { s->removeCallback(); } + + main_id_.cancel(); + swap_id_.cancel(); + cap_id_.cancel(); + + UNIQUE_LOCK(mutex_, lk); + // Make sure all jobs have finished + while (jobs_ > 0) { + sleep_for(milliseconds(10)); + } +} + +void Group::setFPS(int fps) { + mspf_ = 1000 / fps; + ftl::timer::setInterval(mspf_); } void Group::addSource(ftl::rgbd::Source *src) { @@ -21,8 +48,11 @@ void Group::addSource(ftl::rgbd::Source *src) { size_t ix = sources_.size(); sources_.push_back(src); - src->setCallback([this,ix](int64_t timestamp, const cv::Mat &rgb, const cv::Mat &depth) { + src->setCallback([this,ix,src](int64_t timestamp, cv::Mat &rgb, cv::Mat &depth) { if (timestamp == 0) return; + + //LOG(INFO) << "SRC CB: " << timestamp << " (" << framesets_[head_].timestamp << ")"; + UNIQUE_LOCK(mutex_, lk); if (timestamp > framesets_[head_].timestamp) { // Add new frameset @@ -37,80 +67,221 @@ void Group::addSource(ftl::rgbd::Source *src) { for (size_t i=0; i<kFrameBufferSize; ++i) { FrameSet &fs = framesets_[(head_+kFrameBufferSize-i) % kFrameBufferSize]; if (fs.timestamp == timestamp) { + lk.unlock(); + UNIQUE_LOCK(fs.mtx, lk2); + //LOG(INFO) << "Adding frame: " << ix << " for " << timestamp; - rgb.copyTo(fs.channel1[ix]); - depth.copyTo(fs.channel2[ix]); + // Ensure channels match source mat format + fs.channel1[ix].create(rgb.size(), rgb.type()); + fs.channel2[ix].create(depth.size(), depth.type()); + + cv::swap(rgb, fs.channel1[ix]); + cv::swap(depth, fs.channel2[ix]); + ++fs.count; fs.mask |= (1 << ix); + if (fs.count == sources_.size()) { + //LOG(INFO) << "COMPLETE SET: " << fs.timestamp; + } else { + //LOG(INFO) << "INCOMPLETE SET (" << ix << "): " << fs.timestamp; + } + if (callback_ && fs.count == sources_.size()) { - //LOG(INFO) << "DOING CALLBACK"; if (callback_(fs)) { - //sources_[ix]->grab(); - //LOG(INFO) << "GRAB"; + // TODO: Remove callback if returns false? } + + // Reset count to prevent multiple reads of these frames + fs.count = 0; } return; } } - LOG(WARNING) << "Frame timestamp not found in buffer"; + DLOG(WARNING) << "Frame timestamp not found in buffer"; }); } -// TODO: This should be a callback -// Callback returns true if it wishes to continue receiving frames. -void Group::sync(int N, int B) { - for (auto s : sources_) { - s->capture(); - s->swap(); - s->compute(N,B); +void Group::addGroup(Group *grp) { + +} + +void Group::_retrieveJob(ftl::rgbd::Source *src) { + try { + src->retrieve(); + } catch (std::exception &ex) { + LOG(ERROR) << "Exception when retrieving frame"; + LOG(ERROR) << ex.what(); + } + catch (...) { + LOG(ERROR) << "Unknown exception when retrieving frame"; } } -void Group::sync(std::function<bool(const ftl::rgbd::FrameSet &)> cb) { - callback_ = cb; - sync(-1,-1); +void Group::_computeJob(ftl::rgbd::Source *src) { + try { + src->compute(); + } catch (std::exception &ex) { + LOG(ERROR) << "Exception when computing frame"; + LOG(ERROR) << ex.what(); + } + catch (...) { + LOG(ERROR) << "Unknown exception when computing frame"; + } } -bool Group::getFrames(ftl::rgbd::FrameSet &fs, bool complete) { - // Use oldest frameset or search back until first complete set is found? - if (complete) { - UNIQUE_LOCK(mutex_, lk); - // Search backwards to find match - for (size_t i=0; i<kFrameBufferSize; ++i) { - FrameSet &f = framesets_[(head_+kFrameBufferSize-i) % kFrameBufferSize]; - if (f.count == sources_.size()) { - LOG(INFO) << "Complete set found"; - fs = f; // FIXME: This needs to move or copy safely... - return true; +void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) { + if (latency_ == 0) { + callback_ = cb; + } + + // 1. Capture camera frames with high precision + cap_id_ = ftl::timer::add(ftl::timer::kTimerHighPrecision, [this](int64_t ts) { + skip_ = jobs_ != 0; // Last frame not finished so skip all steps + + if (skip_) return; + + last_ts_ = ts; + for (auto s : sources_) { + s->capture(ts); + } + }); + + // 2. After capture, swap any internal source double buffers + swap_id_ = ftl::timer::add(ftl::timer::kTimerSwap, [this](int64_t ts) { + if (skip_) return; + for (auto s : sources_) { + s->swap(); + } + }); + + // 3. Issue IO retrieve ad compute jobs before finding a valid + // frame at required latency to pass to callback. + main_id_ = ftl::timer::add(ftl::timer::kTimerMain, [this,cb](int64_t ts) { + if (skip_) return; + jobs_++; + + for (auto s : sources_) { + jobs_ += 2; + + ftl::pool.push([this,s](int id) { + _retrieveJob(s); + --jobs_; + }); + ftl::pool.push([this,s](int id) { + _computeJob(s); + --jobs_; + }); + } + + // Find a previous frameset and specified latency and do the sync + // callback with that frameset. + if (latency_ > 0) { + ftl::rgbd::FrameSet *fs = nullptr; + + UNIQUE_LOCK(mutex_, lk); + fs = _getFrameset(latency_); + + if (fs) { + UNIQUE_LOCK(fs->mtx, lk2); + lk.unlock(); + cb(*fs); + + // The buffers are invalid after callback so mark stale + fs->stale = true; + } else { + //LOG(INFO) << "NO FRAME FOUND: " << last_ts_ - latency_*mspf_; } } - LOG(WARNING) << "No complete frame set found"; - return false; - } - return false; + jobs_--; + }); + + ftl::timer::start(true); +} + +//ftl::rgbd::FrameSet &Group::_getRelativeFrameset(int rel) { +// int idx = (rel < 0) ? (head_+kFrameBufferSize+rel)%kFrameBufferSize : (head_+rel)%kFrameBufferSize; +// return framesets_[idx]; +//} + +ftl::rgbd::FrameSet *Group::_getFrameset(int f) { + const int64_t lookfor = last_ts_-f*mspf_; + + for (size_t i=1; i<kFrameBufferSize; ++i) { + int idx = (head_+kFrameBufferSize-i)%kFrameBufferSize; + + if (framesets_[idx].timestamp == lookfor && framesets_[idx].count != sources_.size()) { + LOG(INFO) << "Required frame not complete (mask " << (framesets_[idx].timestamp) << ")"; + continue; + } + + if (framesets_[idx].stale) return nullptr; + + if (framesets_[idx].timestamp == lookfor && framesets_[idx].count == sources_.size()) { + //framesets_[idx].stale = false; + return &framesets_[idx]; + } else if (framesets_[idx].timestamp < lookfor && framesets_[idx].count == sources_.size()) { + //framesets_[idx].stale = true; + return &framesets_[idx]; + } + + } + return nullptr; } void Group::_addFrameset(int64_t timestamp) { - int count = (framesets_[head_].timestamp == -1) ? 1 : (timestamp - framesets_[head_].timestamp) / 40; - // Must make sure to also insert missing framesets - //LOG(INFO) << "Adding " << count << " framesets for " << timestamp << " head=" << framesets_[head_].timestamp; + int count = (framesets_[head_].timestamp == -1) ? 200 : (timestamp - framesets_[head_].timestamp) / mspf_; + //LOG(INFO) << "Massive timestamp difference: " << count; + + // Allow for massive timestamp changes (Windows clock adjust) + // Only add a single frameset for large changes + if (count < -int(kFrameBufferSize) || count >= kFrameBufferSize-1) { + head_ = (head_+1) % kFrameBufferSize; + + if (!framesets_[head_].mtx.try_lock()) { + LOG(ERROR) << "Frameset in use!!"; + return; + } + framesets_[head_].timestamp = timestamp; + framesets_[head_].count = 0; + framesets_[head_].mask = 0; + framesets_[head_].stale = false; + framesets_[head_].channel1.resize(sources_.size()); + framesets_[head_].channel2.resize(sources_.size()); + + if (framesets_[head_].sources.size() != sources_.size()) { + framesets_[head_].sources.clear(); + for (auto s : sources_) framesets_[head_].sources.push_back(s); + } + framesets_[head_].mtx.unlock(); + return; + } - //if (count > 10 || count < 1) return; + if (count < 1) return; + // Must make sure to also insert missing framesets for (int i=0; i<count; ++i) { - int64_t lt = (framesets_[head_].timestamp == -1) ? timestamp-40 : framesets_[head_].timestamp; + int64_t lt = (framesets_[head_].timestamp == -1) ? timestamp-mspf_ : framesets_[head_].timestamp; head_ = (head_+1) % kFrameBufferSize; - framesets_[head_].timestamp = lt+40; + + if (!framesets_[head_].mtx.try_lock()) { + LOG(ERROR) << "Frameset in use!!"; + break; + } + framesets_[head_].timestamp = lt+mspf_; framesets_[head_].count = 0; framesets_[head_].mask = 0; + framesets_[head_].stale = false; framesets_[head_].channel1.resize(sources_.size()); framesets_[head_].channel2.resize(sources_.size()); - framesets_[head_].sources.clear(); - for (auto s : sources_) framesets_[head_].sources.push_back(s); + if (framesets_[head_].sources.size() != sources_.size()) { + framesets_[head_].sources.clear(); + for (auto s : sources_) framesets_[head_].sources.push_back(s); + } + framesets_[head_].mtx.unlock(); } } diff --git a/components/rgbd-sources/src/image.hpp b/components/rgbd-sources/src/image.hpp index ccb6ede5c93ed32213a89cc65fe90ab5d1cfbc78..2e2391b7cb95b0c7b120d2992cf3e66c91ad3a1a 100644 --- a/components/rgbd-sources/src/image.hpp +++ b/components/rgbd-sources/src/image.hpp @@ -14,6 +14,8 @@ class ImageSource : public ftl::rgbd::detail::Source { } + bool capture(int64_t ts) { timestamp_ = ts; return true; } + bool retrieve() { return true; } bool compute(int n, int b) { return false; }; bool isReady() { return false; }; }; diff --git a/components/rgbd-sources/src/local.cpp b/components/rgbd-sources/src/local.cpp index cf5521352d4689798cfc12ada0787f27db252c89..03b80ff52f60c95d374cfee289c0b3880bb766bd 100644 --- a/components/rgbd-sources/src/local.cpp +++ b/components/rgbd-sources/src/local.cpp @@ -199,13 +199,7 @@ LocalSource::LocalSource(nlohmann::json &config, const string &vid) return true; }*/ -bool LocalSource::get(cv::cuda::GpuMat &l_out, cv::cuda::GpuMat &r_out, Calibrate *c, cv::cuda::Stream &stream) { - Mat l, r; - - // Use page locked memory - l = left_hm_.createMatHeader(); - r = right_hm_.createMatHeader(); - +bool LocalSource::grab() { if (!camera_a_) return false; if (!camera_a_->grab()) { @@ -228,6 +222,18 @@ bool LocalSource::get(cv::cuda::GpuMat &l_out, cv::cuda::GpuMat &r_out, Calibrat timestamp_ = timestamp; + return true; +} + +bool LocalSource::get(cv::cuda::GpuMat &l_out, cv::cuda::GpuMat &r_out, Calibrate *c, cv::cuda::Stream &stream) { + Mat l, r; + + // Use page locked memory + l = left_hm_.createMatHeader(); + r = right_hm_.createMatHeader(); + + if (!camera_a_) return false; + if (camera_b_ || !stereo_) { if (!camera_a_->retrieve(l)) { LOG(ERROR) << "Unable to read frame from camera A"; diff --git a/components/rgbd-sources/src/local.hpp b/components/rgbd-sources/src/local.hpp index 4fd71c5eede52acc480f8915c3370046aaccfd76..9f21f5cf79b86f7cdee9455052c55a829eaf0da7 100644 --- a/components/rgbd-sources/src/local.hpp +++ b/components/rgbd-sources/src/local.hpp @@ -24,6 +24,7 @@ class LocalSource : public Configurable { //bool left(cv::Mat &m); //bool right(cv::Mat &m); + bool grab(); bool get(cv::cuda::GpuMat &l, cv::cuda::GpuMat &r, Calibrate *c, cv::cuda::Stream &stream); unsigned int width() const { return width_; } diff --git a/components/rgbd-sources/src/middlebury_source.hpp b/components/rgbd-sources/src/middlebury_source.hpp index 25f58b05e7888ded29eed71454eb591fd30babed..21c7d1f1c07d96130be44156fd17666868ab739b 100644 --- a/components/rgbd-sources/src/middlebury_source.hpp +++ b/components/rgbd-sources/src/middlebury_source.hpp @@ -19,6 +19,8 @@ class MiddleburySource : public detail::Source { MiddleburySource(ftl::rgbd::Source *, const std::string &dir); ~MiddleburySource() {}; + bool capture(int64_t ts) { timestamp_ = ts; return true; } + bool retrieve() { return true; } bool compute(int n, int b); bool isReady() { return ready_; } diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index 68f46fc2b6879f437d6cbf76b56145af334e9e7a..3cab6c86d5e6cf45dc1a22a0643e2e58c4f3cd73 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -8,6 +8,8 @@ #include <ftl/rgbd/streamer.hpp> +using ftl::rgbd::detail::NetFrame; +using ftl::rgbd::detail::NetFrameQueue; using ftl::rgbd::detail::NetSource; using ftl::net::Universe; using ftl::UUID; @@ -18,6 +20,52 @@ using std::this_thread::sleep_for; using std::chrono::milliseconds; using std::tuple; +// ===== NetFrameQueue ========================================================= + +NetFrameQueue::NetFrameQueue(int size) : frames_(size) { + for (auto &f : frames_) f.timestamp = -1; +} + +NetFrameQueue::~NetFrameQueue() { + +} + +NetFrame &NetFrameQueue::getFrame(int64_t ts, const cv::Size &s, int c1type, int c2type) { + UNIQUE_LOCK(mtx_, lk); + + // Find matching timestamp + for (auto &f : frames_) { + if (f.timestamp == ts) return f; + } + + // No match so find an empty slot + for (auto &f : frames_) { + if (f.timestamp == -1) { + f.timestamp = ts; + f.chunk_count = 0; + f.channel1.create(s, c1type); + f.channel2.create(s, c2type); + return f; + } + } + + // No empty slot, so give a fatal error + for (auto &f : frames_) { + LOG(ERROR) << "Stale frame: " << f.timestamp << " - " << f.chunk_count; + } + LOG(FATAL) << "Net Frame Queue not large enough: " << ts; + // FIXME: (Nick) Could auto resize the queue. + return frames_[0]; // To avoid missing return error... +} + +void NetFrameQueue::freeFrame(NetFrame &f) { + UNIQUE_LOCK(mtx_, lk); + f.timestamp = -1; +} + + +// ===== NetSource ============================================================= + bool NetSource::_getCalibration(Universe &net, const UUID &peer, const string &src, ftl::rgbd::Camera &p, ftl::rgbd::channel_t chan) { try { while(true) { @@ -60,7 +108,7 @@ bool NetSource::_getCalibration(Universe &net, const UUID &peer, const string &s } NetSource::NetSource(ftl::rgbd::Source *host) - : ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1), current_frame_(0) { + : ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1), queue_(3) { gamma_ = host->value("gamma", 1.0f); temperature_ = host->value("temperature", 6500); @@ -99,6 +147,9 @@ NetSource::NetSource(ftl::rgbd::Source *host) default_quality_ = host->value("quality", 0); }); + chunks_dim_ = host->value("chunking",4); + chunk_count_ = chunks_dim_*chunks_dim_; + _updateURI(); h_ = host_->getNet()->onConnect([this](ftl::net::Peer *p) { @@ -116,10 +167,28 @@ NetSource::~NetSource() { host_->getNet()->removeCallback(h_); } -void NetSource::_recvChunk(int64_t frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { +void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { + // TODO: Don't allocate these each chunk cv::Mat tmp_rgb, tmp_depth; - if (!active_) return; + //if (!active_ || ts == 0) return; + + const ftl::rgbd::channel_t chan = host_->getChannel(); + NetFrame &frame = queue_.getFrame(ts, cv::Size(params_.width, params_.height), CV_8UC3, (isFloatChannel(chan) ? CV_32FC1 : CV_8UC3)); + + // Build chunk head + int cx = (chunk % chunks_dim_) * chunk_width_; + int cy = (chunk / chunks_dim_) * chunk_height_; + cv::Rect roi(cx,cy,chunk_width_,chunk_height_); + cv::Mat chunkRGB = frame.channel1(roi); + cv::Mat chunkDepth = frame.channel2(roi); + + auto start = std::chrono::high_resolution_clock::now(); + int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count(); + //LOG(INFO) << ts << " - Chunk Latency (" << chunk_count_ << ") = " << (now - ts) << " - " << ftl::pool.q_size(); + //if (now - ts > 160) { + // LOG(INFO) << "OLD PACKET: " << host_->getURI() << " (" << chunk << ") - " << ts << " (" << (now - ts) << ")"; + //} // Decode in temporary buffers to prevent long locks cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb); @@ -128,98 +197,63 @@ void NetSource::_recvChunk(int64_t frame, int chunk, bool delta, const vector<un // Apply colour correction to chunk ftl::rgbd::colourCorrection(tmp_rgb, gamma_, temperature_); - // Build chunk head - int cx = (chunk % chunks_dim_) * chunk_width_; - int cy = (chunk / chunks_dim_) * chunk_height_; - // Make certain last frame has finished decoding before swap - while (frame > current_frame_ && chunk_count_ < 16 && chunk_count_ > 0) { - std::this_thread::yield(); - //std::function<void(int)> j = ftl::pool.pop(); - //if ((bool)j) j(-1); - //else std::this_thread::yield(); + // TODO:(Nick) Decode directly into double buffer if no scaling + + // Original size so just copy + if (tmp_rgb.cols == chunkRGB.cols) { + tmp_rgb.copyTo(chunkRGB); + if (!tmp_depth.empty() && tmp_depth.type() == CV_16U && chunkDepth.type() == CV_32F) { + tmp_depth.convertTo(chunkDepth, CV_32FC1, 1.0f/1000.0f); //(16.0f*10.0f)); + } else if (!tmp_depth.empty() && tmp_depth.type() == CV_8UC3 && chunkDepth.type() == CV_8UC3) { + tmp_depth.copyTo(chunkDepth); + } else { + // Silent ignore? + } + // Downsized so needs a scale up + } else { + cv::resize(tmp_rgb, chunkRGB, chunkRGB.size()); + tmp_depth.convertTo(tmp_depth, CV_32FC1, 1.0f/1000.0f); + if (!tmp_depth.empty() && tmp_depth.type() == CV_16U && chunkDepth.type() == CV_32F) { + tmp_depth.convertTo(tmp_depth, CV_32FC1, 1.0f/1000.0f); //(16.0f*10.0f)); + cv::resize(tmp_depth, chunkDepth, chunkDepth.size()); + } else if (!tmp_depth.empty() && tmp_depth.type() == CV_8UC3 && chunkDepth.type() == CV_8UC3) { + cv::resize(tmp_depth, chunkDepth, chunkDepth.size()); + } else { + // Silent ignore? + } } - //{ - // A new frame has been started... finish the last one - if (frame > current_frame_) { - // Lock host to prevent grab - UNIQUE_LOCK(host_->mutex(),lk); - if (frame > current_frame_) { - { - // Lock to allow buffer swap - UNIQUE_LOCK(mutex_,lk2); - - // Swap the double buffers - cv::Mat tmp; - tmp = rgb_; - rgb_ = d_rgb_; - d_rgb_ = tmp; - tmp = depth_; - depth_ = d_depth_; - d_depth_ = tmp; - - chunk_count_ = 0; - timestamp_ = current_frame_; - current_frame_ = frame; - } + if (timestamp_ > 0 && frame.timestamp <= timestamp_) { + LOG(ERROR) << "BAD DUPLICATE FRAME - " << timestamp_ - frame.timestamp; + return; + } + + ++frame.chunk_count; - if (host_->callback()) { - //ftl::pool.push([this](id) { - // UNIQUE_LOCK(host_->mutex(),lk); - host_->callback()(timestamp_, rgb_, depth_); - //}); - } - } - } else if (frame < current_frame_) { - LOG(WARNING) << "Chunk dropped"; - if (chunk == 0) N_--; - return; - } - //} + if (frame.chunk_count > chunk_count_) LOG(FATAL) << "TOO MANY CHUNKS"; - // TODO:(Nick) Decode directly into double buffer if no scaling + if (frame.chunk_count == chunk_count_) { + UNIQUE_LOCK(frame.mtx, flk); + timestamp_ = frame.timestamp; - { - SHARED_LOCK(mutex_, lk); - - cv::Rect roi(cx,cy,chunk_width_,chunk_height_); - cv::Mat chunkRGB = d_rgb_(roi); - cv::Mat chunkDepth = d_depth_(roi); - - // Original size so just copy - if (tmp_rgb.cols == chunkRGB.cols) { - tmp_rgb.copyTo(chunkRGB); - if (!tmp_depth.empty() && tmp_depth.type() == CV_16U && chunkDepth.type() == CV_32F) { - tmp_depth.convertTo(chunkDepth, CV_32FC1, 1.0f/1000.0f); //(16.0f*10.0f)); - } else if (!tmp_depth.empty() && tmp_depth.type() == CV_8UC3 && chunkDepth.type() == CV_8UC3) { - tmp_depth.copyTo(chunkDepth); - } else { - // Silent ignore? - } - // Downsized so needs a scale up - } else { - cv::resize(tmp_rgb, chunkRGB, chunkRGB.size()); - tmp_depth.convertTo(tmp_depth, CV_32FC1, 1.0f/1000.0f); - if (!tmp_depth.empty() && tmp_depth.type() == CV_16U && chunkDepth.type() == CV_32F) { - tmp_depth.convertTo(tmp_depth, CV_32FC1, 1.0f/1000.0f); //(16.0f*10.0f)); - cv::resize(tmp_depth, chunkDepth, chunkDepth.size()); - } else if (!tmp_depth.empty() && tmp_depth.type() == CV_8UC3 && chunkDepth.type() == CV_8UC3) { - cv::resize(tmp_depth, chunkDepth, chunkDepth.size()); + if (frame.timestamp >= 0 && frame.chunk_count == chunk_count_) { + //LOG(INFO) << "Frame finished: " << frame.timestamp; + auto cb = host_->callback(); + if (cb) { + cb(frame.timestamp, frame.channel1, frame.channel2); } else { - // Silent ignore? + LOG(ERROR) << "NO FRAME CALLBACK"; } - } - } - { - - ++chunk_count_; - } + queue_.freeFrame(frame); - if (chunk == 0) { - UNIQUE_LOCK(host_->mutex(),lk); - N_--; + { + // Decrement expected frame counter + //UNIQUE_LOCK(host_->mutex(),lk); + N_--; + } + } } } @@ -277,14 +311,14 @@ void NetSource::_updateURI() { N_ = 0; // Update chunk details - chunks_dim_ = ftl::rgbd::kChunkDim; + //chunks_dim_ = ftl::rgbd::kChunkDim; chunk_width_ = params_.width / chunks_dim_; chunk_height_ = params_.height / chunks_dim_; - chunk_count_ = 0; + //chunk_count_ = 0; rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0)); depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f); - d_rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0)); - d_depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f); + //d_rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0)); + //d_depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f); uri_ = *uri; active_ = true; @@ -321,7 +355,7 @@ bool NetSource::compute(int n, int b) { } if (!host_->getNet()->send(peer_, "get_stream", - *host_->get<string>("uri"), N_, minB_, + *host_->get<string>("uri"), maxN_, minB_, host_->getNet()->id(), *host_->get<string>("uri"))) { active_ = false; } diff --git a/components/rgbd-sources/src/net.hpp b/components/rgbd-sources/src/net.hpp index b3411f12cce2f1e0236475ca75f667429ac5e35d..23206896c844cf4a29aa844771d66347010305d6 100644 --- a/components/rgbd-sources/src/net.hpp +++ b/components/rgbd-sources/src/net.hpp @@ -13,6 +13,34 @@ namespace detail { static const int kDefaultFrameCount = 30; +/** + * Buffers for a single frame as it is being received over the network. + */ +struct NetFrame { + cv::Mat channel1; + cv::Mat channel2; + volatile int64_t timestamp; + std::atomic<int> chunk_count; + MUTEX mtx; +}; + +/** + * Manage multiple frames with their timestamp as an identifier. Once a frame + * is completed it should be freed from the queue for reuse. + */ +class NetFrameQueue { + public: + explicit NetFrameQueue(int size=2); + ~NetFrameQueue(); + + NetFrame &getFrame(int64_t ts, const cv::Size &, int c1type, int c2type); + void freeFrame(NetFrame &); + + private: + std::vector<NetFrame> frames_; + MUTEX mtx_; +}; + /** * RGBD source from either a stereo video file with left + right images, or * direct from two camera devices. A variety of algorithms are included for @@ -24,6 +52,8 @@ class NetSource : public detail::Source { explicit NetSource(ftl::rgbd::Source *); ~NetSource(); + bool capture(int64_t ts) { return true; } + bool retrieve() { return true; } bool compute(int n, int b); bool isReady(); @@ -35,7 +65,7 @@ class NetSource : public detail::Source { private: bool has_calibration_; ftl::UUID peer_; - int N_; + std::atomic<int> N_; bool active_; std::string uri_; ftl::net::callback_t h_; @@ -49,13 +79,16 @@ class NetSource : public detail::Source { int minB_; int maxN_; int default_quality_; + int chunk_count_; ftl::rgbd::channel_t prev_chan_; - int64_t current_frame_; - std::atomic<int> chunk_count_; + //volatile int64_t current_frame_; + //std::atomic<int> chunk_count_; // Double buffering - cv::Mat d_depth_; - cv::Mat d_rgb_; + //cv::Mat d_depth_; + //cv::Mat d_rgb_; + + NetFrameQueue queue_; bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::Camera &p, ftl::rgbd::channel_t chan); void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); diff --git a/components/rgbd-sources/src/realsense_source.hpp b/components/rgbd-sources/src/realsense_source.hpp index 3b2d70c76c28608a3faa96af7c927bc0011804f3..4eb182032ffa67cbe121993ed981398a57e0470a 100644 --- a/components/rgbd-sources/src/realsense_source.hpp +++ b/components/rgbd-sources/src/realsense_source.hpp @@ -17,6 +17,8 @@ class RealsenseSource : public ftl::rgbd::detail::Source { RealsenseSource(ftl::rgbd::Source *host); ~RealsenseSource(); + bool capture(int64_t ts) { timestamp_ = ts; return true; } + bool retrieve() { return true; } bool compute(int n=-1, int b=-1); bool isReady(); diff --git a/components/rgbd-sources/src/snapshot.cpp b/components/rgbd-sources/src/snapshot.cpp index 372c6124d2e530685923823712e043309ec48b7a..8ce89fa080392eb755278f42d92c60a7b0fa0f1e 100644 --- a/components/rgbd-sources/src/snapshot.cpp +++ b/components/rgbd-sources/src/snapshot.cpp @@ -458,12 +458,10 @@ Snapshot SnapshotReader::readArchive() { else { LOG(INFO) << "Using old format snapshot archive"; - result.n_cameras = 1; + result.n_cameras = 0; result.n_frames = 1; - Mat &rgb = result.rgb_left.emplace_back().emplace_back(); - Mat &depth = result.depth_left.emplace_back().emplace_back(); - Mat &pose = result.extrinsic.emplace_back(); - Camera ¶ms = result.parameters.emplace_back(); + + std::map<string,int> cammap; for (auto const& [path, data] : files_) { if (path.rfind("-") == string::npos) { @@ -471,6 +469,25 @@ Snapshot SnapshotReader::readArchive() { continue; } string id = path.substr(0, path.find("-")); + int idx; + + if (cammap.find(id) == cammap.end()) { + cammap[id] = result.n_cameras; + idx = result.n_cameras; + result.n_cameras++; + + result.rgb_left.emplace_back().emplace_back(); + result.depth_left.emplace_back().emplace_back(); + result.extrinsic.emplace_back(); + result.parameters.emplace_back(); + } else { + idx = cammap[id]; + } + + Mat &rgb = result.rgb_left[idx][0]; + Mat &depth = result.depth_left[idx][0]; + Mat &pose = result.extrinsic[idx]; + Camera ¶ms = result.parameters[idx]; // TODO: verify that input is valid // TODO: check that earlier results are not overwritten (status) diff --git a/components/rgbd-sources/src/snapshot_source.cpp b/components/rgbd-sources/src/snapshot_source.cpp index 7eb5bc6740540a55e310d63560e028b18b4148fc..73db6b86c3861cd0aa1105f4d9ff7dcd9cbe9fe3 100644 --- a/components/rgbd-sources/src/snapshot_source.cpp +++ b/components/rgbd-sources/src/snapshot_source.cpp @@ -50,7 +50,9 @@ SnapshotSource::SnapshotSource(ftl::rgbd::Source *host, Snapshot &snapshot, cons LOG(INFO) << "POSE = " << pose; - host->setPose(pose); + host->setPose(pose); + + mspf_ = 1000 / host_->value("fps", 20); } bool SnapshotSource::compute(int n, int b) { @@ -60,6 +62,9 @@ bool SnapshotSource::compute(int n, int b) { snap_rgb_.copyTo(rgb_); snap_depth_.copyTo(depth_); + auto cb = host_->callback(); + if (cb) cb(timestamp_, rgb_, depth_); + frame_idx_ = (frame_idx_ + 1) % snapshot_.getFramesCount(); return true; diff --git a/components/rgbd-sources/src/snapshot_source.hpp b/components/rgbd-sources/src/snapshot_source.hpp index 41bfe690cc69cadc2dc43316c004f327d3f895f2..1200f460cfb0ed68cc7cc1573b66c9135c605404 100644 --- a/components/rgbd-sources/src/snapshot_source.hpp +++ b/components/rgbd-sources/src/snapshot_source.hpp @@ -17,6 +17,8 @@ class SnapshotSource : public detail::Source { SnapshotSource(ftl::rgbd::Source *, ftl::rgbd::Snapshot &snapshot, const std::string &id); ~SnapshotSource() {}; + bool capture(int64_t ts) { timestamp_ = ts; return true; } + bool retrieve() { return true; } bool compute(int n, int b); bool isReady() { return true; } @@ -29,6 +31,7 @@ class SnapshotSource : public detail::Source { cv::Mat snap_rgb_; cv::Mat snap_depth_; + int mspf_; }; } diff --git a/components/rgbd-sources/src/source.cpp b/components/rgbd-sources/src/source.cpp index c52c4100c11acd626bc7d7412cb301793351b727..10c9210f183c8f51d0b1e5424f0e4e441321cd86 100644 --- a/components/rgbd-sources/src/source.cpp +++ b/components/rgbd-sources/src/source.cpp @@ -165,6 +165,7 @@ ftl::rgbd::detail::Source *Source::_createDeviceImpl(const ftl::URI &uri) { } void Source::getFrames(cv::Mat &rgb, cv::Mat &depth) { + if (bool(callback_)) LOG(WARNING) << "Cannot use getFrames and callback in source"; SHARED_LOCK(mutex_,lk); rgb_.copyTo(rgb); depth_.copyTo(depth); @@ -231,8 +232,14 @@ void Source::reset() { impl_ = _createImplementation(); } -bool Source::capture() { - if (impl_) return impl_->capture(); +bool Source::capture(int64_t ts) { + //timestamp_ = ts; + if (impl_) return impl_->capture(ts); + else return true; +} + +bool Source::retrieve() { + if (impl_) return impl_->retrieve(); else return true; } @@ -265,17 +272,19 @@ bool Source::compute(int N, int B) { return false; } -void Source::writeFrames(const cv::Mat &rgb, const cv::Mat &depth) { +void Source::writeFrames(int64_t ts, const cv::Mat &rgb, const cv::Mat &depth) { if (!impl_) { UNIQUE_LOCK(mutex_,lk); rgb.copyTo(rgb_); depth.copyTo(depth_); + timestamp_ = ts; } } -void Source::writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<uint> &depth, cudaStream_t stream) { +void Source::writeFrames(int64_t ts, const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<uint> &depth, cudaStream_t stream) { if (!impl_) { UNIQUE_LOCK(mutex_,lk); + timestamp_ = ts; rgb_.create(rgb.height(), rgb.width(), CV_8UC4); cudaSafeCall(cudaMemcpy2DAsync(rgb_.data, rgb_.step, rgb.devicePtr(), rgb.pitch(), rgb_.cols * sizeof(uchar4), rgb_.rows, cudaMemcpyDeviceToHost, stream)); depth_.create(depth.height(), depth.width(), CV_32SC1); @@ -288,9 +297,10 @@ void Source::writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl: } } -void Source::writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<float> &depth, cudaStream_t stream) { +void Source::writeFrames(int64_t ts, const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<float> &depth, cudaStream_t stream) { if (!impl_) { UNIQUE_LOCK(mutex_,lk); + timestamp_ = ts; rgb.download(rgb_, stream); //rgb_.create(rgb.height(), rgb.width(), CV_8UC4); //cudaSafeCall(cudaMemcpy2DAsync(rgb_.data, rgb_.step, rgb.devicePtr(), rgb.pitch(), rgb_.cols * sizeof(uchar4), rgb_.rows, cudaMemcpyDeviceToHost, stream)); @@ -302,12 +312,15 @@ void Source::writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl: cudaSafeCall(cudaStreamSynchronize(stream_)); cv::cvtColor(rgb_,rgb_, cv::COLOR_BGRA2BGR); cv::cvtColor(rgb_,rgb_, cv::COLOR_Lab2BGR); + + if (callback_) callback_(timestamp_, rgb_, depth_); } } -void Source::writeFrames(const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<uchar4> &rgb2, cudaStream_t stream) { +void Source::writeFrames(int64_t ts, const ftl::cuda::TextureObject<uchar4> &rgb, const ftl::cuda::TextureObject<uchar4> &rgb2, cudaStream_t stream) { if (!impl_) { UNIQUE_LOCK(mutex_,lk); + timestamp_ = ts; rgb.download(rgb_, stream); //rgb_.create(rgb.height(), rgb.width(), CV_8UC4); //cudaSafeCall(cudaMemcpy2DAsync(rgb_.data, rgb_.step, rgb.devicePtr(), rgb.pitch(), rgb_.cols * sizeof(uchar4), rgb_.rows, cudaMemcpyDeviceToHost, stream)); @@ -332,7 +345,7 @@ bool Source::thumbnail(cv::Mat &t) { return true; } else if (impl_) { UNIQUE_LOCK(mutex_,lk); - impl_->capture(); + impl_->capture(0); impl_->swap(); impl_->compute(1, 9); impl_->rgb_.copyTo(rgb_); @@ -357,7 +370,7 @@ const ftl::rgbd::Camera Source::parameters(ftl::rgbd::channel_t chan) const { return (impl_) ? impl_->parameters(chan) : parameters(); } -void Source::setCallback(std::function<void(int64_t, const cv::Mat &, const cv::Mat &)> cb) { +void Source::setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb) { if (bool(callback_)) LOG(ERROR) << "Source already has a callback: " << getURI(); callback_ = cb; } diff --git a/components/rgbd-sources/src/stereovideo.cpp b/components/rgbd-sources/src/stereovideo.cpp index d83fdb194a426659b33166aef64d30d367653a0e..57e57fbbd245ee9e8f4e7038d4b24b85b4259145 100644 --- a/components/rgbd-sources/src/stereovideo.cpp +++ b/components/rgbd-sources/src/stereovideo.cpp @@ -153,7 +153,13 @@ static void disparityToDepth(const cv::cuda::GpuMat &disparity, cv::cuda::GpuMat cv::cuda::divide(val, disparity, depth, 1.0f / 1000.0f, -1, stream); } -bool StereoVideoSource::capture() { +bool StereoVideoSource::capture(int64_t ts) { + timestamp_ = ts; + lsrc_->grab(); + return true; +} + +bool StereoVideoSource::retrieve() { lsrc_->get(cap_left_, cap_right_, calib_, stream2_); stream2_.waitForCompletion(); return true; @@ -199,6 +205,9 @@ bool StereoVideoSource::compute(int n, int b) { left_.download(rgb_, stream_); stream_.waitForCompletion(); // TODO:(Nick) Move to getFrames } + + auto cb = host_->callback(); + if (cb) cb(timestamp_, rgb_, depth_); return true; } diff --git a/components/rgbd-sources/src/stereovideo.hpp b/components/rgbd-sources/src/stereovideo.hpp index 9d01da4cdebe0ca2bb2a1e91832ca016f7116645..88b71a068dfe9d596697afd0b93ded1cb7c034e1 100644 --- a/components/rgbd-sources/src/stereovideo.hpp +++ b/components/rgbd-sources/src/stereovideo.hpp @@ -27,7 +27,8 @@ class StereoVideoSource : public detail::Source { ~StereoVideoSource(); void swap(); - bool capture(); + bool capture(int64_t ts); + bool retrieve(); bool compute(int n, int b); bool isReady(); Camera parameters(channel_t chan); diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index e52e20e394bca414e7b2a08e8c628481cf2fe1a0..7a47a207f7e9d3b3f5d0617b39818adeceed2c42 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -1,4 +1,5 @@ #include <ftl/rgbd/streamer.hpp> +#include <ftl/timer.hpp> #include <vector> #include <optional> #include <thread> @@ -32,10 +33,18 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) net_ = net; time_peer_ = ftl::UUID(0); clock_adjust_ = 0; - mspf_ = 1000 / value("fps", 25); + mspf_ = ftl::timer::getInterval(); //1000 / value("fps", 20); //last_dropped_ = 0; //drop_count_ = 0; + chunk_dim_ = value("chunking",4); + chunk_count_ = chunk_dim_*chunk_dim_; + + LOG(INFO) << "CHUNK COUNT = " << chunk_count_; + + //group_.setFPS(value("fps", 20)); + group_.setLatency(10); + compress_level_ = value("compression", 1); net->bind("find_stream", [this](const std::string &uri) -> optional<UUID> { @@ -101,8 +110,6 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) net->bind("set_channel", [this](const string &uri, unsigned int chan) { SHARED_LOCK(mutex_,slk); - LOG(INFO) << "SET CHANNEL " << chan; - if (sources_.find(uri) != sources_.end()) { sources_[uri]->src->setChannel((ftl::rgbd::channel_t)chan); } @@ -139,6 +146,8 @@ void Streamer::add(Source *src) { s->frame = 0; s->clientCount = 0; sources_[src->getID()] = s; + + group_.addSource(src); } LOG(INFO) << "Streaming: " << src->getID(); @@ -172,6 +181,7 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID LOG(INFO) << "Clock adjustment: " << clock_adjust_; LOG(INFO) << "Latency: " << (latency / 2); LOG(INFO) << "Local: " << std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() << ", master: " << mastertime; + ftl::timer::setClockAdjustment(clock_adjust_); } } @@ -182,7 +192,7 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID for (auto &client : s->clients[rate]) { // If already listening, just update chunk counters if (client.peerid == peer) { - client.txmax = N * kChunkCount; + client.txmax = N * chunk_count_; client.txcount = 0; return; } @@ -193,7 +203,7 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID c.peerid = peer; c.uri = dest; c.txcount = 0; - c.txmax = N * kChunkCount; + c.txmax = N * chunk_count_; ++s->clientCount; } @@ -205,255 +215,117 @@ void Streamer::remove(const std::string &) { } -void Streamer::_decideFrameRate(int64_t framesdropped, int64_t msremainder) { - actual_fps_ = 1000.0f / (float)((framesdropped+1)*mspf_+(msremainder)); - LOG(INFO) << "Actual FPS = " << actual_fps_; - - /*if (framesdropped > 0) { - // If N consecutive frames are dropped, work out new rate - if (last_dropped_/mspf_ >= last_frame_/mspf_ - 2*framesdropped) drop_count_++; - else drop_count_ = 0; - - last_dropped_ = last_frame_+mspf_; - - if (drop_count_ >= ftl::rgbd::detail::kFrameDropLimit) { - drop_count_ = 0; - - const int64_t actualmspf = std::min((int64_t)1000, framesdropped*mspf_+(mspf_ - msremainder)); - - LOG(WARNING) << "Suggest FPS @ " << (1000 / actualmspf); - //mspf_ = actualmspf; - - // Also notify all clients of change - } - } else { - // Perhaps we can boost framerate? - const int64_t actualmspf = std::min((int64_t)1000, framesdropped*mspf_+(mspf_ - msremainder)); - LOG(INFO) << "Boost framerate: " << (1000 / actualmspf); - //mspf_ = actualmspf; - }*/ -} - void Streamer::stop() { - active_ = false; - wait(); -} - -void Streamer::poll() { - // Create frame jobs at correct FPS interval - _schedule(); + group_.stop(); } void Streamer::run(bool block) { - active_ = true; - if (block) { - while (ftl::running && active_) { - poll(); - } + group_.sync([this](FrameSet &fs) -> bool { + _transmit(fs); + return true; + }); } else { // Create thread job for frame ticking ftl::pool.push([this](int id) { - while (ftl::running && active_) { - poll(); - } + group_.sync([this](FrameSet &fs) -> bool { + _transmit(fs); + return true; + }); }); } } -// Must be called in source locked state or src.state must be atomic -void Streamer::_swap(StreamSource *src) { - if (src->jobs == 0) { +void Streamer::_cleanUp() { + for (auto &s : sources_) { + StreamSource *src = s.second; UNIQUE_LOCK(src->mutex,lk); - if (src->jobs == 0) { - for (unsigned int b=0; b<10; ++b) { - auto i = src->clients[b].begin(); - while (i != src->clients[b].end()) { - // Client request completed so remove from list - if ((*i).txcount >= (*i).txmax) { - LOG(INFO) << "Remove client: " << (*i).uri; - i = src->clients[b].erase(i); - --src->clientCount; - } else { - i++; - } + + for (unsigned int b=0; b<10; ++b) { + auto i = src->clients[b].begin(); + while (i != src->clients[b].end()) { + // Client request completed so remove from list + if ((*i).txcount >= (*i).txmax) { + LOG(INFO) << "Remove client: " << (*i).uri; + i = src->clients[b].erase(i); + --src->clientCount; + } else { + i++; } } - - src->src->swap(); - src->src->getFrames(src->rgb, src->depth); - - //if (!src->rgb.empty() && src->prev_depth.empty()) { - //src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0)); - //LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows; - //} - src->jobs = -1; - src->frame++; } } } -void Streamer::wait() { - // Do some jobs in this thread, might as well... - std::function<void(int)> j; - while ((bool)(j=ftl::pool.pop())) { - j(-1); - } +void Streamer::_transmit(ftl::rgbd::FrameSet &fs) { + // Prevent new clients during processing. + SHARED_LOCK(mutex_,slk); - // Wait for all jobs to complete before finishing frame - //UNIQUE_LOCK(job_mtx_, lk); - std::unique_lock<std::mutex> lk(job_mtx_); - job_cv_.wait_for(lk, std::chrono::seconds(20), [this]{ return jobs_ == 0; }); - if (jobs_ != 0) { - LOG(FATAL) << "Deadlock detected"; + if (fs.sources.size() != sources_.size()) { + LOG(ERROR) << "Incorrect number of sources in frameset: " << fs.sources.size() << " vs " << sources_.size(); + return; } - // Capture frame number? - frame_no_ = last_frame_; -} - -void Streamer::_schedule(StreamSource *src) { - if (src == nullptr || src->jobs > 0) return; - - jobs_ += 2 + kChunkCount; - src->jobs = 2 + kChunkCount; + int totalclients = 0; - // Grab / capture job - ftl::pool.push([this,src](int id) { - auto start = std::chrono::high_resolution_clock::now(); - int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_; - int64_t target = now / mspf_; - int64_t msdelay = mspf_ - (now % mspf_); + frame_no_ = fs.timestamp; - if (target != last_frame_ && msdelay != mspf_) LOG(WARNING) << "Frame " << "(" << (target-last_frame_) << ") dropped by " << (now%mspf_) << "ms"; + for (int j=0; j<fs.sources.size(); ++j) { + StreamSource *src = sources_[fs.sources[j]->getID()]; - // Use sleep_for for larger delays - - //LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay; - while (msdelay >= 20 && msdelay < mspf_) { - sleep_for(milliseconds(10)); - now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; - msdelay = mspf_ - (now % mspf_); - } - - // Spin loop until exact grab time - //LOG(INFO) << "Spin Delay: " << (now / 40) << " = " << (40 - (now%40)); - - if (msdelay != mspf_) { - target = now / mspf_; - while ((now/mspf_) == target) { - _mm_pause(); // SSE2 nano pause intrinsic - now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; - }; - } - last_frame_ = now/mspf_; - - try { - src->src->capture(); - } catch (std::exception &ex) { - LOG(ERROR) << "Exception when grabbing frame"; - LOG(ERROR) << ex.what(); - } - catch (...) { - LOG(ERROR) << "Unknown exception when grabbing frame"; - } - - //now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; - //if (now%40 > 0) LOG(INFO) << "Grab in: " << (now%40) << "ms"; + // Don't do any work in the following cases + if (!src) continue; + if (!fs.sources[j]->isReady()) continue; + if (src->clientCount == 0) continue; + if (fs.channel1[j].empty() || (fs.sources[j]->getChannel() != ftl::rgbd::kChanNone && fs.channel2[j].empty())) continue; - //std::chrono::duration<double> elapsed = - // std::chrono::high_resolution_clock::now() - start; - //LOG(INFO) << "Grab in " << elapsed.count() << "s"; + totalclients += src->clientCount; - src->jobs--; - _swap(src); - - // Mark job as finished - std::unique_lock<std::mutex> lk(job_mtx_); - --jobs_; - if (jobs_ == 0) job_cv_.notify_one(); - }); + // Create jobs for each chunk + for (int i=0; i<chunk_count_; ++i) { + // Add chunk job to thread pool + ftl::pool.push([this,&fs,j,i,src](int id) { + int chunk = i; + try { + _encodeAndTransmit(src, fs.channel1[j], fs.channel2[j], chunk); + } catch(...) { + LOG(ERROR) << "Encode Exception: " << chunk; + } - // Compute job - ftl::pool.push([this,src](int id) { - try { - src->src->compute(); - } catch (std::exception &ex) { - LOG(ERROR) << "Exception when computing frame"; - LOG(ERROR) << ex.what(); + //src->jobs--; + std::unique_lock<std::mutex> lk(job_mtx_); + --jobs_; + if (jobs_ == 0) job_cv_.notify_one(); + }); } - catch (...) { - LOG(ERROR) << "Unknown exception when computing frame"; - } - - src->jobs--; - _swap(src); - - // Mark job as finished - std::unique_lock<std::mutex> lk(job_mtx_); - --jobs_; - if (jobs_ == 0) job_cv_.notify_one(); - }); - - // Create jobs for each chunk - for (int i=0; i<kChunkCount; ++i) { - // Add chunk job to thread pool - ftl::pool.push([this,src,i](int id) { - int chunk = i; - try { - if (!src->rgb.empty() && (src->src->getChannel() == ftl::rgbd::kChanNone || !src->depth.empty())) { - _encodeAndTransmit(src, chunk); - } - } catch(...) { - LOG(ERROR) << "Encode Exception: " << chunk; - } - src->jobs--; - _swap(src); - std::unique_lock<std::mutex> lk(job_mtx_); - --jobs_; - if (jobs_ == 0) job_cv_.notify_one(); - }); + jobs_ += chunk_count_; } -} - -void Streamer::_schedule() { - wait(); - //std::mutex job_mtx; - //std::condition_variable job_cv; - //int jobs = 0; - - //auto now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_; - //LOG(INFO) << "Frame time = " << (now-(last_frame_*40)) << "ms"; - - // Prevent new clients during processing. - SHARED_LOCK(mutex_,slk); - - for (auto s : sources_) { - string uri = s.first; - // No point in doing work if no clients - if (s.second->clientCount == 0) { - continue; - } - - _schedule(s.second); + std::unique_lock<std::mutex> lk(job_mtx_); + job_cv_.wait_for(lk, std::chrono::seconds(20), [this]{ return jobs_ == 0; }); + if (jobs_ != 0) { + LOG(FATAL) << "Deadlock detected"; } + + // Go to sleep if no clients instead of spinning the cpu + if (totalclients == 0 || sources_.size() == 0) sleep_for(milliseconds(200)); + else _cleanUp(); } -void Streamer::_encodeAndTransmit(StreamSource *src, int chunk) { - bool hasChan2 = (!src->depth.empty() && src->src->getChannel() != ftl::rgbd::kChanNone); +void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const cv::Mat &depth, int chunk) { + bool hasChan2 = (!depth.empty() && src->src->getChannel() != ftl::rgbd::kChanNone); bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not - int chunk_width = src->rgb.cols / kChunkDim; - int chunk_height = src->rgb.rows / kChunkDim; + int chunk_width = rgb.cols / chunk_dim_; + int chunk_height = rgb.rows / chunk_dim_; // Build chunk heads - int cx = (chunk % kChunkDim) * chunk_width; - int cy = (chunk / kChunkDim) * chunk_height; + int cx = (chunk % chunk_dim_) * chunk_width; + int cy = (chunk / chunk_dim_) * chunk_height; cv::Rect roi(cx,cy,chunk_width,chunk_height); vector<unsigned char> rgb_buf; - cv::Mat chunkRGB = src->rgb(roi); + cv::Mat chunkRGB = rgb(roi); cv::Mat chunkDepth; //cv::Mat chunkDepthPrev = src->prev_depth(roi); @@ -461,7 +333,7 @@ void Streamer::_encodeAndTransmit(StreamSource *src, int chunk) { vector<unsigned char> d_buf; if (hasChan2) { - chunkDepth = src->depth(roi); + chunkDepth = depth(roi); if (chunkDepth.type() == CV_32F) chunkDepth.convertTo(d2, CV_16UC1, 1000); // 16*10); else d2 = chunkDepth; //if (delta) d3 = (d2 * 2) - chunkDepthPrev; @@ -485,8 +357,8 @@ void Streamer::_encodeAndTransmit(StreamSource *src, int chunk) { // TODO:(Nick) could reuse downscales } else { cv::Mat downrgb, downdepth; - cv::resize(chunkRGB, downrgb, cv::Size(bitrate_settings[b].width / kChunkDim, bitrate_settings[b].height / kChunkDim)); - if (hasChan2) cv::resize(d2, downdepth, cv::Size(bitrate_settings[b].width / kChunkDim, bitrate_settings[b].height / kChunkDim)); + cv::resize(chunkRGB, downrgb, cv::Size(bitrate_settings[b].width / chunk_dim_, bitrate_settings[b].height / chunk_dim_)); + if (hasChan2) cv::resize(d2, downdepth, cv::Size(bitrate_settings[b].width / chunk_dim_, bitrate_settings[b].height / chunk_dim_)); _encodeChannel1(downrgb, rgb_buf, b); if (hasChan2) _encodeChannel2(downdepth, d_buf, src->src->getChannel(), b); @@ -500,11 +372,12 @@ void Streamer::_encodeAndTransmit(StreamSource *src, int chunk) { while (c != src->clients[b].end()) { try { // TODO:(Nick) Send pose - if (!net_->send((*c).peerid, (*c).uri, frame_no_*mspf_, chunk, delta, rgb_buf, d_buf)) { + if (!net_->send((*c).peerid, (*c).uri, frame_no_, chunk, delta, rgb_buf, d_buf)) { // Send failed so mark as client stream completed (*c).txcount = (*c).txmax; } else { ++(*c).txcount; + //LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk; } } catch(...) { (*c).txcount = (*c).txmax; diff --git a/components/rgbd-sources/test/source_unit.cpp b/components/rgbd-sources/test/source_unit.cpp index 1bbb336aaad3a6c25f3e515b89c7e6142da5b852..4b14bee1887c956a2b646009b82d779e663b97d7 100644 --- a/components/rgbd-sources/test/source_unit.cpp +++ b/components/rgbd-sources/test/source_unit.cpp @@ -29,6 +29,8 @@ class ImageSource : public ftl::rgbd::detail::Source { last_type = "image"; } + bool capture(int64_t ts) { return true; } + bool retrieve() { return true; } bool compute(int n, int b) { return true; }; bool isReady() { return true; }; }; @@ -42,6 +44,8 @@ class StereoVideoSource : public ftl::rgbd::detail::Source { last_type = "video"; } + bool capture(int64_t ts) { return true; } + bool retrieve() { return true; } bool compute(int n, int b) { return true; }; bool isReady() { return true; }; }; @@ -52,6 +56,8 @@ class NetSource : public ftl::rgbd::detail::Source { last_type = "net"; } + bool capture(int64_t ts) { return true; } + bool retrieve() { return true; } bool compute(int n, int b) { return true; }; bool isReady() { return true; }; }; @@ -62,6 +68,8 @@ class SnapshotSource : public ftl::rgbd::detail::Source { last_type = "snapshot"; } + bool capture(int64_t ts) { return true; } + bool retrieve() { return true; } bool compute(int n, int b) { return true; }; bool isReady() { return true; }; }; @@ -72,6 +80,8 @@ class RealsenseSource : public ftl::rgbd::detail::Source { last_type = "realsense"; } + bool capture(int64_t ts) { return true; } + bool retrieve() { return true; } bool compute(int n, int b) { return true; }; bool isReady() { return true; }; }; @@ -82,6 +92,8 @@ class MiddleburySource : public ftl::rgbd::detail::Source { last_type = "middlebury"; } + bool capture(int64_t ts) { return true; } + bool retrieve() { return true; } bool compute(int n, int b) { return true; }; bool isReady() { return true; }; };