diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index 49717205fbd7b7c306217dd4878c9f23569d2ea5..765daa6aba477b94ccc7924484a9624c0cbf7e85 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -63,6 +63,7 @@ static bool quiet = false; static void run(ftl::Configurable *root) { Universe *net = ftl::create<Universe>(root, "net"); + ftl::ctrl::Master ctrl(root, net); ftl::timer::setHighPrecision(true); @@ -89,8 +90,6 @@ static void run(ftl::Configurable *root) { } int sync_counter = 0; - ftl::ctrl::Master ctrl(root, net); - // Sync clocks! auto timer = ftl::timer::add(ftl::timer::kTimerMain, [&time_peer,&sync_counter,net](int64_t ts) { if (sync_counter-- <= 0 && time_peer != ftl::UUID(0) ) { diff --git a/components/codecs/include/ftl/codecs/packet.hpp b/components/codecs/include/ftl/codecs/packet.hpp index 65c07580e471f2362b3a4056bc03e78d26ead094..4145ebaa879a37fdae83cb7c331e423cc2e653b0 100644 --- a/components/codecs/include/ftl/codecs/packet.hpp +++ b/components/codecs/include/ftl/codecs/packet.hpp @@ -47,6 +47,7 @@ struct Packet { static constexpr unsigned int kStreamCap_Static = 0x01; static constexpr unsigned int kStreamCap_Recorded = 0x02; +static constexpr unsigned int kStreamCap_NewConnection = 0x04; /** V4 packets have no stream flags field */ struct StreamPacketV4 { @@ -90,6 +91,7 @@ struct StreamPacket { unsigned int hint_capability; // Is this a video stream, for example size_t hint_source_total; // Number of tracks per frame to expect int retry_count = 0; // Decode retry count + unsigned int hint_peerid=0; MSGPACK_DEFINE(timestamp, streamID, frame_number, channel, flags); diff --git a/components/common/cpp/include/ftl/threads.hpp b/components/common/cpp/include/ftl/threads.hpp index 38021f099746a9c836269c9b695037808d299bee..c40ed095b5075afe0b4df7409c48ace45b8328cc 100644 --- a/components/common/cpp/include/ftl/threads.hpp +++ b/components/common/cpp/include/ftl/threads.hpp @@ -8,7 +8,7 @@ #define POOL_SIZE 10 //#define DEBUG_MUTEX -#define MUTEX_TIMEOUT 5 +#define MUTEX_TIMEOUT 2 #if defined DEBUG_MUTEX #include <loguru.hpp> diff --git a/components/common/cpp/include/ftl/utility/rollingavg.hpp b/components/common/cpp/include/ftl/utility/rollingavg.hpp new file mode 100644 index 0000000000000000000000000000000000000000..42d3123d8d55f12cf538a3ccd4ff2e312a9d4956 --- /dev/null +++ b/components/common/cpp/include/ftl/utility/rollingavg.hpp @@ -0,0 +1,42 @@ +#ifndef _FTL_ROLLING_AVERAGE_HPP_ +#define _FTL_ROLLING_AVERAGE_HPP_ + +namespace ftl { +namespace utility { + +/** + * General rolling average class where `SIZE` is the number of items to + * average over. This is a fast version which may possibily have issues with + * floating point errors, however these should average out as well. A more + * accurate version would be much slower. + */ +template <typename T, size_t SIZE> +struct RollingAvg { + RollingAvg() { + std::fill(vals_, vals_+SIZE, T(0)); + } + + /** + * Give a new value to add and return the rolling average including that + * new value. + */ + float operator()(T v) { + const size_t mix = (ix_++) % SIZE; + sum_ = sum_ - vals_[mix] + v; + vals_[mix] = v; + return float(sum_) / float(SIZE); + } + + /** Get current average. */ + inline float value() const { return sum_; } + + private: + T sum_ = 0; + T vals_[SIZE] = {0}; + size_t ix_ = 0; +}; + +} +} + +#endif \ No newline at end of file diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index 35cbe5bb2b749c13287327af02764b522e48cca0..0535fc22d2fcac5ef558605e67aedc00cf26787c 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -197,6 +197,8 @@ class Peer { void rawClose() { _badClose(false); } inline void noReconnect() { can_reconnect_ = false; } + + inline unsigned int localID() const { return local_id_; } public: static const int kMaxMessage = 10*1024*1024; // 10Mb currently @@ -266,6 +268,7 @@ class Peer { std::string uri_; // Original connection URI, or assumed URI ftl::UUID peerid_; // Received in handshake or allocated bool outgoing_; + unsigned int local_id_; ftl::net::Dispatcher *disp_; // For RPC call dispatch //std::vector<std::function<void(Peer &)>> open_handlers_; @@ -274,6 +277,7 @@ class Peer { std::map<int, std::unique_ptr<virtual_caller>> callbacks_; static std::atomic_int rpcid__; // Return ID for RPC calls + static std::atomic_int local_peer_ids__; }; // --- Inline Template Implementations ----------------------------------------- diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 368a5bbb2364f57c008a1f1071dff8ef603b7794..b3e2424f18b75c5b6f57efdf330857e2baffbf1b 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -48,18 +48,8 @@ using ftl::net::Universe; using ftl::net::callback_t; using std::vector; -/*static std::string hexStr(const std::string &s) -{ - const char *data = s.data(); - int len = s.size(); - std::stringstream ss; - ss << std::hex; - for(int i=0;i<len;++i) - ss << std::setw(2) << std::setfill('0') << (int)data[i]; - return ss.str(); -}*/ - std::atomic_int Peer::rpcid__ = 0; +std::atomic_int Peer::local_peer_ids__ = 0; // Global peer UUID ftl::UUID ftl::net::this_peer; @@ -188,6 +178,7 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals is_waiting_ = true; scheme_ = ftl::URI::SCHEME_TCP; outgoing_ = false; + local_id_ = local_peer_ids__++; int flags =1; if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; @@ -242,6 +233,7 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), status_ = kInvalid; sock_ = INVALID_SOCKET; outgoing_ = true; + local_id_ = local_peer_ids__++; disp_ = new Dispatcher(d); diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 5586196021db62eb75b2dcb67cb48c3b6d23909b..08e4820ff090d4cdf0b765c542a2cd6a2b61fcb8 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -38,8 +38,11 @@ struct NetImplDetail { } } -#define TCP_SEND_BUFFER_SIZE (512*1024) -#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1) +//#define TCP_SEND_BUFFER_SIZE (512*1024) +//#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1) + +#define TCP_SEND_BUFFER_SIZE (256*1024) +#define TCP_RECEIVE_BUFFER_SIZE (256*1024) callback_t ftl::net::Universe::cbid__ = 0; diff --git a/components/streams/CMakeLists.txt b/components/streams/CMakeLists.txt index cdbf7be84e68a78690ac5f5d794066e836f6dcbf..59afb96f6e9869cad49673e8bc9e135a99515d4c 100644 --- a/components/streams/CMakeLists.txt +++ b/components/streams/CMakeLists.txt @@ -12,6 +12,7 @@ set(STREAMSRC src/sender.cpp src/feed.cpp src/netstream.cpp + src/adaptive.cpp src/injectors.cpp src/parsers.cpp src/builder.cpp diff --git a/components/streams/include/ftl/streams/netstream.hpp b/components/streams/include/ftl/streams/netstream.hpp index f53b37dd7ea343e94bec2271c3a015e56ad10afd..65d0d1952ffd953d38507a4c81b957773a3482a8 100644 --- a/components/streams/include/ftl/streams/netstream.hpp +++ b/components/streams/include/ftl/streams/netstream.hpp @@ -1,7 +1,6 @@ #ifndef _FTL_STREAM_NETSTREAM_HPP_ #define _FTL_STREAM_NETSTREAM_HPP_ -#include <ftl/config.h> #include <ftl/net/universe.hpp> #include <ftl/threads.hpp> #include <ftl/codecs/packet.hpp> @@ -12,6 +11,8 @@ namespace ftl { namespace stream { +class AdaptiveBitrate; + namespace detail { struct StreamClient { ftl::UUID peerid; @@ -27,30 +28,15 @@ struct StreamClient { static const int kMaxFrames = 100; /** - * Allows network streaming of a number of RGB-Depth sources. Remote machines - * can discover available streams from an instance of Streamer. It also allows - * for adaptive bitrate streams where bandwidth can be monitored and different - * data rates can be requested, it is up to the remote machine to decide on - * desired bitrate. - * - * The capture and compression steps operate in different threads and each - * source and bitrate also operate on different threads. For a specific source - * and specific bitrate there is a single thread that sends data to all - * requesting clients. - * - * Use ftl::create<Streamer>(parent, name) to construct, don't construct - * directly. - * - * Note that the streamer attempts to maintain a fixed frame rate by - * monitoring job processing times and sleeping if there is spare time. + * Send and receive packets over a network. This class manages the connection + * of clients or the discovery of a stream and deals with bitrate adaptations. + * Each packet post is forwarded to each connected client that is still active. */ class Net : public Stream { public: Net(nlohmann::json &config, ftl::net::Universe *net); ~Net(); - //bool onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &) override; - bool post(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &) override; bool begin() override; @@ -73,12 +59,10 @@ class Net : public Stream { SHARED_MUTEX mutex_; bool active_; ftl::net::Universe *net_; - bool late_; int64_t clock_adjust_; ftl::UUID time_peer_; ftl::UUID peer_; int64_t last_frame_; - int64_t frame_no_; int64_t last_ping_; std::string uri_; std::string base_uri_; @@ -87,6 +71,14 @@ class Net : public Stream { std::array<std::atomic<int>,32> reqtally_; std::unordered_set<ftl::codecs::Channel> last_selected_; uint8_t bitrate_=255; + std::atomic_int64_t bytes_received_ = 0; + int64_t last_completion_ = 0; + int64_t time_at_last_ = 0; + float required_bps_; + float actual_bps_; + bool abr_enabled_; + + AdaptiveBitrate *abr_; ftl::Handler<ftl::net::Peer*> connect_cb_; @@ -97,16 +89,10 @@ class Net : public Stream { std::list<detail::StreamClient> clients_; - //StreamCallback cb_; - - bool _processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt); + bool _processRequest(ftl::net::Peer &p, ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt); void _checkDataRate(size_t tx_size, int64_t tx_latency, int64_t ts); bool _sendRequest(ftl::codecs::Channel c, uint8_t frameset, uint8_t frames, uint8_t count, uint8_t bitrate); void _cleanUp(); - - //void _addClient(int N, int rate, const ftl::UUID &peer, const std::string &dest); - //void _transmitPacket(const ftl::codecs::Packet &pkt, ftl::codecs::Channel chan, int count); - //void _transmitPacket(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt); }; } diff --git a/components/streams/include/ftl/streams/sender.hpp b/components/streams/include/ftl/streams/sender.hpp index 0ae404bfb551c1210c25f46acf12a9524222fd8a..05f4f82ced1386bf8ae6bdde85fb3d1557ba7711 100644 --- a/components/streams/include/ftl/streams/sender.hpp +++ b/components/streams/include/ftl/streams/sender.hpp @@ -87,7 +87,7 @@ class Sender : public ftl::Configurable { std::unordered_map<int, EncodingState> state_; std::unordered_map<int, AudioState> audio_state_; - std::map<uint8_t, int64_t> bitrate_map_; + std::map<uint8_t, std::pair<int64_t,unsigned int>> bitrate_map_; SHARED_MUTEX bitrate_mtx_; int bitrate_timeout_; diff --git a/components/streams/src/adaptive.cpp b/components/streams/src/adaptive.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e34d5f8276d448a8402becf9e8aae1430dcfced0 --- /dev/null +++ b/components/streams/src/adaptive.cpp @@ -0,0 +1,136 @@ +#include "adaptive.hpp" + +#include <loguru.hpp> + +using ftl::stream::AdaptiveBitrate; +using ftl::stream::ABRState; +using ftl::stream::ABRNetworkStatus; + +ABRNetworkStatus AdaptiveBitrate::status(int t_frame, int t_recv) { + diff_error += t_recv - t_frame; + if (diff_error < 0) diff_error = 0; + + float avgdiff = avg_err_(diff_error); + + int change_dir = 0; + if (avgdiff > previous_diff_) change_dir = 1; + else if (avgdiff < previous_diff_) change_dir = -1; + float changer = avg_change_(change_dir); + previous_diff_ = avgdiff; + + //LOG(INFO) << "Status measure: " << avgdiff << ", " << diff_error << ", " << changer; + + if (next_status_-- > 0) return ABRNetworkStatus::Pending; + + if ((changer == 1.0f && avgdiff >= 100.0f) || avgdiff > 200.0f) { + next_status_ = 20; + return ABRNetworkStatus::Degrading; + } else if (changer < 0.0f && avgdiff >= 100.0f) { + next_status_ = 5; + return ABRNetworkStatus::Improving; + } else if (avgdiff > 50.0f) { + return ABRNetworkStatus::Pending; + } + + return ABRNetworkStatus::Stable; +} + +ABRState AdaptiveBitrate::nextState() { + return ABRState::Maintain; +} + +int AdaptiveBitrate::adjustment(int t_frame, int t_recv, int cur_rate) { + + auto s = status(t_frame, t_recv); + + if (s == ABRNetworkStatus::Degrading) { + stable_ = false; + if (last_increase_ > 0) known_unstable_ = std::min(known_unstable_, bitrate_); + if (known_stable_ >= bitrate_) known_stable_ = std::max(0, bitrate_-10); + bitrate_ = std::max(0, (last_increase_ > 0) ? bitrate_ - (2*last_increase_) : bitrate_/2); + LOG(INFO) << "Degrade to " << bitrate_; + last_increase_ = 0; + } + + if (s == ABRNetworkStatus::Stable) { + ++stable_count_; + } else { + stable_count_ = 0; + } + + if (stable_count_ >= ((stable_) ? 400 : 100)) { + stable_count_ = 0; + known_stable_ = std::max(known_stable_, bitrate_); + + if (known_stable_ < known_unstable_) { + if (known_unstable_ - known_stable_ > 10) { + bitrate_ += 10; + last_increase_ = 10; + } else if (known_unstable_ - known_stable_ > 2) { + bitrate_ += 2; + last_increase_ = 2; + } else { + known_unstable_ += 2; + last_increase_ = std::max(0, known_stable_ - bitrate_); + LOG(INFO) << "JUMP TO STABLE 1"; + stable_ = true; + bitrate_ = known_stable_; + } + } else if (known_unstable_ < known_stable_) { + known_unstable_ += 2; + last_increase_ = std::max(0, known_stable_ - bitrate_); + LOG(INFO) << "JUMP TO STABLE 2"; + bitrate_ += last_increase_; + } else { + last_increase_ = 2; + bitrate_ = known_stable_+2; + } + + if (last_increase_ > 0) LOG(INFO) << "Bitrate increase by " << last_increase_ << " to " << bitrate_; + } + + // Needs a mode + // First undo last change if incrementing, and then retry with smaller increment + // Need to wait after drop to work through the delayed buffer. + // If not working after N frames, decrement again + // Maintain record of max stable rate so far, if increasing causes problem then + // rapidly decrease and attempt to return to previous known stable position. + // If returning to known stable causes problems again, decrement known stable and try again. + + /*if (roll_ratio > 60.0f) { + bitrate_ = std::max(0, bitrate_-20); + } else if (roll_ratio > 30.0f) { + bitrate_ = std::max(0, bitrate_-2); + } else if (roll_ratio < 5.0f && cur_rate == bitrate_) { + bitrate_ = std::min(255, bitrate_+10); + } else if (roll_ratio < 10.0f && cur_rate == bitrate_) { + bitrate_ = std::min(255, bitrate_+2); + }*/ + + /*if (next_adjustment_-- <= 0) { + if (roll_ratio < 1.0f) { + bitrate_ = std::max(0, cur_rate-10); + LOG(INFO) << "Fast decrease bitrate to " << int(bitrate_); + pos_bitrate_ratio_ = 0; + next_adjustment_ = 20; + } else if (roll_ratio < 0.8f) { + bitrate_ = std::max(0, cur_rate-2); + LOG(INFO) << "Slow decrease bitrate to " << int(bitrate_); + pos_bitrate_ratio_ = 0; + next_adjustment_ = 6; + } else if (roll_ratio > 2.0f) { + bitrate_ = std::min(255, cur_rate+2); + increase_max_ = bitrate_; + LOG(INFO) << "Increase bitrate to " << int(bitrate_); + pos_bitrate_ratio_ = 0; + next_adjustment_ = 20; + } else { + pos_bitrate_ratio_ = 0; + } + }*/ + //LOG(INFO) << "Bandwidth Ratio = " << roll_ratio << " (" << bitrate_ << ")"; + + bitrate_ = std::min(bitrate_, max_rate_); + return bitrate_; +} + diff --git a/components/streams/src/adaptive.hpp b/components/streams/src/adaptive.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e6b0513e9e208015dc49e15ffd1316de56f75b69 --- /dev/null +++ b/components/streams/src/adaptive.hpp @@ -0,0 +1,65 @@ +#ifndef _FTL_STREAM_ADAPTIVE_HPP_ +#define _FTL_STREAM_ADAPTIVE_HPP_ + +#include <ftl/utility/rollingavg.hpp> + +namespace ftl { +namespace stream { + +enum class ABRNetworkStatus { + Stable=0, // Appears stable but not room to increase much + Improving, // Going in correct direction but not resolved + Pending, // Not enough information yet + Degrading, // Urgently decrease bitrate + Good // Could potentially increase bitrate +}; + +enum class ABRState { + Unknown, // Current network conditions unknown + Increased_Recover, // Moving back to past stable point + Increased_Wait, // Gentle increase, wait for network status + Maintain, // Stay at current rate for a while + Decreased_Wait, // Decrease and wait for network status + Decreased_50_Wait, // Rapid decrease and move to recover +}; + +class AdaptiveBitrate { + public: + explicit AdaptiveBitrate(int initial) : bitrate_(initial) {} + + inline void setMaxRate(int m) { max_rate_ = m; }; + + inline int current() const { return bitrate_; } + + int adjustment(int t_frame, int t_recv, int rec_rate); + + ABRNetworkStatus status(int, int); + + ABRState nextState(); + + private: + int max_rate_=200; + int bitrate_=32; + int increase_max_=-1; + int diff_error=0; + float previous_diff_=0.0f; + int next_status_=10; + int known_stable_=-1; + int known_unstable_=255; + int stable_count_=0; + int last_increase_=0; + bool stable_=false; + + ftl::utility::RollingAvg<int, 8u> avg_err_; + ftl::utility::RollingAvg<int, 8u> avg_ahead_; + ftl::utility::RollingAvg<float, 4u> avg_change_; + + int pos_bitrate_ratio_ = 0; + int delay_bitrate_increase_ = 20; + int next_adjustment_=0; +}; + +} +} + +#endif diff --git a/components/streams/src/netstream.cpp b/components/streams/src/netstream.cpp index 101cdee7d309dd5140adbe45f10be929cacbb612..2823bb2e33311322bc8565c45de8bd37ad08bee8 100644 --- a/components/streams/src/netstream.cpp +++ b/components/streams/src/netstream.cpp @@ -1,4 +1,5 @@ #include <ftl/streams/netstream.hpp> +#include "adaptive.hpp" #define LOGURU_REPLACE_GLOG 1 #include <loguru.hpp> @@ -13,11 +14,9 @@ using ftl::codecs::StreamPacket; using ftl::codecs::Packet; using ftl::codecs::Channel; using ftl::codecs::codec_t; -using ftl::codecs::definition_t; using ftl::codecs::kAllFrames; using ftl::codecs::kAllFramesets; using std::string; -using std::vector; using std::optional; static constexpr int kTallyScale = 10; @@ -32,10 +31,11 @@ static std::atomic_flag has_bindings = ATOMIC_FLAG_INIT; static SHARED_MUTEX stream_mutex; Net::Net(nlohmann::json &config, ftl::net::Universe *net) : Stream(config), active_(false), net_(net), clock_adjust_(0), last_ping_(0) { + // First net stream needs to register these RPC handlers if (!has_bindings.test_and_set()) { if (net_->isBound("find_stream")) net_->unbind("find_stream"); net_->bind("find_stream", [net = net_](const std::string &uri, bool proxy) -> optional<ftl::UUID> { - LOG(INFO) << "REQUEST FIND STREAM: " << uri; + LOG(INFO) << "Request for stream: " << uri; ftl::URI u1(uri); std::string base = u1.getBaseURI(); @@ -53,10 +53,7 @@ Net::Net(nlohmann::json &config, ftl::net::Universe *net) : Stream(config), acti if (net_->isBound("list_streams")) net_->unbind("list_streams"); net_->bind("list_streams", [this]() { - LOG(INFO) << "REQUEST LIST STREAMS"; SHARED_LOCK(stream_mutex, lk); - //vector<string> streams; - //streams.push_back(uri_); // Send full original URI return net_streams; }); } @@ -64,9 +61,20 @@ Net::Net(nlohmann::json &config, ftl::net::Universe *net) : Stream(config), acti last_frame_ = 0; time_peer_ = ftl::UUID(0); - bitrate_ = static_cast<uint8_t>(std::max(0, std::min(255, value("bitrate", 255)))); + abr_ = new ftl::stream::AdaptiveBitrate(std::max(0, std::min(255, value("bitrate", 64)))); + + bitrate_ = abr_->current(); + abr_->setMaxRate(static_cast<uint8_t>(std::max(0, std::min(255, value("max_bitrate", 200))))); on("bitrate", [this]() { - bitrate_ = static_cast<uint8_t>(std::max(0, std::min(255, value("bitrate", 255)))); + abr_->setMaxRate(static_cast<uint8_t>(std::max(0, std::min(255, value("max_bitrate", 200))))); + }); + + abr_enabled_ = value("abr_enabled", true); + on("abr_enabled", [this]() { + abr_enabled_ = value("abr_enabled", true); + bitrate_ = (abr_enabled_) ? + abr_->current() : + static_cast<uint8_t>(std::max(0, std::min(255, value("bitrate", 64)))); tally_ = 0; }); } @@ -77,6 +85,8 @@ Net::~Net() { // FIXME: Wait to ensure no net callbacks are active. // Do something better than this std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + delete abr_; } bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { @@ -105,15 +115,7 @@ bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet while (c != clients_.end()) { auto &client = *c; - // Quality filter the packets - if (pkt.bitrate > 0 && pkt.bitrate != client.quality) { - //++c; - //LOG(INFO) << "Incorrect quality: " << (int)pkt.bitrate << " but requested " << (int)client.quality; - //continue; - } - try { - // FIXME: This doesn't work for file sources with file relative timestamps... short pre_transmit_latency = short(ftl::timer::get_time() - spkt.localTimestamp); if (!net_->send(client.peerid, @@ -142,8 +144,6 @@ bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet spkt, pkt)) { - } else { - // TODO: Some disconnect error } } catch(...) { // TODO: Some disconnect error @@ -151,6 +151,7 @@ bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet } } + // TODO: Don't always try and do this _cleanUp(); return true; @@ -172,25 +173,47 @@ bool Net::begin() { return false; } + // Add the RPC handler for the URI net_->bind(base_uri_, [this](ftl::net::Peer &p, short ttimeoff, const ftl::codecs::StreamPacket &spkt_raw, const ftl::codecs::Packet &pkt) { int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count(); if (!active_) return; StreamPacket spkt = spkt_raw; - // FIXME: see #335 - //spkt.timestamp -= clock_adjust_; spkt.localTimestamp = now - int64_t(ttimeoff); spkt.hint_capability = 0; spkt.hint_source_total = 0; spkt.version = 4; + spkt.hint_peerid = p.localID(); // Manage recuring requests if (!host_ && last_frame_ != spkt.timestamp) { UNIQUE_LOCK(mutex_, lk); if (last_frame_ != spkt.timestamp) { + + /*float bits_received = float(bytes_received_*8); + required_bps_ = (bits_received / float(spkt.timestamp - last_frame_)) * 1000.0f; + actual_bps_ = (bits_received / float(time_at_last_ - last_completion_)) * 1000.0f; + + float ratio = actual_bps_ / required_bps_;*/ + + int tf = spkt.timestamp - last_frame_; // Milliseconds per frame + int tc = now - last_completion_; // Milliseconds since last frame completed + last_completion_ = now; + bytes_received_ = 0; last_frame_ = spkt.timestamp; + lk.unlock(); + + // Apply adaptive bitrate adjustment if needed + if (abr_enabled_) { + int new_bitrate = abr_->adjustment(tf, tc, pkt.bitrate); + if (new_bitrate != bitrate_) { + bitrate_ = new_bitrate; + tally_ = 0; // Force request send + } + } + if (size() > 0) { // TODO: For all framesets auto sel = selected(0); @@ -223,6 +246,9 @@ bool Net::begin() { } } + bytes_received_ += pkt.data.size(); + //time_at_last_ = now; + // If hosting and no data then it is a request for data // Note: a non host can receive empty data, meaning data is available // but that you did not request it @@ -239,17 +265,15 @@ bool Net::begin() { select(spkt.frameSetID(), selected(spkt.frameSetID()) + spkt.channel); } - _processRequest(p, pkt); + _processRequest(p, spkt, pkt); } else { // FIXME: Allow availability to change... available(spkt.frameSetID()) += spkt.channel; //LOG(INFO) << "AVAILABLE: " << (int)spkt.channel; } - //if (cb_) { - cb_.trigger(spkt, pkt); - if (pkt.data.size() > 0) _checkDataRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp); - //} + cb_.trigger(spkt, pkt); + if (pkt.data.size() > 0) _checkDataRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp); }); // First find non-proxy version, then check for proxy version if no match @@ -258,7 +282,6 @@ bool Net::begin() { if (!p) { LOG(INFO) << "Hosting stream: " << uri_; - // TODO: Register URI as available. host_ = true; // Alias the URI to the configurable if not already @@ -268,6 +291,7 @@ bool Net::begin() { } { + // Add to list of available streams UNIQUE_LOCK(stream_mutex, lk); net_streams.push_back(uri_); } @@ -289,10 +313,9 @@ bool Net::begin() { net_->broadcast("add_stream", uri_); return true; - } else { - //LOG(INFO) << "Net cfg: " << net_->call<std::string>(*p, "get_cfg", uri_); } + // Not hosting... host_ = false; peer_ = *p; tally_ = 30*kTallyScale; @@ -390,11 +413,11 @@ void Net::_cleanUp() { * batches (max 255 unique frames by timestamp). Requests are in the form * of packets that match the request except the data component is empty. */ -bool Net::_processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt) { - { - UNIQUE_LOCK(mutex_,lk); - bool found = false; +bool Net::_processRequest(ftl::net::Peer &p, ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + bool found = false; + { + SHARED_LOCK(mutex_,lk); // Does the client already exist for (auto &c : clients_) { if (c.peerid == p.id()) { @@ -402,49 +425,39 @@ bool Net::_processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt) { c.txcount = 0; c.txmax = static_cast<int>(pkt.frame_count)*kTallyScale; found = true; + // break; } } + } + + // No existing client, so add a new one. + if (!found) { + { + UNIQUE_LOCK(mutex_,lk); - // No existing client, so add a new one. - if (!found) { auto &client = clients_.emplace_back(); client.peerid = p.id(); client.quality = 255; // TODO: Use quality given in packet client.txcount = 0; client.txmax = static_cast<int>(pkt.frame_count)*kTallyScale; - - try { - connect_cb_.trigger(&p); - } catch (const ftl::exception &e) { - LOG(ERROR) << "Exception in stream connect callback: " << e.what(); - } } - // First connected peer (or reconnecting peer) becomes a time server - /*if (time_peer_ == ftl::UUID(0)) { - time_peer_ = p.id(); - DLOG(INFO) << "Adding time peer"; - }*/ + spkt.hint_capability |= ftl::codecs::kStreamCap_NewConnection; + + try { + connect_cb_.trigger(&p); + } catch (const ftl::exception &e) { + LOG(ERROR) << "Exception in stream connect callback: " << e.what(); + } } return false; } void Net::_checkDataRate(size_t tx_size, int64_t tx_latency, int64_t ts) { - //float actual_mbps = (float(tx_size) * 8.0f * (1000.0f / float(tx_latency))) / 1048576.0f; - //float min_mbps = (float(tx_size) * 8.0f * (1000.0f / float(ftl::timer::getInterval()))) / 1048576.0f; - //if (actual_mbps > 0.0f && actual_mbps < min_mbps) LOG(WARNING) << "Bitrate = " << actual_mbps << "Mbps, min required = " << min_mbps << "Mbps"; - UNIQUE_LOCK(msg_mtx__,lk); req_bitrate__ += float(tx_size) * 8.0f; sample_count__ += 1.0f; - - /*if (ts - last_msg_ >= 1000) { - DLOG(INFO) << "Required Bitrate = " << (req_bitrate_ / float(ts - last_msg_) * 1000.0f / 1048576.0f) << "Mbps"; - last_msg_ = ts; - req_bitrate_ = 0.0f; - sample_count_ = 0.0f; - }*/ } float Net::getRequiredBitrate() { diff --git a/components/streams/src/sender.cpp b/components/streams/src/sender.cpp index 72ab29c7189a7c5f4d7304080ae054aa1f314f7b..ed9ac9f846541c503380f570816373d60c54b428 100644 --- a/components/streams/src/sender.cpp +++ b/components/streams/src/sender.cpp @@ -56,18 +56,19 @@ void Sender::setStream(ftl::stream::Stream*s) { // Update the min bitrate selection UNIQUE_LOCK(bitrate_mtx_, lk); if (bitrate_map_.size() > 0) { - if (now - bitrate_map_.begin()->second > bitrate_timeout_) { + while (bitrate_map_.size() > 0 && (now - bitrate_map_.begin()->second.first > bitrate_timeout_ || (bitrate_map_.begin()->second.second == spkt.hint_peerid && pkt.bitrate != bitrate_map_.begin()->first))) { + LOG(INFO) << "Remove bitrate " << int(bitrate_map_.begin()->first); bitrate_map_.erase(bitrate_map_.begin()); } } - bitrate_map_[pkt.bitrate] = now; + bitrate_map_[pkt.bitrate] = std::make_pair(now, spkt.hint_peerid); } //if (state_cb_) state_cb_(spkt.channel, spkt.streamID, spkt.frame_number); if (reqcb_) reqcb_(spkt,pkt); // Inject state packets - do_inject_.clear(); + if (spkt.hint_capability & ftl::codecs::kStreamCap_NewConnection) do_inject_.clear(); return true; }); @@ -395,9 +396,9 @@ void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset, bool isfloat = ftl::codecs::type(c) == CV_32F; bool lossless = (isfloat) ? value("lossless_float", false) : value("lossless_colour", false); - int bitrate = std::max(0, std::min(255, (isfloat) ? value("bitrate_float", 200) : value("bitrate_colour", 64))); - - bitrate = std::min(static_cast<uint8_t>(bitrate), _getMinBitrate()); + int max_bitrate = std::max(0, std::min(255, value("bitrate", 64))); + int bitrate = std::min(static_cast<uint8_t>(max_bitrate), _getMinBitrate()); + if (isfloat) bitrate = std::min(255, int(float(bitrate)*value("bitrate_float_scale", 1.5f))); //int min_bitrate = std::max(0, std::min(255, value("min_bitrate", 0))); // TODO: Use this codec_t codec = static_cast<codec_t>(