Skip to content
Snippets Groups Projects
Commit 1e4ba9e7 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'bug/chunksync2' into 'master'

Partial fixes from chunk sync

See merge request nicolas.pope/ftl!85
parents 1d80b05c 73270500
No related branches found
No related tags found
1 merge request!85Partial fixes from chunk sync
Pipeline #12683 passed
......@@ -35,7 +35,7 @@ static const unsigned int kFrameDropLimit = 5;
struct StreamSource {
ftl::rgbd::Source *src;
std::atomic<unsigned int> jobs; // Busy or ready to swap?
std::atomic<int> jobs; // Busy or ready to swap?
std::atomic<unsigned int> clientCount;
cv::Mat rgb; // Tx buffer
cv::Mat depth; // Tx buffer
......
......@@ -165,10 +165,10 @@ ftl::rgbd::detail::Source *Source::_createDeviceImpl(const ftl::URI &uri) {
void Source::getFrames(cv::Mat &rgb, cv::Mat &depth) {
SHARED_LOCK(mutex_,lk);
//rgb_.copyTo(rgb);
//depth_.copyTo(depth);
rgb = rgb_;
depth = depth_;
rgb_.copyTo(rgb);
depth_.copyTo(depth);
//rgb = rgb_;
//depth = depth_;
/*cv::Mat tmp;
tmp = rgb;
......@@ -244,7 +244,7 @@ bool Source::compute(int N, int B) {
return true;
} else if (impl_ && impl_->compute(N,B)) {
timestamp_ = impl_->timestamp_;
/*cv::Mat tmp;
cv::Mat tmp;
rgb_.create(impl_->rgb_.size(), impl_->rgb_.type());
depth_.create(impl_->depth_.size(), impl_->depth_.type());
tmp = rgb_;
......@@ -252,10 +252,10 @@ bool Source::compute(int N, int B) {
impl_->rgb_ = tmp;
tmp = depth_;
depth_ = impl_->depth_;
impl_->depth_ = tmp;*/
impl_->depth_ = tmp;
impl_->rgb_.copyTo(rgb_);
impl_->depth_.copyTo(depth_);
//impl_->rgb_.copyTo(rgb_);
//impl_->depth_.copyTo(depth_);
return true;
}
return false;
......
......@@ -263,30 +263,31 @@ void Streamer::run(bool block) {
void Streamer::_swap(StreamSource *src) {
if (src->jobs == 0) {
UNIQUE_LOCK(src->mutex,lk);
for (unsigned int b=0; b<10; ++b) {
auto i = src->clients[b].begin();
while (i != src->clients[b].end()) {
// Client request completed so remove from list
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[b].erase(i);
--src->clientCount;
} else {
i++;
if (src->jobs == 0) {
for (unsigned int b=0; b<10; ++b) {
auto i = src->clients[b].begin();
while (i != src->clients[b].end()) {
// Client request completed so remove from list
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[b].erase(i);
--src->clientCount;
} else {
i++;
}
}
}
}
src->src->getFrames(src->rgb, src->depth);
src->src->swap();
src->src->swap();
src->src->getFrames(src->rgb, src->depth);
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
//}
src->jobs = 0;
src->frame++;
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
//}
src->jobs = -1;
src->frame++;
}
}
}
......@@ -310,19 +311,13 @@ void Streamer::wait() {
}
void Streamer::_schedule(StreamSource *src) {
// There will be two jobs for this source...
//UNIQUE_LOCK(job_mtx_,lk);
jobs_ += 2 + kChunkCount;
//lk.unlock();
if (src == nullptr || src->jobs > 0) return;
//StreamSource *src = sources_[uri];
if (src == nullptr || src->jobs != 0) return;
jobs_ += 2 + kChunkCount;
src->jobs = 2 + kChunkCount;
// Grab / capture job
ftl::pool.push([this,src](int id) {
//auto start = std::chrono::high_resolution_clock::now();
auto start = std::chrono::high_resolution_clock::now();
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_;
int64_t target = now / mspf_;
......@@ -374,7 +369,7 @@ void Streamer::_schedule(StreamSource *src) {
// Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
// Compute job
......@@ -395,7 +390,7 @@ void Streamer::_schedule(StreamSource *src) {
// Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
// Create jobs for each chunk
......@@ -415,7 +410,7 @@ void Streamer::_schedule(StreamSource *src) {
_swap(src);
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment