Skip to content
Snippets Groups Projects
Commit 73270500 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Partial fixes from chunk sync

parent 1d80b05c
Branches
Tags
No related merge requests found
......@@ -35,7 +35,7 @@ static const unsigned int kFrameDropLimit = 5;
struct StreamSource {
ftl::rgbd::Source *src;
std::atomic<unsigned int> jobs; // Busy or ready to swap?
std::atomic<int> jobs; // Busy or ready to swap?
std::atomic<unsigned int> clientCount;
cv::Mat rgb; // Tx buffer
cv::Mat depth; // Tx buffer
......
......@@ -165,10 +165,10 @@ ftl::rgbd::detail::Source *Source::_createDeviceImpl(const ftl::URI &uri) {
void Source::getFrames(cv::Mat &rgb, cv::Mat &depth) {
SHARED_LOCK(mutex_,lk);
//rgb_.copyTo(rgb);
//depth_.copyTo(depth);
rgb = rgb_;
depth = depth_;
rgb_.copyTo(rgb);
depth_.copyTo(depth);
//rgb = rgb_;
//depth = depth_;
/*cv::Mat tmp;
tmp = rgb;
......@@ -244,7 +244,7 @@ bool Source::compute(int N, int B) {
return true;
} else if (impl_ && impl_->compute(N,B)) {
timestamp_ = impl_->timestamp_;
/*cv::Mat tmp;
cv::Mat tmp;
rgb_.create(impl_->rgb_.size(), impl_->rgb_.type());
depth_.create(impl_->depth_.size(), impl_->depth_.type());
tmp = rgb_;
......@@ -252,10 +252,10 @@ bool Source::compute(int N, int B) {
impl_->rgb_ = tmp;
tmp = depth_;
depth_ = impl_->depth_;
impl_->depth_ = tmp;*/
impl_->depth_ = tmp;
impl_->rgb_.copyTo(rgb_);
impl_->depth_.copyTo(depth_);
//impl_->rgb_.copyTo(rgb_);
//impl_->depth_.copyTo(depth_);
return true;
}
return false;
......
......@@ -263,7 +263,7 @@ void Streamer::run(bool block) {
void Streamer::_swap(StreamSource *src) {
if (src->jobs == 0) {
UNIQUE_LOCK(src->mutex,lk);
if (src->jobs == 0) {
for (unsigned int b=0; b<10; ++b) {
auto i = src->clients[b].begin();
while (i != src->clients[b].end()) {
......@@ -278,17 +278,18 @@ void Streamer::_swap(StreamSource *src) {
}
}
src->src->getFrames(src->rgb, src->depth);
src->src->swap();
src->src->getFrames(src->rgb, src->depth);
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
//}
src->jobs = 0;
src->jobs = -1;
src->frame++;
}
}
}
void Streamer::wait() {
// Do some jobs in this thread, might as well...
......@@ -310,19 +311,13 @@ void Streamer::wait() {
}
void Streamer::_schedule(StreamSource *src) {
// There will be two jobs for this source...
//UNIQUE_LOCK(job_mtx_,lk);
jobs_ += 2 + kChunkCount;
//lk.unlock();
if (src == nullptr || src->jobs > 0) return;
//StreamSource *src = sources_[uri];
if (src == nullptr || src->jobs != 0) return;
jobs_ += 2 + kChunkCount;
src->jobs = 2 + kChunkCount;
// Grab / capture job
ftl::pool.push([this,src](int id) {
//auto start = std::chrono::high_resolution_clock::now();
auto start = std::chrono::high_resolution_clock::now();
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_;
int64_t target = now / mspf_;
......@@ -374,7 +369,7 @@ void Streamer::_schedule(StreamSource *src) {
// Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
// Compute job
......@@ -395,7 +390,7 @@ void Streamer::_schedule(StreamSource *src) {
// Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
// Create jobs for each chunk
......@@ -415,7 +410,7 @@ void Streamer::_schedule(StreamSource *src) {
_swap(src);
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment