diff --git a/components/common/cpp/include/ftl/timer.hpp b/components/common/cpp/include/ftl/timer.hpp index ecb93ecf3410fd60c4655767a2678e2034202b97..bbfe33b7e42ea63bb2fbe4c107142222c4608f1d 100644 --- a/components/common/cpp/include/ftl/timer.hpp +++ b/components/common/cpp/include/ftl/timer.hpp @@ -80,7 +80,7 @@ void setClockAdjustment(int64_t ms); * If all high precision callbacks together take more than 1ms to complete, a * warning is produced. */ -const TimerHandle add(timerlevel_t, const std::function<void(int64_t ts)> &); +const TimerHandle add(timerlevel_t, const std::function<bool(int64_t ts)> &); /** * Initiate the timer and optionally block the current process. @@ -92,6 +92,8 @@ void start(bool block=false); */ void stop(bool wait=true); +size_t count(timerlevel_t); + /** * Stop and clear all callbacks. Used for testing purposes. */ diff --git a/components/common/cpp/src/timer.cpp b/components/common/cpp/src/timer.cpp index 6064c1f744225c0c1541d0a617d2dc6a5cf0ee33..e00a7a9d17c40568e5b8bba668cf6aed14bbf9bc 100644 --- a/components/common/cpp/src/timer.cpp +++ b/components/common/cpp/src/timer.cpp @@ -28,7 +28,7 @@ static int last_id = 0; struct TimerJob { int id; - function<void(int64_t)> job; + function<bool(int64_t)> job; volatile bool active; bool paused; int multiplier; @@ -66,8 +66,14 @@ static void waitTimePoint() { auto idle_job = jobs[kTimerIdle10].begin(); while (idle_job != jobs[kTimerIdle10].end() && msdelay >= 10 && sincelast != mspf) { (*idle_job).active = true; - (*idle_job).job(now); - (*idle_job++).active = false; + bool doremove = !(*idle_job).job(now); + + if (doremove) { + idle_job = jobs[kTimerIdle10].erase(idle_job); + LOG(INFO) << "Timer job removed"; + } else { + (*idle_job++).active = false; + } now = get_time(); msdelay = mspf - (now % mspf); } @@ -98,7 +104,7 @@ void ftl::timer::setClockAdjustment(int64_t ms) { clock_adjust += ms; } -const TimerHandle ftl::timer::add(timerlevel_t l, const std::function<void(int64_t ts)> &f) { +const TimerHandle ftl::timer::add(timerlevel_t l, const std::function<bool(int64_t ts)> &f) { if (l < 0 || l >= kTimerMAXLEVEL) return {-1}; UNIQUE_LOCK(mtx, lk); @@ -150,9 +156,10 @@ static void trigger_jobs() { j.active = true; active_jobs++; ftl::pool.push([&j,ts](int id) { - j.job(ts); + bool doremove = !j.job(ts); j.active = false; active_jobs--; + if (doremove) removeJob(j.id); }); } } @@ -195,6 +202,11 @@ void ftl::timer::stop(bool wait) { } } +size_t ftl::timer::count(ftl::timer::timerlevel_t l) { + if (l < 0 || l >= kTimerMAXLEVEL) return 0; + return jobs[l].size(); +} + void ftl::timer::reset() { stop(true); for (int i=0; i<ftl::timer::kTimerMAXLEVEL; i++) { diff --git a/components/common/cpp/test/timer_unit.cpp b/components/common/cpp/test/timer_unit.cpp index 6244341a17d3fdf91034c8fbd5c298f92bad4e49..c1d6776299969bb55fe3d9b9c9a91bd3d35430d6 100644 --- a/components/common/cpp/test/timer_unit.cpp +++ b/components/common/cpp/test/timer_unit.cpp @@ -19,7 +19,7 @@ TEST_CASE( "Timer::add() High Precision Accuracy" ) { auto rc = ftl::timer::add(ftl::timer::kTimerHighPrecision, [&didrun](int64_t ts) { didrun = true; ftl::timer::stop(false); - return; + return true; }); REQUIRE( (rc.id >= 0) ); @@ -37,7 +37,7 @@ TEST_CASE( "Timer::add() High Precision Accuracy" ) { didrun = true; std::this_thread::sleep_for(std::chrono::milliseconds(5)); ftl::timer::stop(false); - return; + return true; }); REQUIRE( (rc.id >= 0) ); @@ -54,7 +54,7 @@ TEST_CASE( "Timer::add() High Precision Accuracy" ) { auto rc = ftl::timer::add(ftl::timer::kTimerHighPrecision, [&didrun](int64_t ts) { didrun[0] = true; ftl::timer::stop(false); - return; + return true; }); REQUIRE( (rc.id >= 0) ); @@ -62,13 +62,13 @@ TEST_CASE( "Timer::add() High Precision Accuracy" ) { ftl::timer::add(ftl::timer::kTimerHighPrecision, [&didrun](int64_t ts) { didrun[1] = true; ftl::timer::stop(false); - return; + return true; }); ftl::timer::add(ftl::timer::kTimerHighPrecision, [&didrun](int64_t ts) { didrun[2] = true; ftl::timer::stop(false); - return; + return true; }); ftl::timer::start(true); @@ -78,6 +78,61 @@ TEST_CASE( "Timer::add() High Precision Accuracy" ) { } } +TEST_CASE( "Timer::add() Idle10 job" ) { + SECTION( "Quick idle job" ) { + bool didrun = false; + + ftl::timer::reset(); + + auto rc = ftl::timer::add(ftl::timer::kTimerIdle10, [&didrun](int64_t ts) { + didrun = true; + ftl::timer::stop(false); + return true; + }); + + REQUIRE( (rc.id >= 0) ); + + ftl::timer::start(true); + REQUIRE( didrun == true ); + } + + SECTION( "Slow idle job" ) { + bool didrun = false; + + ftl::timer::reset(); + + auto rc = ftl::timer::add(ftl::timer::kTimerIdle10, [&didrun](int64_t ts) { + didrun = true; + std::this_thread::sleep_for(std::chrono::milliseconds(60)); + ftl::timer::stop(false); + return true; + }); + + REQUIRE( (rc.id >= 0) ); + + ftl::timer::start(true); + REQUIRE( didrun == true ); + } + + SECTION( "Return remove idle job" ) { + bool didrun = false; + + ftl::timer::reset(); + + auto rc = ftl::timer::add(ftl::timer::kTimerIdle10, [&didrun](int64_t ts) { + didrun = true; + ftl::timer::stop(false); + return false; + }); + + REQUIRE( (rc.id >= 0) ); + + ftl::timer::start(true); + REQUIRE( didrun == true ); + REQUIRE( ftl::timer::count(ftl::timer::kTimerIdle10) == 0 ); + } +} + TEST_CASE( "Timer::add() Main job" ) { SECTION( "Quick main job" ) { bool didrun = false; @@ -87,7 +142,7 @@ TEST_CASE( "Timer::add() Main job" ) { auto rc = ftl::timer::add(ftl::timer::kTimerMain, [&didrun](int64_t ts) { didrun = true; ftl::timer::stop(false); - return; + return true; }); REQUIRE( (rc.id >= 0) ); @@ -105,7 +160,7 @@ TEST_CASE( "Timer::add() Main job" ) { didrun = true; std::this_thread::sleep_for(std::chrono::milliseconds(60)); ftl::timer::stop(false); - return; + return true; }); REQUIRE( (rc.id >= 0) ); @@ -124,19 +179,37 @@ TEST_CASE( "Timer::add() Main job" ) { job1++; std::this_thread::sleep_for(std::chrono::milliseconds(60)); ftl::timer::stop(false); - return; + return true; }); REQUIRE( (rc.id >= 0) ); ftl::timer::add(ftl::timer::kTimerMain, [&job2](int64_t ts) { job2++; - return; + return true; }); ftl::timer::start(true); REQUIRE( (job1 == 1 && job2 == 2) ); } + + SECTION( "Return remove main job" ) { + bool didrun = false; + + ftl::timer::reset(); + + auto rc = ftl::timer::add(ftl::timer::kTimerMain, [&didrun](int64_t ts) { + didrun = true; + ftl::timer::stop(false); + return false; + }); + + REQUIRE( (rc.id >= 0) ); + + ftl::timer::start(true); + REQUIRE( didrun == true ); + REQUIRE( ftl::timer::count(ftl::timer::kTimerMain) == 0 ); + } } TEST_CASE( "TimerHandle::cancel()" ) { @@ -147,7 +220,7 @@ TEST_CASE( "TimerHandle::cancel()" ) { ftl::timer::add(ftl::timer::kTimerMain, [&didjob](int64_t ts) { didjob = true; ftl::timer::stop(false); - return; + return true; }); // Fake Handle @@ -164,7 +237,7 @@ TEST_CASE( "TimerHandle::cancel()" ) { auto id = ftl::timer::add(ftl::timer::kTimerMain, [&didjob](int64_t ts) { didjob = true; ftl::timer::stop(false); - return; + return true; }); id.cancel(); diff --git a/components/rgbd-sources/src/group.cpp b/components/rgbd-sources/src/group.cpp index af8dd00e17ec7307dd5c8af25a3d490953d447fb..66bce0b89228dc34720589b67de466f7893394b4 100644 --- a/components/rgbd-sources/src/group.cpp +++ b/components/rgbd-sources/src/group.cpp @@ -140,26 +140,29 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) { cap_id_ = ftl::timer::add(ftl::timer::kTimerHighPrecision, [this](int64_t ts) { skip_ = jobs_ != 0; // Last frame not finished so skip all steps - if (skip_) return; + if (skip_) return true; last_ts_ = ts; for (auto s : sources_) { s->capture(ts); } + + return true; }); // 2. After capture, swap any internal source double buffers swap_id_ = ftl::timer::add(ftl::timer::kTimerSwap, [this](int64_t ts) { - if (skip_) return; + if (skip_) return true; for (auto s : sources_) { s->swap(); } + return true; }); // 3. Issue IO retrieve ad compute jobs before finding a valid // frame at required latency to pass to callback. main_id_ = ftl::timer::add(ftl::timer::kTimerMain, [this,cb](int64_t ts) { - if (skip_) return; + if (skip_) return true; jobs_++; for (auto s : sources_) { @@ -196,6 +199,7 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) { } jobs_--; + return true; }); ftl::timer::start(true); diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index d8cca195fe8fe4b45365ab43c80862d394fcd0e5..bf7304f72f0f7ace305d15da555f1813d1552d03 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -182,20 +182,23 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID try { mastertime = net_->call<int64_t>(peer, "__ping__"); } catch (...) { - //timer_job_.cancel(); - return; + // Reset time peer and remove timer + time_peer_ = ftl::UUID(0); + return false; } auto elapsed = std::chrono::high_resolution_clock::now() - start; int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(); auto clock_adjust = mastertime - (ftl::timer::get_time() + (latency/2)); - //if (clock_adjust > 0) { + if (clock_adjust > 0) { LOG(INFO) << "Clock adjustment: " << clock_adjust; //LOG(INFO) << "Latency: " << (latency / 2); //LOG(INFO) << "Local: " << std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() << ", master: " << mastertime; ftl::timer::setClockAdjustment(clock_adjust); - //} + } + + return true; }); } }