diff --git a/.gitignore b/.gitignore index c209462d2be82075286bde9a7c86ffd0003f8544..003cb9a1743cb674846ffadc7d850480227e07bf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ build +out *.deb *.zip **/config.cpp diff --git a/include/ftl/protocol/packet.hpp b/include/ftl/protocol/packet.hpp index 4994194851bc66254de0004fb74609b4eb88c609..16821d1019e90230378dbc2b446570c2ddf691a4 100644 --- a/include/ftl/protocol/packet.hpp +++ b/include/ftl/protocol/packet.hpp @@ -21,6 +21,7 @@ static constexpr uint8_t kFlagRequest = 0x01; ///< Used for empty data packet static constexpr uint8_t kFlagCompleted = 0x02; ///< Last packet for timestamp static constexpr uint8_t kFlagReset = 0x04; ///< Request full data, including key frames. static constexpr uint8_t kFlagFull = 0x04; ///< If set on EndFrame packet then that frame contained full data +static constexpr uint8_t kFlagOutOfBand = 0x08; ///< The data is not tied to a specific frame rate static constexpr uint8_t kAllFrames = 255; static constexpr uint8_t kAllFramesets = 255; diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp index fe8ce022fd9dbb9a70da6867987656dc948f8f91..7f08b4d2530cfaaf61dff21fb45d9388db4f36ab 100644 --- a/include/ftl/protocol/streams.hpp +++ b/include/ftl/protocol/streams.hpp @@ -63,7 +63,8 @@ enum struct StreamProperty { kDescription, kTags, kUser, - kRequestSize + kRequestSize, + kBuffering }; /** diff --git a/src/peer.cpp b/src/peer.cpp index be479fd2408b032a4cc8dc366a8e1b62c8f1c1b9..e6f4e3946bbe7666680cf92efb4adb8b2501411f 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -280,6 +280,10 @@ void Peer::_createJob() { } --job_count_; }); + + if (ftl::pool.size() == 0) { + --job_count_; + } } void Peer::data() { diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index 8b630a9f8ebad099645422996f738c54d74fe5ae..c6f596e5171297a1e857699e139a113be84364fe 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -9,6 +9,8 @@ #include <memory> #include <utility> #include <algorithm> +#include <thread> +#include <chrono> #include "netstream.hpp" #include <ftl/time.hpp> #include "../uuidMSGPACK.hpp" @@ -20,6 +22,9 @@ #ifndef WIN32 #include <unistd.h> #include <limits.h> +#else +#include <timeapi.h> +#pragma comment(lib, "Winmm") #endif using ftl::protocol::Net; @@ -37,6 +42,7 @@ using std::string; using std::optional; using std::chrono::time_point_cast; using std::chrono::milliseconds; +using std::this_thread::sleep_for; using std::chrono::high_resolution_clock; std::atomic_size_t Net::req_bitrate__ = 0; @@ -227,9 +233,6 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket // Manage recuring requests if (!host_ && spkt.channel == Channel::kEndFrame && localFrame.frameset() < tally_.size()) { - frame_time_ = spkt.timestamp - last_frame_; // Milliseconds per frame - last_frame_ = spkt.timestamp; - // Are we close to reaching the end of our frames request? if (tally_[localFrame.frameset()] <= 5) { // Yes, so send new requests @@ -287,6 +290,99 @@ Net::FrameState *Net::_getFrameState(FrameID id) { return p; } +void Net::_run() { + thread_ = std::thread([this]() { +#ifdef WIN32 + timeBeginPeriod(5); +#endif + while (active_) { + auto now = ftl::time::get_time(); + int64_t nextTs = now + 20; + + // For every state + SHARED_LOCK(statesMtx_, lk); + for (auto &s : frameStates_) { + auto *state = s.second.get(); + + //if (state->base_local_ts_ == 0) continue; + + int64_t cts = now - state->base_local_ts_; + + bool hasNext = false; + + // If there are any packets that should be dispatched + // Then create a thread for each and do it. + if (state->active == 0) { + auto current = state->buffer.begin(); + while (current != state->buffer.end()) { + if (!current->done) { + if (state->base_local_ts_ == 0) { + state->base_local_ts_ = now; + state->base_pkt_ts_ = current->packets.first.timestamp; + cts = 0; + } + + int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_; + + // Should the packet be dispatched yet + if (pts <= cts) { + cts = pts; // Prevent multi frames. + + StreamPacket *spkt; + DataPacket *pkt; + + spkt = ¤t->packets.first; + pkt = ¤t->packets.second; + + ++state->active; + ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) { + _processPacket(buf->peer, 0, *spkt, *pkt); + buf->done = true; + --state->active; + }); + } else { + int64_t next = pts + state->base_local_ts_; + nextTs = std::min(nextTs, next); + hasNext = true; + break; + } + } + + ++current; + } + } else { + LOG(WARNING) << "Already active"; + } + + if (!hasNext) { + nextTs = std::min(nextTs, now + 10); + // TODO: Also, increase buffering + } + + // Remove consumed packets. + UNIQUE_LOCK(state->mtx, lk2); + state->buffer.remove_if([](const PacketBuffer &i) { return static_cast<bool>(i.done); }); + } + lk.unlock(); + + auto used = ftl::time::get_time(); + int64_t spare = nextTs - used; + sleep_for(milliseconds(std::max(int64_t(1), spare))); + } +#ifdef WIN32 + timeEndPeriod(5); +#endif + }); + +#ifndef WIN32 + sched_param p; + p.sched_priority = sched_get_priority_max(SCHED_RR); + pthread_setschedparam(thread_.native_handle(), SCHED_RR, &p); +#else + SetThreadPriority(thread_.native_handle(), THREAD_PRIORITY_TIME_CRITICAL); +#endif +} + bool Net::begin() { if (active_) return true; @@ -306,36 +402,15 @@ bool Net::begin() { PacketMSGPACK &pkt) { auto *state = _getFrameState(FrameID(spkt_raw.streamID, spkt_raw.frame_number)); - { + if (!host_) { UNIQUE_LOCK(state->mtx, lk); // TODO(Nick): This buffer could be faster? - auto &ppair = state->buffer.emplace_back(); - ppair.first = spkt_raw; - ppair.second = std::move(pkt); - } - if (!state->active.test_and_set()) { - auto *pp = &p; - ftl::pool.push([this, pp, ttimeoff, state](int p) { - while (true) { - StreamPacket *spkt; - DataPacket *pkt; - { - UNIQUE_LOCK(state->mtx, lk); - if (state->buffer.size() == 0) { - state->active.clear(); - break; - } - auto &front = state->buffer.front(); - spkt = &front.first; - pkt = &front.second; - } - _processPacket(pp, ttimeoff, *spkt, *pkt); - { - UNIQUE_LOCK(state->mtx, lk); - state->buffer.pop_front(); - } - } - }); + auto &buf = state->buffer.emplace_back(); + buf.packets.first = spkt_raw; + buf.packets.second = std::move(pkt); + buf.peer = &p; + } else { + _processPacket(&p, ttimeoff, spkt_raw, pkt); } }); @@ -356,6 +431,8 @@ bool Net::begin() { active_ = true; } + if (!host_) _run(); + return true; } @@ -603,6 +680,7 @@ bool Net::end() { active_ = false; net_->unbind(base_uri_); + if (thread_.joinable()) thread_.join(); return true; } @@ -622,6 +700,7 @@ void Net::setProperty(ftl::protocol::StreamProperty opt, std::any value) { case StreamProperty::kPaused : paused_ = std::any_cast<bool>(value); break; case StreamProperty::kName : name_ = std::any_cast<std::string>(value); break; case StreamProperty::kRequestSize : frames_to_request_ = std::any_cast<int>(value); break; + case StreamProperty::kBuffering : buffering_ = static_cast<int64_t>(std::any_cast<float>(value) * 1000.0f); break; case StreamProperty::kObservers : case StreamProperty::kBytesSent : case StreamProperty::kBytesReceived : @@ -641,9 +720,10 @@ std::any Net::getProperty(ftl::protocol::StreamProperty opt) { case StreamProperty::kPaused : return paused_; case StreamProperty::kBytesSent : return 0; case StreamProperty::kBytesReceived : return int64_t(bytes_received_); - case StreamProperty::kFrameRate : return (frame_time_ > 0) ? 1000 / frame_time_ : 0; + case StreamProperty::kFrameRate : return 0; case StreamProperty::kLatency : return 0; case StreamProperty::kName : return name_; + case StreamProperty::kBuffering : return static_cast<float>(buffering_) / 1000.0f; case StreamProperty::kRequestSize : return frames_to_request_; default : throw FTL_Error("Unsupported property"); } @@ -661,6 +741,7 @@ bool Net::supportsProperty(ftl::protocol::StreamProperty opt) { case StreamProperty::kFrameRate : case StreamProperty::kName : case StreamProperty::kRequestSize : + case StreamProperty::kBuffering : case StreamProperty::kURI : return true; default : return false; } diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index f803be3d6ed55058d83a1eb21ce9887c93076b7b..95a4db10f74b973f850cc4737e032671b1bc9133 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -90,8 +90,6 @@ class Net : public Stream { bool active_ = false; ftl::net::Universe *net_; std::optional<ftl::UUID> peer_; - int64_t last_frame_ = 0; - int64_t frame_time_ = 0; std::string uri_; std::string base_uri_; const bool host_; @@ -103,6 +101,7 @@ class Net : public Stream { std::string name_; ftl::PacketManager mgr_; ftl::Handler<ftl::net::Peer*> connect_cb_; + int64_t buffering_ = 0; static std::atomic_size_t req_bitrate__; static std::atomic_size_t tx_bitrate__; @@ -111,17 +110,26 @@ class Net : public Stream { static int64_t last_msg__; static MUTEX msg_mtx__; + struct PacketBuffer { + ftl::protocol::PacketPair packets; + ftl::net::Peer *peer = nullptr; + std::atomic_bool done = false; + }; + struct FrameState { ftl::protocol::FrameID id; - std::atomic_flag active; + std::atomic_int active = 0; MUTEX mtx; - std::list<ftl::protocol::PacketPair> buffer; + std::list<PacketBuffer> buffer; + int64_t base_pkt_ts_ = 0; + int64_t base_local_ts_ = 0; }; SHARED_MUTEX statesMtx_; std::unordered_map<uint32_t, std::unique_ptr<FrameState>> frameStates_; std::unordered_map<ftl::protocol::FrameID, std::list<detail::StreamClient>> clients_; + std::thread thread_; FrameState *_getFrameState(FrameID id); bool _enable(FrameID id); @@ -137,6 +145,7 @@ class Net : public Stream { bool doreset = false); void _cleanUp(); void _processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt); + void _run(); }; } // namespace protocol diff --git a/test/filestream_unit.cpp b/test/filestream_unit.cpp index 64ed59d8f2871b6d649e459a4e2a048b210d3761..0398a15e72865b78315b04d633cb325fbdacb244 100644 --- a/test/filestream_unit.cpp +++ b/test/filestream_unit.cpp @@ -61,12 +61,16 @@ TEST_CASE("File write and read", "[stream]") { auto reader = ftl::getStream(filename); - StreamPacket tspkt = {5,0,0,1, Channel::kColour}; + StreamPacket tspkt[3] = { + {5,0,0,1, Channel::kColour}, + {5,0,0,1, Channel::kColour}, + {5,0,0,1, Channel::kColour} + }; std::atomic_int count = 0; auto h = reader->onPacket([&tspkt,&count](const StreamPacket &spkt, const DataPacket &pkt) { if (spkt.channel == Channel::kEndFrame) return true; - tspkt = spkt; + tspkt[spkt.streamID] = spkt; ++count; return true; }); @@ -77,9 +81,15 @@ TEST_CASE("File write and read", "[stream]") { reader->end(); REQUIRE( count == 3 ); - REQUIRE( tspkt.timestamp > 0 ); - REQUIRE( tspkt.streamID == 2 ); - REQUIRE( tspkt.channel == Channel::kScreen ); + REQUIRE( tspkt[2].timestamp > 0); + REQUIRE( tspkt[2].streamID == 2); + REQUIRE( tspkt[2].channel == Channel::kScreen); + REQUIRE(tspkt[1].timestamp > 0); + REQUIRE(tspkt[1].streamID == 1); + REQUIRE(tspkt[1].channel == Channel::kDepth); + REQUIRE(tspkt[0].timestamp > 0); + REQUIRE(tspkt[0].streamID == 0); + REQUIRE(tspkt[0].channel == Channel::kConfidence); } SECTION("write read multiple packets at different timestamps") { diff --git a/test/netstream_unit.cpp b/test/netstream_unit.cpp index 76ff57242bc00d4469e452fcf44b72b699e4bbd1..b098049620cae2c474a30fbe5707f5d498442192 100644 --- a/test/netstream_unit.cpp +++ b/test/netstream_unit.cpp @@ -5,6 +5,7 @@ #include <ftl/uri.hpp> #include <ftl/exception.hpp> #include <ftl/protocol/node.hpp> +#include <ftl/time.hpp> #include <thread> #include <chrono> @@ -29,6 +30,7 @@ class MockNetStream : public ftl::protocol::Net { void hasPosted(const StreamPacket &spkt, const DataPacket &pkt) override { lastSpkt = spkt; + ++postCount; } void forceSeen(FrameID id, Channel channel) { @@ -36,6 +38,7 @@ class MockNetStream : public ftl::protocol::Net { } StreamPacket lastSpkt; + std::atomic_int postCount = 0; }; // --- Tests ------------------------------------------------------------------- @@ -79,6 +82,57 @@ TEST_CASE("Net stream options") { REQUIRE( s1->lastSpkt.timestamp == 100 ); REQUIRE( std::any_cast<bool>(s1->getProperty(StreamProperty::kPaused)) ); } + + SECTION("can increase buffering") { + auto p = createMockPeer(0); + fakedata[0] = ""; + send_handshake(*p.get()); + p->data(); + sleep_for(milliseconds(50)); + + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false); + + int64_t seenTs = 0; + int64_t delta = 0; + std::atomic_int count = 0; + + auto h = s1->onPacket([&seenTs, &count, &delta](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { + int64_t now = ftl::time::get_time(); + delta = now - seenTs; + seenTs = now; + ++count; + return true; + }); + + REQUIRE( s1->begin() ); + + ftl::protocol::StreamPacketMSGPACK spkt; + ftl::protocol::PacketMSGPACK pkt; + spkt.timestamp = 100; + spkt.streamID = 0; + spkt.frame_number = 0; + spkt.channel = Channel::kColour; + writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); + p->data(); + + while (count < 1) { + sleep_for(milliseconds(10)); + } + + spkt.timestamp = 130; + writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); + + s1->setProperty(ftl::protocol::StreamProperty::kBuffering, 0.1f); + + p->data(); + + while (count < 2) { + sleep_for(milliseconds(10)); + } + + REQUIRE(delta > 110); + REQUIRE(delta < 140); + } } TEST_CASE("Net stream sending requests") { @@ -153,9 +207,10 @@ TEST_CASE("Net stream sending requests") { spkt.timestamp = i; writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); p->data(); - sleep_for(milliseconds(50)); } + while (s1->postCount < 2) sleep_for(milliseconds(10)); + REQUIRE( s1->lastSpkt.channel == Channel::kColour ); REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 ); } @@ -196,9 +251,10 @@ TEST_CASE("Net stream sending requests") { spkt.timestamp = i >> 1; writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); p->data(); - sleep_for(milliseconds(50)); } + while (s1->postCount < 3) sleep_for(milliseconds(10)); + REQUIRE( s1->lastSpkt.channel == Channel::kColour ); REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 ); } @@ -217,6 +273,7 @@ TEST_CASE("Net stream sending requests") { ftl::protocol::StreamPacketMSGPACK spkt; ftl::protocol::PacketMSGPACK pkt; + spkt.timestamp = 0; spkt.streamID = 1; spkt.frame_number = 1; spkt.channel = Channel::kColour; diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp index fc798dd25597f819c911c6f99b7b52a2b0207d80..6cb9352f725ca31f15a634f5c3855e65d2f6ff88 100644 --- a/test/stream_integration.cpp +++ b/test/stream_integration.cpp @@ -5,6 +5,7 @@ #include <ftl/uri.hpp> #include <ftl/exception.hpp> #include <ftl/protocol/node.hpp> +#include <ftl/time.hpp> using ftl::protocol::FrameID; using ftl::protocol::StreamProperty; @@ -77,6 +78,7 @@ TEST_CASE("TCP Stream", "[net]") { REQUIRE( seenReq ); ftl::protocol::StreamPacket spkt; + spkt.timestamp = 0; spkt.streamID = 0; spkt.frame_number = 0; spkt.channel = ftl::protocol::Channel::kColour; @@ -121,6 +123,7 @@ TEST_CASE("TCP Stream", "[net]") { REQUIRE(s1->active(FrameID(0, 0)) == true); ftl::protocol::StreamPacket spkt; + spkt.timestamp = 0; spkt.streamID = 0; spkt.frame_number = 0; spkt.channel = ftl::protocol::Channel::kEndFrame; @@ -135,13 +138,74 @@ TEST_CASE("TCP Stream", "[net]") { } // TODO: Find better option - int k = 10; + int k = 20; while (--k > 0 && rcount < 30) { std::this_thread::sleep_for(std::chrono::milliseconds(20)); } REQUIRE( rcount == 30 ); } + SECTION("receives at correct rate") { + std::atomic_int rcount = 0; + std::atomic_int avgDelay = 0; + int64_t lastTS = 0; + auto s1 = ftl::createStream("ftl://mystream"); + REQUIRE( s1 ); + + auto s2 = self->getStream("ftl://mystream"); + REQUIRE( s2 ); + + auto h = s2->onPacket([&rcount, &lastTS, &avgDelay](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { + if (lastTS == 0) { + lastTS = ftl::time::get_time(); + } else { + int64_t now = ftl::time::get_time(); + int64_t delta = now - lastTS; + lastTS = now; + avgDelay += delta; + ++rcount; + } + return true; + }); + + s1->begin(); + s2->begin(); + + REQUIRE(s1->active(FrameID(0, 0)) == false); + + s2->enable(FrameID(0, 0)); + + // TODO: Find better option + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + REQUIRE(s1->active(FrameID(0, 0)) == true); + + ftl::protocol::StreamPacket spkt; + spkt.timestamp = 0; + spkt.streamID = 0; + spkt.frame_number = 0; + spkt.channel = ftl::protocol::Channel::kEndFrame; + ftl::protocol::DataPacket pkt; + pkt.bitrate = 10; + pkt.codec = ftl::protocol::Codec::kJPG; + pkt.packet_count = 1; + + for (int i=0; i<10; ++i) { + spkt.timestamp = i * 10; + s1->post(spkt, pkt); + } + + // TODO: Find better option + int k = 10; + while (--k > 0 && rcount < 9) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + const float delay = static_cast<float>(avgDelay) / static_cast<float>(rcount); + LOG(INFO) << "AVG DELAY = " << delay << ", " << rcount; + REQUIRE(delay > 8.0f); + REQUIRE(delay < 20.0f); + } + /*SECTION("handles out-of-order packets") { MUTEX mtx; std::vector<int64_t> times;