Skip to content
Snippets Groups Projects

Partial fixes from chunk sync

Merged Nicolas Pope requested to merge bug/chunksync2 into master
1 file
+ 1
0
Compare changes
  • Side-by-side
  • Inline
@@ -263,30 +263,31 @@ void Streamer::run(bool block) {
void Streamer::_swap(StreamSource *src) {
if (src->jobs == 0) {
UNIQUE_LOCK(src->mutex,lk);
for (unsigned int b=0; b<10; ++b) {
auto i = src->clients[b].begin();
while (i != src->clients[b].end()) {
// Client request completed so remove from list
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[b].erase(i);
--src->clientCount;
} else {
i++;
if (src->jobs == 0) {
for (unsigned int b=0; b<10; ++b) {
auto i = src->clients[b].begin();
while (i != src->clients[b].end()) {
// Client request completed so remove from list
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[b].erase(i);
--src->clientCount;
} else {
i++;
}
}
}
}
src->src->getFrames(src->rgb, src->depth);
src->src->swap();
src->src->swap();
src->src->getFrames(src->rgb, src->depth);
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
//}
src->jobs = 0;
src->frame++;
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
//}
src->jobs = -1;
src->frame++;
}
}
}
@@ -310,19 +311,13 @@ void Streamer::wait() {
}
void Streamer::_schedule(StreamSource *src) {
// There will be two jobs for this source...
//UNIQUE_LOCK(job_mtx_,lk);
jobs_ += 2 + kChunkCount;
//lk.unlock();
if (src == nullptr || src->jobs > 0) return;
//StreamSource *src = sources_[uri];
if (src == nullptr || src->jobs != 0) return;
jobs_ += 2 + kChunkCount;
src->jobs = 2 + kChunkCount;
// Grab / capture job
ftl::pool.push([this,src](int id) {
//auto start = std::chrono::high_resolution_clock::now();
auto start = std::chrono::high_resolution_clock::now();
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_;
int64_t target = now / mspf_;
@@ -374,7 +369,7 @@ void Streamer::_schedule(StreamSource *src) {
// Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
// Compute job
@@ -395,7 +390,7 @@ void Streamer::_schedule(StreamSource *src) {
// Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
// Create jobs for each chunk
@@ -415,7 +410,7 @@ void Streamer::_schedule(StreamSource *src) {
_swap(src);
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
}
}
Loading