Skip to content
Snippets Groups Projects
Commit 37f75de2 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

#27 Code lint and documentation

parent f19e4e78
No related branches found
No related tags found
No related merge requests found
Showing
with 1217 additions and 946 deletions
...@@ -4,5 +4,4 @@ build ...@@ -4,5 +4,4 @@ build
**/config.cpp **/config.cpp
**/config.h **/config.h
_CPack_Packages _CPack_Packages
.vscode
.env .env
\ No newline at end of file
...@@ -64,6 +64,7 @@ ...@@ -64,6 +64,7 @@
"thread": "cpp", "thread": "cpp",
"typeinfo": "cpp", "typeinfo": "cpp",
"valarray": "cpp", "valarray": "cpp",
"variant": "cpp" "variant": "cpp",
"any": "cpp"
} }
} }
\ No newline at end of file
...@@ -111,7 +111,7 @@ if (WIN32) # TODO(nick) Should do based upon compiler (VS) ...@@ -111,7 +111,7 @@ if (WIN32) # TODO(nick) Should do based upon compiler (VS)
else() else()
add_definitions(-DUNIX) add_definitions(-DUNIX)
# -fdiagnostics-color # -fdiagnostics-color
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always -std=c++17 -fPIC -march=haswell -mavx2 -mfpmath=sse -Wall -Werror=unused-result -Werror=return-type") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always -std=c++17 -fPIC -march=haswell -mavx2 -mfpmath=sse -Wall -Werror")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -pg") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -pg")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -O3") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -O3")
......
...@@ -7,70 +7,68 @@ ...@@ -7,70 +7,68 @@
#pragma once #pragma once
#include <sstream> #include <sstream>
#include <string>
namespace ftl { namespace ftl {
class Formatter { class Formatter {
public: public:
Formatter() {} Formatter() {}
~Formatter() {} ~Formatter() {}
template <typename Type> template <typename Type>
inline Formatter & operator << (const Type & value) inline Formatter & operator << (const Type & value) {
{ stream_ << value;
stream_ << value; return *this;
return *this; }
}
inline std::string str() const { return stream_.str(); }
inline std::string str() const { return stream_.str(); } inline operator std::string () const { return stream_.str(); }
inline operator std::string () const { return stream_.str(); }
enum ConvertToString {
enum ConvertToString to_str
{ };
to_str inline std::string operator >> (ConvertToString) { return stream_.str(); }
};
inline std::string operator >> (ConvertToString) { return stream_.str(); } private:
std::stringstream stream_;
private:
std::stringstream stream_; Formatter(const Formatter &);
Formatter & operator = (Formatter &);
Formatter(const Formatter &);
Formatter & operator = (Formatter &);
}; };
/** /**
* Main FTL internal exception class. Use via Macro below. * Main FTL internal exception class. Use via Macro below.
*/ */
class exception : public std::exception class exception : public std::exception {
{ public:
public: explicit exception(const char *msg);
explicit exception(const char *msg); explicit exception(const Formatter &msg);
explicit exception(const Formatter &msg); ~exception();
~exception();
const char* what() const throw () { const char* what() const throw() {
processed_ = true; processed_ = true;
return msg_.c_str(); return msg_.c_str();
} }
std::string trace() const throw () { std::string trace() const throw() {
return decode_backtrace(); return decode_backtrace();
} }
void ignore() const { processed_ = true; } void ignore() const { processed_ = true; }
private: private:
std::string decode_backtrace() const; std::string decode_backtrace() const;
std::string msg_; std::string msg_;
mutable bool processed_; mutable bool processed_;
#ifdef __GNUC__ #ifdef __GNUC__
static const int TRACE_SIZE_MAX_ = 16; static const int TRACE_SIZE_MAX_ = 16;
void* trace_[TRACE_SIZE_MAX_]; void* trace_[TRACE_SIZE_MAX_];
int trace_size_; int trace_size_;
#endif #endif
}; };
} } // namespace ftl
#define FTL_Error(A) (ftl::exception(ftl::Formatter() << A << " [" << __FILE__ << ":" << __LINE__ << "]")) #define FTL_Error(A) (ftl::exception(ftl::Formatter() << A << " [" << __FILE__ << ":" << __LINE__ << "]"))
...@@ -6,24 +6,25 @@ ...@@ -6,24 +6,25 @@
#pragma once #pragma once
#include <ftl/threads.hpp>
#include <ftl/exception.hpp>
#include <functional> #include <functional>
#include <unordered_map> #include <unordered_map>
#include <utility>
#include <ftl/threads.hpp>
#include <ftl/exception.hpp>
namespace ftl { namespace ftl {
struct Handle; struct Handle;
struct BaseHandler { struct BaseHandler {
virtual void remove(const Handle &)=0; virtual void remove(const Handle &) = 0;
virtual void removeUnsafe(const Handle &)=0; virtual void removeUnsafe(const Handle &) = 0;
inline Handle make_handle(BaseHandler*, int); inline Handle make_handle(BaseHandler*, int);
protected: protected:
std::mutex mutex_; std::mutex mutex_;
int id_=0; int id_ = 0;
}; };
/** /**
...@@ -31,48 +32,58 @@ struct BaseHandler { ...@@ -31,48 +32,58 @@ struct BaseHandler {
* removed safely whenever the `Handle` instance is destroyed. * removed safely whenever the `Handle` instance is destroyed.
*/ */
struct [[nodiscard]] Handle { struct [[nodiscard]] Handle {
friend struct BaseHandler; friend struct BaseHandler;
/** /**
* Cancel the callback and invalidate the handle. * Cancel the callback and invalidate the handle.
*/ */
inline void cancel() { if (handler_) handler_->remove(*this); handler_ = nullptr; } inline void cancel() {
if (handler_) {
inline void innerCancel() { if (handler_) handler_->removeUnsafe(*this); handler_ = nullptr; } handler_->remove(*this);
}
inline int id() const { return id_; } handler_ = nullptr;
}
Handle() : handler_(nullptr), id_(0) {}
inline void innerCancel() {
Handle(const Handle &)=delete; if (handler_) {
Handle &operator=(const Handle &)=delete; handler_->removeUnsafe(*this);
}
inline Handle(Handle &&h) : handler_(nullptr) { handler_ = nullptr;
if (handler_) handler_->remove(*this); }
handler_ = h.handler_;
h.handler_ = nullptr; inline int id() const { return id_; }
id_ = h.id_;
} Handle() : handler_(nullptr), id_(0) {}
inline Handle &operator=(Handle &&h) { Handle(const Handle &) = delete;
if (handler_) handler_->remove(*this); Handle &operator=(const Handle &) = delete;
handler_ = h.handler_;
h.handler_ = nullptr; inline Handle(Handle &&h) : handler_(nullptr) {
id_ = h.id_; if (handler_) handler_->remove(*this);
return *this; handler_ = h.handler_;
} h.handler_ = nullptr;
id_ = h.id_;
inline ~Handle() { }
if (handler_) {
handler_->remove(*this); inline Handle &operator=(Handle &&h) {
} if (handler_) handler_->remove(*this);
} handler_ = h.handler_;
h.handler_ = nullptr;
private: id_ = h.id_;
BaseHandler *handler_; return *this;
int id_; }
Handle(BaseHandler *h, int id) : handler_(h), id_(id) {} inline ~Handle() {
if (handler_) {
handler_->remove(*this);
}
}
private:
BaseHandler *handler_;
int id_;
Handle(BaseHandler *h, int id) : handler_(h), id_(id) {}
}; };
/** /**
...@@ -85,116 +96,118 @@ struct [[nodiscard]] Handle { ...@@ -85,116 +96,118 @@ struct [[nodiscard]] Handle {
*/ */
template <typename ...ARGS> template <typename ...ARGS>
struct Handler : BaseHandler { struct Handler : BaseHandler {
Handler() {} Handler() {}
~Handler() { ~Handler() {
// Ensure all thread pool jobs are done // Ensure all thread pool jobs are done
while (jobs_ > 0 && ftl::pool.size() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2)); while (jobs_ > 0 && ftl::pool.size() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2));
} }
/** /**
* Add a new callback function. It returns a `Handle` object that must * Add a new callback function. It returns a `Handle` object that must
* remain in scope, the destructor of the `Handle` will remove the callback. * remain in scope, the destructor of the `Handle` will remove the callback.
*/ */
Handle on(const std::function<bool(ARGS...)> &f) { Handle on(const std::function<bool(ARGS...)> &f) {
std::unique_lock<std::mutex> lk(mutex_); std::unique_lock<std::mutex> lk(mutex_);
int id = id_++; int id = id_++;
callbacks_[id] = f; callbacks_[id] = f;
return make_handle(this, id); return make_handle(this, id);
} }
/** /**
* Safely trigger all callbacks. Note that `Handler` is locked when * Safely trigger all callbacks. Note that `Handler` is locked when
* triggering so callbacks cannot make modifications to it or they will * triggering so callbacks cannot make modifications to it or they will
* lock up. To remove a callback, return false from the callback, else * lock up. To remove a callback, return false from the callback, else
* return true. * return true.
*/ */
void trigger(ARGS ...args) { void trigger(ARGS ...args) {
bool hadFault = false; bool hadFault = false;
std::unique_lock<std::mutex> lk(mutex_); std::unique_lock<std::mutex> lk(mutex_);
for (auto i=callbacks_.begin(); i!=callbacks_.end(); ) { for (auto i = callbacks_.begin(); i != callbacks_.end(); ) {
bool keep = true; bool keep = true;
try { try {
keep = i->second(args...); keep = i->second(args...);
} catch(...) { } catch(...) {
hadFault = true; hadFault = true;
} }
if (!keep) i = callbacks_.erase(i); if (!keep) i = callbacks_.erase(i);
else ++i; else
} ++i;
if (hadFault) throw FTL_Error("Callback exception"); }
} if (hadFault) throw FTL_Error("Callback exception");
}
/**
* Call all the callbacks in another thread. The callbacks are done in a /**
* single thread, not in parallel. * Call all the callbacks in another thread. The callbacks are done in a
*/ * single thread, not in parallel.
void triggerAsync(ARGS ...args) { */
++jobs_; void triggerAsync(ARGS ...args) {
ftl::pool.push([this, args...](int id) { ++jobs_;
bool hadFault = false; ftl::pool.push([this, args...](int id) {
std::unique_lock<std::mutex> lk(mutex_); bool hadFault = false;
for (auto i=callbacks_.begin(); i!=callbacks_.end(); ) { std::unique_lock<std::mutex> lk(mutex_);
bool keep = true; for (auto i = callbacks_.begin(); i != callbacks_.end(); ) {
try { bool keep = true;
keep = i->second(args...); try {
} catch (...) { keep = i->second(args...);
hadFault = true; } catch (...) {
} hadFault = true;
if (!keep) i = callbacks_.erase(i); }
else ++i; if (!keep) i = callbacks_.erase(i);
} else
--jobs_; ++i;
if (hadFault) throw FTL_Error("Callback exception"); }
}); --jobs_;
} if (hadFault) throw FTL_Error("Callback exception");
});
/** }
* Each callback is called in its own thread job. Note: the return value
* of the callback is ignored in this case and does not allow callback /**
* removal via the return value. * Each callback is called in its own thread job. Note: the return value
*/ * of the callback is ignored in this case and does not allow callback
void triggerParallel(ARGS ...args) { * removal via the return value.
std::unique_lock<std::mutex> lk(mutex_); */
jobs_ += callbacks_.size(); void triggerParallel(ARGS ...args) {
for (auto i=callbacks_.begin(); i!=callbacks_.end(); ++i) { std::unique_lock<std::mutex> lk(mutex_);
ftl::pool.push([this, f = i->second, args...](int id) { jobs_ += callbacks_.size();
try { for (auto i = callbacks_.begin(); i != callbacks_.end(); ++i) {
f(args...); ftl::pool.push([this, f = i->second, args...](int id) {
} catch (const ftl::exception &e) { try {
--jobs_; f(args...);
throw e; } catch (const ftl::exception &e) {
} --jobs_;
--jobs_; throw e;
}); }
} --jobs_;
} });
}
/** }
* Remove a callback using its `Handle`. This is equivalent to allowing the
* `Handle` to be destroyed or cancelled. /**
*/ * Remove a callback using its `Handle`. This is equivalent to allowing the
void remove(const Handle &h) override { * `Handle` to be destroyed or cancelled.
{ */
std::unique_lock<std::mutex> lk(mutex_); void remove(const Handle &h) override {
callbacks_.erase(h.id()); {
} std::unique_lock<std::mutex> lk(mutex_);
// Make sure any possible call to removed callback has finished. callbacks_.erase(h.id());
while (jobs_ > 0 && ftl::pool.size() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2)); }
} // Make sure any possible call to removed callback has finished.
while (jobs_ > 0 && ftl::pool.size() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2));
void removeUnsafe(const Handle &h) override { }
callbacks_.erase(h.id());
// Make sure any possible call to removed callback has finished. void removeUnsafe(const Handle &h) override {
while (jobs_ > 0 && ftl::pool.size() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2)); callbacks_.erase(h.id());
} // Make sure any possible call to removed callback has finished.
while (jobs_ > 0 && ftl::pool.size() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2));
void clear() { }
callbacks_.clear();
} void clear() {
callbacks_.clear();
private: }
std::unordered_map<int, std::function<bool(ARGS...)>> callbacks_;
std::atomic_int jobs_=0; private:
std::unordered_map<int, std::function<bool(ARGS...)>> callbacks_;
std::atomic_int jobs_ = 0;
}; };
/** /**
...@@ -205,61 +218,58 @@ struct Handler : BaseHandler { ...@@ -205,61 +218,58 @@ struct Handler : BaseHandler {
*/ */
template <typename ...ARGS> template <typename ...ARGS>
struct SingletonHandler : BaseHandler { struct SingletonHandler : BaseHandler {
/** /**
* Add a new callback function. It returns a `Handle` object that must * Add a new callback function. It returns a `Handle` object that must
* remain in scope, the destructor of the `Handle` will remove the callback. * remain in scope, the destructor of the `Handle` will remove the callback.
*/ */
[[nodiscard]] Handle on(const std::function<bool(ARGS...)> &f) { [[nodiscard]] Handle on(const std::function<bool(ARGS...)> &f) {
std::unique_lock<std::mutex> lk(mutex_); std::unique_lock<std::mutex> lk(mutex_);
if (callback_) throw FTL_Error("Callback already bound"); if (callback_) throw FTL_Error("Callback already bound");
callback_ = f; callback_ = f;
return make_handle(this, id_++); return make_handle(this, id_++);
} }
/** /**
* Safely trigger all callbacks. Note that `Handler` is locked when * Safely trigger all callbacks. Note that `Handler` is locked when
* triggering so callbacks cannot make modifications to it or they will * triggering so callbacks cannot make modifications to it or they will
* lock up. To remove a callback, return false from the callback, else * lock up. To remove a callback, return false from the callback, else
* return true. * return true.
*/ */
bool trigger(ARGS ...args) { bool trigger(ARGS ...args) {
std::unique_lock<std::mutex> lk(mutex_); std::unique_lock<std::mutex> lk(mutex_);
if (callback_) { if (callback_) {
bool keep = callback_(std::forward<ARGS>(args)...); bool keep = callback_(std::forward<ARGS>(args)...);
if (!keep) callback_ = nullptr; if (!keep) callback_ = nullptr;
return keep; return keep;
} else { } else {
return false; return false;
} }
//} catch (const std::exception &e) { }
// LOG(ERROR) << "Exception in callback: " << e.what();
//} /**
} * Remove a callback using its `Handle`. This is equivalent to allowing the
* `Handle` to be destroyed or cancelled. If the handle does not match the
/** * currently bound callback then the callback is not removed.
* Remove a callback using its `Handle`. This is equivalent to allowing the */
* `Handle` to be destroyed or cancelled. If the handle does not match the void remove(const Handle &h) override {
* currently bound callback then the callback is not removed. std::unique_lock<std::mutex> lk(mutex_);
*/ if (h.id() == id_-1) callback_ = nullptr;
void remove(const Handle &h) override { }
std::unique_lock<std::mutex> lk(mutex_);
if (h.id() == id_-1) callback_ = nullptr; void removeUnsafe(const Handle &h) override {
} if (h.id() == id_-1) callback_ = nullptr;
}
void removeUnsafe(const Handle &h) override {
if (h.id() == id_-1) callback_ = nullptr; void reset() { callback_ = nullptr; }
}
operator bool() const { return static_cast<bool>(callback_); }
void reset() { callback_ = nullptr; }
private:
operator bool() const { return (bool)callback_; } std::function<bool(ARGS...)> callback_;
private:
std::function<bool(ARGS...)> callback_;
}; };
} } // namespace ftl
ftl::Handle ftl::BaseHandler::make_handle(BaseHandler *h, int id) { ftl::Handle ftl::BaseHandler::make_handle(BaseHandler *h, int id) {
return ftl::Handle(h, id); return ftl::Handle(h, id);
} }
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#pragma once #pragma once
#include <memory> #include <memory>
#include <string>
#include <ftl/uuid.hpp> #include <ftl/uuid.hpp>
namespace ftl { namespace ftl {
...@@ -20,13 +21,65 @@ class Service; ...@@ -20,13 +21,65 @@ class Service;
void reset(); void reset();
extern ftl::UUID id; extern ftl::UUID id;
} } // namespace protocol
/**
* @brief Get the Self object. This may initialise the internal system when
* first called. A Self object allows for the overall control of the network
* and general RPC functionality between hosts. If this is called multiple
* times then the same internal object is returned, it is a singleton.
*
* @return std::shared_ptr<ftl::protocol::Self>
*/
std::shared_ptr<ftl::protocol::Self> getSelf(); std::shared_ptr<ftl::protocol::Self> getSelf();
/**
* @brief Create a secondary Self object. Mostly for testing purposes, this
* allows additional instances to be created of the otherwise singleton class.
*
* @return std::shared_ptr<ftl::protocol::Self>
*/
std::shared_ptr<ftl::protocol::Self> createDummySelf(); std::shared_ptr<ftl::protocol::Self> createDummySelf();
/**
* @brief Set the web service URI to use. There should be a single connection
* to a web service that provides additional management functionality beyond
* a typical node. By calling this function the system is informed about where
* to ask about certain resources.
*
* @param uri A websocket URI, either WS or WSS protocol.
* @return A node instance for the service
*/
std::shared_ptr<ftl::protocol::Service> setServiceProvider(const std::string &uri); std::shared_ptr<ftl::protocol::Service> setServiceProvider(const std::string &uri);
/**
* @brief Connect to another machine. This uses the singleton Self instance, however,
* it is possible to also connect from another secondary Self instance by
* using a member function.
*
* @param uri A TCP URI with the address and port of another machine.
* @return std::shared_ptr<ftl::protocol::Node>
*/
std::shared_ptr<ftl::protocol::Node> connectNode(const std::string &uri); std::shared_ptr<ftl::protocol::Node> connectNode(const std::string &uri);
/**
* @brief Host a new stream. The URI must be either a file or an FTL protocol.
* A file stream opened by this function will be write only, and a network
* stream will broadcast itself as a newly available source.
*
* @param uri Either file:// or ftl://
* @return std::shared_ptr<ftl::protocol::Stream>
*/
std::shared_ptr<ftl::protocol::Stream> createStream(const std::string &uri); std::shared_ptr<ftl::protocol::Stream> createStream(const std::string &uri);
/**
* @brief Open an existing stream. This can be a file or a network stream.
* A file stream will be opened readonly, and a network stream will attempt
* to find the stream on the local network or using the web service.
*
* @param uri Either file:// or ftl://
* @return std::shared_ptr<ftl::protocol::Stream>
*/
std::shared_ptr<ftl::protocol::Stream> getStream(const std::string &uri); std::shared_ptr<ftl::protocol::Stream> getStream(const std::string &uri);
} } // namespace ftl
/**
* @file broadcaster.hpp
* @copyright Copyright (c) 2022 University of Turku, MIT License
* @author Nicolas Pope
*/
#pragma once #pragma once
#include <ftl/protocol/streams.hpp>
#include <list> #include <list>
#include <memory>
#include <ftl/protocol/streams.hpp>
namespace ftl { namespace ftl {
namespace protocol { namespace protocol {
...@@ -12,50 +19,49 @@ namespace protocol { ...@@ -12,50 +19,49 @@ namespace protocol {
* packets. * packets.
*/ */
class Broadcast : public Stream { class Broadcast : public Stream {
public: public:
explicit Broadcast(); Broadcast();
virtual ~Broadcast(); virtual ~Broadcast();
void add(const std::shared_ptr<Stream> &);
void remove(const std::shared_ptr<Stream> &);
void clear();
bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) override; void add(const std::shared_ptr<Stream> &);
void remove(const std::shared_ptr<Stream> &);
void clear();
bool begin() override; bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) override;
bool end() override;
bool active() override;
void reset() override; bool begin() override;
bool end() override;
bool active() override;
void refresh() override; void reset() override;
std::list<std::shared_ptr<Stream>> streams() const; void refresh() override;
void setProperty(ftl::protocol::StreamProperty opt, std::any value) override; std::list<std::shared_ptr<Stream>> streams() const;
std::any getProperty(ftl::protocol::StreamProperty opt) override; void setProperty(ftl::protocol::StreamProperty opt, std::any value) override;
bool supportsProperty(ftl::protocol::StreamProperty opt) override; std::any getProperty(ftl::protocol::StreamProperty opt) override;
bool enable(FrameID id) override; bool supportsProperty(ftl::protocol::StreamProperty opt) override;
bool enable(FrameID id, ftl::protocol::Channel channel) override; bool enable(FrameID id) override;
bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override; bool enable(FrameID id, ftl::protocol::Channel channel) override;
StreamType type() const override; bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override;
private: StreamType type() const override;
struct StreamEntry { private:
std::shared_ptr<Stream> stream; struct StreamEntry {
ftl::Handle handle; std::shared_ptr<Stream> stream;
ftl::Handle req_handle; ftl::Handle handle;
ftl::Handle avail_handle; ftl::Handle req_handle;
}; ftl::Handle avail_handle;
};
std::list<StreamEntry> streams_; std::list<StreamEntry> streams_;
}; };
} }
......
...@@ -6,8 +6,8 @@ ...@@ -6,8 +6,8 @@
#pragma once #pragma once
#include <ftl/protocol/channels.hpp>
#include <unordered_set> #include <unordered_set>
#include <ftl/protocol/channels.hpp>
namespace ftl { namespace ftl {
namespace protocol { namespace protocol {
...@@ -20,26 +20,26 @@ ftl::protocol::ChannelSet operator&(const ftl::protocol::ChannelSet &a, const ft ...@@ -20,26 +20,26 @@ ftl::protocol::ChannelSet operator&(const ftl::protocol::ChannelSet &a, const ft
ftl::protocol::ChannelSet operator-(const ftl::protocol::ChannelSet &a, const ftl::protocol::ChannelSet &b); ftl::protocol::ChannelSet operator-(const ftl::protocol::ChannelSet &a, const ftl::protocol::ChannelSet &b);
inline ftl::protocol::ChannelSet &operator+=(ftl::protocol::ChannelSet &t, ftl::protocol::Channel c) { inline ftl::protocol::ChannelSet &operator+=(ftl::protocol::ChannelSet &t, ftl::protocol::Channel c) {
t.insert(c); t.insert(c);
return t; return t;
} }
inline ftl::protocol::ChannelSet &operator-=(ftl::protocol::ChannelSet &t, ftl::protocol::Channel c) { inline ftl::protocol::ChannelSet &operator-=(ftl::protocol::ChannelSet &t, ftl::protocol::Channel c) {
t.erase(c); t.erase(c);
return t; return t;
} }
inline ftl::protocol::ChannelSet operator+(const ftl::protocol::ChannelSet &t, ftl::protocol::Channel c) { inline ftl::protocol::ChannelSet operator+(const ftl::protocol::ChannelSet &t, ftl::protocol::Channel c) {
auto r = t; auto r = t;
r.insert(c); r.insert(c);
return r; return r;
} }
inline ftl::protocol::ChannelSet operator+(ftl::protocol::Channel a, ftl::protocol::Channel b) { inline ftl::protocol::ChannelSet operator+(ftl::protocol::Channel a, ftl::protocol::Channel b) {
std::unordered_set<ftl::protocol::Channel> r; std::unordered_set<ftl::protocol::Channel> r;
r.insert(a); r.insert(a);
r.insert(b); r.insert(b);
return r; return r;
} }
bool operator!=(const ftl::protocol::ChannelSet &a, const ftl::protocol::ChannelSet &b); bool operator!=(const ftl::protocol::ChannelSet &a, const ftl::protocol::ChannelSet &b);
/**
* @file channelUtils.hpp
* @copyright Copyright (c) 2020 University of Turku, MIT License
* @author Nicolas Pope
*/
#pragma once #pragma once
#include <string>
#include <ftl/protocol/channels.hpp> #include <ftl/protocol/channels.hpp>
namespace ftl { namespace ftl {
namespace protocol { namespace protocol {
inline bool isVideo(Channel c) { return (int)c < 32; }; inline bool isVideo(Channel c) { return static_cast<int>(c) < 32; }
inline bool isAudio(Channel c) { return (int)c >= 32 && (int)c < 64; }; inline bool isAudio(Channel c) { return static_cast<int>(c) >= 32 && static_cast<int>(c) < 64; }
inline bool isData(Channel c) { return (int)c >= 64; }; inline bool isData(Channel c) { return static_cast<int>(c) >= 64; }
/** Obtain a string name for channel. */ /** Obtain a string name for channel. */
std::string name(Channel c); std::string name(Channel c);
...@@ -17,17 +24,16 @@ int type(Channel c); ...@@ -17,17 +24,16 @@ int type(Channel c);
/** @deprecated */ /** @deprecated */
inline bool isFloatChannel(ftl::codecs::Channel chan) { inline bool isFloatChannel(ftl::codecs::Channel chan) {
switch (chan) { switch (chan) {
case Channel::GroundTruth: case Channel::GroundTruth :
case Channel::Depth : case Channel::Depth :
//case Channel::Normals : case Channel::Confidence :
case Channel::Confidence: case Channel::Flow :
case Channel::Flow : case Channel::Density :
case Channel::Density: case Channel::Energy : return true;
case Channel::Energy : return true; default : return false;
default : return false; }
}
} }
} } // namespace protocol
} } // namespace ftl
...@@ -7,129 +7,126 @@ ...@@ -7,129 +7,126 @@
#pragma once #pragma once
#include <bitset> #include <bitset>
//#include <msgpack.hpp>
namespace ftl { namespace ftl {
namespace protocol { namespace protocol {
/** Frame channel identifier. */ /** Frame channel identifier. */
enum struct Channel : int { enum struct Channel : int {
/* Video Channels */ /* Video Channels */
kNone = -1, kNone = -1,
kColour = 0, // 8UC3 or 8UC4 kColour = 0, // 8UC3 or 8UC4
kLeft = 0, kLeft = 0,
kDepth = 1, // 32S or 32F kDepth = 1, // 32S or 32F
kRight = 2, // 8UC3 or 8UC4 kRight = 2, // 8UC3 or 8UC4
kColour2 = 2, kColour2 = 2,
kDepth2 = 3, kDepth2 = 3,
kDeviation = 4, kDeviation = 4,
kScreen = 4, // 16SC2 kScreen = 4, // 16SC2
kNormals = 5, // 16FC4 kNormals = 5, // 16FC4
kWeights = 6, // short kWeights = 6, // short
kConfidence = 7, // 32F kConfidence = 7, // 32F
kContribution = 7, // 32F kContribution = 7, // 32F
kEnergyVector = 8, // 32FC4 kEnergyVector = 8, // 32FC4
kFlow = 9, // 16SC2 kFlow = 9, // 16SC2
kFlow2 = 10, // 16SC2 kFlow2 = 10, // 16SC2
kEnergy = 10, // 32F kEnergy = 10, // 32F
kMask = 11, // 32U kMask = 11, // 32U
kDensity = 12, // 32F kDensity = 12, // 32F
kSupport1 = 13, // 8UC4 (currently) kSupport1 = 13, // 8UC4 (currently)
kSupport2 = 14, // 8UC4 (currently) kSupport2 = 14, // 8UC4 (currently)
kSegmentation = 15, // 32S? kSegmentation = 15, // 32S?
kNormals2 = 16, // 16FC4 kNormals2 = 16, // 16FC4
kUNUSED1 = 17, kUNUSED1 = 17,
kDisparity = 18, kDisparity = 18,
kSmoothing = 19, // 32F kSmoothing = 19, // 32F
kUNUSED2 = 20, kUNUSED2 = 20,
kOverlay = 21, // 8UC4 kOverlay = 21, // 8UC4
kGroundTruth = 22, // 32F kGroundTruth = 22, // 32F
/* Audio Channels */ /* Audio Channels */
kAudioMono = 32, // Deprecated, will always be stereo kAudioMono = 32, // Deprecated, will always be stereo
kAudioStereo = 33, kAudioStereo = 33,
kAudio = 33, kAudio = 33,
/* Special data channels */ /* Special data channels */
kConfiguration = 64, // JSON Data kConfiguration = 64, // JSON Data
kSettings1 = 65, kSettings1 = 65,
kCalibration = 65, // Camera Parameters Object kCalibration = 65, // Camera Parameters Object
kPose = 66, // Eigen::Matrix4d, camera transform kPose = 66, // Eigen::Matrix4d, camera transform
kSettings2 = 67, kSettings2 = 67,
kCalibration2 = 67, // Right camera parameters kCalibration2 = 67, // Right camera parameters
kIndex = 68, kIndex = 68,
kControl = 69, // For stream and encoder control kControl = 69, // For stream and encoder control
kSettings3 = 70, kSettings3 = 70,
kMetaData = 71, // Map of string pairs (key, value) kMetaData = 71, // Map of string pairs (key, value)
kCapabilities = 72, // Unordered set of int capabilities kCapabilities = 72, // Unordered set of int capabilities
kCalibrationData = 73, // Just for stereo intrinsics/extrinsics etc kCalibrationData = 73, // Just for stereo intrinsics/extrinsics etc
kThumbnail = 74, // Small JPG thumbnail, sometimes updated kThumbnail = 74, // Small JPG thumbnail, sometimes updated
kOverlaySelect = 75, // Choose what to have in the overlay channel kOverlaySelect = 75, // Choose what to have in the overlay channel
kStartTime = 76, // Stream start timestamp kStartTime = 76, // Stream start timestamp
kUser = 77, // User currently controlling the stream kUser = 77, // User currently controlling the stream
kAccelerometer = 90, // Eigen::Vector3f kAccelerometer = 90, // Eigen::Vector3f
kGyroscope = 91, // Eigen::Vector3f kGyroscope = 91, // Eigen::Vector3f
/* Camera Options */ /* Camera Options */
kBrightness = 100, kBrightness = 100,
kContrast = 101, kContrast = 101,
kExposure = 102, kExposure = 102,
kGain = 103, kGain = 103,
kWhiteBalance = 104, kWhiteBalance = 104,
kAutoExposure = 105, kAutoExposure = 105,
kAutoWhiteBalance = 106, kAutoWhiteBalance = 106,
kCameraTemperature = 107, kCameraTemperature = 107,
/* Realsense Options */ /* Realsense Options */
kRS2_LaserPower = 150, kRS2_LaserPower = 150,
kRS2_MinDistance = 151, kRS2_MinDistance = 151,
kRS2_MaxDistance = 152, kRS2_MaxDistance = 152,
kRS2_InterCamSync = 153, kRS2_InterCamSync = 153,
kRS2_PostSharpening = 154, kRS2_PostSharpening = 154,
/* Pylon Options 200 */ /* Pylon Options 200 */
/* Audio Settings 300 */ /* Audio Settings 300 */
/* Renderer Settings 400 */ /* Renderer Settings 400 */
kRenderer_CameraType = 400, // stereo, normal, tile kRenderer_CameraType = 400, // stereo, normal, tile
kRenderer_Visualisation = 401, // Pointcloud, mesh, other kRenderer_Visualisation = 401, // Pointcloud, mesh, other
kRenderer_Engine = 402, // OpenGL, CUDA, other kRenderer_Engine = 402, // OpenGL, CUDA, other
kRenderer_FPS = 403, // Frames per second kRenderer_FPS = 403, // Frames per second
kRenderer_View = 404, // Fixed viewpoint to one source kRenderer_View = 404, // Fixed viewpoint to one source
kRenderer_Channel = 405, // Select overlay channel, kRenderer_Channel = 405, // Select overlay channel,
kRenderer_Opacity = 406, // Opacity of overlay channel kRenderer_Opacity = 406, // Opacity of overlay channel
kRenderer_Sources = 407, // Which source devices to use kRenderer_Sources = 407, // Which source devices to use
kRenderer_Projection = 408, // 0 = normal, 1 = ortho, 2 = equirect kRenderer_Projection = 408, // 0 = normal, 1 = ortho, 2 = equirect
kRenderer_Background = 409, // Background colour kRenderer_Background = 409, // Background colour
kRenderer_ShowBadColour = 420, kRenderer_ShowBadColour = 420,
kRenderer_CoolEffect = 421, kRenderer_CoolEffect = 421,
kRenderer_EffectColour = 422, kRenderer_EffectColour = 422,
kRenderer_ShowColourWeights = 423, kRenderer_ShowColourWeights = 423,
kRenderer_TriangleLimit = 424, kRenderer_TriangleLimit = 424,
kRenderer_DisconDisparities = 425, kRenderer_DisconDisparities = 425,
kRenderer_NormalWeightColour = 426, kRenderer_NormalWeightColour = 426,
kRenderer_ChannelWeights = 427, kRenderer_ChannelWeights = 427,
kRenderer_AccumFunc = 428, kRenderer_AccumFunc = 428,
/* Pipeline Settings */ /* Pipeline Settings */
kPipeline_Enable = 500, kPipeline_Enable = 500,
kPipeline_EnableMVMLS = 501, kPipeline_EnableMVMLS = 501,
kPipeline_EnableAruco = 502, kPipeline_EnableAruco = 502,
/* Custom / user data channels */ /* Custom / user data channels */
kData = 2048, // Do not use kData = 2048, // Do not use
kEndFrame = 2048, // Signify the last packet kEndFrame = 2048, // Signify the last packet
kFaces = 2049, // Data about detected faces kFaces = 2049, // Data about detected faces
kTransforms = 2050, // Transformation matrices for framesets kTransforms = 2050, // Transformation matrices for framesets
kShapes3D = 2051, // Labeled 3D shapes kShapes3D = 2051, // Labeled 3D shapes
kMessages = 2052, // Vector of Strings kMessages = 2052, // Vector of Strings
kTouch = 2053, // List of touch data type (each touch point) kTouch = 2053, // List of touch data type (each touch point)
kPipelines = 2054, // List of pipline URIs that have been applied kPipelines = 2054, // List of pipline URIs that have been applied
}; };
} } // namespace protocol
} } // namespace ftl
//MSGPACK_ADD_ENUM(ftl::codecs::Channel);
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
#pragma once #pragma once
#include <cstdint> #include <cstdint>
//#include <msgpack.hpp> #include <utility>
namespace ftl { namespace ftl {
...@@ -17,42 +17,40 @@ namespace ftl { ...@@ -17,42 +17,40 @@ namespace ftl {
*/ */
namespace protocol { namespace protocol {
static constexpr uint8_t kFlagRequest = 0x01; ///< Used for empty data packets to mark a request for data static constexpr uint8_t kFlagRequest = 0x01; ///< Used for empty data packets to mark a request for data
static constexpr uint8_t kFlagCompleted = 0x02; ///< Last packet for timestamp static constexpr uint8_t kFlagCompleted = 0x02; ///< Last packet for timestamp
static constexpr uint8_t kFlagReset = 0x04; static constexpr uint8_t kFlagReset = 0x04;
/** /**
* Compression format used. * Compression format used.
*/ */
enum struct Codec : uint8_t { enum struct Codec : uint8_t {
/* Video (image) codecs */ /* Video (image) codecs */
kJPG = 0, kJPG = 0,
kPNG, kPNG,
kH264, kH264,
kHEVC, // H265 kHEVC, // H265
kH264Lossless, kH264Lossless,
kHEVCLossLess, kHEVCLossLess,
/* Audio codecs */ /* Audio codecs */
kWave=32, kWave = 32,
kOPUS, kOPUS,
/* Data "codecs" */ /* Data "codecs" */
kJSON = 100, // A JSON string kJSON = 100, // A JSON string
kCalibration, // Camera parameters object [deprecated] kCalibration, // Camera parameters object [deprecated]
kPose, // 4x4 eigen matrix [deprecated] kPose, // 4x4 eigen matrix [deprecated]
kMsgPack, kMsgPack,
kString, // Null terminated string kString, // Null terminated string
kRaw, // Some unknown binary format kRaw, // Some unknown binary format
kInvalid = 254, kInvalid = 254,
kAny = 255 kAny = 255
}; };
/** Given a frame count, return a width x height tile configuration. */ /** Given a frame count, return a width x height tile configuration. */
std::pair<int,int> chooseTileConfig(int size); std::pair<int, int> chooseTileConfig(int size);
} // namespace codecs } // namespace protocol
} // namespace ftl } // namespace ftl
//MSGPACK_ADD_ENUM(ftl::codecs::codec_t);
/**
* @file error.hpp
* @copyright Copyright (c) 2022 University of Turku, MIT License
* @author Nicolas Pope
*/
#pragma once #pragma once
namespace ftl { namespace ftl {
namespace protocol { namespace protocol {
/**
* @brief Error codes for asynchronous error events.
*
*/
enum struct Error { enum struct Error {
kNoError = 0, kNoError = 0,
kUnknown = 1, kUnknown = 1,
...@@ -22,5 +32,5 @@ enum struct Error { ...@@ -22,5 +32,5 @@ enum struct Error {
kBadURI kBadURI
}; };
} } // namespace protocol
} } // namespace ftl
/**
* @file frameid.hpp
* @copyright Copyright (c) 2022 University of Turku, MIT License
* @author Nicolas Pope
*/
#pragma once #pragma once
#include <cinttypes> #include <cinttypes>
...@@ -11,45 +17,46 @@ namespace protocol { ...@@ -11,45 +17,46 @@ namespace protocol {
* frames cannot be duplicated. * frames cannot be duplicated.
*/ */
struct FrameID { struct FrameID {
uint32_t id; uint32_t id;
/** /**
* Frameset ID for this frame. * Frameset ID for this frame.
*/ */
inline unsigned int frameset() const { return id >> 8; } inline unsigned int frameset() const { return id >> 8; }
/** /**
* Frame index within the frameset. This will correspond to the vector * Frame index within the frameset. This will correspond to the vector
* index in the frameset object. * index in the frameset object.
*/ */
inline unsigned int source() const { return id & 0xff; } inline unsigned int source() const { return id & 0xff; }
/** /**
* The packed int with both frameset ID and index. * The packed int with both frameset ID and index.
*/ */
operator uint32_t() const { return id; } operator uint32_t() const { return id; }
inline FrameID &operator=(int v) { id = v; return *this; } inline FrameID &operator=(int v) {
id = v;
/** return *this;
* Create a frame ID using a frameset id and a source number. }
* @param fs Frameset id
* @param s Source number inside frameset /**
*/ * Create a frame ID using a frameset id and a source number.
FrameID(unsigned int fs, unsigned int s) : id((fs << 8) + (s & 0xff) ) {} * @param fs Frameset id
explicit FrameID(uint32_t x) : id(x) {} * @param s Source number inside frameset
FrameID() : id(0) {} */
FrameID(unsigned int fs, unsigned int s) : id((fs << 8) + (s & 0xff) ) {}
explicit FrameID(uint32_t x) : id(x) {}
FrameID() : id(0) {}
}; };
} } // namespace protocol
} } // namespace ftl
// custom specialization of std::hash can be injected in namespace std // custom specialization of std::hash can be injected in namespace std
template<> template<>
struct std::hash<ftl::protocol::FrameID> struct std::hash<ftl::protocol::FrameID> {
{ std::size_t operator()(ftl::protocol::FrameID const& s) const noexcept {
std::size_t operator()(ftl::protocol::FrameID const& s) const noexcept
{
return std::hash<unsigned int>{}(s.id); return std::hash<unsigned int>{}(s.id);
} }
}; };
#pragma once /**
* @file muxer.hpp
* @copyright Copyright (c) 2020 University of Turku, MIT License
* @author Nicolas Pope
*/
#include <ftl/protocol/streams.hpp> #pragma once
#include <map> #include <map>
#include <list> #include <list>
#include <memory>
#include <unordered_map>
#include <utility>
#include <ftl/protocol/streams.hpp>
namespace ftl { namespace ftl {
namespace protocol { namespace protocol {
...@@ -17,65 +25,68 @@ static constexpr size_t kMaxStreams = 5; ...@@ -17,65 +25,68 @@ static constexpr size_t kMaxStreams = 5;
* stream mapping to be registered. * stream mapping to be registered.
*/ */
class Muxer : public Stream { class Muxer : public Stream {
public: public:
explicit Muxer(); Muxer();
virtual ~Muxer(); virtual ~Muxer();
void add(const std::shared_ptr<Stream> &, int fsid=-1);
void remove(const std::shared_ptr<Stream> &);
//bool onPacket(const StreamCallback &) override; void add(const std::shared_ptr<Stream> &, int fsid = -1);
void remove(const std::shared_ptr<Stream> &);
bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) override; bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) override;
bool begin() override; bool begin() override;
bool end() override; bool end() override;
bool active() override; bool active() override;
void reset() override; void reset() override;
bool enable(FrameID id) override; bool enable(FrameID id) override;
bool enable(FrameID id, ftl::protocol::Channel channel) override; bool enable(FrameID id, ftl::protocol::Channel channel) override;
bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override; bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override;
void setProperty(ftl::protocol::StreamProperty opt, std::any value) override; void setProperty(ftl::protocol::StreamProperty opt, std::any value) override;
std::any getProperty(ftl::protocol::StreamProperty opt) override; std::any getProperty(ftl::protocol::StreamProperty opt) override;
bool supportsProperty(ftl::protocol::StreamProperty opt) override; bool supportsProperty(ftl::protocol::StreamProperty opt) override;
StreamType type() const override; StreamType type() const override;
std::shared_ptr<Stream> originStream(FrameID) const; /**
* @brief Get the stream instance associated with an ID.
*
* @return std::shared_ptr<Stream>
*/
std::shared_ptr<Stream> originStream(FrameID) const;
private: private:
struct StreamEntry { struct StreamEntry {
std::shared_ptr<Stream> stream; std::shared_ptr<Stream> stream;
ftl::Handle handle; ftl::Handle handle;
ftl::Handle req_handle; ftl::Handle req_handle;
ftl::Handle avail_handle; ftl::Handle avail_handle;
ftl::Handle err_handle; ftl::Handle err_handle;
int id = 0; int id = 0;
int fixed_fs = -1; int fixed_fs = -1;
}; };
std::unordered_map<int, int> fsmap_; std::unordered_map<int, int> fsmap_;
std::unordered_map<int, int> sourcecount_; std::unordered_map<int, int> sourcecount_;
std::unordered_map<int64_t, FrameID> imap_; std::unordered_map<int64_t, FrameID> imap_;
std::unordered_map<FrameID, std::pair<FrameID, Muxer::StreamEntry*>> omap_; std::unordered_map<FrameID, std::pair<FrameID, Muxer::StreamEntry*>> omap_;
std::list<StreamEntry> streams_; std::list<StreamEntry> streams_;
mutable SHARED_MUTEX mutex_; mutable SHARED_MUTEX mutex_;
std::atomic_int stream_ids_ = 0; std::atomic_int stream_ids_ = 0;
std::atomic_int framesets_ = 0; std::atomic_int framesets_ = 0;
/* On packet receive, map to local ID */ /* On packet receive, map to local ID */
FrameID _mapFromInput(StreamEntry *, FrameID id); FrameID _mapFromInput(StreamEntry *, FrameID id);
/* On posting, map to output ID */ /* On posting, map to output ID */
std::pair<FrameID, StreamEntry*> _mapToOutput(FrameID id) const; std::pair<FrameID, StreamEntry*> _mapToOutput(FrameID id) const;
}; };
} } // namespace protocol
} } // namespace ftl
...@@ -6,9 +6,9 @@ ...@@ -6,9 +6,9 @@
#pragma once #pragma once
#include <ftl/uuid.hpp>
#include <memory> #include <memory>
#include <string>
#include <ftl/uuid.hpp>
namespace ftl { namespace ftl {
namespace net { namespace net {
...@@ -24,94 +24,96 @@ enum struct NodeType { ...@@ -24,94 +24,96 @@ enum struct NodeType {
}; };
enum struct NodeStatus { enum struct NodeStatus {
kInvalid, // no socket kInvalid, // no socket
kConnecting, // socket created, no handshake yet kConnecting, // socket created, no handshake yet
kConnected, // connection fully established kConnected, // connection fully established
kDisconnected, // socket closed, reconnect not possible kDisconnected, // socket closed, reconnect not possible
kReconnecting // socket closed, call reconnect() to try reconnecting kReconnecting // socket closed, call reconnect() to try reconnecting
}; };
/** /**
* To be constructed using the Universe::connect() method and not to be * @brief An API wrapper for a network connection. This object provides the
* created directly. * available RPC calls and connection status or control methods. Note that
* releasing the shared pointer will not result in connection termination,
* it must be closed and then released for the garbage collection to happen.
*
*/ */
class Node { class Node {
public: public:
/** Peer for outgoing connection: resolve address and connect */ /** Peer for outgoing connection: resolve address and connect */
explicit Node(const ftl::net::PeerPtr &impl); explicit Node(const ftl::net::PeerPtr &impl);
virtual ~Node(); virtual ~Node();
/** /**
* Close the peer if open. Setting retry parameter to true will initiate * Close the peer if open. Setting retry parameter to true will initiate
* backoff retry attempts. This is used to deliberately close a connection * backoff retry attempts. This is used to deliberately close a connection
* and not for error conditions where different close semantics apply. * and not for error conditions where different close semantics apply.
* *
* @param retry Should reconnection be attempted? * @param retry Should reconnection be attempted?
*/ */
void close(bool retry=false); void close(bool retry = false);
bool isConnected() const; bool isConnected() const;
/** /**
* Block until the connection and handshake has completed. You should use * Block until the connection and handshake has completed. You should use
* onConnect callbacks instead of blocking, mostly this is intended for * onConnect callbacks instead of blocking, mostly this is intended for
* the unit tests to keep them synchronous. * the unit tests to keep them synchronous.
* *
* @return True if all connections were successful, false if timeout or error. * @return True if all connections were successful, false if timeout or error.
*/ */
bool waitConnection(int seconds = 1); bool waitConnection(int seconds = 1);
/** /**
* Make a reconnect attempt. Called internally by Universe object. * Make a reconnect attempt. Called internally by Universe object.
*/ */
bool reconnect(); bool reconnect();
bool isOutgoing() const; bool isOutgoing() const;
/** /**
* Test if the connection is valid. This returns true in all conditions * Test if the connection is valid. This returns true in all conditions
* except where the socket has been disconnected permenantly, or was never * except where the socket has been disconnected permenantly, or was never
* able to connect, perhaps due to an invalid address, or is in middle of a * able to connect, perhaps due to an invalid address, or is in middle of a
* reconnect attempt. (Valid states: kConnecting, kConnected) * reconnect attempt. (Valid states: kConnecting, kConnected)
* *
* Should return true only in cases when valid OS socket exists. * Should return true only in cases when valid OS socket exists.
*/ */
bool isValid() const; bool isValid() const;
/** node type */ /** node type */
virtual NodeType getType() const { return NodeType::kNode; } virtual NodeType getType() const { return NodeType::kNode; }
NodeStatus status() const; NodeStatus status() const;
uint32_t getFTLVersion() const;
uint32_t getFTLVersion() const; uint8_t getFTLMajor() const { return getFTLVersion() >> 16; }
uint8_t getFTLMajor() const { return getFTLVersion() >> 16; } uint8_t getFTLMinor() const { return (getFTLVersion() >> 8) & 0xFF; }
uint8_t getFTLMinor() const { return (getFTLVersion() >> 8) & 0xFF; } uint8_t getFTLPatch() const { return getFTLVersion() & 0xFF; }
uint8_t getFTLPatch() const { return getFTLVersion() & 0xFF; }
/**
/** * Get the sockets protocol, address and port as a url string. This will be
* Get the sockets protocol, address and port as a url string. This will be * the same as the initial connection string on the client.
* the same as the initial connection string on the client. */
*/ std::string getURI() const;
std::string getURI() const;
/**
/** * Get the UUID for this peer.
* Get the UUID for this peer. */
*/ const ftl::UUID &id() const;
const ftl::UUID &id() const;
/**
/** * Get the peer id as a string.
* Get the peer id as a string. */
*/ std::string to_string() const;
std::string to_string() const;
void noReconnect();
void noReconnect();
unsigned int localID();
unsigned int localID();
int connectionCount() const;
int connectionCount() const;
protected:
protected: ftl::net::PeerPtr peer_;
ftl::net::PeerPtr peer_;
}; };
} } // namespace protocol
} } // namespace ftl
...@@ -8,9 +8,9 @@ ...@@ -8,9 +8,9 @@
#include <cstdint> #include <cstdint>
#include <vector> #include <vector>
#include <string>
#include <ftl/protocol/codecs.hpp> #include <ftl/protocol/codecs.hpp>
#include <ftl/protocol/channels.hpp> #include <ftl/protocol/channels.hpp>
//#include <msgpack.hpp>
namespace ftl { namespace ftl {
namespace protocol { namespace protocol {
...@@ -23,15 +23,15 @@ static constexpr uint8_t kCurrentFTLVersion = 5; ...@@ -23,15 +23,15 @@ static constexpr uint8_t kCurrentFTLVersion = 5;
* First bytes of our file format. * First bytes of our file format.
*/ */
struct Header { struct Header {
const char magic[4] = {'F','T','L','F'}; const char magic[4] = {'F', 'T', 'L', 'F'};
uint8_t version = kCurrentFTLVersion; uint8_t version = kCurrentFTLVersion;
}; };
/** /**
* Version 2 header padding for potential indexing use. * Version 2 header padding for potential indexing use.
*/ */
struct IndexHeader { struct IndexHeader {
int64_t reserved[8]; int64_t reserved[8];
}; };
/** /**
...@@ -41,19 +41,17 @@ struct IndexHeader { ...@@ -41,19 +41,17 @@ struct IndexHeader {
* an empty wrapper around that. It is used in the encoding callback. * an empty wrapper around that. It is used in the encoding callback.
*/ */
struct Packet { struct Packet {
ftl::protocol::Codec codec; ftl::protocol::Codec codec = ftl::protocol::Codec::kInvalid;
uint8_t reserved=0; uint8_t reserved = 0;
uint8_t frame_count=1; // v4+ Frames included in this packet uint8_t frame_count = 1; // v4+ Frames included in this packet
uint8_t bitrate=0; // v4+ For multi-bitrate encoding, 0=highest uint8_t bitrate = 0; // v4+ For multi-bitrate encoding, 0=highest
union { union {
uint8_t flags=0; // Codec dependent flags (eg. I-Frame or P-Frame) uint8_t flags = 0; // Codec dependent flags (eg. I-Frame or P-Frame)
uint8_t packet_count; uint8_t packet_count;
}; };
std::vector<uint8_t> data; std::vector<uint8_t> data;
//MSGPACK_DEFINE(codec, reserved, frame_count, bitrate, flags, data);
}; };
static constexpr unsigned int kStreamCap_Static = 0x01; static constexpr unsigned int kStreamCap_Static = 0x01;
...@@ -62,23 +60,21 @@ static constexpr unsigned int kStreamCap_NewConnection = 0x04; ...@@ -62,23 +60,21 @@ static constexpr unsigned int kStreamCap_NewConnection = 0x04;
/** V4 packets have no stream flags field */ /** V4 packets have no stream flags field */
struct StreamPacketV4 { struct StreamPacketV4 {
int version=4; // FTL version, Not encoded into stream int version = 4; // FTL version, Not encoded into stream
int64_t timestamp;
uint8_t streamID; // Source number [or v4 frameset id]
uint8_t frame_number; // v4+ First frame number (packet may include multiple frames)
ftl::protocol::Channel channel; // Actual channel of this current set of packets
inline int frameNumber() const { return (version >= 4) ? frame_number : streamID; } int64_t timestamp;
inline size_t frameSetID() const { return (version >= 4) ? streamID : 0; } uint8_t streamID; // Source number [or v4 frameset id]
uint8_t frame_number; // v4+ First frame number (packet may include multiple frames)
ftl::protocol::Channel channel; // Actual channel of this current set of packets
int64_t localTimestamp; // Not message packet / saved inline int frameNumber() const { return (version >= 4) ? frame_number : streamID; }
unsigned int hint_capability; // Is this a video stream, for example inline size_t frameSetID() const { return (version >= 4) ? streamID : 0; }
size_t hint_source_total; // Number of tracks per frame to expect
//MSGPACK_DEFINE(timestamp, streamID, frame_number, channel); int64_t localTimestamp; // Not message packet / saved
unsigned int hint_capability; // Is this a video stream, for example
size_t hint_source_total; // Number of tracks per frame to expect
operator std::string() const; operator std::string() const;
}; };
/** /**
...@@ -87,26 +83,24 @@ struct StreamPacketV4 { ...@@ -87,26 +83,24 @@ struct StreamPacketV4 {
* or included before a frame packet structure. * or included before a frame packet structure.
*/ */
struct StreamPacket { struct StreamPacket {
int version = kCurrentFTLVersion; // FTL version, Not encoded into stream int version = kCurrentFTLVersion; // FTL version, Not encoded into stream
int64_t timestamp;
uint8_t streamID; // Source number [or v4 frameset id]
uint8_t frame_number; // v4+ First frame number (packet may include multiple frames)
ftl::protocol::Channel channel; // Actual channel of this current set of packets
uint8_t flags=0;
inline int frameNumber() const { return (version >= 4) ? frame_number : streamID; } int64_t timestamp;
inline size_t frameSetID() const { return (version >= 4) ? streamID : 0; } uint8_t streamID; // Source number [or v4 frameset id]
uint8_t frame_number; // v4+ First frame number (packet may include multiple frames)
ftl::protocol::Channel channel; // Actual channel of this current set of packets
uint8_t flags = 0;
int64_t localTimestamp; // Not message packet / saved inline int frameNumber() const { return (version >= 4) ? frame_number : streamID; }
unsigned int hint_capability; // Is this a video stream, for example inline size_t frameSetID() const { return (version >= 4) ? streamID : 0; }
size_t hint_source_total; // Number of tracks per frame to expect
int retry_count = 0; // Decode retry count
unsigned int hint_peerid=0;
//MSGPACK_DEFINE(timestamp, streamID, frame_number, channel, flags); int64_t localTimestamp; // Not message packet / saved
unsigned int hint_capability; // Is this a video stream, for example
size_t hint_source_total; // Number of tracks per frame to expect
int retry_count = 0; // Decode retry count
unsigned int hint_peerid = 0;
operator std::string() const; operator std::string() const;
}; };
/** /**
...@@ -114,9 +108,9 @@ struct StreamPacket { ...@@ -114,9 +108,9 @@ struct StreamPacket {
* saved or transmitted in a stream together. * saved or transmitted in a stream together.
*/ */
struct PacketPair { struct PacketPair {
PacketPair(const StreamPacket &s, const Packet &p) : spkt(s), pkt(p) {} PacketPair(const StreamPacket &s, const Packet &p) : spkt(s), pkt(p) {}
const StreamPacket &spkt; const StreamPacket &spkt;
const Packet &pkt; const Packet &pkt;
}; };
} // namespace protocol } // namespace protocol
......
/** /**
* @file node.hpp * @file self.hpp
* @copyright Copyright (c) 2022 University of Turku, MIT License * @copyright Copyright (c) 2022 University of Turku, MIT License
* @author Nicolas Pope * @author Nicolas Pope
*/ */
#pragma once #pragma once
#include <memory>
#include <string>
#include <vector>
#include <list>
#include <ftl/uuid.hpp> #include <ftl/uuid.hpp>
#include <ftl/uri.hpp> #include <ftl/uri.hpp>
#include <ftl/handle.hpp> #include <ftl/handle.hpp>
#include <ftl/protocol/error.hpp> #include <ftl/protocol/error.hpp>
#include <memory>
#include <string>
namespace ftl { namespace ftl {
namespace net { namespace net {
class Universe; class Universe;
...@@ -24,63 +25,115 @@ namespace protocol { ...@@ -24,63 +25,115 @@ namespace protocol {
class Node; class Node;
class Stream; class Stream;
class Self { /**
public: * @brief A wrapper providing RPC API and local node management. Internally the
/** Peer for outgoing connection: resolve address and connect */ * Self instance is responsible for handling all network operations. Typically
explicit Self(const std::shared_ptr<ftl::net::Universe> &impl); * there is just a single instance of this class, but more can be created for
virtual ~Self(); * testing purposes.
*
std::shared_ptr<ftl::protocol::Node> connectNode(const std::string &uri); */
class Self {
std::shared_ptr<ftl::protocol::Stream> createStream(const std::string &uri); public:
/** Peer for outgoing connection: resolve address and connect */
std::shared_ptr<ftl::protocol::Stream> getStream(const std::string &uri); explicit Self(const std::shared_ptr<ftl::net::Universe> &impl);
virtual ~Self();
void start();
/**
/** * @brief Connect to another host from this Self instance. Usually the
* Open a new listening port on a given interfaces. * namespace method can be used instead.
* eg. "tcp://localhost:9000" *
* @param addr URI giving protocol, interface and port * @param uri A TCP URI.
*/ * @return std::shared_ptr<ftl::protocol::Node>
bool listen(const ftl::URI &addr); */
std::shared_ptr<ftl::protocol::Node> connectNode(const std::string &uri);
std::vector<ftl::URI> getListeningURIs();
/**
/** * @brief Create a new stream. Use the namespace method if possible.
* Essential to call this before destroying anything that registered *
* callbacks or binds for RPC. It will terminate all connections and * @param uri A file:// or ftl:// URI.
* stop any network activity but without deleting the net object. * @return std::shared_ptr<ftl::protocol::Stream>
*/ */
void shutdown(); std::shared_ptr<ftl::protocol::Stream> createStream(const std::string &uri);
bool isConnected(const ftl::URI &uri); /**
bool isConnected(const std::string &s); * @brief Open an existing stream. Use the namespace method if possible.
*
size_t numberOfNodes() const; * @param uri A file:// or ftl:// URI
* @return std::shared_ptr<ftl::protocol::Stream>
/** */
* Will block until all currently registered connnections have completed. std::shared_ptr<ftl::protocol::Stream> getStream(const std::string &uri);
* You should not use this, but rather use onConnect.
*/ void start();
int waitConnections(int seconds = 1);
/**
/** get peer pointer by peer UUID, returns nullptr if not found */ * Open a new listening port on a given interfaces.
std::shared_ptr<ftl::protocol::Node> getNode(const ftl::UUID &pid) const; * eg. "tcp://localhost:9000"
/** get webservice peer pointer, returns nullptr if not connected to webservice */ * @param addr URI giving protocol, interface and port
std::shared_ptr<ftl::protocol::Node> getWebService() const; */
std::list<std::shared_ptr<ftl::protocol::Node>> getNodes() const; bool listen(const ftl::URI &addr);
ftl::Handle onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&); /**
ftl::Handle onDisconnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&); * @brief Get the list of all listening addresses and ports.
ftl::Handle onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, ftl::protocol::Error, const std::string & )>&); *
* @return std::vector<ftl::URI>
// Used for testing */
ftl::net::Universe *getUniverse() const { return universe_.get(); } std::vector<ftl::URI> getListeningURIs();
protected: /**
std::shared_ptr<ftl::net::Universe> universe_; * Essential to call this before destroying anything that registered
* callbacks or binds for RPC. It will terminate all connections and
* stop any network activity but without deleting the net object.
*/
void shutdown();
bool isConnected(const ftl::URI &uri);
bool isConnected(const std::string &s);
size_t numberOfNodes() const;
/**
* Will block until all currently registered connnections have completed.
* You should not use this, but rather use onConnect.
*/
int waitConnections(int seconds = 1);
/** get peer pointer by peer UUID, returns nullptr if not found */
std::shared_ptr<ftl::protocol::Node> getNode(const ftl::UUID &pid) const;
/** get webservice peer pointer, returns nullptr if not connected to webservice */
std::shared_ptr<ftl::protocol::Node> getWebService() const;
std::list<std::shared_ptr<ftl::protocol::Node>> getNodes() const;
/**
* @brief Register a callback for new node connections.
*
* @return ftl::Handle
*/
ftl::Handle onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&);
/**
* @brief Register a callback for node disconnects.
*
* @return ftl::Handle
*/
ftl::Handle onDisconnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&);
/**
* @brief Register a callback for any node or network errors. Note that the node pointer can
* be null if the error was not associated with a specific node.
*
* @return ftl::Handle
*/
ftl::Handle onError(
const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&,
ftl::protocol::Error,
const std::string &)>&);
// Used for testing
ftl::net::Universe *getUniverse() const { return universe_.get(); }
protected:
std::shared_ptr<ftl::net::Universe> universe_;
}; };
} } // namespace protocol
} } // namespace ftl
...@@ -6,6 +6,12 @@ ...@@ -6,6 +6,12 @@
#pragma once #pragma once
#include <string>
#include <vector>
#include <unordered_set>
#include <any>
#include <unordered_map>
#include <memory>
#include <ftl/handle.hpp> #include <ftl/handle.hpp>
#include <ftl/threads.hpp> #include <ftl/threads.hpp>
#include <ftl/protocol/channels.hpp> #include <ftl/protocol/channels.hpp>
...@@ -13,51 +19,52 @@ ...@@ -13,51 +19,52 @@
#include <ftl/protocol/packet.hpp> #include <ftl/protocol/packet.hpp>
#include <ftl/protocol/frameid.hpp> #include <ftl/protocol/frameid.hpp>
#include <ftl/protocol/error.hpp> #include <ftl/protocol/error.hpp>
#include <string>
#include <vector>
#include <unordered_set>
#include <any>
namespace ftl { namespace ftl {
namespace protocol { namespace protocol {
/* Represents a request for data through a stream */ /** Represents a request for data through a stream */
struct Request { struct Request {
FrameID id; FrameID id;
ftl::protocol::Channel channel; ftl::protocol::Channel channel;
int bitrate; int bitrate;
int count; int count;
ftl::protocol::Codec codec; ftl::protocol::Codec codec;
}; };
using RequestCallback = std::function<bool(const ftl::protocol::Request&)>; using RequestCallback = std::function<bool(const ftl::protocol::Request&)>;
using StreamCallback = std::function<bool(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &)>; using StreamCallback = std::function<bool(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &)>;
/**
* @brief Enumeration of possible stream properties. Not all properties are supported
* by all stream types, but they allow additional control and data access.
*
*/
enum struct StreamProperty { enum struct StreamProperty {
kInvalid = 0, kInvalid = 0,
kLooping, kLooping,
kSpeed, kSpeed,
kBitrate, kBitrate,
kMaxBitrate, kMaxBitrate,
kAdaptiveBitrate, kAdaptiveBitrate,
kObservers, kObservers,
kURI, kURI,
kPaused, kPaused,
kBytesSent, kBytesSent,
kBytesReceived, kBytesReceived,
kLatency, kLatency,
kFrameRate, kFrameRate,
kName, kName,
kDescription, kDescription,
kTags, kTags,
kUser kUser
}; };
enum struct StreamType { enum struct StreamType {
kMixed, kMixed, // Multiple types of stream
kUnknown, kUnknown,
kLive, kLive, // Net stream
kRecorded kRecorded // File stream
}; };
/** /**
...@@ -68,158 +75,271 @@ enum struct StreamType { ...@@ -68,158 +75,271 @@ enum struct StreamType {
* Streams are bidirectional, frames can be both received and written. * Streams are bidirectional, frames can be both received and written.
*/ */
class Stream { class Stream {
public: public:
virtual ~Stream() {}; virtual ~Stream() {}
virtual std::string name() const; virtual std::string name() const;
/** /**
* Obtain all packets for next frame. The provided callback function is * Obtain all packets for next frame. The provided callback function is
* called once for every packet. This function might continue to call the * called once for every packet. This function might continue to call the
* callback even after the read function returns, for example with a * callback even after the read function returns, for example with a
* NetStream. * NetStream.
*/ */
ftl::Handle onPacket(const StreamCallback &cb) { return cb_.on(cb); } ftl::Handle onPacket(const StreamCallback &cb) { return cb_.on(cb); }
ftl::Handle onRequest(const std::function<bool(const Request &)> &cb) { return request_cb_.on(cb); } /**
* @brief Register a callback for frame and channel requests. Remote machines can send
virtual bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &)=0; * requests, at which point the data should be generated and sent properly.
*
// TODO: Add methods for: pause, paused, statistics * @param cb
* @return ftl::Handle
/** */
* Start the stream. Calls to the onPacket callback will only occur after ftl::Handle onRequest(const std::function<bool(const Request &)> &cb) { return request_cb_.on(cb); }
* a call to this function (and before a call to end()).
*/ /**
virtual bool begin()=0; * @brief Send packets, either to file or over the network. Packets should follow
* the overall protocol rules, detailed elsewhere.
virtual bool end()=0; *
* @return true if sent
/** * @return false if dropped
* Is the stream active? Generally true if begin() has been called, false */
* initially and after end(). However, it may go false for other reasons. virtual bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) = 0;
* If false, no calls to onPacket will occur and posts will be ignored.
*/ // TODO(Nick): Add methods for: pause, paused, statistics
virtual bool active()=0;
/**
/** * Start the stream. Calls to the onPacket callback will only occur after
* @brief Clear all state. This will remove all information about available * a call to this function (and before a call to end()).
* and enabled frames or channels. You will then need to enable frames and */
* channels again. If active the stream will remain active. virtual bool begin() = 0;
*
*/ virtual bool end() = 0;
virtual void reset();
/**
/** * Is the stream active? Generally true if begin() has been called, false
* @brief Re-request all channels and state. This will also cause video encoding * initially and after end(). However, it may go false for other reasons.
* to generate new I-frames as if a new connection is made. All persistent data * If false, no calls to onPacket will occur and posts will be ignored.
* channels would also become available. For file streams this would reset the */
* stream to the beginning of the file. virtual bool active() = 0;
*
*/ /**
virtual void refresh(); * @brief Clear all state. This will remove all information about available
* and enabled frames or channels. You will then need to enable frames and
/** * channels again. If active the stream will remain active.
* @brief Check if a frame is available. *
* */
* @param id Frameset and frame number virtual void reset();
* @return true if data is available for the frame
* @return false if no data has been seen /**
*/ * @brief Re-request all channels and state. This will also cause video encoding
bool available(FrameID id) const; * to generate new I-frames as if a new connection is made. All persistent data
* channels would also become available. For file streams this would reset the
bool available(FrameID id, ftl::protocol::Channel channel) const; * stream to the beginning of the file.
*
bool available(FrameID id, const ftl::protocol::ChannelSet channels) const; */
virtual void refresh();
ftl::Handle onAvailable(const std::function<bool(FrameID, ftl::protocol::Channel)> &cb) { return avail_cb_.on(cb); }
/**
/** * @brief Check if a frame is available.
* @brief Get all channels seen for a frame. If the frame does not exist then *
* an empty set is returned. * @param id Frameset and frame number
* * @return true if data is available for the frame
* @param id Frameset and frame number * @return false if no data has been seen
* @return Set of all seen channels, or empty. */
*/ bool available(FrameID id) const;
ftl::protocol::ChannelSet channels(FrameID id) const;
/**
ftl::protocol::ChannelSet enabledChannels(FrameID id) const; * @brief Check if a channel for a frame is available.
*
/** * @param id Frameset and frame number
* @brief Get all available frames in the stream. * @param channel
* * @return true if there is channel data
* @return Set of frame IDs * @return false if the channel does not exist
*/ */
std::unordered_set<FrameID> frames() const; bool available(FrameID id, ftl::protocol::Channel channel) const;
/** /**
* @brief Get all enabled frames in the stream. * @brief Check if all channels in a set are available.
* *
* @return Set of frame IDs * @param id Frameset and frame number
*/ * @param channels Set of channels to check
std::unordered_set<FrameID> enabled() const; * @return true if all channels exist
* @return false if one or more do not exist
/** */
* @brief Check if a frame is enabled. bool available(FrameID id, const ftl::protocol::ChannelSet channels) const;
*
* @param id Frameset and frame number /**
* @return true if data for this frame is enabled. * @brief Register a callback for when new frames and channels become available.
* @return false if data not enabled or frame does not exist. *
*/ * @param cb
bool enabled(FrameID id) const; * @return ftl::Handle
*/
bool enabled(FrameID id, ftl::protocol::Channel channel) const; ftl::Handle onAvailable(const std::function<bool(FrameID, ftl::protocol::Channel)> &cb) { return avail_cb_.on(cb); }
/** /**
* Number of framesets in stream. * @brief Get all channels seen for a frame. If the frame does not exist then
*/ * an empty set is returned.
inline size_t size() const { return state_.size(); } *
* @param id Frameset and frame number
virtual bool enable(FrameID id); * @return Set of all seen channels, or empty.
*/
virtual bool enable(FrameID id, ftl::protocol::Channel channel); ftl::protocol::ChannelSet channels(FrameID id) const;
virtual bool enable(FrameID id, const ftl::protocol::ChannelSet &channels); /**
* @brief Obtain a set of all active channels for a frame.
// TODO: Disable *
* @param id Frameset and frame number
virtual void setProperty(ftl::protocol::StreamProperty opt, std::any value)=0; * @return ftl::protocol::ChannelSet
*/
virtual std::any getProperty(ftl::protocol::StreamProperty opt)=0; ftl::protocol::ChannelSet enabledChannels(FrameID id) const;
virtual bool supportsProperty(ftl::protocol::StreamProperty opt)=0; /**
* @brief Get all available frames in the stream.
virtual StreamType type() const { return StreamType::kUnknown; } *
* @return Set of frame IDs
ftl::Handle onError(const std::function<bool(ftl::protocol::Error, const std::string &)> &cb) { return error_cb_.on(cb); } */
std::unordered_set<FrameID> frames() const;
protected:
void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt); /**
* @brief Get all enabled frames in the stream.
void seen(FrameID id, ftl::protocol::Channel channel); *
* @return Set of frame IDs
void request(const Request &req); */
std::unordered_set<FrameID> enabled() const;
void error(ftl::protocol::Error, const std::string &str);
/**
mutable SHARED_MUTEX mtx_; * @brief Check if a frame is enabled.
*
private: * @param id Frameset and frame number
struct FSState { * @return true if data for this frame is enabled.
bool enabled = false; * @return false if data not enabled or frame does not exist.
ftl::protocol::ChannelSet selected; */
ftl::protocol::ChannelSet available; bool enabled(FrameID id) const;
// TODO: Add a name and metadata
}; /**
* @brief Check if a specific channel is enabled for a frame.
ftl::Handler<const ftl::protocol::StreamPacket&, const ftl::protocol::Packet&> cb_; *
ftl::Handler<const Request &> request_cb_; * @param id Frameset and frame number
ftl::Handler<FrameID, ftl::protocol::Channel> avail_cb_; * @param channel
ftl::Handler<ftl::protocol::Error, const std::string&> error_cb_; * @return true if the channel is active
std::unordered_map<int, FSState> state_; * @return false if the channel is not active or does not exist
*/
bool enabled(FrameID id, ftl::protocol::Channel channel) const;
/**
* Number of framesets in stream.
*/
inline size_t size() const { return state_.size(); }
/**
* @brief Activate a frame. This allows availability information to be gathered
* for the frame which might not otherwise be available. However, data is likely
* missing unless a channel is enabled.
*
* @param id Frameset and frame number
* @return true if the frame could be enabled
* @return false if the frame could not be found or enabled
*/
virtual bool enable(FrameID id);
/**
* @brief Request a specific channel in a frame. Once the request is made data
* should become available if it exists. This will also enable the frame if
* not already enabled.
*
* @param id Frameset and frame number
* @param channel
* @return true if the channel is available and enabled
* @return false if the channel does not exist
*/
virtual bool enable(FrameID id, ftl::protocol::Channel channel);
/**
* @brief Activate a set of channels for a frame. Requests for data for each
* given channel are sent and the data should then become available.
*
* @param id Frameset and frame number
* @param channels a set of all channels to activate
* @return true if all channels could be enabled
* @return false if some channel could not be enabled
*/
virtual bool enable(FrameID id, const ftl::protocol::ChannelSet &channels);
// TODO(Nick): Disable
/**
* @brief Set a stream property to a new value. If the property is not supported,
* is readonly or an invalid value type is given, then an exception is thrown.
* Check if the property is supported first.
*
* @param opt
* @param value
*/
virtual void setProperty(ftl::protocol::StreamProperty opt, std::any value) = 0;
/**
* @brief Get the value of a stream property. If the property is not supported then
* an exception is thrown. The result is an `any` object that should be casted
* correctly by the user.
*
* @param opt
* @return std::any
*/
virtual std::any getProperty(ftl::protocol::StreamProperty opt) = 0;
/**
* @brief Check if a property is supported. No exceptions are thrown.
*
* @param opt
* @return true if the property is at least readable
* @return false if the property is unsupported
*/
virtual bool supportsProperty(ftl::protocol::StreamProperty opt) = 0;
virtual StreamType type() const { return StreamType::kUnknown; }
/**
* @brief Register a callback for asynchronous stream errors.
*
* @param cb
* @return ftl::Handle
*/
ftl::Handle onError(const std::function<bool(ftl::protocol::Error, const std::string &)> &cb) {
return error_cb_.on(cb);
}
protected:
/** Dispatch packets to callbacks */
void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt);
/** Mark the channel and frame as available */
void seen(FrameID id, ftl::protocol::Channel channel);
/** Dispatch a request */
void request(const Request &req);
/** Report a stream error */
void error(ftl::protocol::Error, const std::string &str);
mutable SHARED_MUTEX mtx_;
private:
struct FSState {
bool enabled = false;
ftl::protocol::ChannelSet selected;
ftl::protocol::ChannelSet available;
// TODO(Nick): Add a name and metadata
};
ftl::Handler<const ftl::protocol::StreamPacket&, const ftl::protocol::Packet&> cb_;
ftl::Handler<const Request &> request_cb_;
ftl::Handler<FrameID, ftl::protocol::Channel> avail_cb_;
ftl::Handler<ftl::protocol::Error, const std::string&> error_cb_;
std::unordered_map<int, FSState> state_;
}; };
using StreamPtr = std::shared_ptr<Stream>; using StreamPtr = std::shared_ptr<Stream>;
} } // namespace protocol
} } // namespace ftl
...@@ -24,47 +24,47 @@ ...@@ -24,47 +24,47 @@
#define RECURSIVE_MUTEX std::recursive_timed_mutex #define RECURSIVE_MUTEX std::recursive_timed_mutex
#define SHARED_MUTEX std::shared_timed_mutex #define SHARED_MUTEX std::shared_timed_mutex
#define UNIQUE_LOCK(M,L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::milliseconds(MUTEX_TIMEOUT)); while (!L) { LOG(ERROR) << "Mutex timeout"; L.try_lock_for(std::chrono::milliseconds(MUTEX_TIMEOUT)); }; #define UNIQUE_LOCK(M, L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::milliseconds(MUTEX_TIMEOUT)); while (!L) { LOG(ERROR) << "Mutex timeout"; L.try_lock_for(std::chrono::milliseconds(MUTEX_TIMEOUT)); };
#define SHARED_LOCK(M,L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::milliseconds(MUTEX_TIMEOUT)); while (!L) { LOG(ERROR) << "Mutex timeout"; L.try_lock_for(std::chrono::milliseconds(MUTEX_TIMEOUT)); }; #define SHARED_LOCK(M, L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::milliseconds(MUTEX_TIMEOUT)); while (!L) { LOG(ERROR) << "Mutex timeout"; L.try_lock_for(std::chrono::milliseconds(MUTEX_TIMEOUT)); };
#else #else
#define MUTEX std::mutex #define MUTEX std::mutex
#define RECURSIVE_MUTEX std::recursive_mutex #define RECURSIVE_MUTEX std::recursive_mutex
#define SHARED_MUTEX std::shared_mutex #define SHARED_MUTEX std::shared_mutex
#define UNIQUE_LOCK(M,L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M); #define UNIQUE_LOCK(M, L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M);
#define SHARED_LOCK(M,L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M); #define SHARED_LOCK(M, L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M);
#endif // DEBUG_MUTEX #endif // DEBUG_MUTEX
#define SHARED_LOCK_TYPE(M) std::shared_lock<M> #define SHARED_LOCK_TYPE(M) std::shared_lock<M>
namespace ftl { namespace ftl {
extern ctpl::thread_pool pool; extern ctpl::thread_pool pool;
namespace threads { namespace threads {
/** Upgrade shared lock to exclusive lock and re-acquire shared lock at end of /** Upgrade shared lock to exclusive lock and re-acquire shared lock at end of
* scope. */ * scope. */
class _write_lock { class _write_lock {
public: public:
explicit _write_lock(std::shared_mutex& mtx) : mtx_(&mtx) { explicit _write_lock(std::shared_mutex& mtx) : mtx_(&mtx) {
mtx_->unlock_shared(); mtx_->unlock_shared();
mtx_->lock(); mtx_->lock();
} }
~_write_lock() { ~_write_lock() {
mtx_->unlock(); mtx_->unlock();
mtx_->lock_shared(); mtx_->lock_shared();
} }
private: private:
std::shared_mutex* const mtx_; std::shared_mutex* const mtx_;
}; };
/** Upgrade already acquired SHARED_LOCK to exclusive lock and re-acquire shared /** Upgrade already acquired SHARED_LOCK to exclusive lock and re-acquire shared
* lock at end of scope. Shared lock must be already acquired on mutex! If more * lock at end of scope. Shared lock must be already acquired on mutex! If more
* careful locking is required, use std::..._lock directly */ * careful locking is required, use std::..._lock directly */
#define WRITE_LOCK(M,L) ftl::threads::_write_lock L(M); #define WRITE_LOCK(M, L) ftl::threads::_write_lock L(M);
} } // namespace threads
} } // namespace ftl
/**
* @file time.hpp
* @copyright Copyright (c) 2020 University of Turku, MIT License
* @author Nicolas Pope
*/
#pragma once #pragma once
#include <cinttypes> #include <cinttypes>
...@@ -20,5 +26,5 @@ double get_time_seconds(); ...@@ -20,5 +26,5 @@ double get_time_seconds();
*/ */
void setClockAdjustment(int64_t ms); void setClockAdjustment(int64_t ms);
} } // namespace time
} } // namespace ftl
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment