Newer
Older
#include <vector>
#include <thread>
#include <chrono>
using ftl::rgbd::Camera;
using std::shared_mutex;
using std::unique_lock;
using std::vector;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
bool NetSource::_getCalibration(Universe &net, const UUID &peer, const string &src, ftl::rgbd::Camera &p) {
try {
while(true) {
auto buf = net.call<vector<unsigned char>>(peer_, "source_calibration", src);
if (buf.size() > 0) {
memcpy((char*)&p, buf.data(), buf.size());
if (sizeof(p) != buf.size()) {
LOG(ERROR) << "Corrupted calibration";
return false;
}
LOG(INFO) << "Calibration received: " << p.cx << ", " << p.cy << ", " << p.fx << ", " << p.fy;
return true;
} else {
LOG(INFO) << "Could not get calibration, retrying";
sleep_for(milliseconds(500));
NetSource::NetSource(ftl::rgbd::Source *host)
: ftl::rgbd::detail::Source(host), active_(false) {
h_ = host_->getNet()->onConnect([this](ftl::net::Peer *p) {
if (active_) return;
LOG(INFO) << "NetSource restart...";
_updateURI();
});
void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
// Decode in temporary buffers to prevent long locks
cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb);
cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth);
// Lock host to prevent grab
UNIQUE_LOCK(host_->mutex(),lk);
rgb_ = tmp_rgb;
tmp_depth.convertTo(depth_, CV_32FC1, 1.0f/(16.0f*100.0f));
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
void NetSource::_recvChunk(int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
cv::Mat tmp_rgb, tmp_depth;
if (!active_) return;
//LOG(INFO) << "Received chunk " << (int)chunk;
//try {
// Decode in temporary buffers to prevent long locks
cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb);
cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth);
// Build chunk head
int cx = (chunk % chunks_dim_) * chunk_width_;
int cy = (chunk / chunks_dim_) * chunk_height_;
cv::Rect roi(cx,cy,chunk_width_,chunk_height_);
cv::Mat chunkRGB = rgb_(roi);
//cv::Mat ichunkDepth = idepth_(roi);
cv::Mat chunkDepth = depth_(roi);
// Lock host to prevent grab
UNIQUE_LOCK(host_->mutex(),lk);
tmp_rgb.copyTo(chunkRGB);
//tmp_depth.convertTo(tmp_depth, CV_16UC1);
//if (delta) ichunkDepth = tmp_depth - ichunkDepth;
//tmp_depth.copyTo(ichunkDepth);
tmp_depth.convertTo(chunkDepth, CV_32FC1, 1.0f/(16.0f*10.0f));
if (chunk == 0) N_--;
//lk.unlock();
//} catch(...) {
// LOG(ERROR) << "Decode exception";
// return;
//}
}
void NetSource::setPose(const Eigen::Matrix4f &pose) {
if (!active_) return;
vector<unsigned char> vec((unsigned char*)pose.data(), (unsigned char*)(pose.data()+(pose.size())));
try {
if (!host_->getNet()->send(peer_, "set_pose", *host_->get<string>("uri"), vec)) {
// TODO(Nick) If URI changes then must unbind + rebind.
if (uri_.size() > 0) {
auto p = host_->getNet()->findOne<ftl::UUID>("find_stream", *uri);
if (!p) {
LOG(ERROR) << "Could not find stream: " << *uri;
return;
}
peer_ = *p;
has_calibration_ = _getCalibration(*host_->getNet(), peer_, *uri, params_);
host_->getNet()->bind(*uri, [this](int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
_recvChunk(frame, chunk, delta, jpg, d);
});
N_ = 10;
// Initiate stream with request for first 10 frames
try {
host_->getNet()->send(peer_, "get_stream", *uri, 10, 0, host_->getNet()->id(), *uri);
} catch(...) {
LOG(ERROR) << "Could not connect to stream " << *uri;
}
// Update chunk details
chunks_dim_ = ftl::rgbd::kChunkDim;
chunk_width_ = params_.width / chunks_dim_;
chunk_height_ = params_.height / chunks_dim_;
rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0));
depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f);
//idepth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_16UC1, cv::Scalar(0));
uri_ = *uri;
active_ = true;
} else {
uri_ = "";
LOG(WARNING) << "NetSource URI is missing";
}
}
// Send one frame before end to prevent unwanted pause
if (N_ <= 2) {
N_ = 10;
if (!host_->getNet()->send(peer_, "get_stream", *host_->get<string>("uri"), 10, 0, host_->getNet()->id(), *host_->get<string>("uri"))) {
active_ = false;
}
}
}
bool NetSource::isReady() {
return has_calibration_ && !rgb_.empty();
}