Skip to content
Snippets Groups Projects
streams.hpp 11.7 KiB
Newer Older
Nicolas Pope's avatar
Nicolas Pope committed
/**
 * @file streams.hpp
 * @copyright Copyright (c) 2022 University of Turku, MIT License
 * @author Nicolas Pope
 */

#pragma once

#include <string>
#include <vector>
#include <unordered_set>
#include <any>
#include <unordered_map>
#include <memory>
Nicolas Pope's avatar
Nicolas Pope committed
#include <ftl/handle.hpp>
#include <ftl/threads.hpp>
#include <ftl/protocol/channels.hpp>
#include <ftl/protocol/channelSet.hpp>
#include <ftl/protocol/packet.hpp>
#include <ftl/protocol/frameid.hpp>
#include <ftl/protocol/error.hpp>
Nicolas Pope's avatar
Nicolas Pope committed

namespace ftl {
namespace protocol {
/** Represents a request for data through a stream */
Nicolas Pope's avatar
Nicolas Pope committed
struct Request {
    FrameID id;
    ftl::protocol::Channel channel;
    int bitrate;
    int count;
    ftl::protocol::Codec codec;
Nicolas Pope's avatar
Nicolas Pope committed
};

using RequestCallback = std::function<bool(const ftl::protocol::Request&)>;

Nicolas Pope's avatar
Nicolas Pope committed
using StreamCallback = std::function<bool(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &)>;
Nicolas Pope's avatar
Nicolas Pope committed

/**
 * @brief Enumeration of possible stream properties. Not all properties are supported
 * by all stream types, but they allow additional control and data access.
 * 
 */
enum struct StreamProperty {
Nicolas Pope's avatar
Nicolas Pope committed
    kInvalid = 0,
    kLooping,
    kSpeed,
    kBitrate,
    kMaxBitrate,
    kAdaptiveBitrate,
    kObservers,
    kURI,
    kPaused,
    kBytesSent,
    kBytesReceived,
    kLatency,
    kFrameRate,
    kName,
    kDescription,
    kTags,
    kUser
Nicolas Pope's avatar
Nicolas Pope committed
};

/**
 * @brief A hint about the streams capabilities.
 * 
 */
Nicolas Pope's avatar
Nicolas Pope committed
enum struct StreamType {
    kMixed,     // Multiple types of stream
Nicolas Pope's avatar
Nicolas Pope committed
    kUnknown,
    kLive,      // Net stream
    kRecorded   // File stream
Nicolas Pope's avatar
Nicolas Pope committed
};

/**
 * Base stream class to be implemented. Provides encode and decode functionality
 * around a generic packet read and write mechanism. Some specialisations will
 * provide and automatically handle control signals.
 *
 * Streams are bidirectional, frames can be both received and written.
 */
class Stream {
 public:
    virtual ~Stream() {}

    /**
     * @brief If available, get a human readable name for the stream.
     * 
     * @return std::string 
     */
    virtual std::string name() const;

    /**
     * @brief Register a callback to receive packets. The callback is called at the available
     * frame rate, where each frame may consist of multiple packets and therefore multiple
     * callbacks. Each callback can occur in a different thread, therefore all packets for
     * a frame could be triggered in parallel.
     * 
     * @param cb 
     * @return ftl::Handle 
     */
    ftl::Handle onPacket(const StreamCallback &cb) { return cb_.on(cb); }

    /**
     * @brief Register a callback for frame and channel requests. Remote machines can send
     * requests, at which point the data should be generated and sent properly.
     * 
     * @param cb 
     * @return ftl::Handle 
     */
    ftl::Handle onRequest(const std::function<bool(const Request &)> &cb) { return request_cb_.on(cb); }

    /**
     * @brief Send packets, either to file or over the network. Packets should follow
     * the overall protocol rules, detailed elsewhere.
     * 
     * @return true if sent
     * @return false if dropped
     */
Nicolas Pope's avatar
Nicolas Pope committed
    virtual bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) = 0;

