Skip to content
Snippets Groups Projects
Commit 9fd2d8b3 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Resolves #373 Frameset timestamp errors

parent 70d1fb1f
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,7 @@
#define POOL_SIZE 10
//#define DEBUG_MUTEX
#define DEBUG_MUTEX
#define MUTEX_TIMEOUT 2
#if defined DEBUG_MUTEX
......
......@@ -15,30 +15,33 @@ class Transactional {
static_assert(std::is_pointer<T>::value, "Transactional type must be a pointer");
public:
Transactional(T obj, SHARED_MUTEX &mtx) : ref_(obj), mtx_(mtx), lock_(mtx_) {}
Transactional(T obj, SHARED_MUTEX &mtx, const std::function<void(T)> &complete) : ref_(obj), mtx_(mtx), lock_(mtx_), completed_(complete) {}
Transactional() : ref_(nullptr), mtx_(nullptr) {}
Transactional(T obj, SHARED_MUTEX *mtx) : ref_(obj), mtx_(mtx), lock_(*mtx_) {}
Transactional(T obj, SHARED_MUTEX *mtx, const std::function<void(T)> &complete) : ref_(obj), mtx_(mtx), lock_(*mtx_), completed_(complete) {}
Transactional(const Transactional &)=delete;
Transactional()=delete;
~Transactional() {
lock_.unlock();
if (lock_) lock_.unlock();
if (completed_) completed_(ref_);
}
Transactional(Transactional &&t) : ref_(t.ref_), mtx_(t.mtx_), lock_(mtx_), completed_(t.completed_) {
Transactional(Transactional &&t) : ref_(t.ref_), mtx_(t.mtx_), lock_(*mtx_), completed_(t.completed_) {
t.completed_ = nullptr;
}
Transactional &operator=(const Transactional &)=delete;
T operator->() { return ref_; }
const T operator->() const { return ref_; }
bool isValid() const { return ref_ != nullptr; }
operator bool() const { return ref_ != nullptr; }
T operator*() { return ref_; }
const T operator*() const { return ref_; }
T operator->() { if (!ref_) throw FTL_Error("Use of invalid frameset"); return ref_; }
const T operator->() const { if (!ref_) throw FTL_Error("Use of invalid frameset"); return ref_; }
T operator*() { if (!ref_) throw FTL_Error("Use of invalid frameset"); return ref_; }
const T operator*() const { if (!ref_) throw FTL_Error("Use of invalid frameset"); return ref_; }
private:
T ref_;
SHARED_MUTEX &mtx_;
SHARED_MUTEX *mtx_;
SHARED_LOCK_TYPE(SHARED_MUTEX) lock_;
std::function<void(T)> completed_;
};
......
......@@ -223,7 +223,12 @@ static void trigger_jobs() {
// If last job in list then do in this thread
if (active_jobs == static_cast<int>(jobs[kTimerMain].size())+1) {
lk.unlock();
bool doremove = !pj->job.trigger(ts);
bool doremove = true;
try {
doremove = !pj->job.trigger(ts);
} catch(const std::exception &e) {
LOG(ERROR) << "Exception in timer job: " << e.what();
}
pj->active = false;
active_jobs--;
if (doremove) removeJob(pj->id);
......@@ -231,7 +236,12 @@ static void trigger_jobs() {
break;
} else {
ftl::pool.push([pj,ts](int id) {
bool doremove = !pj->job.trigger(ts);
bool doremove = true;
try {
doremove = !pj->job.trigger(ts);
} catch(const std::exception &e) {
LOG(ERROR) << "Exception in timer job: " << e.what();
}
pj->active = false;
active_jobs--;
if (doremove) removeJob(pj->id);
......
......@@ -41,8 +41,8 @@ struct NetImplDetail {
//#define TCP_SEND_BUFFER_SIZE (512*1024)
//#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1)
#define TCP_SEND_BUFFER_SIZE (52*1024) // Was 256
#define TCP_RECEIVE_BUFFER_SIZE (52*1024) // Was 256
#define TCP_SEND_BUFFER_SIZE (128*1024) // Was 256
#define TCP_RECEIVE_BUFFER_SIZE (128*1024) // Was 256
callback_t ftl::net::Universe::cbid__ = 0;
......
......@@ -153,6 +153,8 @@ class ForeignBuilder : public BaseBuilder {
std::atomic<int> jobs_;
volatile bool skip_;
ftl::Handle main_id_;
size_t max_buffer_size_ = 16;
size_t completion_size_ = 8;
std::string name_;
......
......@@ -58,7 +58,7 @@ LockedFrameSet LocalBuilder::get(int64_t timestamp, size_t ix) {
frameset_ = _allocate(timestamp);
}
LockedFrameSet lfs(frameset_.get(), frameset_->smtx);
LockedFrameSet lfs(frameset_.get(), &frameset_->smtx);
return lfs;
}
......@@ -68,7 +68,7 @@ LockedFrameSet LocalBuilder::get(int64_t timestamp) {
frameset_ = _allocate(timestamp);
}
LockedFrameSet lfs(frameset_.get(), frameset_->smtx);
LockedFrameSet lfs(frameset_.get(), &frameset_->smtx);
return lfs;
}
......@@ -227,20 +227,27 @@ LockedFrameSet ForeignBuilder::get(int64_t timestamp) {
UNIQUE_LOCK(mutex_, lk);
auto fs = _get(timestamp);
LockedFrameSet lfs(fs.get(), fs->smtx, [this,fs](ftl::data::FrameSet *d) {
if (fs->isComplete()) {
if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE)) {
UNIQUE_LOCK(mutex_, lk);
_schedule();
if (fs) {
LockedFrameSet lfs(fs.get(), &fs->smtx, [this,fs](ftl::data::FrameSet *d) {
if (fs->isComplete()) {
if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE)) {
UNIQUE_LOCK(mutex_, lk);
_schedule();
}
}
}
});
return lfs;
});
return lfs;
} else {
return LockedFrameSet();
}
}
std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_get(int64_t timestamp) {
if (timestamp <= last_frame_) {
throw FTL_Error("Frameset already completed: " << timestamp << " (" << last_frame_ << ")");
//throw FTL_Error("Frameset already completed: " << timestamp << " (" << last_frame_ << ")");
LOG(ERROR) << "Frameset already completed: " << timestamp << " (" << last_frame_ << ")";
return nullptr;
}
auto fs = _findFrameset(timestamp);
......@@ -253,9 +260,9 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_get(int64_t timestamp) {
_schedule();
}
if (fs->test(ftl::data::FSFlag::STALE)) {
/*if (fs->test(ftl::data::FSFlag::STALE)) {
throw FTL_Error("Frameset already completed");
}
}*/
return fs;
}
......@@ -265,10 +272,13 @@ LockedFrameSet ForeignBuilder::get(int64_t timestamp, size_t ix) {
if (timestamp <= 0) throw FTL_Error("Invalid frame timestamp (" << timestamp << ")");
auto fs = _get(timestamp);
if (!fs) throw FTL_Error("No frameset for time " << timestamp);
LockedFrameSet lfs(fs.get(), fs->smtx);
return lfs;
if (fs) {
LockedFrameSet lfs(fs.get(), &fs->smtx);
return lfs;
} else {
return LockedFrameSet();
}
} else {
if (timestamp <= 0 || ix >= 32) throw FTL_Error("Invalid frame timestamp or index (" << timestamp << ", " << ix << ")");
......@@ -280,24 +290,28 @@ LockedFrameSet ForeignBuilder::get(int64_t timestamp, size_t ix) {
auto fs = _get(timestamp);
if (ix >= fs->frames.size()) {
// FIXME: Check that no access to frames can occur without lock
UNIQUE_LOCK(fs->smtx, flk);
while (fs->frames.size() < size_) {
fs->frames.push_back(std::move(pool_->allocate(ftl::data::FrameID(fs->frameset(), + fs->frames.size()), fs->timestamp())));
if (fs) {
if (ix >= fs->frames.size()) {
// FIXME: Check that no access to frames can occur without lock
UNIQUE_LOCK(fs->smtx, flk);
while (fs->frames.size() < size_) {
fs->frames.push_back(std::move(pool_->allocate(ftl::data::FrameID(fs->frameset(), + fs->frames.size()), fs->timestamp())));
}
}
}
LockedFrameSet lfs(fs.get(), fs->smtx, [this,fs](ftl::data::FrameSet *d) {
if (fs->isComplete()) {
if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE)) {
UNIQUE_LOCK(mutex_, lk);
_schedule();
LockedFrameSet lfs(fs.get(), &fs->smtx, [this,fs](ftl::data::FrameSet *d) {
if (fs->isComplete()) {
if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE)) {
UNIQUE_LOCK(mutex_, lk);
_schedule();
}
}
}
});
});
return lfs;
return lfs;
} else {
return LockedFrameSet();
}
}
}
......@@ -376,6 +390,12 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_getFrameset() {
while (N-- > 0 && i != framesets_.end()) ++i;
if (i != framesets_.end()) f = *i;
} else {
// Force complete of old frame
if (framesets_.size() >= completion_size_) {
LOG(WARNING) << "Forced completion: " << framesets_.back()->timestamp();
framesets_.back()->mask = 0xFF;
}
// Always choose oldest frameset when it completes
if (framesets_.size() > 0 && framesets_.back()->isComplete()) f = framesets_.back();
}
......@@ -412,9 +432,10 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_getFrameset() {
}
std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_addFrameset(int64_t timestamp) {
if (framesets_.size() >= 32) {
if (framesets_.size() >= max_buffer_size_) {
LOG(WARNING) << "Frameset buffer full, resetting: " << timestamp;
framesets_.clear();
//framesets_.pop_back();
}
auto newf = std::make_shared<FrameSet>(pool_, ftl::data::FrameID(id_,255), timestamp, size_);
......
......@@ -44,7 +44,7 @@ Receiver::~Receiver() {
void Receiver::loopback(ftl::data::Frame &f, ftl::codecs::Channel c) {
auto &build = builder(f.frameset());
auto fs = build.get(f.timestamp(), f.source());
fs->frames[f.source()].informChange(c, build.changeType(), f.getAnyMutable(c));
if (fs) fs->frames[f.source()].informChange(c, build.changeType(), f.getAnyMutable(c));
}
ftl::streams::BaseBuilder &Receiver::builder(uint32_t id) {
......@@ -135,41 +135,57 @@ Receiver::InternalAudioStates &Receiver::_getAudioFrame(const StreamPacket &spkt
void Receiver::_processData(const StreamPacket &spkt, const Packet &pkt) {
auto &build = builder(spkt.streamID);
auto fs = build.get(spkt.timestamp, spkt.frame_number);
auto &f = (spkt.frame_number == 255) ? **fs : fs->frames[spkt.frame_number];
// Remove LIVE capability if stream hints it is recorded
if (spkt.channel == Channel::Capabilities && (spkt.hint_capability & ftl::codecs::kStreamCap_Recorded)) {
std::any data;
ftl::data::decode_type<std::unordered_set<Capability>>(data, pkt.data);
if (fs) {
auto &f = (spkt.frame_number == 255) ? **fs : fs->frames[spkt.frame_number];
auto &cap = *std::any_cast<std::unordered_set<Capability>>(&data);
if (cap.count(Capability::LIVE)) {
cap.erase(Capability::LIVE);
// Remove LIVE capability if stream hints it is recorded
if (spkt.channel == Channel::Capabilities && (spkt.hint_capability & ftl::codecs::kStreamCap_Recorded)) {
std::any data;
ftl::data::decode_type<std::unordered_set<Capability>>(data, pkt.data);
auto &cap = *std::any_cast<std::unordered_set<Capability>>(&data);
if (cap.count(Capability::LIVE)) {
cap.erase(Capability::LIVE);
}
cap.emplace(Capability::STREAMED);
f.informChange(spkt.channel, build.changeType(), data);
} else if (spkt.channel == Channel::Pose && pkt.codec == ftl::codecs::codec_t::POSE) {
// TODO: Remove this eventually, it allows old FTL files to work
std::any data;
auto &pose = data.emplace<Eigen::Matrix4d>();
pose = Eigen::Map<Eigen::Matrix4d>((double*)pkt.data.data());
f.informChange(spkt.channel, build.changeType(), data);
} else {
f.informChange(spkt.channel, build.changeType(), pkt);
}
cap.emplace(Capability::STREAMED);
f.informChange(spkt.channel, build.changeType(), data);
} else if (spkt.channel == Channel::Pose && pkt.codec == ftl::codecs::codec_t::POSE) {
// TODO: Remove this eventually, it allows old FTL files to work
std::any data;
auto &pose = data.emplace<Eigen::Matrix4d>();
pose = Eigen::Map<Eigen::Matrix4d>((double*)pkt.data.data());
f.informChange(spkt.channel, build.changeType(), data);
} else {
f.informChange(spkt.channel, build.changeType(), pkt);
}
if (spkt.channel == Channel::Calibration) {
const auto &calibration = std::get<0>(f.get<std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, int>>(Channel::Calibration));
InternalVideoStates &ividstate = _getVideoFrame(spkt);
ividstate.width = calibration.width;
ividstate.height = calibration.height;
}
if (spkt.channel == Channel::Calibration) {
const auto &calibration = std::get<0>(f.get<std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, int>>(Channel::Calibration));
InternalVideoStates &ividstate = _getVideoFrame(spkt);
ividstate.width = calibration.width;
ividstate.height = calibration.height;
}
// TODO: Adjust metadata also for recorded streams
// TODO: Adjust metadata also for recorded streams
fs->localTimestamp = spkt.localTimestamp;
_finishPacket(fs, spkt.frame_number);
fs->localTimestamp = spkt.localTimestamp;
_finishPacket(fs, spkt.frame_number);
// Still need to get the calibration data even if frameset is lost.
} else if (spkt.channel == Channel::Calibration) {
//LOG(WARNING) << "Calibration being missed in data";
InternalVideoStates &ividstate = _getVideoFrame(spkt);
std::any tany;
ftl::data::decode_type<std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, int>>(tany, pkt.data);
auto *cal = std::any_cast<std::tuple<ftl::rgbd::Camera, ftl::codecs::Channel, int>>(&tany);
if (cal) {
auto &calibration = std::get<0>(*cal);
ividstate.width = calibration.width;
ividstate.height = calibration.height;
}
}
}
ftl::audio::Decoder *Receiver::_createAudioDecoder(InternalAudioStates &frame, const ftl::codecs::Packet &pkt) {
......@@ -185,23 +201,28 @@ void Receiver::_processAudio(const StreamPacket &spkt, const Packet &pkt) {
auto &build = builder(spkt.streamID);
auto fs = build.get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1);
auto &frame = fs->frames[spkt.frame_number];
auto &audiolist = frame.createChange<std::list<ftl::audio::Audio>>(spkt.channel, build.changeType(), pkt);
auto &audio = audiolist.emplace_back();
if (fs) {
auto &frame = fs->frames[spkt.frame_number];
ftl::audio::Decoder *dec = _createAudioDecoder(state, pkt);
if (!dec) {
LOG(ERROR) << "Could get an audio decoder";
return;
}
if (!dec->decode(pkt, audio.data())) {
LOG(ERROR) << "Audio decode failed";
return;
}
auto &audiolist = frame.createChange<std::list<ftl::audio::Audio>>(spkt.channel, build.changeType(), pkt);
auto &audio = audiolist.emplace_back();
fs->localTimestamp = spkt.localTimestamp;
_finishPacket(fs, spkt.frame_number);
ftl::audio::Decoder *dec = _createAudioDecoder(state, pkt);
if (!dec) {
LOG(ERROR) << "Could get an audio decoder";
return;
}
if (!dec->decode(pkt, audio.data())) {
LOG(ERROR) << "Audio decode failed";
return;
}
fs->localTimestamp = spkt.localTimestamp;
_finishPacket(fs, spkt.frame_number);
} else {
LOG(WARNING) << "Audio data being lost";
}
}
namespace sgm {
......@@ -272,6 +293,11 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) {
}
}
if (!fs) {
LOG(WARNING) << "Dropping a video frame";
return;
}
auto cvstream = cv::cuda::StreamAccessor::wrapStream(decoder->stream());
// Mark a frameset as being partial
......@@ -319,7 +345,10 @@ void Receiver::_finishPacket(ftl::streams::LockedFrameSet &fs, size_t fix) {
if (frame.packet_tx > 0 && frame.packet_tx == frame.packet_rx) {
fs->completed(fix);
if (fs->isComplete()) timestamp_ = fs->timestamp();
if (fs->isComplete()) {
//LOG(INFO) << "COMPLETE: " << fs->timestamp() << ", " << fix;
timestamp_ = fs->timestamp();
}
frame.packet_tx = 0;
frame.packet_rx = 0;
}
......@@ -330,8 +359,12 @@ void Receiver::processPackets(const StreamPacket &spkt, const Packet &pkt) {
if (spkt.channel == Channel::EndFrame) {
auto fs = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1);
fs->frames[spkt.frame_number].packet_tx = static_cast<int>(pkt.packet_count);
_finishPacket(fs, spkt.frame_number);
if (fs) {
fs->frames[spkt.frame_number].packet_tx = static_cast<int>(pkt.packet_count);
//LOG(INFO) << "EXPECTED " << fs->frames[spkt.frame_number].packet_tx << " for " << int(spkt.frame_number);
_finishPacket(fs, spkt.frame_number);
}
return;
}
......@@ -340,15 +373,18 @@ void Receiver::processPackets(const StreamPacket &spkt, const Packet &pkt) {
if (spkt.streamID < 255 && !(spkt.flags & ftl::codecs::kFlagRequest)) {
// Get the frameset
auto fs = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1);
const auto *cs = stream_;
const auto sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID());
fs->localTimestamp = spkt.localTimestamp;
if (fs) {
const auto *cs = stream_;
const auto sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID());
for (auto &frame : fs->frames) {
frame.markAvailable(spkt.channel);
fs->localTimestamp = spkt.localTimestamp;
for (auto &frame : fs->frames) {
frame.markAvailable(spkt.channel);
}
_finishPacket(fs, spkt.frame_number);
}
_finishPacket(fs, spkt.frame_number);
}
return;
}
......
......@@ -154,8 +154,19 @@ void Sender::_send(ftl::rgbd::FrameSet &fs, ftl::codecs::StreamPacket &spkt, con
spkt.flags = ftl::codecs::kFlagCompleted;
}*/
if (spkt.frame_number == 255) LOG(WARNING) << "Bad frame number";
if (spkt.frame_number == 255) ++fs.frames[0].packet_tx;
else if (spkt.frame_number < fs.frames.size()) ++fs.frames[spkt.frame_number].packet_tx;
else if (spkt.frame_number < fs.frames.size() && fs.frames[spkt.frame_number].source() == spkt.frame_number) ++fs.frames[spkt.frame_number].packet_tx;
else {
// Find the correct frame
for (auto &f : fs.frames) {
if (f.source() == spkt.frame_number) {
++f.packet_tx;
break;
}
}
}
stream_->post(spkt, pkt);
}
......@@ -183,7 +194,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode
pkt.codec = codec_t::Invalid;
for (size_t i=0; i<fs.frames.size(); ++i) {
spkt.frame_number = i;
spkt.frame_number = fs.frames[i].source();
pkt.packet_count = static_cast<uint8_t>(fs.frames[i].packet_tx+1); // FIXME: 255 limit currently
_send(fs, spkt, pkt);
}
......@@ -268,7 +279,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c, bool noencode
spkt.timestamp = fs.timestamp();
spkt.localTimestamp = fs.localTimestamp;
spkt.streamID = fs.frameset(); //fs.id;
spkt.frame_number = i;
spkt.frame_number = frame.source();
spkt.channel = c;
//spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0;
......@@ -494,7 +505,7 @@ void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset)
spkt.timestamp = fs.timestamp();
spkt.localTimestamp = fs.localTimestamp;
spkt.streamID = fs.frameset();
spkt.frame_number = offset;
spkt.frame_number = fs.frames[offset].source();
spkt.channel = c;
auto &tile = _getTile(fs.id(), cc);
......@@ -589,7 +600,7 @@ void Sender::_encodeAudioChannel(ftl::data::FrameSet &fs, Channel c, bool reset)
spkt.timestamp = fs.timestamp();
spkt.localTimestamp = fs.localTimestamp;
spkt.streamID = fs.frameset();
spkt.frame_number = i;
spkt.frame_number = fs.frames[i].source();
spkt.channel = c;
//spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0;
......@@ -630,7 +641,7 @@ void Sender::_encodeDataChannel(ftl::data::FrameSet &fs, Channel c, bool reset)
spkt.timestamp = fs.timestamp();
spkt.localTimestamp = fs.localTimestamp;
spkt.streamID = fs.frameset();
spkt.frame_number = i++;
spkt.frame_number = f.source();
spkt.channel = c;
//spkt.flags = (last_flush) ? ftl::codecs::kFlagCompleted : 0;
......
......@@ -115,6 +115,171 @@ TEST_CASE( "Send and receiver via encoding" ) {
delete sender;
}
TEST_CASE( "Multi-thread stability testing" ) {
json_t global = json_t{{"$id","ftl://test"}};
ftl::config::configure(global);
ftl::data::Pool pool(5,7);
json_t rcfg = json_t{
{"$id","ftl://test/1"}
};
auto *receiver = ftl::create<Receiver>(rcfg, &pool);
json_t scfg = json_t{
{"$id","ftl://test/2"}
};
auto *sender = ftl::create<Sender>(scfg);
json_t cfg2 = json_t{
{"$id","ftl://test/3"}
};
TestStream stream(cfg2);
receiver->setStream(&stream);
receiver->set("frameset_buffer_size", 0);
sender->setStream(&stream);
sender->resetSender(); // FIXME: Why is this needed?
ftl::timer::setInterval(20);
ftl::timer::start(false);
SECTION("One frame, two channel") {
stream.select(0, {Channel::Colour}, true);
auto h1 = pool.onFlushSet([sender](ftl::data::FrameSet &fs, ftl::codecs::Channel c) {
if (!fs.test(ftl::data::FSFlag::AUTO_SEND)) return true;
//LOG(INFO) << "FLUSH: " << fs.timestamp() << ", " << int(c);
sender->post(fs, c);
return true;
});
int count = 0;
ftl::data::FrameSetPtr result = nullptr;
auto h = receiver->onFrameSet([&count,&result](const ftl::data::FrameSetPtr &fs) {
LOG(INFO) << "FS RECV: " << fs->timestamp();
count++;
if (result) REQUIRE( result->timestamp() <= fs->timestamp()-20 );
REQUIRE( fs->frames.size() == 1 );
REQUIRE( fs->frames[0].hasChannel(Channel::Colour) );
result = fs;
return true;
});
auto h2 = ftl::timer::add(ftl::timer::timerlevel_t::kTimerMain, [&pool](int64_t ts) {
Frame f = pool.allocate(ftl::data::FrameID(0,0), ts);
f.store();
auto &mat = f.create<cv::cuda::GpuMat>(Channel::Colour);
mat.create(480, 640, CV_8UC4);
mat.setTo(cv::Scalar(0,0,0,0));
auto &calib = f.cast<ftl::rgbd::Frame>().setLeft();
calib.width = 640;
calib.height = 480;
auto fsptr = FrameSet::fromFrame(f);
FrameSet &fs = *fsptr;
fs.set(ftl::data::FSFlag::AUTO_SEND);
fsptr->flush(Channel::Calibration);
ftl::pool.push([fsptr](int id) { fsptr->flush(Channel::Colour); });
return true;
});
int i=1000;
while (i-- > 0 && count < 100) std::this_thread::sleep_for(std::chrono::milliseconds(10));
REQUIRE( count >= 100 );
}
SECTION("Two frame, two channel") {
stream.select(0, {Channel::Colour}, true);
auto h1 = pool.onFlushSet([sender](ftl::data::FrameSet &fs, ftl::codecs::Channel c) {
if (!fs.test(ftl::data::FSFlag::AUTO_SEND)) return true;
//LOG(INFO) << "FLUSH: " << fs.timestamp() << ", " << int(c) << ", " << fs.frames[0].source();
sender->post(fs, c);
return true;
});
int count = 0;
ftl::data::FrameSetPtr result = nullptr;
auto h = receiver->onFrameSet([&count,&result](const ftl::data::FrameSetPtr &fs) {
LOG(INFO) << "FS RECV: " << fs->timestamp();
count++;
if (result) {
REQUIRE( result->timestamp() <= fs->timestamp()-20 );
//REQUIRE( fs->frames.size() == 2 );
REQUIRE( fs->isComplete() );
REQUIRE( fs->frames[0].hasChannel(Channel::Colour) );
if (fs->frames.size() > 1) REQUIRE( fs->frames[1].hasChannel(Channel::Colour) );
}
result = fs;
return true;
});
ftl::data::Pool pool2(5,7);
auto h2 = ftl::timer::add(ftl::timer::timerlevel_t::kTimerMain, [&pool,&pool2](int64_t ts) {
ftl::pool.push([&pool, ts](int id) {
Frame f = pool.allocate(ftl::data::FrameID(0,0), ts);
f.store();
auto &mat = f.create<cv::cuda::GpuMat>(Channel::Colour);
mat.create(480, 640, CV_8UC4);
mat.setTo(cv::Scalar(0,0,0,0));
auto &calib = f.cast<ftl::rgbd::Frame>().setLeft();
calib.width = 640;
calib.height = 480;
auto fsptr = FrameSet::fromFrame(f);
FrameSet &fs = *fsptr;
fs.set(ftl::data::FSFlag::AUTO_SEND);
fsptr->flush(Channel::Calibration);
ftl::pool.push([fsptr](int id) { fsptr->flush(Channel::Colour); });
});
ftl::pool.push([&pool, ts](int id) {
Frame f = pool.allocate(ftl::data::FrameID(0,1), ts);
f.store();
auto &mat = f.create<cv::cuda::GpuMat>(Channel::Colour);
mat.create(480, 640, CV_8UC4);
mat.setTo(cv::Scalar(0,0,0,0));
auto &calib = f.cast<ftl::rgbd::Frame>().setLeft();
calib.width = 640;
calib.height = 480;
auto fsptr = FrameSet::fromFrame(f);
FrameSet &fs = *fsptr;
fs.set(ftl::data::FSFlag::AUTO_SEND);
fsptr->flush(Channel::Calibration);
ftl::pool.push([fsptr](int id) { fsptr->flush(Channel::Colour); });
});
return true;
});
int i=1000;
while (i-- > 0 && count < 100) std::this_thread::sleep_for(std::chrono::milliseconds(10));
REQUIRE( count >= 100 );
}
LOG(INFO) << "DONE";
ftl::timer::reset();
ftl::timer::setInterval(50);
ftl::pool.clear_queue();
while (ftl::pool.n_idle() != ftl::pool.size()) std::this_thread::sleep_for(std::chrono::milliseconds(10));
delete receiver;
delete sender;
}
TEST_CASE( "Response via loopback" ) {
json_t global = json_t{{"$id","ftl://test"}};
ftl::config::configure(global);
......
......@@ -78,7 +78,7 @@ class FrameSet : public ftl::data::Frame {
/**
* Are all frames complete within this frameset?
*/
inline bool isComplete() { return mask != 0 && ftl::popcount(mask) == frames.size(); }
inline bool isComplete() { return mask != 0 && ftl::popcount(mask) >= frames.size(); }
/**
* Check that a given frame is valid in this frameset.
......
......@@ -25,8 +25,8 @@ Frame Pool::allocate(FrameID id, int64_t timestamp) {
UNIQUE_LOCK(mutex_, lk);
auto &pool = _getPool(id);
if (timestamp < pool.last_timestamp) {
timestamp = pool.last_timestamp;
if (timestamp <= pool.last_timestamp) {
//timestamp = pool.last_timestamp;
//throw FTL_Error("New frame timestamp is older than previous: " << timestamp << " vs " << pool.last_timestamp);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment