Skip to content
Snippets Groups Projects
netstream.hpp 4.68 KiB
Newer Older
/**
 * @file netstream.hpp
 * @copyright Copyright (c) 2022 University of Turku, MIT License
 * @author Nicolas Pope
 */

Nicolas Pope's avatar
Nicolas Pope committed
#pragma once

#include <string>
#include <list>
#include <atomic>
#include <unordered_map>
Nicolas Pope's avatar
Nicolas Pope committed
#include "../universe.hpp"
#include <ftl/threads.hpp>
#include <ftl/protocol/packet.hpp>
#include <ftl/protocol/streams.hpp>
#include <ftl/handle.hpp>
#include "packetmanager.hpp"
Nicolas Pope's avatar
Nicolas Pope committed

namespace ftl {
namespace protocol {

namespace detail {
struct StreamClient {
    ftl::UUID peerid;
    std::atomic<int> txcount;           // Frames sent since last request
    std::atomic<uint32_t> channels;     // A channel mask, those that have been requested
    uint8_t quality;
Nicolas Pope's avatar
Nicolas Pope committed
};
}

struct NetStats {
    float rxRate;
    float txRate;
Nicolas Pope's avatar
Nicolas Pope committed
};

/**
 * Send and receive packets over a network. This class manages the connection
 * of clients or the discovery of a stream and deals with bitrate adaptations.
 * Each packet post is forwarded to each connected client that is still active.
 */
class Net : public Stream {
 public:
    Net(const std::string &uri, ftl::net::Universe *net, bool host = false);
    virtual ~Net();

Nicolas Pope's avatar
Nicolas Pope committed
    bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) override;

    bool begin() override;
    bool end() override;
    bool active() override;
    bool active(FrameID id) override;

    bool enable(FrameID id) override;
    bool enable(FrameID id, ftl::protocol::Channel c) override;
    bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override;

    void reset() override;
    void refresh() override;

    void setProperty(ftl::protocol::StreamProperty opt, std::any value) override;
    std::any getProperty(ftl::protocol::StreamProperty opt) override;
    bool supportsProperty(ftl::protocol::StreamProperty opt) override;
    StreamType type() const override;

    inline const ftl::UUID &getPeer() const {
        if (host_) { throw FTL_Error("Net::getPeer() not possible, hosting stream"); }
        if (!peer_) { throw FTL_Error("steram::Net has no valid Peer. Not found earlier?"); }
        return *peer_;
    }

    inline ftl::Handle onClientConnect(const std::function<bool(ftl::net::Peer*)> &cb) { return connect_cb_.on(cb); }

    /**
     * Return the average bitrate of all streams since the last call to this
     * function. Units are Mbps.
     */
    static NetStats getStatistics();

    static void installRPC(ftl::net::Universe *net);

    static constexpr int kFramesToRequest = 30;

    // Unit test support
Nicolas Pope's avatar
Nicolas Pope committed
    virtual void hasPosted(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) {}
Nicolas Pope's avatar
Nicolas Pope committed
    void inject(const ftl::protocol::StreamPacket &, ftl::protocol::DataPacket &);

 private:
    SHARED_MUTEX mutex_;
    bool active_ = false;
    ftl::net::Universe *net_;
    std::optional<ftl::UUID> peer_;
    std::string uri_;
    std::string base_uri_;
    const bool host_;
    std::array<std::atomic_int, 5> tally_ = {};
Nicolas Pope's avatar
Nicolas Pope committed
    uint8_t bitrate_ = 255;
    std::atomic_int64_t bytes_received_ = 0;
    bool paused_ = false;
    int frames_to_request_ = kFramesToRequest;
    std::string name_;
    ftl::PacketManager mgr_;
    ftl::Handler<ftl::net::Peer*> connect_cb_;
    int64_t buffering_ = 0;

    static std::atomic_size_t req_bitrate__;
    static std::atomic_size_t tx_bitrate__;
    static std::atomic_size_t rx_sample_count__;
    static std::atomic_size_t tx_sample_count__;
    static int64_t last_msg__;
    static MUTEX msg_mtx__;

    struct PacketBuffer {
        ftl::protocol::PacketPair packets;
        ftl::net::Peer *peer = nullptr;
        std::atomic_bool done = false;
    };

    struct FrameState {
        ftl::protocol::FrameID id;
        std::atomic_int active = 0;
        SHARED_MUTEX mtx;
        std::list<PacketBuffer> buffer;
        int64_t base_pkt_ts_ = 0;
        int64_t base_local_ts_ = 0;
    };

    SHARED_MUTEX statesMtx_;
    std::unordered_map<uint32_t, std::unique_ptr<FrameState>> frameStates_;

    std::unordered_map<ftl::protocol::FrameID, std::list<detail::StreamClient>> clients_;
    std::thread thread_;
    FrameState *_getFrameState(FrameID id);
    bool _enable(FrameID id);
Nicolas Pope's avatar
Nicolas Pope committed
    bool _processRequest(ftl::net::Peer *p, const ftl::protocol::StreamPacket *spkt, ftl::protocol::DataPacket &pkt);
    void _checkRXRate(size_t rx_size, int64_t rx_latency, int64_t ts);
    void _checkTXRate(size_t tx_size, int64_t tx_latency, int64_t ts);
    bool _sendRequest(
        ftl::protocol::Channel c,
        uint8_t frameset,
        uint8_t frames,
        uint8_t count,
        uint8_t bitrate,
        bool doreset = false);
    void _cleanUp();
Nicolas Pope's avatar
Nicolas Pope committed
    void _processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt);
    void _run();
}  // namespace protocol
}  // namespace ftl