    // TODO(Nick): Add methods for: pause, paused, statistics

    /**
     * @brief Start the stream. Calls to the onPacket callback will only occur after
     * a call to this function (and before a call to end()).
     */
    virtual bool begin() = 0;

    /**
     * @brief Terminate the stream. This will stop callbacks and will close all resources.
     * 
     * @return true 
     * @return false 
     */
    virtual bool end() = 0;

    /**
     * Is the stream active? Generally true if begin() has been called, false
     * initially and after end(). However, it may go false for other reasons.
     * If false, no calls to onPacket will occur and posts will be ignored.
     */
    virtual bool active() = 0;

    /**
     * @brief Clear all state. This will remove all information about available
     * and enabled frames or channels. You will then need to enable frames and
     * channels again. If active the stream will remain active.
     * 
     */
    virtual void reset();

    /**
     * @brief Re-request all channels and state. This will also cause video encoding
     * to generate new I-frames as if a new connection is made. All persistent data
     * channels would also become available. For file streams this would reset the
     * stream to the beginning of the file.
     * 
     */
    virtual void refresh();

    /**
     * @brief Check if a frame is available.
     * 
     * @param id Frameset and frame number
     * @return true if data is available for the frame
     * @return false if no data has been seen
     */
    bool available(FrameID id) const;

    /**
     * @brief Check if a channel for a frame is available.
     * 
     * @param id Frameset and frame number
     * @param channel 
     * @return true if there is channel data
     * @return false if the channel does not exist
     */
    bool available(FrameID id, ftl::protocol::Channel channel) const;

    /**
     * @brief Check if all channels in a set are available.
     * 
     * @param id Frameset and frame number
     * @param channels Set of channels to check
     * @return true if all channels exist
     * @return false if one or more do not exist
     */
    bool available(FrameID id, const ftl::protocol::ChannelSet channels) const;

    /**
     * @brief Register a callback for when new frames and channels become available.
     * 
     * @param cb 
     * @return ftl::Handle 
     */
    ftl::Handle onAvailable(const std::function<bool(FrameID, ftl::protocol::Channel)> &cb) { return avail_cb_.on(cb); }

    /**
     * @brief Get all channels seen for a frame. If the frame does not exist then
     * an empty set is returned.
     * 
     * @param id Frameset and frame number
     * @return Set of all seen channels, or empty. 
     */
    ftl::protocol::ChannelSet channels(FrameID id) const;

    /**
     * @brief Obtain a set of all active channels for a frame.
     * 
     * @param id Frameset and frame number
     * @return ftl::protocol::ChannelSet 
     */
    ftl::protocol::ChannelSet enabledChannels(FrameID id) const;

    /**
     * @brief Get all available frames in the stream.
     * 
     * @return Set of frame IDs
     */
    std::unordered_set<FrameID> frames() const;

    /**
     * @brief Get all enabled frames in the stream.
     * 
     * @return Set of frame IDs
     */
    std::unordered_set<FrameID> enabled() const;

    /**
     * @brief Check if a frame is enabled.
     * 
     * @param id Frameset and frame number
     * @return true if data for this frame is enabled.
     * @return false if data not enabled or frame does not exist.
     */
    bool enabled(FrameID id) const;

    /**
     * @brief Check if a specific channel is enabled for a frame.
     * 
     * @param id Frameset and frame number
     * @param channel 
     * @return true if the channel is active
     * @return false if the channel is not active or does not exist
     */
    bool enabled(FrameID id, ftl::protocol::Channel channel) const;

    /**
     * Number of framesets in stream.
     */
    inline size_t size() const { return state_.size(); }

    /**
     * @brief Activate a frame. This allows availability information to be gathered
     * for the frame which might not otherwise be available. However, data is likely
     * missing unless a channel is enabled.
     * 
     * @param id Frameset and frame number
     * @return true if the frame could be enabled
     * @return false if the frame could not be found or enabled
     */
    virtual bool enable(FrameID id);

