Something went wrong on our end
-
Nicolas Pope authoredNicolas Pope authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
streams.hpp 10.42 KiB
/**
* @file streams.hpp
* @copyright Copyright (c) 2022 University of Turku, MIT License
* @author Nicolas Pope
*/
#pragma once
#include <string>
#include <vector>
#include <unordered_set>
#include <any>
#include <unordered_map>
#include <memory>
#include <ftl/handle.hpp>
#include <ftl/threads.hpp>
#include <ftl/protocol/channels.hpp>
#include <ftl/protocol/channelSet.hpp>
#include <ftl/protocol/packet.hpp>
#include <ftl/protocol/frameid.hpp>
#include <ftl/protocol/error.hpp>
namespace ftl {
namespace protocol {
/** Represents a request for data through a stream */
struct Request {
FrameID id;
ftl::protocol::Channel channel;
int bitrate;
int count;
ftl::protocol::Codec codec;
};
using RequestCallback = std::function<bool(const ftl::protocol::Request&)>;
using StreamCallback = std::function<bool(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &)>;
/**
* @brief Enumeration of possible stream properties. Not all properties are supported
* by all stream types, but they allow additional control and data access.
*
*/
enum struct StreamProperty {
kInvalid = 0,
kLooping,
kSpeed,
kBitrate,
kMaxBitrate,
kAdaptiveBitrate,
kObservers,
kURI,
kPaused,
kBytesSent,
kBytesReceived,
kLatency,
kFrameRate,
kName,
kDescription,
kTags,
kUser
};
enum struct StreamType {
kMixed, // Multiple types of stream
kUnknown,
kLive, // Net stream
kRecorded // File stream
};
/**
* Base stream class to be implemented. Provides encode and decode functionality
* around a generic packet read and write mechanism. Some specialisations will
* provide and automatically handle control signals.
*
* Streams are bidirectional, frames can be both received and written.
*/
class Stream {
public:
virtual ~Stream() {}
virtual std::string name() const;
/**
* Obtain all packets for next frame. The provided callback function is
* called once for every packet. This function might continue to call the
* callback even after the read function returns, for example with a
* NetStream.
*/
ftl::Handle onPacket(const StreamCallback &cb) { return cb_.on(cb); }
/**
* @brief Register a callback for frame and channel requests. Remote machines can send
* requests, at which point the data should be generated and sent properly.
*
* @param cb
* @return ftl::Handle
*/
ftl::Handle onRequest(const std::function<bool(const Request &)> &cb) { return request_cb_.on(cb); }
/**
* @brief Send packets, either to file or over the network. Packets should follow
* the overall protocol rules, detailed elsewhere.
*
* @return true if sent
* @return false if dropped
*/
virtual bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) = 0;
// TODO(Nick): Add methods for: pause, paused, statistics
/**
* Start the stream. Calls to the onPacket callback will only occur after
* a call to this function (and before a call to end()).
*/
virtual bool begin() = 0;
virtual bool end() = 0;
/**
* Is the stream active? Generally true if begin() has been called, false
* initially and after end(). However, it may go false for other reasons.
* If false, no calls to onPacket will occur and posts will be ignored.
*/
virtual bool active() = 0;
/**
* @brief Clear all state. This will remove all information about available
* and enabled frames or channels. You will then need to enable frames and
* channels again. If active the stream will remain active.
*
*/
virtual void reset();
/**
* @brief Re-request all channels and state. This will also cause video encoding
* to generate new I-frames as if a new connection is made. All persistent data
* channels would also become available. For file streams this would reset the
* stream to the beginning of the file.
*
*/
virtual void refresh();
/**
* @brief Check if a frame is available.
*
* @param id Frameset and frame number
* @return true if data is available for the frame
* @return false if no data has been seen
*/
bool available(FrameID id) const;
/**
* @brief Check if a channel for a frame is available.
*
* @param id Frameset and frame number
* @param channel
* @return true if there is channel data
* @return false if the channel does not exist
*/
bool available(FrameID id, ftl::protocol::Channel channel) const;
/**
* @brief Check if all channels in a set are available.
*
* @param id Frameset and frame number
* @param channels Set of channels to check
* @return true if all channels exist
* @return false if one or more do not exist
*/
bool available(FrameID id, const ftl::protocol::ChannelSet channels) const;
/**
* @brief Register a callback for when new frames and channels become available.
*
* @param cb
* @return ftl::Handle
*/
ftl::Handle onAvailable(const std::function<bool(FrameID, ftl::protocol::Channel)> &cb) { return avail_cb_.on(cb); }
/**
* @brief Get all channels seen for a frame. If the frame does not exist then
* an empty set is returned.
*
* @param id Frameset and frame number
* @return Set of all seen channels, or empty.
*/
ftl::protocol::ChannelSet channels(FrameID id) const;
/**
* @brief Obtain a set of all active channels for a frame.
*
* @param id Frameset and frame number
* @return ftl::protocol::ChannelSet
*/
ftl::protocol::ChannelSet enabledChannels(FrameID id) const;
/**
* @brief Get all available frames in the stream.
*
* @return Set of frame IDs
*/
std::unordered_set<FrameID> frames() const;
/**
* @brief Get all enabled frames in the stream.
*
* @return Set of frame IDs
*/
std::unordered_set<FrameID> enabled() const;
/**
* @brief Check if a frame is enabled.
*
* @param id Frameset and frame number
* @return true if data for this frame is enabled.
* @return false if data not enabled or frame does not exist.
*/
bool enabled(FrameID id) const;
/**
* @brief Check if a specific channel is enabled for a frame.
*
* @param id Frameset and frame number
* @param channel
* @return true if the channel is active
* @return false if the channel is not active or does not exist
*/
bool enabled(FrameID id, ftl::protocol::Channel channel) const;
/**
* Number of framesets in stream.
*/
inline size_t size() const { return state_.size(); }
/**
* @brief Activate a frame. This allows availability information to be gathered
* for the frame which might not otherwise be available. However, data is likely
* missing unless a channel is enabled.
*
* @param id Frameset and frame number
* @return true if the frame could be enabled
* @return false if the frame could not be found or enabled
*/
virtual bool enable(FrameID id);
/**
* @brief Request a specific channel in a frame. Once the request is made data
* should become available if it exists. This will also enable the frame if
* not already enabled.
*
* @param id Frameset and frame number
* @param channel
* @return true if the channel is available and enabled
* @return false if the channel does not exist
*/
virtual bool enable(FrameID id, ftl::protocol::Channel channel);
/**
* @brief Activate a set of channels for a frame. Requests for data for each
* given channel are sent and the data should then become available.
*
* @param id Frameset and frame number
* @param channels a set of all channels to activate
* @return true if all channels could be enabled
* @return false if some channel could not be enabled
*/
virtual bool enable(FrameID id, const ftl::protocol::ChannelSet &channels);
// TODO(Nick): Disable
/**
* @brief Set a stream property to a new value. If the property is not supported,
* is readonly or an invalid value type is given, then an exception is thrown.
* Check if the property is supported first.
*
* @param opt
* @param value
*/
virtual void setProperty(ftl::protocol::StreamProperty opt, std::any value) = 0;
/**
* @brief Get the value of a stream property. If the property is not supported then
* an exception is thrown. The result is an `any` object that should be casted
* correctly by the user.
*
* @param opt
* @return std::any
*/
virtual std::any getProperty(ftl::protocol::StreamProperty opt) = 0;
/**
* @brief Check if a property is supported. No exceptions are thrown.
*
* @param opt
* @return true if the property is at least readable
* @return false if the property is unsupported
*/
virtual bool supportsProperty(ftl::protocol::StreamProperty opt) = 0;
virtual StreamType type() const { return StreamType::kUnknown; }
/**
* @brief Register a callback for asynchronous stream errors.
*
* @param cb
* @return ftl::Handle
*/
ftl::Handle onError(const std::function<bool(ftl::protocol::Error, const std::string &)> &cb) {
return error_cb_.on(cb);
}
protected:
/** Dispatch packets to callbacks */
void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt);
/** Mark the channel and frame as available */
void seen(FrameID id, ftl::protocol::Channel channel);
/** Dispatch a request */
void request(const Request &req);
/** Report a stream error */
void error(ftl::protocol::Error, const std::string &str);
mutable SHARED_MUTEX mtx_;
private:
struct FSState {
bool enabled = false;
ftl::protocol::ChannelSet selected;
ftl::protocol::ChannelSet available;
// TODO(Nick): Add a name and metadata
};
ftl::Handler<const ftl::protocol::StreamPacket&, const ftl::protocol::DataPacket&> cb_;
ftl::Handler<const Request &> request_cb_;
ftl::Handler<FrameID, ftl::protocol::Channel> avail_cb_;
ftl::Handler<ftl::protocol::Error, const std::string&> error_cb_;
std::unordered_map<int, FSState> state_;
};
using StreamPtr = std::shared_ptr<Stream>;
} // namespace protocol
} // namespace ftl