diff --git a/SDK/C/src/streams.cpp b/SDK/C/src/streams.cpp index e2a48c9bf2eaa5c35f3f0d2a017464e532dcef15..4584ec6267b2412bb8da7350178a8b95cd046632 100644 --- a/SDK/C/src/streams.cpp +++ b/SDK/C/src/streams.cpp @@ -66,7 +66,6 @@ ftlStream_t ftlCreateWriteStream(const char *uri) { s->sender = nullptr; s->pipelines = nullptr; s->video_fs = std::make_shared<ftl::data::FrameSet>(&s->pool, ftl::data::FrameID(0,0), ftl::timer::get_time()); - s->video_fs->count = 0; s->video_fs->mask = 0; s->interval = 40; //s->video_fs->frames.reserve(32); @@ -200,7 +199,6 @@ ftlError_t ftlIntrinsicsWriteLeft(ftlStream_t stream, int32_t sourceId, int32_t cam.baseline = baseline; cam.doffs = 0.0f; stream->video_fs->mask |= 1 << sourceId; - stream->video_fs->count++; //if (!stream->video_fs->frames[sourceId].origin()) { // stream->video_fs.frames[sourceId].setOrigin(&stream->video_states[sourceId]); //} diff --git a/applications/gui2/src/views/addsource.cpp b/applications/gui2/src/views/addsource.cpp index 3f9ad3ba0629be3074cecae40b5a122416c9573d..2d0a0209e8db9c82804d62670d9397904dc50f63 100644 --- a/applications/gui2/src/views/addsource.cpp +++ b/applications/gui2/src/views/addsource.cpp @@ -137,7 +137,7 @@ nanogui::Button *AddSourceWindow::_addButton(const std::string &s, nanogui::Widg button->setTooltip(s); button->setCallback([this, uri = s, hide]() { - if (hide) close(); + //if (hide) close(); ctrl_->add(uri); }); diff --git a/applications/gui2/src/widgets/imageview.hpp b/applications/gui2/src/widgets/imageview.hpp index 0209636fafd0049e4fcc8304db2b0240f15c8f2d..c5d520b628f8fc80a539689ebfea1f0f4638f7ee 100644 --- a/applications/gui2/src/widgets/imageview.hpp +++ b/applications/gui2/src/widgets/imageview.hpp @@ -193,7 +193,7 @@ public: using ImageView::ImageView; FTLImageView(nanogui::Widget* parent, GLuint imageID = -1) : ImageView(parent, imageID), was_valid_(false) {} - ~FTLImageView(); + virtual ~FTLImageView(); virtual void draw(NVGcontext* ctx) override; virtual nanogui::Vector2i preferredSize(NVGcontext* ctx) const override; diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index 765daa6aba477b94ccc7924484a9624c0cbf7e85..f74fb46daa7b0a7b2cae4a3217c27a4fbcc5b668 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -212,6 +212,9 @@ static void run(ftl::Configurable *root) { if (encodable != previous_encodable) sender->resetEncoders(fs->frameset()); previous_encodable = encodable; + // Must touch the depth channel to indicate it should be waited for + //for (auto &f : fs->frames) f.touch(Channel::Depth); + // Do all processing in another thread... ftl::pool.push([sender,&stats_count,&latency,&frames,&stats_time,pipeline,&busy,fs](int id) mutable { // Do pipeline here... if not still busy doing it diff --git a/components/codecs/include/ftl/codecs/channels.hpp b/components/codecs/include/ftl/codecs/channels.hpp index 598a6cf5ca841e835e717adc0e3b4d54355af94b..b5a03877b3d7d5469a2c5f457566507d7a251eb3 100644 --- a/components/codecs/include/ftl/codecs/channels.hpp +++ b/components/codecs/include/ftl/codecs/channels.hpp @@ -58,7 +58,8 @@ enum struct Channel : int { CalibrationData = 73, // Just for stereo intrinsics/extrinsics etc Thumbnail = 74, // Small JPG thumbnail, sometimes updated - Data = 2048, // Custom data, any codec. + Data = 2048, // Do not use + EndFrame = 2048, // Signify the last packet Faces = 2049, // Data about detected faces Transforms = 2050, // Transformation matrices for framesets Shapes3D = 2051, // Labeled 3D shapes diff --git a/components/codecs/include/ftl/codecs/packet.hpp b/components/codecs/include/ftl/codecs/packet.hpp index 4145ebaa879a37fdae83cb7c331e423cc2e653b0..1b37d88b4a07c0351bfe9bf92f1d0e427f9eed2e 100644 --- a/components/codecs/include/ftl/codecs/packet.hpp +++ b/components/codecs/include/ftl/codecs/packet.hpp @@ -38,8 +38,13 @@ struct Packet { ftl::codecs::codec_t codec; uint8_t reserved=0; uint8_t frame_count=1; // v4+ Frames included in this packet - uint8_t bitrate=0; // v4+ For multi-bitrate encoding, 0=highest - uint8_t flags=0; // Codec dependent flags (eg. I-Frame or P-Frame) + + uint8_t bitrate=0; // v4+ For multi-bitrate encoding, 0=highest + + union { + uint8_t flags=0; // Codec dependent flags (eg. I-Frame or P-Frame) + uint8_t packet_count; + }; std::vector<uint8_t> data; MSGPACK_DEFINE(codec, reserved, frame_count, bitrate, flags, data); diff --git a/components/common/cpp/include/ftl/utility/intrinsics.hpp b/components/common/cpp/include/ftl/utility/intrinsics.hpp new file mode 100644 index 0000000000000000000000000000000000000000..304156b7952f605a7a0e1f8408e31687aeb2475f --- /dev/null +++ b/components/common/cpp/include/ftl/utility/intrinsics.hpp @@ -0,0 +1,38 @@ +#ifndef _FTL_UTILITY_INTRINSICS_HPP_ +#define _FTL_UTILITY_INTRINSICS_HPP_ + +namespace ftl { + +inline unsigned int popcount(uint64_t bits) { + #if defined(_MSC_VER) + return __popcnt64(bits); + #elif defined(__GNUC__) + return __builtin_popcountl(bits); + #else + int count = 0; + while (bits != 0) { + bits = bits >> 1; + count += uint64_t(1) & bits; + } + return count; + #endif +} + +inline unsigned int popcount(uint32_t bits) { + #if defined(_MSC_VER) + return __popcnt(bits); + #elif defined(__GNUC__) + return __builtin_popcount(bits); + #else + int count = 0; + while (bits != 0) { + bits = bits >> 1; + count += uint32_t(1) & bits; + } + return count; + #endif +} + +} + +#endif diff --git a/components/operators/src/fusion/mvmls.cpp b/components/operators/src/fusion/mvmls.cpp index c16b3ab43097c4bda940a2f361592cd1fb5784b3..bdb69b6e6fd53777ec38e248414d72fd05ad2e52 100644 --- a/components/operators/src/fusion/mvmls.cpp +++ b/components/operators/src/fusion/mvmls.cpp @@ -47,7 +47,7 @@ bool MultiViewMLS::apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out, cuda bool show_consistency = config()->value("show_consistency", false); bool show_adjustment = config()->value("show_adjustment", false); - if (in.frames.size() < 1 || in.count == 0) return false; + if (in.frames.size() < 1 || in.mask == 0) return false; cv::Size size(0,0); for (auto &f : in.frames) { if (f.hasChannel(Channel::Depth)) { diff --git a/components/streams/include/ftl/streams/filestream.hpp b/components/streams/include/ftl/streams/filestream.hpp index 5b139546f69a590a2f791e6d2b8dd79df8b8abcc..27bee2cdee78401d93104aec99977e7c8ad651b9 100644 --- a/components/streams/include/ftl/streams/filestream.hpp +++ b/components/streams/include/ftl/streams/filestream.hpp @@ -75,6 +75,8 @@ class File : public Stream { ftl::Handle timer_; bool is_video_; bool save_data_; + bool needs_endframe_ = true; + std::vector<int> packet_counts_; //StreamCallback cb_; MUTEX mutex_; diff --git a/components/streams/include/ftl/streams/receiver.hpp b/components/streams/include/ftl/streams/receiver.hpp index 8903f50fa06b0661a89f5a771b2645cf57adbca0..7febc9e416794b762f7df9a02b0f385840c071a3 100644 --- a/components/streams/include/ftl/streams/receiver.hpp +++ b/components/streams/include/ftl/streams/receiver.hpp @@ -92,6 +92,7 @@ class Receiver : public ftl::Configurable, public ftl::data::Generator { ftl::audio::Decoder *_createAudioDecoder(InternalAudioStates &frame, const ftl::codecs::Packet &pkt); InternalVideoStates &_getVideoFrame(const ftl::codecs::StreamPacket &spkt, int ix=0); InternalAudioStates &_getAudioFrame(const ftl::codecs::StreamPacket &spkt, int ix=0); + void _finishPacket(ftl::streams::LockedFrameSet &fs, size_t fix); }; } diff --git a/components/streams/include/ftl/streams/sender.hpp b/components/streams/include/ftl/streams/sender.hpp index 05f4f82ced1386bf8ae6bdde85fb3d1557ba7711..8edbbf1d673877eec95bce185e4cad038823e1eb 100644 --- a/components/streams/include/ftl/streams/sender.hpp +++ b/components/streams/include/ftl/streams/sender.hpp @@ -92,11 +92,11 @@ class Sender : public ftl::Configurable { int bitrate_timeout_; //ftl::codecs::Encoder *_getEncoder(int fsid, int fid, ftl::codecs::Channel c); - void _encodeChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset, bool last_flush); + void _encodeChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset); void _encodeChannel(ftl::data::Frame &f, ftl::codecs::Channel c, bool reset); - void _encodeVideoChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset, bool last_flush); - void _encodeAudioChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset, bool last_flush); - void _encodeDataChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset, bool last_flush); + void _encodeVideoChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset); + void _encodeAudioChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset); + void _encodeDataChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset); void _encodeDataChannel(ftl::data::Frame &fs, ftl::codecs::Channel c, bool reset); int _generateTiles(const ftl::rgbd::FrameSet &fs, int offset, ftl::codecs::Channel c, cv::cuda::Stream &stream, bool); @@ -106,6 +106,7 @@ class Sender : public ftl::Configurable { ftl::audio::Encoder *_getAudioEncoder(int fsid, int sid, ftl::codecs::Channel c, ftl::codecs::Packet &pkt); void _sendPersistent(ftl::data::Frame &frame); + void _send(ftl::rgbd::FrameSet &fs, ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt); bool _checkNeedsIFrame(int64_t ts, bool injecting); uint8_t _getMinBitrate(); diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp index 11e077b647691bc2b7640e92bb5cb058f13e1ea3..a209a44e6a0a5e095f6cb9448ae625bc2f6a5991 100644 --- a/components/streams/src/builder.cpp +++ b/components/streams/src/builder.cpp @@ -5,6 +5,7 @@ #include <loguru.hpp> #include <chrono> +#include <bitset> using ftl::streams::BaseBuilder; using ftl::streams::ForeignBuilder; @@ -102,7 +103,6 @@ std::shared_ptr<ftl::data::FrameSet> LocalBuilder::_allocate(int64_t timestamp) newf->frames.push_back(std::move(pool_->allocate(ftl::data::FrameID(id_, i), timestamp))); } - newf->count = size_; newf->mask = 0xFF; newf->clearFlags(); return newf; @@ -240,7 +240,7 @@ LockedFrameSet ForeignBuilder::get(int64_t timestamp) { std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_get(int64_t timestamp) { if (timestamp <= last_frame_) { - throw FTL_Error("Frameset already completed: " << timestamp); + throw FTL_Error("Frameset already completed: " << timestamp << " (" << last_frame_ << ")"); } auto fs = _findFrameset(timestamp); @@ -384,25 +384,27 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_getFrameset() { // Lock to force completion of on going construction first UNIQUE_LOCK(f->smtx, slk); last_frame_ = f->timestamp(); - auto j = framesets_.erase(i); f->set(ftl::data::FSFlag::STALE); slk.unlock(); - int count = 0; + if (!f->isComplete()) LOG(WARNING) << "Dispatching incomplete frameset: " << f->timestamp() << " (" << std::bitset<16>( f->mask ).to_string() << ")"; + // Remove all previous framesets - // FIXME: Should do this in reverse order. - while (j!=framesets_.end()) { - ++count; + while (framesets_.size() > 0) { + ftl::data::FrameSetPtr &f2 = framesets_.back(); + if (f2.get() == f.get()) break; - auto f2 = *j; LOG(WARNING) << "FrameSet discarded: " << f2->timestamp(); + f2->set(ftl::data::FSFlag::DISCARD); { // Ensure frame processing is finished first UNIQUE_LOCK(f2->smtx, lk); - j = framesets_.erase(j); } + + framesets_.pop_back(); } + framesets_.pop_back(); return f; } @@ -420,7 +422,6 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_addFrameset(int64_t timest newf->frames.push_back(std::move(pool_->allocate(ftl::data::FrameID(id_, i), timestamp))); } - newf->count = 0; newf->mask = 0; newf->localTimestamp = timestamp; newf->clearFlags(); diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp index aa7878a4692f4ab570ff4a05430773a0c1f92cd0..20fe0c8276e42fe539564e2edaf89d6b75ad4918 100644 --- a/components/streams/src/feed.cpp +++ b/components/streams/src/feed.cpp @@ -169,7 +169,7 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) : handle_receiver_ = receiver_->onFrameSet( [this](const ftl::data::FrameSetPtr& fs) { if (value("drop_partial_framesets", false)) { - if (fs->count < static_cast<int>(fs->frames.size())) { + if (!fs->isComplete()) { LOG(WARNING) << "Dropping partial frameset: " << fs->timestamp(); return true; } diff --git a/components/streams/src/filestream.cpp b/components/streams/src/filestream.cpp index d7ac4bcf9e844326e121c4a9d6bb2d281dde55f3..019b03caf7181050a61a1084266c87c039792b2f 100644 --- a/components/streams/src/filestream.cpp +++ b/components/streams/src/filestream.cpp @@ -237,11 +237,37 @@ bool File::tick(int64_t ts) { { UNIQUE_LOCK(data_mutex_, dlk); if (data_.size() > 0) has_data = true; + + if (needs_endframe_) { + // Reset packet counts + for (auto &p : packet_counts_) p = 0; + } + + size_t frame_count = 0; for (auto i = data_.begin(); i != data_.end(); ++i) { if (std::get<0>(*i).timestamp <= timestamp_) { + auto &spkt = std::get<0>(*i); + auto &pkt = std::get<1>(*i); + ++jobs_; - std::get<0>(*i).timestamp = ts; + spkt.timestamp = ts; + + if (spkt.channel == Channel::EndFrame) needs_endframe_ = false; + + if (needs_endframe_) { + if (spkt.frame_number < 255) { + frame_count = std::max(frame_count, static_cast<size_t>(spkt.frame_number + pkt.frame_count)); + while (packet_counts_.size() < frame_count) packet_counts_.push_back(0); + for (int j=spkt.frame_number; j<spkt.frame_number+pkt.frame_count; ++j) ++packet_counts_[j]; + } else { + // Add frameset packets to frame 0 counts + frame_count = std::max(frame_count, size_t(1)); + while (packet_counts_.size() < frame_count) packet_counts_.push_back(0); + ++packet_counts_[0]; + } + } + ftl::pool.push([this,i](int id) { auto &spkt = std::get<0>(*i); auto &pkt = std::get<1>(*i); @@ -260,6 +286,36 @@ bool File::tick(int64_t ts) { data_.erase(i); --jobs_; }); + } else { + if (needs_endframe_) { + // Send final frame packet. + StreamPacket spkt; + spkt.timestamp = ts; + spkt.streamID = 0; // FIXME: Allow for non-zero framesets. + spkt.flags = 0; + spkt.channel = Channel::EndFrame; + + Packet pkt; + pkt.bitrate = 255; + pkt.codec = ftl::codecs::codec_t::Invalid; + pkt.packet_count = 1; + pkt.frame_count = 1; + + for (size_t i=0; i<frame_count; ++i) { + spkt.frame_number = i; + pkt.packet_count = packet_counts_[i]+1; + + try { + cb_.trigger(spkt, pkt); + } catch (const ftl::exception &e) { + LOG(ERROR) << "Exception in packet callback: " << e.what() << e.trace(); + } catch (std::exception &e) { + LOG(ERROR) << "Exception in packet callback: " << e.what(); + } + } + } + + break; } } } @@ -268,7 +324,6 @@ bool File::tick(int64_t ts) { while ((active_ && istream_->good()) || buffer_in_.nonparsed_size() > 0u) { UNIQUE_LOCK(data_mutex_, dlk); - auto *lastData = (data_.size() > 0) ? &data_.back() : nullptr; auto &data = data_.emplace_back(); dlk.unlock(); @@ -303,10 +358,10 @@ bool File::tick(int64_t ts) { data_.pop_back(); //} }*/ - if (version_ < 5 && lastData) { + //if (version_ < 5 && lastData) { // For versions < 5, add completed flag to previous data - std::get<0>(*lastData).flags |= ftl::codecs::kFlagCompleted; - } + // std::get<0>(*lastData).flags |= ftl::codecs::kFlagCompleted; + //} if (std::get<0>(data).timestamp > extended_ts) { break; diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index fa08ef87800749c995fa8539925f5d97a639b93e..efc4ffba2122fd4c9dbec1f346d39d58df923a97 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -102,20 +102,6 @@ void Receiver::registerBuilder(const std::shared_ptr<ftl::streams::BaseBuilder> })); } -//void Receiver::onAudio(const ftl::audio::FrameSet::Callback &cb) { -// audio_cb_ = cb; -//} - -/*void Receiver::_processConfig(InternalStates &frame, const ftl::codecs::Packet &pkt) { - std::string cfg; - auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size()); - unpacked.get().convert(cfg); - - LOG(INFO) << "Config Received: " << cfg; - // TODO: This needs to be put in safer / better location - //host_->set(std::get<0>(cfg), nlohmann::json::parse(std::get<1>(cfg))); -}*/ - void Receiver::_createDecoder(InternalVideoStates &frame, int chan, const ftl::codecs::Packet &pkt) { UNIQUE_LOCK(frame.mutex,lk); auto *decoder = frame.decoders[chan]; @@ -205,21 +191,8 @@ void Receiver::_processData(const StreamPacket &spkt, const Packet &pkt) { // TODO: Adjust metadata also for recorded streams - if (spkt.flags & ftl::codecs::kFlagCompleted) { - //UNIQUE_LOCK(vidstate.mutex, lk); - timestamp_ = spkt.timestamp; - fs->completed(spkt.frame_number); - } - fs->localTimestamp = spkt.localTimestamp; - - /*const auto *cs = stream_; - const auto sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID()); - - if (f.hasAll(sel)) { - timestamp_ = spkt.timestamp; - fs->completed(spkt.frame_number); - }*/ + _finishPacket(fs, spkt.frame_number); } ftl::audio::Decoder *Receiver::_createAudioDecoder(InternalAudioStates &frame, const ftl::codecs::Packet &pkt) { @@ -241,11 +214,6 @@ void Receiver::_processAudio(const StreamPacket &spkt, const Packet &pkt) { auto &audiolist = frame.createChange<std::list<ftl::audio::Audio>>(spkt.channel, build.changeType(), pkt); auto &audio = audiolist.emplace_back(); - //size_t size = pkt.data.size()/sizeof(short); - //audio.data().resize(size); - //auto *ptr = (short*)pkt.data.data(); - //for (size_t i=0; i<size; i++) audio.data()[i] = ptr[i]; - ftl::audio::Decoder *dec = _createAudioDecoder(state, pkt); if (!dec) { LOG(ERROR) << "Could get an audio decoder"; @@ -256,41 +224,8 @@ void Receiver::_processAudio(const StreamPacket &spkt, const Packet &pkt) { return; } - if (spkt.flags & ftl::codecs::kFlagCompleted) { - //UNIQUE_LOCK(vidstate.mutex, lk); - timestamp_ = spkt.timestamp; - fs->completed(spkt.frame_number); - } - fs->localTimestamp = spkt.localTimestamp; - - // Generate settings from packet data - /*ftl::audio::AudioSettings settings; - settings.channels = (spkt.channel == Channel::AudioStereo) ? 2 : 1; - settings.frame_size = 960; - - switch (pkt.definition) { - case definition_t::hz48000 : settings.sample_rate = 48000; break; - case definition_t::hz44100 : settings.sample_rate = 44100; break; - default: settings.sample_rate = 48000; break; - }*/ - - //frame.state.setLeft(settings); - //frame.frame.setOrigin(&frame.state); - - /*if (audio_cb_) { - // Create an audio frameset wrapper. - ftl::audio::FrameSet fs; - fs.id = 0; - fs.timestamp = frame.timestamp; - //fs.originClockDelta; - fs.count = 1; - //fs.stale = false; - fs.clear(ftl::data::FSFlag::STALE); - frame.frame.swapTo(fs.frames.emplace_back()); - - audio_cb_(fs); - }*/ + _finishPacket(fs, spkt.frame_number); } namespace sgm { @@ -379,13 +314,6 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { auto cvstream = cv::cuda::StreamAccessor::wrapStream(decoder->stream()); - /*if (spkt.channel == Channel::Depth && (pkt.flags & 0x2)) { - cv::Mat tmp; - surface.download(tmp); - cv::imshow("Test", tmp); - cv::waitKey(1); - }*/ - // Mark a frameset as being partial if (pkt.flags & ftl::codecs::kFlagPartial) { fs->markPartial(); @@ -401,8 +329,6 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { //if (!frame.origin()) frame.setOrigin(&vidstate.state); if (frame.hasChannel(spkt.channel)) { - // FIXME: Is this a corruption in recording or in playback? - // Seems to occur in same place in ftl file, one channel is missing LOG(WARNING) << "Previous frame not complete: " << spkt.timestamp; } @@ -419,35 +345,37 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { // Must ensure all processing is finished before completing a frame. cudaSafeCall(cudaStreamSynchronize(decoder->stream())); - for (int i=0; i<pkt.frame_count; ++i) { - InternalVideoStates &vidstate = _getVideoFrame(spkt,i); - //auto &frame = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+i); - //auto &frame = fs->frames[spkt.frame_number+i]; + fs->localTimestamp = spkt.localTimestamp; - /*if (spkt.version < 5) { - const auto *cs = stream_; - const auto sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID()); + for (int i=pkt.frame_count-1; i>=0; --i) { + _finishPacket(fs, spkt.frame_number+i); + } +} - UNIQUE_LOCK(vidstate.mutex, lk); - if (frame.availableAll(sel)) { - timestamp_ = spkt.timestamp; - fs->completed(spkt.frame_number+i); - } - }*/ +void Receiver::_finishPacket(ftl::streams::LockedFrameSet &fs, size_t fix) { + if (fix >= fs->frames.size()) fix = 0; - if (spkt.flags & ftl::codecs::kFlagCompleted) { - UNIQUE_LOCK(vidstate.mutex, lk); - timestamp_ = spkt.timestamp; - fs->completed(spkt.frame_number+i); - } - } + auto &frame = fs->frames[fix]; + ++frame.packet_rx; - fs->localTimestamp = spkt.localTimestamp; + if (frame.packet_tx > 0 && frame.packet_tx == frame.packet_rx) { + fs->completed(fix); + frame.packet_tx = 0; + frame.packet_rx = 0; + } } void Receiver::processPackets(const StreamPacket &spkt, const Packet &pkt) { const unsigned int channum = (unsigned int)spkt.channel; + if (spkt.channel == Channel::EndFrame) { + auto fs = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1); + timestamp_ = spkt.timestamp; + fs->frames[spkt.frame_number].packet_tx = static_cast<int>(pkt.packet_count); + _finishPacket(fs, spkt.frame_number); + return; + } + // No data packet means channel is only available. if (pkt.data.size() == 0) { if (spkt.streamID < 255 && !(spkt.flags & ftl::codecs::kFlagRequest)) { @@ -456,23 +384,12 @@ void Receiver::processPackets(const StreamPacket &spkt, const Packet &pkt) { const auto *cs = stream_; const auto sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID()); + fs->localTimestamp = spkt.localTimestamp; + for (auto &frame : fs->frames) { - //LOG(INFO) << "MARK " << frame.source() << " " << (int)spkt.channel; frame.markAvailable(spkt.channel); - - if (spkt.flags & ftl::codecs::kFlagCompleted) { - //UNIQUE_LOCK(vidstate.mutex, lk); // FIXME: Should have a lock here... - timestamp_ = spkt.timestamp; - fs->completed(frame.source()); - } - - //if (frame.availableAll(sel)) { - //LOG(INFO) << "FRAME COMPLETED " << frame.source(); - // fs->completed(frame.source()); - //} } - - fs->localTimestamp = spkt.localTimestamp; + _finishPacket(fs, spkt.frame_number); } return; } diff --git a/components/streams/src/sender.cpp b/components/streams/src/sender.cpp index ed9ac9f846541c503380f570816373d60c54b428..2035bcca56f598781de0c999cae0989082bdd165 100644 --- a/components/streams/src/sender.cpp +++ b/components/streams/src/sender.cpp @@ -118,7 +118,7 @@ void Sender::fakePost(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { for (size_t i=0; i<fs.frames.size(); ++i) { auto &frame = fs.frames[i]; - if (frame.hasOwn(c)) ++fs.flush_count; + if (frame.hasOwn(c)) ++frame.packet_tx; } } @@ -137,9 +137,46 @@ bool Sender::_checkNeedsIFrame(int64_t ts, bool injecting) { return injection_timestamp_ >= ts; } +void Sender::_send(ftl::rgbd::FrameSet &fs, ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + /*int ccount = 0; + for (size_t i=0; i<fs.frames.size(); ++i) ccount += fs.frames[i].changed().size(); + fs.flush_count += fs.frames.size(); + + if (ccount == fs.flush_count) { + spkt.flags = ftl::codecs::kFlagCompleted; + }*/ + + if (spkt.frame_number == 255) ++fs.frames[0].packet_tx; + else if (spkt.frame_number < fs.frames.size()) ++fs.frames[spkt.frame_number].packet_tx; + stream_->post(spkt, pkt); +} + void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode) { if (!stream_) return; + // Send quick message for this special channel. + if (c == Channel::EndFrame) { + StreamPacket spkt; + spkt.version = 5; + spkt.timestamp = fs.timestamp(); + spkt.localTimestamp = fs.localTimestamp; + spkt.streamID = fs.frameset(); + //spkt.frame_number = 0; + spkt.channel = c; + spkt.flags = ftl::codecs::kFlagCompleted; + + ftl::codecs::Packet pkt; + pkt.frame_count = 1; //fs.frames.size(); + pkt.codec = codec_t::Invalid; + + for (size_t i=0; i<fs.frames.size(); ++i) { + spkt.frame_number = i; + pkt.packet_count = static_cast<uint8_t>(fs.frames[i].packet_tx+1); // FIXME: 255 limit currently + _send(fs, spkt, pkt); + } + return; + } + std::unordered_set<ftl::codecs::Channel> selected; if (stream_->size() > 0) selected = stream_->selected(fs.frameset()); @@ -152,7 +189,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode bool needs_encoding = true; int valid_frames = 0; - int ccount = 0; + //int ccount = 0; int forward_count = 0; for (size_t i=0; i<fs.frames.size(); ++i) { @@ -160,14 +197,14 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode if (!frame.has(c)) continue; ++valid_frames; - ++fs.flush_count; + //++fs.flush_count; // TODO: Send entire persistent session on inject if (do_inject) { _sendPersistent(frame); } - ccount += frame.changed().size(); + //ccount += frame.changed().size(); if (selected.find(c) != selected.end() || (int)c >= 32) { // FIXME: Sends high res colour, but receive end currently broken @@ -191,7 +228,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode } } - bool last_flush = ccount == fs.flush_count; + //bool last_flush = ccount == fs.flush_count; // Don't do anything if channel not in any frames. if (valid_frames == 0) return; @@ -214,9 +251,10 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode spkt.streamID = fs.frameset(); //fs.id; spkt.frame_number = i; spkt.channel = c; - spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0; + //spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0; - stream_->post(spkt, packets.back()); + //stream_->post(spkt, packets.back()); + _send(fs, spkt, packets.back()); //} else if (packets.size() > 1) { // PROBLEMS //} @@ -236,20 +274,21 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode spkt.timestamp = fs.timestamp(); spkt.localTimestamp = fs.localTimestamp; spkt.streamID = fs.frameset(); - spkt.frame_number = 255; + spkt.frame_number = 0; spkt.channel = c; - spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0; + //spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0; Packet pkt; pkt.codec = codec_t::Any; pkt.frame_count = 1; pkt.bitrate = 0; pkt.flags = 0; - stream_->post(spkt, pkt); + //stream_->post(spkt, pkt); + _send(fs, spkt, pkt); } if (needs_encoding) { - _encodeChannel(fs, c, do_iframe, last_flush); + _encodeChannel(fs, c, do_iframe); } } @@ -259,7 +298,7 @@ void Sender::forceAvailable(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { spkt.timestamp = fs.timestamp(); spkt.localTimestamp = fs.localTimestamp; spkt.streamID = fs.frameset(); - spkt.frame_number = 255; + spkt.frame_number = 0; spkt.channel = c; Packet pkt; @@ -339,7 +378,7 @@ void Sender::post(ftl::data::Frame &frame, ftl::codecs::Channel c) { spkt.timestamp = frame.timestamp(); spkt.localTimestamp = spkt.timestamp; spkt.streamID = frame.frameset(); - spkt.frame_number = 255; + spkt.frame_number = 0; spkt.channel = c; Packet pkt; @@ -392,7 +431,7 @@ void Sender::setActiveEncoders(uint32_t fsid, const std::unordered_set<Channel> } } -void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset, bool last_flush) { +void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset) { bool isfloat = ftl::codecs::type(c) == CV_32F; bool lossless = (isfloat) ? value("lossless_float", false) : value("lossless_colour", false); @@ -438,7 +477,6 @@ void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset, spkt.streamID = fs.frameset(); spkt.frame_number = offset; spkt.channel = c; - spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0; auto &tile = _getTile(fs.id(), cc); @@ -487,7 +525,7 @@ void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset, pkt.flags = 0; // In the event of partial frames, add a flag to indicate that - if (static_cast<size_t>(fs.count) < fs.frames.size()) pkt.flags |= ftl::codecs::kFlagPartial; + //if (static_cast<size_t>(fs.count) < fs.frames.size()) pkt.flags |= ftl::codecs::kFlagPartial; // Choose correct region of interest into the surface. cv::Rect roi = _generateROI(fs, cc, offset, is_stereo); @@ -496,7 +534,8 @@ void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset, FTL_Profile("Encoder",0.02); if (enc->encode(sroi, pkt)) { - stream_->post(spkt, pkt); + //stream_->post(spkt, pkt); + _send(fs, spkt, pkt); /*cv::Mat tmp; tile.surface.download(tmp); @@ -516,7 +555,7 @@ void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset, } } -void Sender::_encodeAudioChannel(ftl::data::FrameSet &fs, Channel c, bool reset, bool last_flush) { +void Sender::_encodeAudioChannel(ftl::data::FrameSet &fs, Channel c, bool reset) { // TODO: combine into multiple opus streams for (size_t i=0; i<fs.frames.size(); ++i) { @@ -533,7 +572,7 @@ void Sender::_encodeAudioChannel(ftl::data::FrameSet &fs, Channel c, bool reset, spkt.streamID = fs.frameset(); spkt.frame_number = i; spkt.channel = c; - spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0; + //spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0; ftl::codecs::Packet pkt; pkt.codec = ftl::codecs::codec_t::OPUS; @@ -557,11 +596,12 @@ void Sender::_encodeAudioChannel(ftl::data::FrameSet &fs, Channel c, bool reset, } } - stream_->post(spkt, pkt); + _send(fs, spkt, pkt); + //stream_->post(spkt, pkt); } } -void Sender::_encodeDataChannel(ftl::data::FrameSet &fs, Channel c, bool reset, bool last_flush) { +void Sender::_encodeDataChannel(ftl::data::FrameSet &fs, Channel c, bool reset) { int i=0; // TODO: Pack all frames into a single packet @@ -573,7 +613,7 @@ void Sender::_encodeDataChannel(ftl::data::FrameSet &fs, Channel c, bool reset, spkt.streamID = fs.frameset(); spkt.frame_number = i++; spkt.channel = c; - spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0; + //spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0; ftl::codecs::Packet pkt; pkt.frame_count = 1; @@ -584,7 +624,8 @@ void Sender::_encodeDataChannel(ftl::data::FrameSet &fs, Channel c, bool reset, auto encoder = ftl::data::getTypeEncoder(f.type(c)); if (encoder) { if (encoder(f, c, pkt.data)) { - stream_->post(spkt, pkt); + //stream_->post(spkt, pkt); + _send(fs, spkt, pkt); } } else { LOG(WARNING) << "Missing msgpack encoder"; @@ -617,15 +658,15 @@ void Sender::_encodeDataChannel(ftl::data::Frame &f, Channel c, bool reset) { } } -void Sender::_encodeChannel(ftl::data::FrameSet &fs, Channel c, bool reset, bool last_flush) { +void Sender::_encodeChannel(ftl::data::FrameSet &fs, Channel c, bool reset) { int ic = int(c); if (ic < 32) { - _encodeVideoChannel(fs, c, reset, last_flush); + _encodeVideoChannel(fs, c, reset); } else if (ic < 64) { - _encodeAudioChannel(fs, c, reset, last_flush); + _encodeAudioChannel(fs, c, reset); } else { - _encodeDataChannel(fs, c, reset, last_flush); + _encodeDataChannel(fs, c, reset); } } diff --git a/components/streams/src/stream.cpp b/components/streams/src/stream.cpp index 57c9c5cd91b248ebec52cd36bc00db22ad61bd89..8861d0a794a4016f34079e6f475ec9e22a87bbc5 100644 --- a/components/streams/src/stream.cpp +++ b/components/streams/src/stream.cpp @@ -226,7 +226,8 @@ void Muxer::_notify(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Pa try { cb_.trigger(spkt, pkt); // spkt.frame_number < 255 && } catch (std::exception &e) { - LOG(ERROR) << "Exception in packet handler: " << e.what(); + LOG(ERROR) << "Exception in packet handler (" << int(spkt.channel) << "): " << e.what(); + //reset(); // Force stream reset here to get new i-frames } } diff --git a/components/streams/test/builder_unit.cpp b/components/streams/test/builder_unit.cpp index 9d2d7c0774420f60bb5acccde85683a657a87507..7041b9eae016b3fc19917064e7a67c0aba62c04f 100644 --- a/components/streams/test/builder_unit.cpp +++ b/components/streams/test/builder_unit.cpp @@ -159,7 +159,7 @@ TEST_CASE("ftl::streams::LocalBuilder can provide empty frames", "[]") { REQUIRE( fs->timestamp() == 100 ); REQUIRE( fs->frames.size() == 1 ); REQUIRE( fs->hasFrame(0) ); - REQUIRE( fs->count == 1 ); + REQUIRE( fs->mask != 0 ); } SECTION("multiple framesets frameset") { diff --git a/components/streams/test/filestream_unit.cpp b/components/streams/test/filestream_unit.cpp index 25c33b5800c5be2e3752fa48f1120675d1c88b88..6bb3bd1763303d066262d9d46106b99793b25257 100644 --- a/components/streams/test/filestream_unit.cpp +++ b/components/streams/test/filestream_unit.cpp @@ -110,7 +110,7 @@ TEST_CASE("ftl::stream::File write and read", "[stream]") { reader->tick(100); std::this_thread::sleep_for(std::chrono::milliseconds(10)); - REQUIRE( count == 1 ); + REQUIRE( count == 2 ); //REQUIRE( tspkt.timestamp == 0 ); //auto itime = tspkt.timestamp; @@ -118,7 +118,7 @@ TEST_CASE("ftl::stream::File write and read", "[stream]") { reader->tick(101); std::this_thread::sleep_for(std::chrono::milliseconds(10)); - REQUIRE( count == 1 ); + REQUIRE( count == 2 ); //REQUIRE( tspkt.timestamp == itime+ftl::timer::getInterval() ); count = 0; diff --git a/components/streams/test/receiver_unit.cpp b/components/streams/test/receiver_unit.cpp index 0f8b362afffc72e271f5a8638880d7dedf030969..55ea7c837f0ef33a6fa80bbd08a9e48dea447922 100644 --- a/components/streams/test/receiver_unit.cpp +++ b/components/streams/test/receiver_unit.cpp @@ -39,6 +39,27 @@ class TestStream : public ftl::stream::Stream { return true; } + bool postEnd(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt, int count) { + ftl::codecs::Packet pkt2; + pkt2.codec = codec_t::Invalid; + pkt2.bitrate = 255; + pkt2.packet_count = count+1; + pkt2.frame_count = 1; + + ftl::codecs::StreamPacket spkt2; + spkt2.version = 4; + spkt2.timestamp = spkt.timestamp; + spkt2.frame_number = 0; + spkt2.channel = Channel::EndFrame; + spkt2.streamID = spkt.streamID; + + for (int i=0; i<pkt.frame_count; ++i) { + spkt2.frame_number = i; + post(spkt2, pkt2); + } + return post(spkt, pkt); + } + bool begin() override { return true; } bool end() override { return true; } bool active() override { return true; } @@ -100,8 +121,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { bool r = encoder.encode(m, pkt); REQUIRE( r ); - spkt.flags |= ftl::codecs::kFlagCompleted; - stream.post(spkt, pkt); + stream.postEnd(spkt, pkt, 2); int count = 0; auto h = receiver->onFrameSet([&count](const ftl::data::FrameSetPtr& fs) { @@ -131,8 +151,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { bool r = encoder.encode(m, pkt); REQUIRE( r ); - spkt.flags |= ftl::codecs::kFlagCompleted; - stream.post(spkt, pkt); + stream.postEnd(spkt, pkt, 2); std::atomic<int> mask = 0; auto h = receiver->onFrameSet([&mask](const ftl::data::FrameSetPtr& fs) { @@ -141,7 +160,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { }); spkt.streamID = 1; - stream.post(spkt, pkt); + stream.postEnd(spkt, pkt, 2); int i=10; while (i-- > 0 && mask != 3) std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -157,8 +176,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { bool r = encoder.encode(m, pkt); REQUIRE( r ); - spkt.flags |= ftl::codecs::kFlagCompleted; - stream.post(spkt, pkt); + stream.postEnd(spkt, pkt, 2); int count = 0; auto h = receiver->onFrameSet([&count](const ftl::data::FrameSetPtr& fs) { @@ -194,8 +212,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { bool r = encoder.encode(m, pkt); REQUIRE( r ); - spkt.flags |= ftl::codecs::kFlagCompleted; - stream.post(spkt, pkt); + stream.postEnd(spkt, pkt, 2); int count = 0; auto h = receiver->onFrameSet([&count](const ftl::data::FrameSetPtr& fs) { @@ -232,8 +249,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { bool r = encoder.encode(m, pkt); REQUIRE( r ); - spkt.flags |= ftl::codecs::kFlagCompleted; - stream.post(spkt, pkt); + stream.postEnd(spkt, pkt, 2); int count = 0; auto h = receiver->onFrameSet([&count](const ftl::data::FrameSetPtr& fs) { @@ -331,15 +347,13 @@ TEST_CASE( "Receiver sync bugs" ) { try { stream.post(spkt, pkt); } catch(...) {} spkt.timestamp = 10; spkt.channel = Channel::ColourHighRes; - spkt.flags |= ftl::codecs::kFlagCompleted; - try { stream.post(spkt, pkt); } catch(...) {} + try { stream.postEnd(spkt, pkt, 3); } catch(...) {} spkt.timestamp = 20; spkt.channel = Channel::Colour2; try { stream.post(spkt, pkt); } catch(...) {} spkt.timestamp = 20; spkt.channel = Channel::Colour; - spkt.flags |= ftl::codecs::kFlagCompleted; - try { stream.post(spkt, pkt); } catch(...) {} + try { stream.postEnd(spkt, pkt, 2); } catch(...) {} int i=10; while (i-- > 0 && count < 2) std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -418,9 +432,7 @@ TEST_CASE( "Receiver non zero buffer" ) { }); stream.post(spkt, pkt); - spkt.flags |= ftl::codecs::kFlagCompleted; spkt.timestamp += 10; - spkt.flags |= ftl::codecs::kFlagCompleted; stream.post(spkt, pkt); int i=10; @@ -464,7 +476,7 @@ TEST_CASE( "Receiver for data channels" ) { spkt.version = 4; spkt.timestamp = 10; spkt.frame_number = 0; - spkt.channel = Channel::Data; + spkt.channel = Channel::Configuration; spkt.streamID = 0; ftl::timer::start(false); @@ -475,8 +487,7 @@ TEST_CASE( "Receiver for data channels" ) { ftl::util::FTLVectorBuffer buf(pkt.data); msgpack::pack(buf, 5.0f); - spkt.flags |= ftl::codecs::kFlagCompleted; - stream.post(spkt, pkt); + stream.postEnd(spkt, pkt, 1); int count = 0; auto h = receiver->onFrameSet([&count](const ftl::data::FrameSetPtr& fs) { @@ -484,8 +495,8 @@ TEST_CASE( "Receiver for data channels" ) { REQUIRE( fs->timestamp() == 10 ); REQUIRE( fs->frames.size() == 1 ); - REQUIRE( fs->frames[0].hasChannel(Channel::Data) ); - REQUIRE( fs->frames[0].get<float>(Channel::Data) == 5.0f ); + REQUIRE( fs->frames[0].hasChannel(Channel::Configuration) ); + REQUIRE( fs->frames[0].get<float>(Channel::Configuration) == 5.0f ); return true; }); @@ -507,8 +518,7 @@ TEST_CASE( "Receiver for data channels" ) { // Need to have at least one frame for this to work spkt.frame_number = 0; - spkt.flags |= ftl::codecs::kFlagCompleted; - stream.post(spkt, pkt); + stream.postEnd(spkt, pkt, 2); int count = 0; auto h = receiver->onFrameSet([&count](const std::shared_ptr<ftl::data::FrameSet>& fs) { @@ -516,8 +526,8 @@ TEST_CASE( "Receiver for data channels" ) { REQUIRE( fs->timestamp() == 10 ); REQUIRE( fs->frames.size() == 1 ); - REQUIRE( fs->hasChannel(Channel::Data) ); - REQUIRE( fs->get<float>(Channel::Data) == 5.0f ); + REQUIRE( fs->hasChannel(Channel::Configuration) ); + REQUIRE( fs->get<float>(Channel::Configuration) == 5.0f ); return true; }); @@ -536,8 +546,7 @@ TEST_CASE( "Receiver for data channels" ) { calib.width = 1024; msgpack::pack(buf, calib); - spkt.flags |= ftl::codecs::kFlagCompleted; - stream.post(spkt, pkt); + stream.postEnd(spkt, pkt, 1); int count = 0; auto h = receiver->onFrameSet([&count](const ftl::data::FrameSetPtr& fs) { @@ -545,8 +554,8 @@ TEST_CASE( "Receiver for data channels" ) { REQUIRE( fs->timestamp() == 10 ); REQUIRE( fs->frames.size() == 1 ); - REQUIRE( fs->frames[0].hasChannel(Channel::Data) ); - REQUIRE( fs->frames[0].get<ftl::rgbd::Camera>(Channel::Data).width == 1024 ); + REQUIRE( fs->frames[0].hasChannel(Channel::Configuration) ); + REQUIRE( fs->frames[0].get<ftl::rgbd::Camera>(Channel::Configuration).width == 1024 ); return true; }); @@ -564,8 +573,7 @@ TEST_CASE( "Receiver for data channels" ) { Eigen::Matrix4d pose; msgpack::pack(buf, pose); - spkt.flags |= ftl::codecs::kFlagCompleted; - stream.post(spkt, pkt); + stream.postEnd(spkt, pkt, 1); int count = 0; auto h = receiver->onFrameSet([&count](const std::shared_ptr<ftl::data::FrameSet>& fs) { @@ -573,8 +581,8 @@ TEST_CASE( "Receiver for data channels" ) { REQUIRE( fs->timestamp() == 10 ); REQUIRE( fs->frames.size() == 1 ); - REQUIRE( fs->frames[0].hasChannel(Channel::Data) ); - fs->frames[0].get<Eigen::Matrix4d>(Channel::Data); + REQUIRE( fs->frames[0].hasChannel(Channel::Configuration) ); + fs->frames[0].get<Eigen::Matrix4d>(Channel::Configuration); return true; }); diff --git a/components/streams/test/recsend_unit.cpp b/components/streams/test/recsend_unit.cpp index 25ff480a7ed4a03c8f72f304eb8ef1aa757e359c..105b7f91b866404f4e0e4819c18150d9e25e079a 100644 --- a/components/streams/test/recsend_unit.cpp +++ b/components/streams/test/recsend_unit.cpp @@ -98,6 +98,7 @@ TEST_CASE( "Send and receiver via encoding" ) { }); sender->post(fs, Channel::Control); + sender->post(fs, Channel::EndFrame); int i=10; while (i-- > 0 && count < 1) std::this_thread::sleep_for(std::chrono::milliseconds(10)); diff --git a/components/streams/test/sender_unit.cpp b/components/streams/test/sender_unit.cpp index 8c8331607f16e44f125618f503fb92cc9f5d6311..847fbe8bca7d9cbe8088ac33fb389c75584331ae 100644 --- a/components/streams/test/sender_unit.cpp +++ b/components/streams/test/sender_unit.cpp @@ -89,7 +89,6 @@ TEST_CASE( "Sender::post() video frames" ) { SECTION("a single colour frame") { stream.select(0, {Channel::Colour}, true); - fs.count = 1; fs.mask = 1; fs.frames[0].create<cv::cuda::GpuMat>(Channel::Colour).create(cv::Size(1280,720), CV_8UC4); fs.frames[0].set<cv::cuda::GpuMat>(Channel::Colour).setTo(cv::Scalar(0)); @@ -102,7 +101,6 @@ TEST_CASE( "Sender::post() video frames" ) { REQUIRE( (int)spkt.frame_number == 0 ); REQUIRE( spkt.streamID == 0 ); REQUIRE( spkt.channel == Channel::Colour ); - REQUIRE( (spkt.flags & ftl::codecs::kFlagCompleted) ); REQUIRE( pkt.codec == codec_t::HEVC ); REQUIRE( pkt.data.size() > 0 ); REQUIRE( pkt.frame_count == 1 ); @@ -115,7 +113,6 @@ TEST_CASE( "Sender::post() video frames" ) { fs.resize(2); fs.frames[1].store(); - fs.count = 2; fs.mask = 3; fs.frames[0].create<cv::cuda::GpuMat>(Channel::Colour).create(cv::Size(1280,720), CV_8UC4); fs.frames[0].set<cv::cuda::GpuMat>(Channel::Colour).setTo(cv::Scalar(0)); @@ -130,7 +127,6 @@ TEST_CASE( "Sender::post() video frames" ) { REQUIRE( (int)spkt.frame_number == 0 ); REQUIRE( spkt.streamID == 0 ); REQUIRE( spkt.channel == Channel::Colour ); - REQUIRE( (spkt.flags & ftl::codecs::kFlagCompleted) ); REQUIRE( pkt.codec == codec_t::HEVC ); REQUIRE( pkt.data.size() > 0 ); REQUIRE( pkt.frame_count == 2 ); @@ -143,7 +139,6 @@ TEST_CASE( "Sender::post() video frames" ) { fs.resize(2); fs.frames[1].store(); - fs.count = 2; fs.mask = 3; fs.frames[0].create<cv::cuda::GpuMat>(Channel::Depth).create(cv::Size(1280,720), CV_32F); fs.frames[0].set<cv::cuda::GpuMat>(Channel::Depth).setTo(cv::Scalar(0.0f)); @@ -170,7 +165,6 @@ TEST_CASE( "Sender::post() video frames" ) { fs.resize(10); - fs.count = 10; fs.mask = 0x3FF; fs.frames[0].create<cv::cuda::GpuMat>(Channel::Depth).create(cv::Size(1280,720), CV_32F); fs.frames[0].set<cv::cuda::GpuMat>(Channel::Depth).setTo(cv::Scalar(0.0f)); @@ -202,7 +196,6 @@ TEST_CASE( "Sender::post() video frames" ) { fs.resize(2); fs.frames[1].store(); - fs.count = 2; fs.mask = 3; fs.frames[0].create<cv::cuda::GpuMat>(Channel::Depth).create(cv::Size(1280,720), CV_32F); fs.frames[0].set<cv::cuda::GpuMat>(Channel::Depth).setTo(cv::Scalar(0.0f)); @@ -228,7 +221,6 @@ TEST_CASE( "Sender::post() video frames" ) { SECTION("one frame and two channels") { stream.select(0, Channel::Colour + Channel::Depth, true); - fs.count = 1; fs.mask = 1; fs.frames[0].create<cv::cuda::GpuMat>(Channel::Colour).create(cv::Size(1280,720), CV_8UC4); fs.frames[0].set<cv::cuda::GpuMat>(Channel::Colour).setTo(cv::Scalar(0)); @@ -244,10 +236,8 @@ TEST_CASE( "Sender::post() video frames" ) { REQUIRE( (int)spkt.frame_number == 0 ); REQUIRE( spkt.streamID == 0 ); REQUIRE( spkt.channel == Channel::Depth ); - REQUIRE( !(prev_spkt.flags & ftl::codecs::kFlagCompleted) ); REQUIRE( prev_spkt.channel == Channel::Colour ); REQUIRE( prev_spkt.timestamp == 1000 ); - REQUIRE( (spkt.flags & ftl::codecs::kFlagCompleted) ); REQUIRE( pkt.codec == codec_t::HEVC ); REQUIRE( pkt.data.size() > 0 ); REQUIRE( pkt.frame_count == 1 ); @@ -298,7 +288,6 @@ TEST_CASE( "Sender request to control encoding" ) { codec_t::Any, 0, 255, 255, 0, {} }); - fs.count = 1; fs.mask = 1; fs.frames[0].create<cv::cuda::GpuMat>(Channel::Colour).create(cv::Size(1280,720), CV_8UC4); fs.frames[0].set<cv::cuda::GpuMat>(Channel::Colour).setTo(cv::Scalar(0)); @@ -356,7 +345,6 @@ TEST_CASE( "Sender::post() data channels" ) { SECTION("a single calibration channel") { stream.select(0, {Channel::Calibration}, true); - fs.count = 1; fs.mask = 1; auto &calib = std::get<0>(fs.frames[0].create<std::tuple<ftl::rgbd::Camera, Channel, int>>(Channel::Calibration)); calib.width = 1024; @@ -370,7 +358,6 @@ TEST_CASE( "Sender::post() data channels" ) { REQUIRE( (int)spkt.frame_number == 0 ); REQUIRE( spkt.streamID == 0 ); REQUIRE( spkt.channel == Channel::Calibration ); - REQUIRE( (spkt.flags & ftl::codecs::kFlagCompleted) ); REQUIRE( pkt.codec == codec_t::MSGPACK ); REQUIRE( pkt.data.size() > 0 ); REQUIRE( pkt.frame_count == 1 ); @@ -379,7 +366,6 @@ TEST_CASE( "Sender::post() data channels" ) { SECTION("a single pose channel") { stream.select(0, {Channel::Pose}, true); - fs.count = 1; fs.mask = 1; fs.frames[0].create<Eigen::Matrix4d>(Channel::Pose); @@ -398,23 +384,22 @@ TEST_CASE( "Sender::post() data channels" ) { } SECTION("a single custom channel") { - stream.select(0, {Channel::Data}, true); + stream.select(0, {Channel::Configuration}, true); - fs.count = 1; fs.mask = 1; - auto &vf = fs.frames[0].create<std::vector<float>>(Channel::Data); + auto &vf = fs.frames[0].create<std::vector<float>>(Channel::Configuration); vf.push_back(5.0f); vf.push_back(33.0f); fs.frames[0].flush(); - sender->post(fs, Channel::Data); + sender->post(fs, Channel::Configuration); REQUIRE( count == 1 ); REQUIRE( spkt.version == 5 ); REQUIRE( spkt.timestamp == 1000 ); REQUIRE( (int)spkt.frame_number == 0 ); REQUIRE( spkt.streamID == 0 ); - REQUIRE( spkt.channel == Channel::Data ); + REQUIRE( spkt.channel == Channel::Configuration ); REQUIRE( pkt.codec == codec_t::MSGPACK ); REQUIRE( pkt.data.size() > 0 ); REQUIRE( pkt.frame_count == 1 ); @@ -422,23 +407,22 @@ TEST_CASE( "Sender::post() data channels" ) { } SECTION("a single list channel") { - stream.select(0, {Channel::Data}, true); + stream.select(0, {Channel::Configuration}, true); - fs.count = 1; fs.mask = 1; - auto vf = fs.frames[0].create<std::list<float>>(Channel::Data); + auto vf = fs.frames[0].create<std::list<float>>(Channel::Configuration); vf = 5.0f; vf = 33.0f; fs.frames[0].flush(); - sender->post(fs, Channel::Data); + sender->post(fs, Channel::Configuration); REQUIRE( count == 1 ); REQUIRE( spkt.version == 5 ); REQUIRE( spkt.timestamp == 1000 ); REQUIRE( (int)spkt.frame_number == 0 ); REQUIRE( spkt.streamID == 0 ); - REQUIRE( spkt.channel == Channel::Data ); + REQUIRE( spkt.channel == Channel::Configuration ); REQUIRE( pkt.codec == codec_t::MSGPACK ); REQUIRE( pkt.data.size() > 0 ); REQUIRE( pkt.frame_count == 1 ); @@ -482,7 +466,6 @@ TEST_CASE( "Sender::post() audio channels" ) { stream.select(0, {Channel::AudioMono}, true); - fs.count = 1; fs.mask = 1; auto audio = fs.frames[0].create<std::list<ftl::audio::AudioFrame>>(Channel::AudioMono); @@ -510,7 +493,6 @@ TEST_CASE( "Sender::post() audio channels" ) { stream.select(0, {Channel::AudioMono}, true); - fs.count = 1; fs.mask = 1; auto audio = fs.frames[0].create<std::list<ftl::audio::AudioFrame>>(Channel::AudioMono); @@ -549,7 +531,6 @@ TEST_CASE( "Sender::post() audio channels" ) { stream.select(0, {Channel::AudioStereo}, true); - fs.count = 1; fs.mask = 1; auto audio = fs.frames[0].create<std::list<ftl::audio::AudioFrame>>(Channel::AudioStereo); diff --git a/components/structures/include/ftl/data/new_frame.hpp b/components/structures/include/ftl/data/new_frame.hpp index 9fa92ed140776c7cf2683c3b484dce61f21a5921..6bbd61bc21f9bcdd1ec78bb45ac49518ce3e3912 100644 --- a/components/structures/include/ftl/data/new_frame.hpp +++ b/components/structures/include/ftl/data/new_frame.hpp @@ -651,6 +651,12 @@ class Frame { // ========================================================================= + public: + std::atomic_int packet_tx = 0; /// Number of packets transmitted for frame + std::atomic_int packet_rx = 0; /// Number of packets received for frame + + // ========================================================================= + protected: std::any &createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t); diff --git a/components/structures/include/ftl/data/new_frameset.hpp b/components/structures/include/ftl/data/new_frameset.hpp index ad75c990ba0d6e6822774e257d560d1e6eea28ab..c471eb5a04cfffd6cd2e235069c35d9151dc93ca 100644 --- a/components/structures/include/ftl/data/new_frameset.hpp +++ b/components/structures/include/ftl/data/new_frameset.hpp @@ -4,6 +4,7 @@ #include <ftl/threads.hpp> #include <ftl/timer.hpp> #include <ftl/data/new_frame.hpp> +#include <ftl/utility/intrinsics.hpp> #include <functional> //#include <opencv2/opencv.hpp> @@ -18,7 +19,8 @@ static const size_t kMaxFramesInSet = 32; enum class FSFlag : int { STALE = 0, - PARTIAL = 1 + PARTIAL = 1, + DISCARD = 4 }; /** @@ -40,9 +42,10 @@ class FrameSet : public ftl::data::Frame { //int64_t timestamp; // Millisecond timestamp of all frames int64_t localTimestamp; std::vector<Frame> frames; - std::atomic<int> count; // Number of valid frames + //std::atomic<int> count=0; // Actual packet count + //std::atomic<int> expected=0; // Expected packet count std::atomic<unsigned int> mask; // Mask of all sources that contributed - std::atomic<int> flush_count; // How many channels have been flushed + //std::atomic<int> flush_count; // How many channels have been flushed SHARED_MUTEX smtx; //Eigen::Matrix4d pose; // Set to identity by default. @@ -74,7 +77,7 @@ class FrameSet : public ftl::data::Frame { /** * Are all frames complete within this frameset? */ - inline bool isComplete() { return static_cast<unsigned int>(count) == frames.size(); } + inline bool isComplete() { return ftl::popcount(mask) == frames.size(); } /** * Check that a given frame is valid in this frameset. diff --git a/components/structures/src/frameset.cpp b/components/structures/src/frameset.cpp index 4d4cbc88b03f5a149ba9d012af1f75182316ca69..f6c9ff4587491ca356820e856198b5ce33dcded8 100644 --- a/components/structures/src/frameset.cpp +++ b/components/structures/src/frameset.cpp @@ -6,8 +6,6 @@ using ftl::data::FrameSet; FrameSet::FrameSet(Pool *ppool, FrameID pid, int64_t ts, size_t psize) : Frame(ppool->allocate(FrameID(pid.frameset(),255), ts)), mask(0) { - - flush_count = 0; // Reset flush on store... frames.reserve(psize); } @@ -20,11 +18,7 @@ void ftl::data::FrameSet::completed(size_t ix) { if (ix == 255) { } else if (ix < frames.size()) { - // If already completed for given frame, then skip - if (mask & (1 << ix)) return; - mask |= (1 << ix); - ++count; } else { throw FTL_Error("Completing frame that does not exist: " << timestamp() << ":" << ix); } @@ -47,12 +41,10 @@ void ftl::data::FrameSet::resize(size_t s) { void ftl::data::FrameSet::moveTo(ftl::data::FrameSet &fs) { UNIQUE_LOCK(fs.mutex(), lk); Frame::moveTo(fs); - fs.count = static_cast<int>(count); fs.flags_ = (int)flags_; fs.mask = static_cast<unsigned int>(mask); fs.frames = std::move(frames); - count = 0; mask = 0; set(ftl::data::FSFlag::STALE); } @@ -111,6 +103,7 @@ void FrameSet::flush() { for (auto c : unflushed) { pool()->flush_fs_.trigger(*this, c); } + pool()->flush_fs_.trigger(*this, ftl::codecs::Channel::EndFrame); } void FrameSet::flush(ftl::codecs::Channel c) { @@ -129,7 +122,6 @@ void FrameSet::flush(ftl::codecs::Channel c) { std::shared_ptr<FrameSet> FrameSet::fromFrame(Frame &f) { auto sptr = std::make_shared<FrameSet>(f.pool(), f.id(), f.timestamp()); sptr->frames.push_back(std::move(f)); - sptr->count = 1; sptr->mask = 1; return sptr; } diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp index 2b45a62a3b3729500ee041a1ee978e09d842420f..4181712fd367b04c04f0dbfd2af0b600c3473715 100644 --- a/components/structures/src/new_frame.cpp +++ b/components/structures/src/new_frame.cpp @@ -344,6 +344,8 @@ void Frame::moveTo(Frame &f) { f.pool_ = pool_; f.data_ = std::move(data_); f.changed_ = std::move(changed_); + f.packet_rx = (int)packet_rx; + f.packet_tx = (int)packet_tx; status_ = FrameStatus::RELEASED; } @@ -392,6 +394,8 @@ void Frame::reset() { status_ = FrameStatus::CREATED; mode_ = FrameMode::PRIMARY; available_ = 0; + packet_rx = 0; + packet_tx = 0; } void Frame::hardReset() {