Skip to content
Snippets Groups Projects
Commit c42a7563 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'master' into feature/146/upsample

parents d8af5453 82e627db
No related branches found
No related tags found
1 merge request!88Implements #146 upsampling option
......@@ -214,10 +214,10 @@ class Universe : public ftl::Configurable {
std::map<ftl::UUID, ftl::net::Peer*> peer_ids_;
ftl::UUID id_;
ftl::net::Dispatcher disp_;
std::thread thread_;
std::list<ReconnectInfo> reconnects_;
size_t phase_;
std::list<ftl::net::Peer*> garbage_;
std::thread thread_;
struct ConnHandler {
callback_t id;
......
......@@ -24,12 +24,12 @@ using ftl::net::callback_t;
callback_t ftl::net::Universe::cbid__ = 0;
Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), thread_(Universe::__start, this), phase_(0) {
Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) {
_installBindings();
}
Universe::Universe(nlohmann::json &config) :
Configurable(config), active_(true), this_peer(ftl::net::this_peer), thread_(Universe::__start, this), phase_(0) {
Configurable(config), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) {
_installBindings();
}
......
......@@ -35,7 +35,7 @@ static const unsigned int kFrameDropLimit = 5;
struct StreamSource {
ftl::rgbd::Source *src;
std::atomic<unsigned int> jobs; // Busy or ready to swap?
std::atomic<int> jobs; // Busy or ready to swap?
std::atomic<unsigned int> clientCount;
cv::Mat rgb; // Tx buffer
cv::Mat depth; // Tx buffer
......
......@@ -150,8 +150,6 @@ void NetSource::_recvChunk(int64_t frame, int chunk, bool delta, const vector<un
// Lock to allow buffer swap
UNIQUE_LOCK(mutex_,lk2);
chunk_count_ = 0;
// Swap the double buffers
cv::Mat tmp;
tmp = rgb_;
......@@ -161,6 +159,7 @@ void NetSource::_recvChunk(int64_t frame, int chunk, bool delta, const vector<un
depth_ = d_depth_;
d_depth_ = tmp;
chunk_count_ = 0;
timestamp_ = current_frame_;
current_frame_ = frame;
}
......
......@@ -165,10 +165,10 @@ ftl::rgbd::detail::Source *Source::_createDeviceImpl(const ftl::URI &uri) {
void Source::getFrames(cv::Mat &rgb, cv::Mat &depth) {
SHARED_LOCK(mutex_,lk);
//rgb_.copyTo(rgb);
//depth_.copyTo(depth);
rgb = rgb_;
depth = depth_;
rgb_.copyTo(rgb);
depth_.copyTo(depth);
//rgb = rgb_;
//depth = depth_;
/*cv::Mat tmp;
tmp = rgb;
......@@ -254,8 +254,11 @@ bool Source::compute(int N, int B) {
depth_ = impl_->depth_;
impl_->depth_ = tmp;*/
// TODO:(Nick) Reduce buffer copies
impl_->rgb_.copyTo(rgb_);
impl_->depth_.copyTo(depth_);
//rgb_ = impl_->rgb_;
//depth_ = impl_->depth_;
return true;
}
return false;
......
......@@ -263,30 +263,31 @@ void Streamer::run(bool block) {
void Streamer::_swap(StreamSource *src) {
if (src->jobs == 0) {
UNIQUE_LOCK(src->mutex,lk);
for (unsigned int b=0; b<10; ++b) {
auto i = src->clients[b].begin();
while (i != src->clients[b].end()) {
// Client request completed so remove from list
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[b].erase(i);
--src->clientCount;
} else {
i++;
if (src->jobs == 0) {
for (unsigned int b=0; b<10; ++b) {
auto i = src->clients[b].begin();
while (i != src->clients[b].end()) {
// Client request completed so remove from list
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
i = src->clients[b].erase(i);
--src->clientCount;
} else {
i++;
}
}
}
}
src->src->getFrames(src->rgb, src->depth);
src->src->swap();
src->src->swap();
src->src->getFrames(src->rgb, src->depth);
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
//}
src->jobs = 0;
src->frame++;
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
//}
src->jobs = -1;
src->frame++;
}
}
}
......@@ -310,19 +311,13 @@ void Streamer::wait() {
}
void Streamer::_schedule(StreamSource *src) {
// There will be two jobs for this source...
//UNIQUE_LOCK(job_mtx_,lk);
jobs_ += 2 + kChunkCount;
//lk.unlock();
if (src == nullptr || src->jobs > 0) return;
//StreamSource *src = sources_[uri];
if (src == nullptr || src->jobs != 0) return;
jobs_ += 2 + kChunkCount;
src->jobs = 2 + kChunkCount;
// Grab / capture job
ftl::pool.push([this,src](int id) {
//auto start = std::chrono::high_resolution_clock::now();
auto start = std::chrono::high_resolution_clock::now();
int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_;
int64_t target = now / mspf_;
......@@ -374,7 +369,7 @@ void Streamer::_schedule(StreamSource *src) {
// Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
// Compute job
......@@ -395,7 +390,7 @@ void Streamer::_schedule(StreamSource *src) {
// Mark job as finished
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
// Create jobs for each chunk
......@@ -415,7 +410,7 @@ void Streamer::_schedule(StreamSource *src) {
_swap(src);
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
if (jobs_ == 0) job_cv_.notify_one();
});
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment