Newer
Older
/**
* @file netstream.hpp
* @copyright Copyright (c) 2022 University of Turku, MIT License
* @author Nicolas Pope
*/
#include "../universe.hpp"
#include <ftl/threads.hpp>
#include <ftl/protocol/packet.hpp>
#include <ftl/protocol/streams.hpp>
#include <ftl/handle.hpp>
namespace ftl {
namespace protocol {
namespace detail {
struct StreamClient {
ftl::UUID peerid;
std::atomic<int> txcount; // Frames sent since last request
int txmax; // Frames to send in request
std::atomic<uint32_t> channels; // A channel mask, those that have been requested
uint8_t quality;
};
}
/**
* The maximum number of frames a client can request in a single request.
*/
static const int kMaxFrames = 100;
struct NetStats {
};
/**
* Send and receive packets over a network. This class manages the connection
* of clients or the discovery of a stream and deals with bitrate adaptations.
* Each packet post is forwarded to each connected client that is still active.
*/
class Net : public Stream {
public:
Net(const std::string &uri, ftl::net::Universe *net, bool host = false);
virtual ~Net();
bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) override;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
bool begin() override;
bool end() override;
bool active() override;
bool enable(FrameID id) override;
bool enable(FrameID id, ftl::protocol::Channel c) override;
bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override;
void reset() override;
void refresh() override;
void setProperty(ftl::protocol::StreamProperty opt, std::any value) override;
std::any getProperty(ftl::protocol::StreamProperty opt) override;
bool supportsProperty(ftl::protocol::StreamProperty opt) override;
StreamType type() const override;
inline const ftl::UUID &getPeer() const {
if (host_) { throw FTL_Error("Net::getPeer() not possible, hosting stream"); }
if (!peer_) { throw FTL_Error("steram::Net has no valid Peer. Not found earlier?"); }
return *peer_;
}
inline ftl::Handle onClientConnect(const std::function<bool(ftl::net::Peer*)> &cb) { return connect_cb_.on(cb); }
/**
* Return the average bitrate of all streams since the last call to this
* function. Units are Mbps.
*/
static NetStats getStatistics();
static void installRPC(ftl::net::Universe *net);
static constexpr int kFramesToRequest = 30;
// Unit test support
virtual void hasPosted(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) {}
void inject(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &);
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
private:
SHARED_MUTEX mutex_;
bool active_ = false;
ftl::net::Universe *net_;
int64_t clock_adjust_ = 0;
ftl::UUID time_peer_;
std::optional<ftl::UUID> peer_;
int64_t last_frame_ = 0;
int64_t last_ping_ = 0;
int64_t frame_time_ = 0;
std::string uri_;
std::string base_uri_;
const bool host_;
int tally_ = 0;
std::array<std::atomic<int>, 32> reqtally_ = {0};
ftl::protocol::ChannelSet last_selected_;
uint8_t bitrate_ = 200;
std::atomic_int64_t bytes_received_ = 0;
int64_t last_completion_ = 0;
int64_t time_at_last_ = 0;
float required_bps_ = 0.0f;
float actual_bps_ = 0.0f;
bool paused_ = false;
int frames_to_request_ = kFramesToRequest;
std::string name_;
ftl::Handler<ftl::net::Peer*> connect_cb_;
uint32_t local_fsid_ = 0;
static std::atomic_size_t req_bitrate__;
static std::atomic_size_t tx_bitrate__;
static std::atomic_size_t rx_sample_count__;
static std::atomic_size_t tx_sample_count__;
static int64_t last_msg__;
static MUTEX msg_mtx__;
std::list<detail::StreamClient> clients_;
bool _enable(FrameID id);
bool _processRequest(ftl::net::Peer *p, ftl::protocol::StreamPacket *spkt, const ftl::protocol::DataPacket &pkt);
void _checkRXRate(size_t rx_size, int64_t rx_latency, int64_t ts);
void _checkTXRate(size_t tx_size, int64_t tx_latency, int64_t ts);
bool _sendRequest(
ftl::protocol::Channel c,
uint8_t frameset,
uint8_t frames,
uint8_t count,
uint8_t bitrate,
bool doreset = false);
void _cleanUp();
void _processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, const DataPacket &pkt);
} // namespace protocol
} // namespace ftl