Skip to content
Snippets Groups Projects
Commit abc89122 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Increase stream start stability

parent 846fb05e
No related branches found
No related tags found
1 merge request!316Resolves #343 GUI and Frame Refactor
......@@ -91,6 +91,7 @@ struct StreamPacket {
int64_t originClockDelta; // Not message packet / saved
unsigned int hint_capability; // Is this a video stream, for example
size_t hint_source_total; // Number of tracks per frame to expect
int retry_count = 0; // Decode retry count
MSGPACK_DEFINE(timestamp, streamID, frame_number, channel, flags);
......
......@@ -214,6 +214,11 @@ ForeignBuilder::~ForeignBuilder() {
while (jobs_ > 0) {
sleep_for(milliseconds(10));
}
// Also make sure to get unique lock on any processing framesets.
for (auto &f : framesets_) {
UNIQUE_LOCK(f->smtx, lk);
}
}
LockedFrameSet ForeignBuilder::get(int64_t timestamp) {
......
......@@ -217,13 +217,16 @@ bool File::tick(int64_t ts) {
#endif
if (jobs_ > 0) {
//LOG(ERROR) << "STILL HAS JOBS";
return true;
}
bool has_data = false;
// Check buffer first for frames already read
{
UNIQUE_LOCK(data_mutex_, dlk);
if (data_.size() > 0) has_data = true;
for (auto i = data_.begin(); i != data_.end(); ++i) {
if (std::get<0>(*i).timestamp <= timestamp_) {
++jobs_;
......@@ -274,18 +277,20 @@ bool File::tick(int64_t ts) {
// This should only occur for first few frames, generally otherwise
// the data buffer is already several frames ahead so is processed
// above. Hence, no need to bother parallelising this bit.
if (std::get<0>(data).timestamp <= timestamp_) {
/*if (std::get<0>(data).timestamp <= timestamp_) {
std::get<0>(data).timestamp = ts;
//if (cb_) {
dlk.lock();
try {
LOG(INFO) << "EARLY TRIGGER: " << std::get<0>(data).timestamp << " - " << int(std::get<0>(data).channel);
cb_.trigger(std::get<0>(data),std::get<1>(data));
} catch (std::exception &e) {
LOG(ERROR) << "Exception in packet callback: " << e.what();
}
data_.pop_back();
//}
} else if (version_ < 5 && lastData) {
}*/
if (version_ < 5 && lastData) {
// For versions < 5, add completed flag to previous data
std::get<0>(*lastData).flags |= ftl::codecs::kFlagCompleted;
}
......@@ -295,7 +300,7 @@ bool File::tick(int64_t ts) {
}
}
timestamp_ += interval_;
if (has_data) timestamp_ += interval_;
if (data_.size() == 0 && value("looping", true)) {
buffer_in_.reset();
......
......@@ -312,7 +312,19 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) {
int height = ividstate.height; //calibration.height;
if (width == 0 || height == 0) {
LOG(WARNING) << "No calibration, skipping frame: " << spkt.timestamp;
// Attempt to retry the decode later
// Make a copy of the packets into a thread job
// FIXME: Check that thread pool does not explode
if (spkt.retry_count < 10) {
LOG(WARNING) << "No calibration, retrying: " << spkt.timestamp;
ftl::pool.push([this, spkt, pkt](int id) mutable {
++const_cast<StreamPacket&>(spkt).retry_count;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
_processVideo(spkt, pkt);
});
} else {
LOG(WARNING) << "No calibration, failed frame: " << spkt.timestamp;
}
return;
}
......
......@@ -198,14 +198,21 @@ std::any &Frame::createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t
std::any &Frame::createAny(ftl::codecs::Channel c) {
if (status_ != FrameStatus::STORED) throw FTL_Error("Cannot create before store or after flush");
//UNIQUE_LOCK(mutex(), lk);
ftl::data::Frame::ChannelData *d;
auto &d = data_[c];
if (d.status != ftl::data::ChannelStatus::FLUSHED) {
d.status = ftl::data::ChannelStatus::VALID;
d.encoded.clear();
if (parent_) {
UNIQUE_LOCK(mutex(), lk);
d = &(data_[c]);
touch(c);
return d.data;
} else {
d = &(data_[c]);
touch(c);
}
if (d->status != ftl::data::ChannelStatus::FLUSHED) {
d->status = ftl::data::ChannelStatus::VALID;
d->encoded.clear();
return d->data;
} else {
throw FTL_Error("Channel is flushed and read-only: " << static_cast<unsigned int>(c));
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment