diff --git a/components/net/cpp/include/ftl/net/common.hpp b/components/net/cpp/include/ftl/net/common.hpp index 61914d81a93684f488330974a8cedf92c29681e8..78325d09b717d232751c21040be591f259fe7cf8 100644 --- a/components/net/cpp/include/ftl/net/common.hpp +++ b/components/net/cpp/include/ftl/net/common.hpp @@ -6,6 +6,7 @@ #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> +#include <netinet/tcp.h> #include <netdb.h> #include <arpa/inet.h> #define INVALID_SOCKET -1 diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index db9e703ddbe7054bf7e46fb52cebcc2d9fc6b690..0c71db7fe8b1fe07dcb31ff3833679ef85d6846c 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -370,7 +370,12 @@ bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string(); return false; } +#ifdef WIN32 + return p->isConnected() && p->send(name, args...) >= 0; +#else return p->isConnected() && p->send(name, args...) > 0; +#endif + } /*template <typename... ARGS> diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 853b91d26c213a74fc83a5f99706e7b6282dec76..9b54887ac683d4ac444e2776c93ac18ec117ed5b 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -30,6 +30,7 @@ #include <algorithm> #include <tuple> #include <chrono> +#include <vector> using std::tuple; using std::get; @@ -41,6 +42,7 @@ using std::chrono::seconds; using ftl::net::Universe; using ftl::net::callback_t; using std::mutex; +using std::vector; using std::recursive_mutex; using std::unique_lock; @@ -83,6 +85,9 @@ static SOCKET tcpConnect(URI &uri) { return INVALID_SOCKET; } + //int flags =1; + //if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; + addrinfo hints = {}, *addrs; hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; @@ -163,6 +168,9 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals is_waiting_ = true; scheme_ = ftl::URI::SCHEME_TCP; + + //int flags =1; + //if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; // Send the initiating handshake if valid if (status_ == kConnecting) { @@ -585,10 +593,16 @@ int Peer::_send() { auto send_vec = send_buf_.vector(); auto send_size = send_buf_.vector_size(); - int c = 0; + vector<WSABUF> wsabuf(send_size); + for (int i = 0; i < send_size; i++) { - c += ftl::net::internal::send(sock_, (char*)send_vec[i].iov_base, (int)send_vec[i].iov_len, 0); + wsabuf[i].len = (ULONG)send_vec[i].iov_len; + wsabuf[i].buf = (char*)send_vec[i].iov_base; + //c += ftl::net::internal::send(sock_, (char*)send_vec[i].iov_base, (int)send_vec[i].iov_len, 0); } + + DWORD bytessent; + int c = WSASend(sock_, wsabuf.data(), send_size, (LPDWORD)&bytessent, 0, NULL, NULL); #else int c = ftl::net::internal::writev(sock_, send_buf_.vector(), (int)send_buf_.vector_size()); #endif diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index 0286512fc9eb40dfb81663f2141d828281855107..665f9af9d54350dfc7bbd8acef5173d36986e255 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -16,6 +16,8 @@ namespace ftl { namespace rgbd { +static const int kChunkDim = 4; + namespace detail { struct StreamClient { @@ -31,13 +33,14 @@ static const unsigned int kDepth = 0x4; struct StreamSource { ftl::rgbd::Source *src; - std::atomic<unsigned int> state; // Busy or ready to swap? + std::atomic<unsigned int> jobs; // Busy or ready to swap? cv::Mat rgb; // Tx buffer cv::Mat depth; // Tx buffer - std::vector<unsigned char> rgb_buf; - std::vector<unsigned char> d_buf; + cv::Mat prev_rgb; + cv::Mat prev_depth; std::vector<detail::StreamClient> clients[10]; // One list per bitrate std::shared_mutex mutex; + unsigned long long frame; }; } @@ -109,6 +112,7 @@ class Streamer : public ftl::Configurable { std::mutex job_mtx_; std::condition_variable job_cv_; std::atomic<int> jobs_; + int compress_level_; void _schedule(); void _swap(detail::StreamSource *); diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index ef3fdd5d270288cd6c73422d0651cb40bced7b82..a527665c29d4d1ca5f301f1eae05ff7a8618bba7 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -4,6 +4,8 @@ #include <chrono> #include <shared_mutex> +#include <ftl/rgbd/streamer.hpp> + using ftl::rgbd::detail::NetSource; using ftl::net::Universe; using ftl::UUID; @@ -76,6 +78,42 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch //lk.unlock(); } +void NetSource::_recvChunk(int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { + cv::Mat tmp_rgb, tmp_depth; + + if (!active_) return; + + //LOG(INFO) << "Received chunk " << (int)chunk; + + //try { + // Decode in temporary buffers to prevent long locks + cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb); + cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth); + + // Build chunk head + int cx = (chunk % chunks_dim_) * chunk_width_; + int cy = (chunk / chunks_dim_) * chunk_height_; + + cv::Rect roi(cx,cy,chunk_width_,chunk_height_); + cv::Mat chunkRGB = rgb_(roi); + //cv::Mat ichunkDepth = idepth_(roi); + cv::Mat chunkDepth = depth_(roi); + + // Lock host to prevent grab + UNIQUE_LOCK(host_->mutex(),lk); + tmp_rgb.copyTo(chunkRGB); + //tmp_depth.convertTo(tmp_depth, CV_16UC1); + //if (delta) ichunkDepth = tmp_depth - ichunkDepth; + //tmp_depth.copyTo(ichunkDepth); + tmp_depth.convertTo(chunkDepth, CV_32FC1, 1.0f/(16.0f*10.0f)); + if (chunk == 0) N_--; + //lk.unlock(); + //} catch(...) { + // LOG(ERROR) << "Decode exception"; + // return; + //} +} + void NetSource::setPose(const Eigen::Matrix4f &pose) { if (!active_) return; @@ -110,8 +148,8 @@ void NetSource::_updateURI() { has_calibration_ = _getCalibration(*host_->getNet(), peer_, *uri, params_); - host_->getNet()->bind(*uri, [this](const vector<unsigned char> &jpg, const vector<unsigned char> &d) { - _recv(jpg, d); + host_->getNet()->bind(*uri, [this](int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { + _recvChunk(frame, chunk, delta, jpg, d); }); N_ = 10; @@ -123,6 +161,14 @@ void NetSource::_updateURI() { LOG(ERROR) << "Could not connect to stream " << *uri; } + // Update chunk details + chunks_dim_ = ftl::rgbd::kChunkDim; + chunk_width_ = params_.width / chunks_dim_; + chunk_height_ = params_.height / chunks_dim_; + rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0)); + depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f); + //idepth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_16UC1, cv::Scalar(0)); + uri_ = *uri; active_ = true; } else { diff --git a/components/rgbd-sources/src/net.hpp b/components/rgbd-sources/src/net.hpp index de4a3b00b78bc8322a82f4ec598263d35202d69d..3a986d0647ae465e355dbf887e283f61a43b53b3 100644 --- a/components/rgbd-sources/src/net.hpp +++ b/components/rgbd-sources/src/net.hpp @@ -37,9 +37,14 @@ class NetSource : public detail::Source { std::string uri_; ftl::net::callback_t h_; std::mutex mutex_; + int chunks_dim_; + int chunk_width_; + int chunk_height_; + cv::Mat idepth_; bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::Camera &p); void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); + void _recvChunk(int frame, int chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); void _updateURI(); }; diff --git a/components/rgbd-sources/src/realsense_source.cpp b/components/rgbd-sources/src/realsense_source.cpp index 0766adb60669ae126cd89ee0faac66a2333e29cf..54fd1888d3a25446d57c8c91dbe2cfd3c9d45594 100644 --- a/components/rgbd-sources/src/realsense_source.cpp +++ b/components/rgbd-sources/src/realsense_source.cpp @@ -39,7 +39,7 @@ RealsenseSource::~RealsenseSource() { bool RealsenseSource::grab() { rs2::frameset frames = pipe_.wait_for_frames(); //rs2::align align(RS2_STREAM_DEPTH); - //frames = align_to_depth_.process(frames); //align_to_depth_.process(frames); + frames = align_to_depth_.process(frames); //align_to_depth_.process(frames); rs2::depth_frame depth = frames.get_depth_frame(); float w = depth.get_width(); diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index 251f77f82f28fff82fe1588c8d367b4031de197c..9e693533815f95aa08bbe368e66bef71e245aa78 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -27,6 +27,8 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) active_ = false; net_ = net; + + compress_level_ = value("compression", 1); net->bind("find_stream", [this](const std::string &uri) -> optional<UUID> { SHARED_LOCK(mutex_,slk); @@ -100,7 +102,9 @@ void Streamer::add(Source *src) { StreamSource *s = new StreamSource; s->src = src; - s->state = 0; + //s->prev_depth = cv::Mat(cv::Size(src->parameters().width, src->parameters().height), CV_16SC1, 0); + s->jobs = 0; + s->frame = 0; sources_[src->getID()] = s; } @@ -197,31 +201,27 @@ void Streamer::run(bool block) { // Must be called in source locked state or src.state must be atomic void Streamer::_swap(StreamSource *src) { - if (src->state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kRGB | ftl::rgbd::detail::kDepth)) { + if (src->jobs == 0) { UNIQUE_LOCK(src->mutex,lk); - if (src->rgb_buf.size() > 0 && src->d_buf.size() > 0) { - auto i = src->clients[0].begin(); - while (i != src->clients[0].end()) { - try { - // TODO(Nick) Send pose and timestamp - if (!net_->send((*i).peerid, (*i).uri, src->rgb_buf, src->d_buf)) { - (*i).txcount = (*i).txmax; - } - } catch(...) { - (*i).txcount = (*i).txmax; - } - (*i).txcount++; - if ((*i).txcount >= (*i).txmax) { - LOG(INFO) << "Remove client: " << (*i).uri; - i = src->clients[0].erase(i); - } else { - i++; - } + auto i = src->clients[0].begin(); + while (i != src->clients[0].end()) { + (*i).txcount++; + if ((*i).txcount >= (*i).txmax) { + LOG(INFO) << "Remove client: " << (*i).uri; + i = src->clients[0].erase(i); + } else { + i++; } } + src->src->getFrames(src->rgb, src->depth); - src->state = 0; + //if (!src->rgb.empty() && src->prev_depth.empty()) { + //src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0)); + //LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows; + //} + src->jobs = 0; + src->frame++; } } @@ -256,11 +256,12 @@ void Streamer::_schedule() { // There will be two jobs for this source... //UNIQUE_LOCK(job_mtx_,lk); - jobs_ += 3; + jobs_ += 1 + kChunkDim*kChunkDim; //lk.unlock(); StreamSource *src = sources_[uri]; - if (src == nullptr || src->state != 0) continue; + if (src == nullptr || src->jobs != 0) continue; + src->jobs = 1 + kChunkDim*kChunkDim; // Grab job ftl::pool.push([this,src](int id) { @@ -273,7 +274,8 @@ void Streamer::_schedule() { // CHECK (Nick) Can state be an atomic instead? //UNIQUE_LOCK(src->mutex, lk); - src->state |= ftl::rgbd::detail::kGrabbed; + src->jobs--; + //src->state |= ftl::rgbd::detail::kGrabbed; _swap(src); // Mark job as finished @@ -281,8 +283,62 @@ void Streamer::_schedule() { job_cv_.notify_one(); }); + // Create jobs for each chunk + for (int i=0; i<(kChunkDim*kChunkDim); i++) { + ftl::pool.push([this,src](int id, int chunk) { + if (!src->rgb.empty() && !src->depth.empty()) { + bool delta = (chunk+src->frame) % 8 > 0; + int chunk_width = src->rgb.cols / kChunkDim; + int chunk_height = src->rgb.rows / kChunkDim; + + // Build chunk head + int cx = (chunk % kChunkDim) * chunk_width; + int cy = (chunk / kChunkDim) * chunk_height; + + cv::Rect roi(cx,cy,chunk_width,chunk_height); + vector<unsigned char> rgb_buf; + cv::Mat chunkRGB = src->rgb(roi); + cv::Mat chunkDepth = src->depth(roi); + //cv::Mat chunkDepthPrev = src->prev_depth(roi); + + cv::imencode(".jpg", chunkRGB, rgb_buf); + + cv::Mat d2, d3; + vector<unsigned char> d_buf; + chunkDepth.convertTo(d2, CV_16UC1, 16*10); + //if (delta) d3 = (d2 * 2) - chunkDepthPrev; + //else d3 = d2; + //d2.copyTo(chunkDepthPrev); + vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, compress_level_}; // Default is 1 for fast, 9 = small but slow. + cv::imencode(".png", d2, d_buf, pngparams); + + //LOG(INFO) << "Sending chunk " << chunk << " : size = " << (d_buf.size()+rgb_buf.size()) / 1024 << "kb"; + + UNIQUE_LOCK(src->mutex,lk); + auto i = src->clients[0].begin(); + while (i != src->clients[0].end()) { + try { + // TODO(Nick) Send pose and timestamp + if (!net_->send((*i).peerid, (*i).uri, 0, chunk, delta, rgb_buf, d_buf)) { + (*i).txcount = (*i).txmax; + } + } catch(...) { + (*i).txcount = (*i).txmax; + } + i++; + } + } + + //src->state |= ftl::rgbd::detail::kRGB; + src->jobs--; + _swap(src); + --jobs_; + job_cv_.notify_one(); + }, i); + } + // Compress colour job - ftl::pool.push([this,src](int id) { + /*pool_.push([this,src](int id) { if (!src->rgb.empty()) { auto start = std::chrono::high_resolution_clock::now(); @@ -321,7 +377,7 @@ void Streamer::_schedule() { _swap(src); --jobs_; job_cv_.notify_one(); - }); + });*/ // Transmit job // For any single source and bitrate there is only one thread