diff --git a/components/codecs/include/ftl/codecs/packet.hpp b/components/codecs/include/ftl/codecs/packet.hpp index cfe7714daabfdfd596fbcb675c702f01a5064cd8..f19ffcba2f7ace52778a509b2c3e55b0cebf6d7b 100644 --- a/components/codecs/include/ftl/codecs/packet.hpp +++ b/components/codecs/include/ftl/codecs/packet.hpp @@ -91,6 +91,7 @@ struct StreamPacket { int64_t originClockDelta; // Not message packet / saved unsigned int hint_capability; // Is this a video stream, for example size_t hint_source_total; // Number of tracks per frame to expect + int retry_count = 0; // Decode retry count MSGPACK_DEFINE(timestamp, streamID, frame_number, channel, flags); diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp index 20106e841bb4db67cc8dbe9806d9a939af427ee1..1eae72dcb730880ceba1e0aaf8d3b34f278bbcd4 100644 --- a/components/streams/src/builder.cpp +++ b/components/streams/src/builder.cpp @@ -214,6 +214,11 @@ ForeignBuilder::~ForeignBuilder() { while (jobs_ > 0) { sleep_for(milliseconds(10)); } + + // Also make sure to get unique lock on any processing framesets. + for (auto &f : framesets_) { + UNIQUE_LOCK(f->smtx, lk); + } } LockedFrameSet ForeignBuilder::get(int64_t timestamp) { diff --git a/components/streams/src/filestream.cpp b/components/streams/src/filestream.cpp index 501c815cbdea840716afebb659a238e65aebb35d..9121abbdd435a9005fff30197c7acbbdc53a8218 100644 --- a/components/streams/src/filestream.cpp +++ b/components/streams/src/filestream.cpp @@ -217,13 +217,16 @@ bool File::tick(int64_t ts) { #endif if (jobs_ > 0) { - //LOG(ERROR) << "STILL HAS JOBS"; return true; } + bool has_data = false; + // Check buffer first for frames already read { UNIQUE_LOCK(data_mutex_, dlk); + if (data_.size() > 0) has_data = true; + for (auto i = data_.begin(); i != data_.end(); ++i) { if (std::get<0>(*i).timestamp <= timestamp_) { ++jobs_; @@ -274,18 +277,20 @@ bool File::tick(int64_t ts) { // This should only occur for first few frames, generally otherwise // the data buffer is already several frames ahead so is processed // above. Hence, no need to bother parallelising this bit. - if (std::get<0>(data).timestamp <= timestamp_) { + /*if (std::get<0>(data).timestamp <= timestamp_) { std::get<0>(data).timestamp = ts; //if (cb_) { dlk.lock(); try { + LOG(INFO) << "EARLY TRIGGER: " << std::get<0>(data).timestamp << " - " << int(std::get<0>(data).channel); cb_.trigger(std::get<0>(data),std::get<1>(data)); } catch (std::exception &e) { LOG(ERROR) << "Exception in packet callback: " << e.what(); } data_.pop_back(); //} - } else if (version_ < 5 && lastData) { + }*/ + if (version_ < 5 && lastData) { // For versions < 5, add completed flag to previous data std::get<0>(*lastData).flags |= ftl::codecs::kFlagCompleted; } @@ -295,7 +300,7 @@ bool File::tick(int64_t ts) { } } - timestamp_ += interval_; + if (has_data) timestamp_ += interval_; if (data_.size() == 0 && value("looping", true)) { buffer_in_.reset(); diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index 37ab291bf7fe96f01babd6ea202f7b6b6e5280c5..7dade1b0b27ba3dacd4ee6e5682020b1c8ee2b72 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -312,7 +312,19 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { int height = ividstate.height; //calibration.height; if (width == 0 || height == 0) { - LOG(WARNING) << "No calibration, skipping frame: " << spkt.timestamp; + // Attempt to retry the decode later + // Make a copy of the packets into a thread job + // FIXME: Check that thread pool does not explode + if (spkt.retry_count < 10) { + LOG(WARNING) << "No calibration, retrying: " << spkt.timestamp; + ftl::pool.push([this, spkt, pkt](int id) mutable { + ++const_cast<StreamPacket&>(spkt).retry_count; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + _processVideo(spkt, pkt); + }); + } else { + LOG(WARNING) << "No calibration, failed frame: " << spkt.timestamp; + } return; } diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp index 594124c58bd7cd2d66f3f0ff0c65cfa44394075b..1b1700d792d77dac74d106a623700daabb214ad3 100644 --- a/components/structures/src/new_frame.cpp +++ b/components/structures/src/new_frame.cpp @@ -198,14 +198,21 @@ std::any &Frame::createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t std::any &Frame::createAny(ftl::codecs::Channel c) { if (status_ != FrameStatus::STORED) throw FTL_Error("Cannot create before store or after flush"); - //UNIQUE_LOCK(mutex(), lk); + ftl::data::Frame::ChannelData *d; - auto &d = data_[c]; - if (d.status != ftl::data::ChannelStatus::FLUSHED) { - d.status = ftl::data::ChannelStatus::VALID; - d.encoded.clear(); + if (parent_) { + UNIQUE_LOCK(mutex(), lk); + d = &(data_[c]); touch(c); - return d.data; + } else { + d = &(data_[c]); + touch(c); + } + + if (d->status != ftl::data::ChannelStatus::FLUSHED) { + d->status = ftl::data::ChannelStatus::VALID; + d->encoded.clear(); + return d->data; } else { throw FTL_Error("Channel is flushed and read-only: " << static_cast<unsigned int>(c)); }