Skip to content
Snippets Groups Projects
Commit 41f11e15 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/89/chunking' into 'master'

Implements #89 PNG chunking

Closes #89 and #8

See merge request nicolas.pope/ftl!40
parents a2a3c5c4 70737117
Branches
Tags
1 merge request!40Implements #89 PNG chunking
Pipeline #11610 passed
......@@ -6,6 +6,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <arpa/inet.h>
#define INVALID_SOCKET -1
......
......@@ -370,7 +370,12 @@ bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args)
DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string();
return false;
}
#ifdef WIN32
return p->isConnected() && p->send(name, args...) >= 0;
#else
return p->isConnected() && p->send(name, args...) > 0;
#endif
}
/*template <typename... ARGS>
......
......@@ -30,6 +30,7 @@
#include <algorithm>
#include <tuple>
#include <chrono>
#include <vector>
using std::tuple;
using std::get;
......@@ -41,6 +42,7 @@ using std::chrono::seconds;
using ftl::net::Universe;
using ftl::net::callback_t;
using std::mutex;
using std::vector;
using std::recursive_mutex;
using std::unique_lock;
......@@ -83,6 +85,9 @@ static SOCKET tcpConnect(URI &uri) {
return INVALID_SOCKET;
}
//int flags =1;
//if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
addrinfo hints = {}, *addrs;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
......@@ -164,6 +169,9 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals
is_waiting_ = true;
scheme_ = ftl::URI::SCHEME_TCP;
//int flags =1;
//if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
// Send the initiating handshake if valid
if (status_ == kConnecting) {
// Install return handshake handler.
......@@ -585,10 +593,16 @@ int Peer::_send() {
auto send_vec = send_buf_.vector();
auto send_size = send_buf_.vector_size();
int c = 0;
vector<WSABUF> wsabuf(send_size);
for (int i = 0; i < send_size; i++) {
c += ftl::net::internal::send(sock_, (char*)send_vec[i].iov_base, (int)send_vec[i].iov_len, 0);
wsabuf[i].len = (ULONG)send_vec[i].iov_len;
wsabuf[i].buf = (char*)send_vec[i].iov_base;
//c += ftl::net::internal::send(sock_, (char*)send_vec[i].iov_base, (int)send_vec[i].iov_len, 0);
}
DWORD bytessent;
int c = WSASend(sock_, wsabuf.data(), send_size, (LPDWORD)&bytessent, 0, NULL, NULL);
#else
int c = ftl::net::internal::writev(sock_, send_buf_.vector(), (int)send_buf_.vector_size());
#endif
......
......@@ -16,6 +16,8 @@
namespace ftl {
namespace rgbd {
static const int kChunkDim = 4;
namespace detail {
struct StreamClient {
......@@ -31,13 +33,14 @@ static const unsigned int kDepth = 0x4;
struct StreamSource {
ftl::rgbd::Source *src;
std::atomic<unsigned int> state; // Busy or ready to swap?
std::atomic<unsigned int> jobs; // Busy or ready to swap?
cv::Mat rgb; // Tx buffer
cv::Mat depth; // Tx buffer
std::vector<unsigned char> rgb_buf;
std::vector<unsigned char> d_buf;
cv::Mat prev_rgb;
cv::Mat prev_depth;
std::vector<detail::StreamClient> clients[10]; // One list per bitrate
std::shared_mutex mutex;
unsigned long long frame;
};
}
......@@ -109,6 +112,7 @@ class Streamer : public ftl::Configurable {
std::mutex job_mtx_;
std::condition_variable job_cv_;
std::atomic<int> jobs_;
int compress_level_;
void _schedule();
void _swap(detail::StreamSource *);
......
......@@ -4,6 +4,8 @@
#include <chrono>
#include <shared_mutex>
#include <ftl/rgbd/streamer.hpp>
using ftl::rgbd::detail::NetSource;
using ftl::net::Universe;
using ftl::UUID;
......@@ -76,6 +78,42 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch
//lk.unlock();
}
void NetSource::_recvChunk(int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
cv::Mat tmp_rgb, tmp_depth;
if (!active_) return;
//LOG(INFO) << "Received chunk " << (int)chunk;
//try {
// Decode in temporary buffers to prevent long locks
cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb);
cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth);
// Build chunk head
int cx = (chunk % chunks_dim_) * chunk_width_;
int cy = (chunk / chunks_dim_) * chunk_height_;
cv::Rect roi(cx,cy,chunk_width_,chunk_height_);
cv::Mat chunkRGB = rgb_(roi);
//cv::Mat ichunkDepth = idepth_(roi);
cv::Mat chunkDepth = depth_(roi);
// Lock host to prevent grab
UNIQUE_LOCK(host_->mutex(),lk);
tmp_rgb.copyTo(chunkRGB);
//tmp_depth.convertTo(tmp_depth, CV_16UC1);
//if (delta) ichunkDepth = tmp_depth - ichunkDepth;
//tmp_depth.copyTo(ichunkDepth);
tmp_depth.convertTo(chunkDepth, CV_32FC1, 1.0f/(16.0f*10.0f));
if (chunk == 0) N_--;
//lk.unlock();
//} catch(...) {
// LOG(ERROR) << "Decode exception";
// return;
//}
}
void NetSource::setPose(const Eigen::Matrix4f &pose) {
if (!active_) return;
......@@ -110,8 +148,8 @@ void NetSource::_updateURI() {
has_calibration_ = _getCalibration(*host_->getNet(), peer_, *uri, params_);
host_->getNet()->bind(*uri, [this](const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
_recv(jpg, d);
host_->getNet()->bind(*uri, [this](int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
_recvChunk(frame, chunk, delta, jpg, d);
});
N_ = 10;
......@@ -123,6 +161,14 @@ void NetSource::_updateURI() {
LOG(ERROR) << "Could not connect to stream " << *uri;
}
// Update chunk details
chunks_dim_ = ftl::rgbd::kChunkDim;
chunk_width_ = params_.width / chunks_dim_;
chunk_height_ = params_.height / chunks_dim_;
rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0));
depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f);
//idepth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_16UC1, cv::Scalar(0));
uri_ = *uri;
active_ = true;
} else {
......
......@@ -37,9 +37,14 @@ class NetSource : public detail::Source {
std::string uri_;
ftl::net::callback_t h_;
std::mutex mutex_;
int chunks_dim_;
int chunk_width_;
int chunk_height_;
cv::Mat idepth_;
bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::Camera &p);
void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
void _recvChunk(int frame, int chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
void _updateURI();
};
......
......@@ -39,7 +39,7 @@ RealsenseSource::~RealsenseSource() {
bool RealsenseSource::grab() {
rs2::frameset frames = pipe_.wait_for_frames();
//rs2::align align(RS2_STREAM_DEPTH);
//frames = align_to_depth_.process(frames); //align_to_depth_.process(frames);
frames = align_to_depth_.process(frames); //align_to_depth_.process(frames);
rs2::depth_frame depth = frames.get_depth_frame();
float w = depth.get_width();
......
......@@ -28,6 +28,8 @@ Streamer::Streamer(nlohmann::json &config, Universe *net)
active_ = false;
net_ = net;
compress_level_ = value("compression", 1);
net->bind("find_stream", [this](const std::string &uri) -> optional<UUID> {
SHARED_LOCK(mutex_,slk);
......@@ -100,7 +102,9 @@ void Streamer::add(Source *src) {
StreamSource *s = new StreamSource;
s->src = src;
s->state = 0;
//s->prev_depth = cv::Mat(cv::Size(src->parameters().width, src->parameters().height), CV_16SC1, 0);
s->jobs = 0;
s->frame = 0;
sources_[src->getID()] = s;
}
......@@ -197,20 +201,11 @@ void Streamer::run(bool block) {
// Must be called in source locked state or src.state must be atomic
void Streamer::_swap(StreamSource *src) {
if (src->state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kRGB | ftl::rgbd::detail::kDepth)) {
if (src->jobs == 0) {
UNIQUE_LOCK(src->mutex,lk);
if (src->rgb_buf.size() > 0 && src->d_buf.size() > 0) {
auto i = src->clients[0].begin();
while (i != src->clients[0].end()) {
try {
// TODO(Nick) Send pose and timestamp
if (!net_->send((*i).peerid, (*i).uri, src->rgb_buf, src->d_buf)) {
(*i).txcount = (*i).txmax;
}
} catch(...) {
(*i).txcount = (*i).txmax;
}
(*i).txcount++;
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
......@@ -219,9 +214,14 @@ void Streamer::_swap(StreamSource *src) {
i++;
}
}
}
src->src->getFrames(src->rgb, src->depth);
src->state = 0;
//if (!src->rgb.empty() && src->prev_depth.empty()) {
//src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0));
//LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows;
//}
src->jobs = 0;
src->frame++;
}
}
......@@ -256,11 +256,12 @@ void Streamer::_schedule() {
// There will be two jobs for this source...
//UNIQUE_LOCK(job_mtx_,lk);
jobs_ += 3;
jobs_ += 1 + kChunkDim*kChunkDim;
//lk.unlock();
StreamSource *src = sources_[uri];
if (src == nullptr || src->state != 0) continue;
if (src == nullptr || src->jobs != 0) continue;
src->jobs = 1 + kChunkDim*kChunkDim;
// Grab job
ftl::pool.push([this,src](int id) {
......@@ -273,7 +274,8 @@ void Streamer::_schedule() {
// CHECK (Nick) Can state be an atomic instead?
//UNIQUE_LOCK(src->mutex, lk);
src->state |= ftl::rgbd::detail::kGrabbed;
src->jobs--;
//src->state |= ftl::rgbd::detail::kGrabbed;
_swap(src);
// Mark job as finished
......@@ -281,8 +283,62 @@ void Streamer::_schedule() {
job_cv_.notify_one();
});
// Create jobs for each chunk
for (int i=0; i<(kChunkDim*kChunkDim); i++) {
ftl::pool.push([this,src](int id, int chunk) {
if (!src->rgb.empty() && !src->depth.empty()) {
bool delta = (chunk+src->frame) % 8 > 0;
int chunk_width = src->rgb.cols / kChunkDim;
int chunk_height = src->rgb.rows / kChunkDim;
// Build chunk head
int cx = (chunk % kChunkDim) * chunk_width;
int cy = (chunk / kChunkDim) * chunk_height;
cv::Rect roi(cx,cy,chunk_width,chunk_height);
vector<unsigned char> rgb_buf;
cv::Mat chunkRGB = src->rgb(roi);
cv::Mat chunkDepth = src->depth(roi);
//cv::Mat chunkDepthPrev = src->prev_depth(roi);
cv::imencode(".jpg", chunkRGB, rgb_buf);
cv::Mat d2, d3;
vector<unsigned char> d_buf;
chunkDepth.convertTo(d2, CV_16UC1, 16*10);
//if (delta) d3 = (d2 * 2) - chunkDepthPrev;
//else d3 = d2;
//d2.copyTo(chunkDepthPrev);
vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, compress_level_}; // Default is 1 for fast, 9 = small but slow.
cv::imencode(".png", d2, d_buf, pngparams);
//LOG(INFO) << "Sending chunk " << chunk << " : size = " << (d_buf.size()+rgb_buf.size()) / 1024 << "kb";
UNIQUE_LOCK(src->mutex,lk);
auto i = src->clients[0].begin();
while (i != src->clients[0].end()) {
try {
// TODO(Nick) Send pose and timestamp
if (!net_->send((*i).peerid, (*i).uri, 0, chunk, delta, rgb_buf, d_buf)) {
(*i).txcount = (*i).txmax;
}
} catch(...) {
(*i).txcount = (*i).txmax;
}
i++;
}
}
//src->state |= ftl::rgbd::detail::kRGB;
src->jobs--;
_swap(src);
--jobs_;
job_cv_.notify_one();
}, i);
}
// Compress colour job
ftl::pool.push([this,src](int id) {
/*pool_.push([this,src](int id) {
if (!src->rgb.empty()) {
auto start = std::chrono::high_resolution_clock::now();
......@@ -321,7 +377,7 @@ void Streamer::_schedule() {
_swap(src);
--jobs_;
job_cv_.notify_one();
});
});*/
// Transmit job
// For any single source and bitrate there is only one thread
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment