Newer
Older
/**
* @file streams.hpp
* @copyright Copyright (c) 2022 University of Turku, MIT License
* @author Nicolas Pope
*/
#pragma once
#include <string>
#include <vector>
#include <unordered_set>
#include <any>
#include <unordered_map>
#include <memory>
#include <ftl/handle.hpp>
#include <ftl/threads.hpp>
#include <ftl/protocol/channels.hpp>
#include <ftl/protocol/channelSet.hpp>
#include <ftl/protocol/packet.hpp>
/** Represents a request for data through a stream */
FrameID id;
ftl::protocol::Channel channel;
int bitrate;
int count;
ftl::protocol::Codec codec;
/**
* The maximum number of frames a client can request in a single request.
*/
static const int kMaxFrames = 100;
using RequestCallback = std::function<bool(const ftl::protocol::Request&)>;
using StreamCallback = std::function<bool(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &)>;
/**
* @brief Enumeration of possible stream properties. Not all properties are supported
* by all stream types, but they allow additional control and data access.
*
*/
kObservers,
kURI,
kPaused,
kBytesSent,
kBytesReceived,
kLatency,
kFrameRate,
kName,
kDescription,
kTags,
kUser
/**
* @brief A hint about the streams capabilities.
*
*/
kLive, // Net stream
kRecorded // File stream
};
/**
* Base stream class to be implemented. Provides encode and decode functionality
* around a generic packet read and write mechanism. Some specialisations will
* provide and automatically handle control signals.
*
* Streams are bidirectional, frames can be both received and written.
*/
class Stream {
/**
* @brief If available, get a human readable name for the stream.
*
* @return std::string
*/
virtual std::string name() const;
/**
* @brief Register a callback to receive packets. The callback is called at the available
* frame rate, where each frame may consist of multiple packets and therefore multiple
* callbacks. Each callback can occur in a different thread, therefore all packets for
* a frame could be triggered in parallel.
*
* @param cb
* @return ftl::Handle
*/
ftl::Handle onPacket(const StreamCallback &cb) { return cb_.on(cb); }
/**
* @brief Register a callback for frame and channel requests. Remote machines can send
* requests, at which point the data should be generated and sent properly.
*
* @param cb
* @return ftl::Handle
*/
ftl::Handle onRequest(const std::function<bool(const Request &)> &cb) { return request_cb_.on(cb); }
/**
* @brief Send packets, either to file or over the network. Packets should follow
* the overall protocol rules, detailed elsewhere.
*
* @return true if sent
* @return false if dropped
*/
virtual bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &) = 0;
// TODO(Nick): Add methods for: pause, paused, statistics
/**
* @brief Start the stream. Calls to the onPacket callback will only occur after
* a call to this function (and before a call to end()).
*/
virtual bool begin() = 0;
/**
* @brief Terminate the stream. This will stop callbacks and will close all resources.
*
* @return true
* @return false
*/
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
virtual bool end() = 0;
/**
* Is the stream active? Generally true if begin() has been called, false
* initially and after end(). However, it may go false for other reasons.
* If false, no calls to onPacket will occur and posts will be ignored.
*/
virtual bool active() = 0;
/**
* @brief Clear all state. This will remove all information about available
* and enabled frames or channels. You will then need to enable frames and
* channels again. If active the stream will remain active.
*
*/
virtual void reset();
/**
* @brief Re-request all channels and state. This will also cause video encoding
* to generate new I-frames as if a new connection is made. All persistent data
* channels would also become available. For file streams this would reset the
* stream to the beginning of the file.
*
*/
virtual void refresh();
/**
* @brief Check if a frame is available.
*
* @param id Frameset and frame number
* @return true if data is available for the frame
* @return false if no data has been seen
*/
bool available(FrameID id) const;
/**
* @brief Check if a channel for a frame is available.
*
* @param id Frameset and frame number
* @param channel
* @return true if there is channel data
* @return false if the channel does not exist
*/
bool available(FrameID id, ftl::protocol::Channel channel) const;
/**
* @brief Check if all channels in a set are available.
*
* @param id Frameset and frame number
* @param channels Set of channels to check
* @return true if all channels exist
* @return false if one or more do not exist
*/
bool available(FrameID id, const ftl::protocol::ChannelSet channels) const;
/**
* @brief Register a callback for when new frames and channels become available.
*
* @param cb
* @return ftl::Handle
*/
ftl::Handle onAvailable(const std::function<bool(FrameID, ftl::protocol::Channel)> &cb) { return avail_cb_.on(cb); }
/**
* @brief Get all channels seen for a frame. If the frame does not exist then
* an empty set is returned.
*
* @param id Frameset and frame number
* @return Set of all seen channels, or empty.
*/
ftl::protocol::ChannelSet channels(FrameID id) const;
/**
* @brief Obtain a set of all active channels for a frame.
*
* @param id Frameset and frame number
* @return ftl::protocol::ChannelSet
*/
ftl::protocol::ChannelSet enabledChannels(FrameID id) const;
/**
* @brief Get all available frames in the stream.
*
* @return Set of frame IDs
*/
std::unordered_set<FrameID> frames() const;
/**
* @brief Get all enabled frames in the stream.
*
* @return Set of frame IDs
*/
std::unordered_set<FrameID> enabled() const;
/**
* @brief Check if a frame is enabled.
*
* @param id Frameset and frame number
* @return true if data for this frame is enabled.
* @return false if data not enabled or frame does not exist.
*/
bool enabled(FrameID id) const;
/**
* @brief Check if a specific channel is enabled for a frame.
*
* @param id Frameset and frame number
* @param channel
* @return true if the channel is active
* @return false if the channel is not active or does not exist
*/
bool enabled(FrameID id, ftl::protocol::Channel channel) const;
/**
* Number of framesets in stream.
*/
inline size_t size() const { return state_.size(); }
/**
* @brief Activate a frame. This allows availability information to be gathered
* for the frame which might not otherwise be available. However, data is likely
* missing unless a channel is enabled.
*
* @param id Frameset and frame number
* @return true if the frame could be enabled
* @return false if the frame could not be found or enabled
*/
virtual bool enable(FrameID id);
/**
* @brief Request a specific channel in a frame. Once the request is made data
* should become available if it exists. This will also enable the frame if
* not already enabled.
*
* @param id Frameset and frame number
* @param channel
* @return true if the channel is available and enabled
* @return false if the channel does not exist
*/
virtual bool enable(FrameID id, ftl::protocol::Channel channel);
/**
* @brief Activate a set of channels for a frame. Requests for data for each
* given channel are sent and the data should then become available.
*
* @param id Frameset and frame number
* @param channels a set of all channels to activate
* @return true if all channels could be enabled
* @return false if some channel could not be enabled
*/
virtual bool enable(FrameID id, const ftl::protocol::ChannelSet &channels);
/**
* @brief Disable an entire frame. If the frame is not available or is already
* disabled then this method has no effect.
*
* @param id
*/
virtual void disable(FrameID id);
/**
* @brief Disable a specific channel in a frame. If not available or already
* disabled then this method has no effect.
*
* @param id
* @param channel
*/
virtual void disable(FrameID id, ftl::protocol::Channel channel);
/**
* @brief Disable a set of channels in a frame. If not available or already
* disabled then this method has no effect.
*
* @param id
* @param channels
*/
virtual void disable(FrameID id, const ftl::protocol::ChannelSet &channels);
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
/**
* @brief Set a stream property to a new value. If the property is not supported,
* is readonly or an invalid value type is given, then an exception is thrown.
* Check if the property is supported first.
*
* @param opt
* @param value
*/
virtual void setProperty(ftl::protocol::StreamProperty opt, std::any value) = 0;
/**
* @brief Get the value of a stream property. If the property is not supported then
* an exception is thrown. The result is an `any` object that should be casted
* correctly by the user.
*
* @param opt
* @return std::any
*/
virtual std::any getProperty(ftl::protocol::StreamProperty opt) = 0;
/**
* @brief Check if a property is supported. No exceptions are thrown.
*
* @param opt
* @return true if the property is at least readable
* @return false if the property is unsupported
*/
virtual bool supportsProperty(ftl::protocol::StreamProperty opt) = 0;
/**
* @brief Get the streams type hint.
*
* @return StreamType
*/
virtual StreamType type() const { return StreamType::kUnknown; }
/**
* @brief Register a callback for asynchronous stream errors.
*
* @param cb
* @return ftl::Handle
*/
ftl::Handle onError(const std::function<bool(ftl::protocol::Error, const std::string &)> &cb) {
return error_cb_.on(cb);
}
/** Mark the channel and frame as available */
void seen(FrameID id, ftl::protocol::Channel channel);
protected:
/** Dispatch packets to callbacks */
void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt);
/** Dispatch a request */
void request(const Request &req);
/** Report a stream error */
void error(ftl::protocol::Error, const std::string &str);
mutable SHARED_MUTEX mtx_;
private:
struct FSState {
bool enabled = false;
ftl::protocol::ChannelSet selected;
ftl::protocol::ChannelSet available;
// TODO(Nick): Add a name and metadata
};
ftl::Handler<const ftl::protocol::StreamPacket&, const ftl::protocol::DataPacket&> cb_;
ftl::Handler<const Request &> request_cb_;
ftl::Handler<FrameID, ftl::protocol::Channel> avail_cb_;
ftl::Handler<ftl::protocol::Error, const std::string&> error_cb_;
std::unordered_map<int, FSState> state_;
using StreamPtr = std::shared_ptr<Stream>;
} // namespace protocol
} // namespace ftl