diff --git a/components/common/cpp/include/ftl/threads.hpp b/components/common/cpp/include/ftl/threads.hpp index c40ed095b5075afe0b4df7409c48ace45b8328cc..1e60a20077da11dedc9fc026f6f9932a2c2d9609 100644 --- a/components/common/cpp/include/ftl/threads.hpp +++ b/components/common/cpp/include/ftl/threads.hpp @@ -7,7 +7,7 @@ #define POOL_SIZE 10 -//#define DEBUG_MUTEX +#define DEBUG_MUTEX #define MUTEX_TIMEOUT 2 #if defined DEBUG_MUTEX diff --git a/components/common/cpp/include/ftl/transactional.hpp b/components/common/cpp/include/ftl/transactional.hpp index 298fa5d2a9187c01e3bf244c72e8593316cb3ecb..54659b1d35706f150e19e14eb0e5d5e4b1b54296 100644 --- a/components/common/cpp/include/ftl/transactional.hpp +++ b/components/common/cpp/include/ftl/transactional.hpp @@ -15,30 +15,33 @@ class Transactional { static_assert(std::is_pointer<T>::value, "Transactional type must be a pointer"); public: - Transactional(T obj, SHARED_MUTEX &mtx) : ref_(obj), mtx_(mtx), lock_(mtx_) {} - Transactional(T obj, SHARED_MUTEX &mtx, const std::function<void(T)> &complete) : ref_(obj), mtx_(mtx), lock_(mtx_), completed_(complete) {} + Transactional() : ref_(nullptr), mtx_(nullptr) {} + Transactional(T obj, SHARED_MUTEX *mtx) : ref_(obj), mtx_(mtx), lock_(*mtx_) {} + Transactional(T obj, SHARED_MUTEX *mtx, const std::function<void(T)> &complete) : ref_(obj), mtx_(mtx), lock_(*mtx_), completed_(complete) {} Transactional(const Transactional &)=delete; - Transactional()=delete; ~Transactional() { - lock_.unlock(); + if (lock_) lock_.unlock(); if (completed_) completed_(ref_); } - Transactional(Transactional &&t) : ref_(t.ref_), mtx_(t.mtx_), lock_(mtx_), completed_(t.completed_) { + Transactional(Transactional &&t) : ref_(t.ref_), mtx_(t.mtx_), lock_(*mtx_), completed_(t.completed_) { t.completed_ = nullptr; } Transactional &operator=(const Transactional &)=delete; - T operator->() { return ref_; } - const T operator->() const { return ref_; } + bool isValid() const { return ref_ != nullptr; } + operator bool() const { return ref_ != nullptr; } - T operator*() { return ref_; } - const T operator*() const { return ref_; } + T operator->() { if (!ref_) throw FTL_Error("Use of invalid frameset"); return ref_; } + const T operator->() const { if (!ref_) throw FTL_Error("Use of invalid frameset"); return ref_; } + + T operator*() { if (!ref_) throw FTL_Error("Use of invalid frameset"); return ref_; } + const T operator*() const { if (!ref_) throw FTL_Error("Use of invalid frameset"); return ref_; } private: T ref_; - SHARED_MUTEX &mtx_; + SHARED_MUTEX *mtx_; SHARED_LOCK_TYPE(SHARED_MUTEX) lock_; std::function<void(T)> completed_; }; diff --git a/components/common/cpp/src/timer.cpp b/components/common/cpp/src/timer.cpp index b249b0f6c6655846e50ced9946dd36ad943a5cd7..3e9764a918b963571be8a0c2ca4d7ba6e950165e 100644 --- a/components/common/cpp/src/timer.cpp +++ b/components/common/cpp/src/timer.cpp @@ -223,7 +223,12 @@ static void trigger_jobs() { // If last job in list then do in this thread if (active_jobs == static_cast<int>(jobs[kTimerMain].size())+1) { lk.unlock(); - bool doremove = !pj->job.trigger(ts); + bool doremove = true; + try { + doremove = !pj->job.trigger(ts); + } catch(const std::exception &e) { + LOG(ERROR) << "Exception in timer job: " << e.what(); + } pj->active = false; active_jobs--; if (doremove) removeJob(pj->id); @@ -231,7 +236,12 @@ static void trigger_jobs() { break; } else { ftl::pool.push([pj,ts](int id) { - bool doremove = !pj->job.trigger(ts); + bool doremove = true; + try { + doremove = !pj->job.trigger(ts); + } catch(const std::exception &e) { + LOG(ERROR) << "Exception in timer job: " << e.what(); + } pj->active = false; active_jobs--; if (doremove) removeJob(pj->id); diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 84633c833ca937f897a26dd8381f593c5a02a99b..a36bf29ceb84d9dd178cefcfe96287e2c16cc9a9 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -41,8 +41,8 @@ struct NetImplDetail { //#define TCP_SEND_BUFFER_SIZE (512*1024) //#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1) -#define TCP_SEND_BUFFER_SIZE (52*1024) // Was 256 -#define TCP_RECEIVE_BUFFER_SIZE (52*1024) // Was 256 +#define TCP_SEND_BUFFER_SIZE (128*1024) // Was 256 +#define TCP_RECEIVE_BUFFER_SIZE (128*1024) // Was 256 callback_t ftl::net::Universe::cbid__ = 0; diff --git a/components/streams/include/ftl/streams/builder.hpp b/components/streams/include/ftl/streams/builder.hpp index cd8f6b5faac472fba05471a2766173ac6b477e05..466399b4afb261562c4b6274eee1acf642b5b94f 100644 --- a/components/streams/include/ftl/streams/builder.hpp +++ b/components/streams/include/ftl/streams/builder.hpp @@ -153,6 +153,8 @@ class ForeignBuilder : public BaseBuilder { std::atomic<int> jobs_; volatile bool skip_; ftl::Handle main_id_; + size_t max_buffer_size_ = 16; + size_t completion_size_ = 8; std::string name_; diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp index 4a80bbdbc34bfc988a8b7069835d6e89bfc994e2..e279b70ccb3520e219aa7a5c2960701104ce9ba8 100644 --- a/components/streams/src/builder.cpp +++ b/components/streams/src/builder.cpp @@ -58,7 +58,7 @@ LockedFrameSet LocalBuilder::get(int64_t timestamp, size_t ix) { frameset_ = _allocate(timestamp); } - LockedFrameSet lfs(frameset_.get(), frameset_->smtx); + LockedFrameSet lfs(frameset_.get(), &frameset_->smtx); return lfs; } @@ -68,7 +68,7 @@ LockedFrameSet LocalBuilder::get(int64_t timestamp) { frameset_ = _allocate(timestamp); } - LockedFrameSet lfs(frameset_.get(), frameset_->smtx); + LockedFrameSet lfs(frameset_.get(), &frameset_->smtx); return lfs; } @@ -227,20 +227,27 @@ LockedFrameSet ForeignBuilder::get(int64_t timestamp) { UNIQUE_LOCK(mutex_, lk); auto fs = _get(timestamp); - LockedFrameSet lfs(fs.get(), fs->smtx, [this,fs](ftl::data::FrameSet *d) { - if (fs->isComplete()) { - if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE)) { - UNIQUE_LOCK(mutex_, lk); - _schedule(); + + if (fs) { + LockedFrameSet lfs(fs.get(), &fs->smtx, [this,fs](ftl::data::FrameSet *d) { + if (fs->isComplete()) { + if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE)) { + UNIQUE_LOCK(mutex_, lk); + _schedule(); + } } - } - }); - return lfs; + }); + return lfs; + } else { + return LockedFrameSet(); + } } std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_get(int64_t timestamp) { if (timestamp <= last_frame_) { - throw FTL_Error("Frameset already completed: " << timestamp << " (" << last_frame_ << ")"); + //throw FTL_Error("Frameset already completed: " << timestamp << " (" << last_frame_ << ")"); + LOG(ERROR) << "Frameset already completed: " << timestamp << " (" << last_frame_ << ")"; + return nullptr; } auto fs = _findFrameset(timestamp); @@ -253,9 +260,9 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_get(int64_t timestamp) { _schedule(); } - if (fs->test(ftl::data::FSFlag::STALE)) { + /*if (fs->test(ftl::data::FSFlag::STALE)) { throw FTL_Error("Frameset already completed"); - } + }*/ return fs; } @@ -265,10 +272,13 @@ LockedFrameSet ForeignBuilder::get(int64_t timestamp, size_t ix) { if (timestamp <= 0) throw FTL_Error("Invalid frame timestamp (" << timestamp << ")"); auto fs = _get(timestamp); - if (!fs) throw FTL_Error("No frameset for time " << timestamp); - LockedFrameSet lfs(fs.get(), fs->smtx); - return lfs; + if (fs) { + LockedFrameSet lfs(fs.get(), &fs->smtx); + return lfs; + } else { + return LockedFrameSet(); + } } else { if (timestamp <= 0 || ix >= 32) throw FTL_Error("Invalid frame timestamp or index (" << timestamp << ", " << ix << ")"); @@ -280,24 +290,28 @@ LockedFrameSet ForeignBuilder::get(int64_t timestamp, size_t ix) { auto fs = _get(timestamp); - if (ix >= fs->frames.size()) { - // FIXME: Check that no access to frames can occur without lock - UNIQUE_LOCK(fs->smtx, flk); - while (fs->frames.size() < size_) { - fs->frames.push_back(std::move(pool_->allocate(ftl::data::FrameID(fs->frameset(), + fs->frames.size()), fs->timestamp()))); + if (fs) { + if (ix >= fs->frames.size()) { + // FIXME: Check that no access to frames can occur without lock + UNIQUE_LOCK(fs->smtx, flk); + while (fs->frames.size() < size_) { + fs->frames.push_back(std::move(pool_->allocate(ftl::data::FrameID(fs->frameset(), + fs->frames.size()), fs->timestamp()))); + } } - } - LockedFrameSet lfs(fs.get(), fs->smtx, [this,fs](ftl::data::FrameSet *d) { - if (fs->isComplete()) { - if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE)) { - UNIQUE_LOCK(mutex_, lk); - _schedule(); + LockedFrameSet lfs(fs.get(), &fs->smtx, [this,fs](ftl::data::FrameSet *d) { + if (fs->isComplete()) { + if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE)) { + UNIQUE_LOCK(mutex_, lk); + _schedule(); + } } - } - }); + }); - return lfs; + return lfs; + } else { + return LockedFrameSet(); + } } } @@ -376,6 +390,12 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_getFrameset() { while (N-- > 0 && i != framesets_.end()) ++i; if (i != framesets_.end()) f = *i; } else { + // Force complete of old frame + if (framesets_.size() >= completion_size_) { + LOG(WARNING) << "Forced completion: " << framesets_.back()->timestamp(); + framesets_.back()->mask = 0xFF; + } + // Always choose oldest frameset when it completes if (framesets_.size() > 0 && framesets_.back()->isComplete()) f = framesets_.back(); } @@ -412,9 +432,10 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_getFrameset() { } std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_addFrameset(int64_t timestamp) { - if (framesets_.size() >= 32) { + if (framesets_.size() >= max_buffer_size_) { LOG(WARNING) << "Frameset buffer full, resetting: " << timestamp; framesets_.clear(); + //framesets_.pop_back(); } auto newf = std::make_shared<FrameSet>(pool_, ftl::data::FrameID(id_,255), timestamp, size_); diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index aee169927ace489075499ec5738fd2fa246c6da1..67945bbf463720c6b7dd53617f454fa2a49d3b07 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -44,7 +44,7 @@ Receiver::~Receiver() { void Receiver::loopback(ftl::data::Frame &f, ftl::codecs::Channel c) { auto &build = builder(f.frameset()); auto fs = build.get(f.timestamp(), f.source()); - fs->frames[f.source()].informChange(c, build.changeType(), f.getAnyMutable(c)); + if (fs) fs->frames[f.source()].informChange(c, build.changeType(), f.getAnyMutable(c)); } ftl::streams::BaseBuilder &Receiver::builder(uint32_t id) { @@ -135,41 +135,57 @@ Receiver::InternalAudioStates &Receiver::_getAudioFrame(const StreamPacket &spkt void Receiver::_processData(const StreamPacket &spkt, const Packet &pkt) { auto &build = builder(spkt.streamID); auto fs = build.get(spkt.timestamp, spkt.frame_number); - auto &f = (spkt.frame_number == 255) ? **fs : fs->frames[spkt.frame_number]; - // Remove LIVE capability if stream hints it is recorded - if (spkt.channel == Channel::Capabilities && (spkt.hint_capability & ftl::codecs::kStreamCap_Recorded)) { - std::any data; - ftl::data::decode_type<std::unordered_set<Capability>>(data, pkt.data); + if (fs) { + auto &f = (spkt.frame_number == 255) ? **fs : fs->frames[spkt.frame_number]; - auto &cap = *std::any_cast<std::unordered_set<Capability>>(&data); - if (cap.count(Capability::LIVE)) { - cap.erase(Capability::LIVE); + // Remove LIVE capability if stream hints it is recorded + if (spkt.channel == Channel::Capabilities && (spkt.hint_capability & ftl::codecs::kStreamCap_Recorded)) { + std::any data; + ftl::data::decode_type<std::unordered_set<Capability>>(data, pkt.data); + + auto &cap = *std::any_cast<std::unordered_set<Capability>>(&data); + if (cap.count(Capability::LIVE)) { + cap.erase(Capability::LIVE); + } + cap.emplace(Capability::STREAMED); + + f.informChange(spkt.channel, build.changeType(), data); + } else if (spkt.channel == Channel::Pose && pkt.codec == ftl::codecs::codec_t::POSE) { + // TODO: Remove this eventually, it allows old FTL files to work + std::any data; + auto &pose = data.emplace<Eigen::Matrix4d>(); + pose = Eigen::Map<Eigen::Matrix4d>((double*)pkt.data.data()); + f.informChange(spkt.channel, build.changeType(), data); + } else { + f.informChange(spkt.channel, build.changeType(), pkt); } - cap.emplace(Capability::STREAMED); - - f.informChange(spkt.channel, build.changeType(), data); - } else if (spkt.channel == Channel::Pose && pkt.codec == ftl::codecs::codec_t::POSE) { - // TODO: Remove this eventually, it allows old FTL files to work - std::any data; - auto &pose = data.emplace<Eigen::Matrix4d>(); - pose = Eigen::Map<Eigen::Matrix4d>((double*)pkt.data.data()); - f.informChange(spkt.channel, build.changeType(), data); - } else { - f.informChange(spkt.channel, build.changeType(), pkt); - } - if (spkt.channel == Channel::Calibration) { - const auto &calibration = std::get<0>(f.get<std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, int>>(Channel::Calibration)); - InternalVideoStates &ividstate = _getVideoFrame(spkt); - ividstate.width = calibration.width; - ividstate.height = calibration.height; - } + if (spkt.channel == Channel::Calibration) { + const auto &calibration = std::get<0>(f.get<std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, int>>(Channel::Calibration)); + InternalVideoStates &ividstate = _getVideoFrame(spkt); + ividstate.width = calibration.width; + ividstate.height = calibration.height; + } - // TODO: Adjust metadata also for recorded streams + // TODO: Adjust metadata also for recorded streams - fs->localTimestamp = spkt.localTimestamp; - _finishPacket(fs, spkt.frame_number); + fs->localTimestamp = spkt.localTimestamp; + _finishPacket(fs, spkt.frame_number); + + // Still need to get the calibration data even if frameset is lost. + } else if (spkt.channel == Channel::Calibration) { + //LOG(WARNING) << "Calibration being missed in data"; + InternalVideoStates &ividstate = _getVideoFrame(spkt); + std::any tany; + ftl::data::decode_type<std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, int>>(tany, pkt.data); + auto *cal = std::any_cast<std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, int>>(&tany); + if (cal) { + auto &calibration = std::get<0>(*cal); + ividstate.width = calibration.width; + ividstate.height = calibration.height; + } + } } ftl::audio::Decoder *Receiver::_createAudioDecoder(InternalAudioStates &frame, const ftl::codecs::Packet &pkt) { @@ -185,23 +201,28 @@ void Receiver::_processAudio(const StreamPacket &spkt, const Packet &pkt) { auto &build = builder(spkt.streamID); auto fs = build.get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1); - auto &frame = fs->frames[spkt.frame_number]; - auto &audiolist = frame.createChange<std::list<ftl::audio::Audio>>(spkt.channel, build.changeType(), pkt); - auto &audio = audiolist.emplace_back(); + if (fs) { + auto &frame = fs->frames[spkt.frame_number]; - ftl::audio::Decoder *dec = _createAudioDecoder(state, pkt); - if (!dec) { - LOG(ERROR) << "Could get an audio decoder"; - return; - } - if (!dec->decode(pkt, audio.data())) { - LOG(ERROR) << "Audio decode failed"; - return; - } + auto &audiolist = frame.createChange<std::list<ftl::audio::Audio>>(spkt.channel, build.changeType(), pkt); + auto &audio = audiolist.emplace_back(); - fs->localTimestamp = spkt.localTimestamp; - _finishPacket(fs, spkt.frame_number); + ftl::audio::Decoder *dec = _createAudioDecoder(state, pkt); + if (!dec) { + LOG(ERROR) << "Could get an audio decoder"; + return; + } + if (!dec->decode(pkt, audio.data())) { + LOG(ERROR) << "Audio decode failed"; + return; + } + + fs->localTimestamp = spkt.localTimestamp; + _finishPacket(fs, spkt.frame_number); + } else { + LOG(WARNING) << "Audio data being lost"; + } } namespace sgm { @@ -272,6 +293,11 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { } } + if (!fs) { + LOG(WARNING) << "Dropping a video frame"; + return; + } + auto cvstream = cv::cuda::StreamAccessor::wrapStream(decoder->stream()); // Mark a frameset as being partial @@ -319,7 +345,10 @@ void Receiver::_finishPacket(ftl::streams::LockedFrameSet &fs, size_t fix) { if (frame.packet_tx > 0 && frame.packet_tx == frame.packet_rx) { fs->completed(fix); - if (fs->isComplete()) timestamp_ = fs->timestamp(); + if (fs->isComplete()) { + //LOG(INFO) << "COMPLETE: " << fs->timestamp() << ", " << fix; + timestamp_ = fs->timestamp(); + } frame.packet_tx = 0; frame.packet_rx = 0; } @@ -330,8 +359,12 @@ void Receiver::processPackets(const StreamPacket &spkt, const Packet &pkt) { if (spkt.channel == Channel::EndFrame) { auto fs = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1); - fs->frames[spkt.frame_number].packet_tx = static_cast<int>(pkt.packet_count); - _finishPacket(fs, spkt.frame_number); + + if (fs) { + fs->frames[spkt.frame_number].packet_tx = static_cast<int>(pkt.packet_count); + //LOG(INFO) << "EXPECTED " << fs->frames[spkt.frame_number].packet_tx << " for " << int(spkt.frame_number); + _finishPacket(fs, spkt.frame_number); + } return; } @@ -340,15 +373,18 @@ void Receiver::processPackets(const StreamPacket &spkt, const Packet &pkt) { if (spkt.streamID < 255 && !(spkt.flags & ftl::codecs::kFlagRequest)) { // Get the frameset auto fs = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1); - const auto *cs = stream_; - const auto sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID()); - fs->localTimestamp = spkt.localTimestamp; + if (fs) { + const auto *cs = stream_; + const auto sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID()); - for (auto &frame : fs->frames) { - frame.markAvailable(spkt.channel); + fs->localTimestamp = spkt.localTimestamp; + + for (auto &frame : fs->frames) { + frame.markAvailable(spkt.channel); + } + _finishPacket(fs, spkt.frame_number); } - _finishPacket(fs, spkt.frame_number); } return; } diff --git a/components/streams/src/sender.cpp b/components/streams/src/sender.cpp index 5e153f82ee5d145148c5d63d0b61fed5efc6f735..ee79773d0093d46e379aa9c56195b49afa7b3b54 100644 --- a/components/streams/src/sender.cpp +++ b/components/streams/src/sender.cpp @@ -154,8 +154,19 @@ void Sender::_send(ftl::rgbd::FrameSet &fs, ftl::codecs::StreamPacket &spkt, con spkt.flags = ftl::codecs::kFlagCompleted; }*/ + if (spkt.frame_number == 255) LOG(WARNING) << "Bad frame number"; + if (spkt.frame_number == 255) ++fs.frames[0].packet_tx; - else if (spkt.frame_number < fs.frames.size()) ++fs.frames[spkt.frame_number].packet_tx; + else if (spkt.frame_number < fs.frames.size() && fs.frames[spkt.frame_number].source() == spkt.frame_number) ++fs.frames[spkt.frame_number].packet_tx; + else { + // Find the correct frame + for (auto &f : fs.frames) { + if (f.source() == spkt.frame_number) { + ++f.packet_tx; + break; + } + } + } stream_->post(spkt, pkt); } @@ -183,7 +194,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode pkt.codec = codec_t::Invalid; for (size_t i=0; i<fs.frames.size(); ++i) { - spkt.frame_number = i; + spkt.frame_number = fs.frames[i].source(); pkt.packet_count = static_cast<uint8_t>(fs.frames[i].packet_tx+1); // FIXME: 255 limit currently _send(fs, spkt, pkt); } @@ -268,7 +279,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode spkt.timestamp = fs.timestamp(); spkt.localTimestamp = fs.localTimestamp; spkt.streamID = fs.frameset(); //fs.id; - spkt.frame_number = i; + spkt.frame_number = frame.source(); spkt.channel = c; //spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0; @@ -494,7 +505,7 @@ void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset) spkt.timestamp = fs.timestamp(); spkt.localTimestamp = fs.localTimestamp; spkt.streamID = fs.frameset(); - spkt.frame_number = offset; + spkt.frame_number = fs.frames[offset].source(); spkt.channel = c; auto &tile = _getTile(fs.id(), cc); @@ -589,7 +600,7 @@ void Sender::_encodeAudioChannel(ftl::data::FrameSet &fs, Channel c, bool reset) spkt.timestamp = fs.timestamp(); spkt.localTimestamp = fs.localTimestamp; spkt.streamID = fs.frameset(); - spkt.frame_number = i; + spkt.frame_number = fs.frames[i].source(); spkt.channel = c; //spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0; @@ -630,7 +641,7 @@ void Sender::_encodeDataChannel(ftl::data::FrameSet &fs, Channel c, bool reset) spkt.timestamp = fs.timestamp(); spkt.localTimestamp = fs.localTimestamp; spkt.streamID = fs.frameset(); - spkt.frame_number = i++; + spkt.frame_number = f.source(); spkt.channel = c; //spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0; diff --git a/components/streams/test/recsend_unit.cpp b/components/streams/test/recsend_unit.cpp index 105b7f91b866404f4e0e4819c18150d9e25e079a..b1f0c9f1f33713d2f930daeb982639afda50e62b 100644 --- a/components/streams/test/recsend_unit.cpp +++ b/components/streams/test/recsend_unit.cpp @@ -115,6 +115,171 @@ TEST_CASE( "Send and receiver via encoding" ) { delete sender; } +TEST_CASE( "Multi-thread stability testing" ) { + json_t global = json_t{{"$id","ftl://test"}}; + ftl::config::configure(global); + + ftl::data::Pool pool(5,7); + + json_t rcfg = json_t{ + {"$id","ftl://test/1"} + }; + auto *receiver = ftl::create<Receiver>(rcfg, &pool); + + json_t scfg = json_t{ + {"$id","ftl://test/2"} + }; + auto *sender = ftl::create<Sender>(scfg); + + json_t cfg2 = json_t{ + {"$id","ftl://test/3"} + }; + + TestStream stream(cfg2); + + receiver->setStream(&stream); + receiver->set("frameset_buffer_size", 0); + sender->setStream(&stream); + sender->resetSender(); // FIXME: Why is this needed? + + ftl::timer::setInterval(20); + ftl::timer::start(false); + + SECTION("One frame, two channel") { + stream.select(0, {Channel::Colour}, true); + + auto h1 = pool.onFlushSet([sender](ftl::data::FrameSet &fs, ftl::codecs::Channel c) { + if (!fs.test(ftl::data::FSFlag::AUTO_SEND)) return true; + + //LOG(INFO) << "FLUSH: " << fs.timestamp() << ", " << int(c); + sender->post(fs, c); + return true; + }); + + int count = 0; + ftl::data::FrameSetPtr result = nullptr; + auto h = receiver->onFrameSet([&count,&result](const ftl::data::FrameSetPtr &fs) { + LOG(INFO) << "FS RECV: " << fs->timestamp(); + count++; + if (result) REQUIRE( result->timestamp() <= fs->timestamp()-20 ); + REQUIRE( fs->frames.size() == 1 ); + REQUIRE( fs->frames[0].hasChannel(Channel::Colour) ); + result = fs; + return true; + }); + + auto h2 = ftl::timer::add(ftl::timer::timerlevel_t::kTimerMain, [&pool](int64_t ts) { + Frame f = pool.allocate(ftl::data::FrameID(0,0), ts); + f.store(); + auto &mat = f.create<cv::cuda::GpuMat>(Channel::Colour); + mat.create(480, 640, CV_8UC4); + mat.setTo(cv::Scalar(0,0,0,0)); + + auto &calib = f.cast<ftl::rgbd::Frame>().setLeft(); + calib.width = 640; + calib.height = 480; + + auto fsptr = FrameSet::fromFrame(f); + FrameSet &fs = *fsptr; + fs.set(ftl::data::FSFlag::AUTO_SEND); + fsptr->flush(Channel::Calibration); + ftl::pool.push([fsptr](int id) { fsptr->flush(Channel::Colour); }); + return true; + }); + + int i=1000; + while (i-- > 0 && count < 100) std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + REQUIRE( count >= 100 ); + + } + + SECTION("Two frame, two channel") { + stream.select(0, {Channel::Colour}, true); + + auto h1 = pool.onFlushSet([sender](ftl::data::FrameSet &fs, ftl::codecs::Channel c) { + if (!fs.test(ftl::data::FSFlag::AUTO_SEND)) return true; + + //LOG(INFO) << "FLUSH: " << fs.timestamp() << ", " << int(c) << ", " << fs.frames[0].source(); + sender->post(fs, c); + return true; + }); + + int count = 0; + ftl::data::FrameSetPtr result = nullptr; + auto h = receiver->onFrameSet([&count,&result](const ftl::data::FrameSetPtr &fs) { + LOG(INFO) << "FS RECV: " << fs->timestamp(); + count++; + if (result) { + REQUIRE( result->timestamp() <= fs->timestamp()-20 ); + //REQUIRE( fs->frames.size() == 2 ); + REQUIRE( fs->isComplete() ); + REQUIRE( fs->frames[0].hasChannel(Channel::Colour) ); + if (fs->frames.size() > 1) REQUIRE( fs->frames[1].hasChannel(Channel::Colour) ); + } + result = fs; + return true; + }); + + ftl::data::Pool pool2(5,7); + + auto h2 = ftl::timer::add(ftl::timer::timerlevel_t::kTimerMain, [&pool,&pool2](int64_t ts) { + ftl::pool.push([&pool, ts](int id) { + Frame f = pool.allocate(ftl::data::FrameID(0,0), ts); + f.store(); + auto &mat = f.create<cv::cuda::GpuMat>(Channel::Colour); + mat.create(480, 640, CV_8UC4); + mat.setTo(cv::Scalar(0,0,0,0)); + + auto &calib = f.cast<ftl::rgbd::Frame>().setLeft(); + calib.width = 640; + calib.height = 480; + + auto fsptr = FrameSet::fromFrame(f); + FrameSet &fs = *fsptr; + fs.set(ftl::data::FSFlag::AUTO_SEND); + fsptr->flush(Channel::Calibration); + ftl::pool.push([fsptr](int id) { fsptr->flush(Channel::Colour); }); + }); + + ftl::pool.push([&pool, ts](int id) { + Frame f = pool.allocate(ftl::data::FrameID(0,1), ts); + f.store(); + auto &mat = f.create<cv::cuda::GpuMat>(Channel::Colour); + mat.create(480, 640, CV_8UC4); + mat.setTo(cv::Scalar(0,0,0,0)); + + auto &calib = f.cast<ftl::rgbd::Frame>().setLeft(); + calib.width = 640; + calib.height = 480; + + auto fsptr = FrameSet::fromFrame(f); + FrameSet &fs = *fsptr; + fs.set(ftl::data::FSFlag::AUTO_SEND); + fsptr->flush(Channel::Calibration); + ftl::pool.push([fsptr](int id) { fsptr->flush(Channel::Colour); }); + }); + return true; + }); + + int i=1000; + while (i-- > 0 && count < 100) std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + REQUIRE( count >= 100 ); + + } + + LOG(INFO) << "DONE"; + + ftl::timer::reset(); + ftl::timer::setInterval(50); + ftl::pool.clear_queue(); + while (ftl::pool.n_idle() != ftl::pool.size()) std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + delete receiver; + delete sender; +} + TEST_CASE( "Response via loopback" ) { json_t global = json_t{{"$id","ftl://test"}}; ftl::config::configure(global); diff --git a/components/structures/include/ftl/data/new_frameset.hpp b/components/structures/include/ftl/data/new_frameset.hpp index 90be7f7bee9c9f646ea3eb7c8c4cfb78e55d532b..f40fd085bebc98dae9779d10720cc9f9461ae280 100644 --- a/components/structures/include/ftl/data/new_frameset.hpp +++ b/components/structures/include/ftl/data/new_frameset.hpp @@ -78,7 +78,7 @@ class FrameSet : public ftl::data::Frame { /** * Are all frames complete within this frameset? */ - inline bool isComplete() { return mask != 0 && ftl::popcount(mask) == frames.size(); } + inline bool isComplete() { return mask != 0 && ftl::popcount(mask) >= frames.size(); } /** * Check that a given frame is valid in this frameset. diff --git a/components/structures/src/pool.cpp b/components/structures/src/pool.cpp index ff2484e135702e9c314ca62bae6dda705dbf9f11..c67d1f8a22e5a3aa1f0add7b427768ed0da8b3db 100644 --- a/components/structures/src/pool.cpp +++ b/components/structures/src/pool.cpp @@ -25,8 +25,8 @@ Frame Pool::allocate(FrameID id, int64_t timestamp) { UNIQUE_LOCK(mutex_, lk); auto &pool = _getPool(id); - if (timestamp < pool.last_timestamp) { - timestamp = pool.last_timestamp; + if (timestamp <= pool.last_timestamp) { + //timestamp = pool.last_timestamp; //throw FTL_Error("New frame timestamp is older than previous: " << timestamp << " vs " << pool.last_timestamp); }