* @file netstream.hpp
* @copyright Copyright (c) 2022 University of Turku, MIT License
* @author Nicolas Pope
#include "../universe.hpp"
#include <ftl/threads.hpp>
#include <ftl/protocol/packet.hpp>
#include <ftl/protocol/streams.hpp>
#include <ftl/handle.hpp>
namespace ftl {
namespace protocol {
namespace detail {
struct StreamClient {
ftl::UUID peerid;
std::atomic<int> txcount; // Frames sent since last request
std::atomic<uint32_t> channels; // A channel mask, those that have been requested
uint8_t quality;
* Send and receive packets over a network. This class manages the connection
* of clients or the discovery of a stream and deals with bitrate adaptations.
* Each packet post is forwarded to each connected client that is still active.
class Net : public Stream {
Net(const std::string &uri, ftl::net::Universe *net, bool host = false);
virtual ~Net();
bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) override;
bool begin() override;
bool end() override;
bool active() override;
bool enable(FrameID id) override;
bool enable(FrameID id, ftl::protocol::Channel c) override;
bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override;
void reset() override;
void refresh() override;
void setProperty(ftl::protocol::StreamProperty opt, std::any value) override;
std::any getProperty(ftl::protocol::StreamProperty opt) override;
bool supportsProperty(ftl::protocol::StreamProperty opt) override;
StreamType type() const override;
inline const ftl::UUID &getPeer() const {
if (host_) { throw FTL_Error("Net::getPeer() not possible, hosting stream"); }
if (!peer_) { throw FTL_Error("steram::Net has no valid Peer. Not found earlier?"); }
return *peer_;
inline ftl::Handle onClientConnect(const std::function<bool(ftl::net::Peer*)> &cb) { return connect_cb_.on(cb); }
* Return the average bitrate of all streams since the last call to this
* function. Units are Mbps.
static NetStats getStatistics();
static void installRPC(ftl::net::Universe *net);
static constexpr int kFramesToRequest = 30;
// Unit test support
virtual void hasPosted(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) {}
void inject(const ftl::protocol::StreamPacket &, ftl::protocol::DataPacket &);
bool active_ = false;
ftl::net::Universe *net_;
std::optional<ftl::UUID> peer_;
std::string uri_;
std::string base_uri_;
const bool host_;
std::array<std::atomic_int, 5> tally_ = {};
std::atomic_int64_t bytes_received_ = 0;
bool paused_ = false;
int frames_to_request_ = kFramesToRequest;
std::string name_;
ftl::Handler<ftl::net::Peer*> connect_cb_;
static std::atomic_size_t req_bitrate__;
static std::atomic_size_t tx_bitrate__;
static std::atomic_size_t rx_sample_count__;
static std::atomic_size_t tx_sample_count__;
static int64_t last_msg__;
static MUTEX msg_mtx__;
struct PacketBuffer {
ftl::protocol::PacketPair packets;
ftl::net::Peer *peer = nullptr;
std::atomic_bool done = false;
struct FrameState {
ftl::protocol::FrameID id;
std::list<PacketBuffer> buffer;
int64_t base_pkt_ts_ = 0;
int64_t base_local_ts_ = 0;
SHARED_MUTEX statesMtx_;
std::unordered_map<uint32_t, std::unique_ptr<FrameState>> frameStates_;
std::unordered_map<ftl::protocol::FrameID, std::list<detail::StreamClient>> clients_;
FrameState *_getFrameState(FrameID id);
bool _processRequest(ftl::net::Peer *p, const ftl::protocol::StreamPacket *spkt, ftl::protocol::DataPacket &pkt);
void _checkRXRate(size_t rx_size, int64_t rx_latency, int64_t ts);
void _checkTXRate(size_t tx_size, int64_t tx_latency, int64_t ts);
bool _sendRequest(
ftl::protocol::Channel c,
uint8_t frameset,
uint8_t frames,
uint8_t count,
uint8_t bitrate,
bool doreset = false);
void _cleanUp();
void _processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt);
} // namespace protocol
} // namespace ftl