    /**
     * @brief Request a specific channel in a frame. Once the request is made data
     * should become available if it exists. This will also enable the frame if
     * not already enabled.
     * 
     * @param id Frameset and frame number
     * @param channel 
     * @return true if the channel is available and enabled
     * @return false if the channel does not exist
     */
    virtual bool enable(FrameID id, ftl::protocol::Channel channel);

    /**
     * @brief Activate a set of channels for a frame. Requests for data for each
     * given channel are sent and the data should then become available.
     * 
     * @param id Frameset and frame number
     * @param channels a set of all channels to activate
     * @return true if all channels could be enabled
     * @return false if some channel could not be enabled
     */
    virtual bool enable(FrameID id, const ftl::protocol::ChannelSet &channels);

Nicolas Pope's avatar
Nicolas Pope committed
    /**
     * @brief Disable an entire frame. If the frame is not available or is already
     * disabled then this method has no effect.
     * 
     * @param id 
     */
    virtual void disable(FrameID id);

    /**
     * @brief Disable a specific channel in a frame. If not available or already
     * disabled then this method has no effect.
     * 
     * @param id 
     * @param channel 
     */
    virtual void disable(FrameID id, ftl::protocol::Channel channel);

    /**
     * @brief Disable a set of channels in a frame. If not available or already
     * disabled then this method has no effect.
     * 
     * @param id 
     * @param channels 
     */
    virtual void disable(FrameID id, const ftl::protocol::ChannelSet &channels);

    /**
     * @brief Set a stream property to a new value. If the property is not supported,
     * is readonly or an invalid value type is given, then an exception is thrown.
     * Check if the property is supported first.
     * 
     * @param opt 
     * @param value 
     */
    virtual void setProperty(ftl::protocol::StreamProperty opt, std::any value) = 0;

    /**
     * @brief Get the value of a stream property. If the property is not supported then
     * an exception is thrown. The result is an `any` object that should be casted
     * correctly by the user.
     * 
     * @param opt 
     * @return std::any 
     */
    virtual std::any getProperty(ftl::protocol::StreamProperty opt) = 0;

    /**
     * @brief Check if a property is supported. No exceptions are thrown.
     * 
     * @param opt 
     * @return true if the property is at least readable
     * @return false if the property is unsupported
     */
    virtual bool supportsProperty(ftl::protocol::StreamProperty opt) = 0;

    /**
     * @brief Get the streams type hint.
     * 
     * @return StreamType 
     */
    virtual StreamType type() const { return StreamType::kUnknown; }

    /**
     * @brief Register a callback for asynchronous stream errors.
     * 
     * @param cb 
     * @return ftl::Handle 
     */
    ftl::Handle onError(const std::function<bool(ftl::protocol::Error, const std::string &)> &cb) {
        return error_cb_.on(cb);
    }

 protected:
    /** Dispatch packets to callbacks */
Nicolas Pope's avatar
Nicolas Pope committed
    void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt);

    /** Mark the channel and frame as available */
    void seen(FrameID id, ftl::protocol::Channel channel);

    /** Dispatch a request */
    void request(const Request &req);

    /** Report a stream error */
    void error(ftl::protocol::Error, const std::string &str);

    mutable SHARED_MUTEX mtx_;

 private:
    struct FSState {
        bool enabled = false;
        ftl::protocol::ChannelSet selected;
        ftl::protocol::ChannelSet available;
        // TODO(Nick): Add a name and metadata
    };

Nicolas Pope's avatar
Nicolas Pope committed
    ftl::Handler<const ftl::protocol::StreamPacket&, const ftl::protocol::DataPacket&> cb_;
    ftl::Handler<const Request &> request_cb_;
    ftl::Handler<FrameID, ftl::protocol::Channel> avail_cb_;
    ftl::Handler<ftl::protocol::Error, const std::string&> error_cb_;
    std::unordered_map<int, FSState> state_;
Nicolas Pope's avatar
Nicolas Pope committed
};

using StreamPtr = std::shared_ptr<Stream>;

}  // namespace protocol
}  // namespace ftl