Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • nicolaspope/ftl
1 result
Select Git revision
Show changes
Commits on Source (2)
......@@ -35,7 +35,7 @@ static const unsigned int kFrameDropLimit = 5;
struct StreamSource {
ftl::rgbd::Source *src;
std::atomic<unsigned int> jobs; // Busy or ready to swap?
std::atomic<int> jobs; // Busy or ready to swap?
std::atomic<unsigned int> clientCount;
cv::Mat rgb; // Tx buffer
cv::Mat depth; // Tx buffer
......
......@@ -165,10 +165,10 @@ ftl::rgbd::detail::Source *Source::_createDeviceImpl(const ftl::URI &uri) {
void Source::getFrames(cv::Mat &rgb, cv::Mat &depth) {
SHARED_LOCK(mutex_,lk);
//rgb_.copyTo(rgb);
//depth_.copyTo(depth);
rgb = rgb_;
depth = depth_;
rgb_.copyTo(rgb);
depth_.copyTo(depth);
//rgb = rgb_;
//depth = depth_;
/*cv::Mat tmp;
tmp = rgb;
......@@ -244,7 +244,7 @@ bool Source::compute(int N, int B) {
return true;
} else if (impl_ && impl_->compute(N,B)) {
timestamp_ = impl_->timestamp_;
/*cv::Mat tmp;
cv::Mat tmp;
rgb_.create(impl_->rgb_.size(), impl_->rgb_.type());
depth_.create(impl_->depth_.size(), impl_->depth_.type());
tmp = rgb_;
......@@ -252,10 +252,10 @@ bool Source::compute(int N, int B) {
impl_->rgb_ = tmp;
tmp = depth_;
depth_ = impl_->depth_;
impl_->depth_ = tmp;*/
impl_->depth_ = tmp;
impl_->rgb_.copyTo(rgb_);
impl_->depth_.copyTo(depth_);
//impl_->rgb_.copyTo(rgb_);
//impl_->depth_.copyTo(depth_);
return true;
}
return false;
......
......@@ -263,30 +263,31 @@ void Streamer::run(bool block) {
void Streamer::_swap(StreamSource *src) {
if (src->jobs == 0) {
UNIQUE_LOCK(src->mutex,lk);
for (unsigned int b=0; b<10; ++b) {
auto i = src->clients[b].begin();
while (i != src->clients[b].end()) {
// Client request completed so remove from list
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[b].erase(i);
--src->clientCount;
} else {
i++;
if (src->jobs == 0) {
for (unsigned int b=0; b<10; ++b) {
auto i = src->clients[b].begin();
while (i != src->clients[b].end()) {
// Client request completed so remove from list
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[b].erase(i);
--src->clientCount;
} else {
i++;
}
}
}
}
src->src->getFrames(src->rgb, src->depth);
src->src->swap();
src->src->swap();
src->src->getFrames(src->rgb, src->depth);
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
//}
src->jobs = 0;
src->frame++;
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
//}
src->jobs = -1;
src->frame++;
}
}
}
......@@ -310,19 +311,13 @@ void Streamer::wait() {
}
void Streamer::_schedule(StreamSource *src) {
// There will be two jobs for this source...
//UNIQUE_LOCK(job_mtx_,lk);
jobs_ += 2 + kChunkCount;
//lk.unlock();
if (src == nullptr || src->jobs > 0) return;
//StreamSource *src = sources_[uri];
if (src == nullptr || src->jobs != 0) return;
jobs_ += 2 + kChunkCount;
src->jobs = 2 + kChunkCount;
// Grab / capture job
ftl::pool.push([this,src](int id) {
//auto start = std::chrono::high_resolution_clock::now();
auto start = std::chrono::high_resolution_clock::now();
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_;
int64_t target = now / mspf_;
......@@ -374,7 +369,7 @@ void Streamer::_schedule(StreamSource *src) {
// Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
// Compute job
......@@ -395,7 +390,7 @@ void Streamer::_schedule(StreamSource *src) {
// Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
// Create jobs for each chunk
......@@ -415,7 +410,7 @@ void Streamer::_schedule(StreamSource *src) {
_swap(src);
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
}
}
......