Skip to content
Snippets Groups Projects
Commit cd542d4b authored by Sebastian Hahta's avatar Sebastian Hahta
Browse files

Feed and Filter (works in GUI)

parent b8b99d82
No related branches found
No related tags found
1 merge request!316Resolves #343 GUI and Frame Refactor
Pipeline #27634 failed
......@@ -35,13 +35,5 @@ InputOutput::InputOutput(ftl::Configurable *root, ftl::net::Universe *net) :
});
feed_ = std::unique_ptr<ftl::stream::Feed>(ftl::create<ftl::stream::Feed>(root, "feed", net));
}
void InputOutput::processFrameSet_(const std::shared_ptr<ftl::data::FrameSet>& fs) {
}
ftl::Handle InputOutput::addCallback(const std::function<bool(const std::shared_ptr<ftl::data::FrameSet>&)> &f) {
return feed_->onFrameSet(f);
}
......@@ -6,21 +6,22 @@ using ftl::gui2::Camera;
void Camera::activate() {
auto view = new ftl::gui2::CameraView(screen);
view->setHandle(
io->addCallback([this, view](const std::shared_ptr<ftl::data::FrameSet>& fs){
filter = io->feed()->filter({ftl::codecs::Channel::Colour});
filter->on(
[this, view](const std::shared_ptr<ftl::data::FrameSet>& fs){
view->update(fs, source_idx);
screen->redraw();
return true;
}));
}
);
view->onClose([this](){
this->io->feed()->removeFilter(filter);
});
screen->setView(view);
}
/*void Camera::deactivate() {
io->removeCallbackVideo();
io->removeCallbackAudio();
}*/
void Camera::setSource(int idx) {
source_idx = idx;
}
......@@ -15,7 +15,9 @@ public:
void setSource(int);
private:
//ftl::stream::Filter filter_;
int source_idx = -1;
ftl::stream::Feed::Filter *filter;
};
}
......
......@@ -14,10 +14,9 @@ public:
CameraView(nanogui::Widget* parent);
virtual void draw(NVGcontext *ctx) override;
void update(const std::shared_ptr<ftl::data::FrameSet>& fs, int fid);
void setHandle(ftl::Handle&& handle) { handle_ = std::move(handle); }
private:
ftl::gui2::FrameView *fview = nullptr;
ftl::Handle handle_;
};
}
......
......@@ -7,6 +7,7 @@
#include <nanogui/entypo.h>
using ftl::codecs::Channel;
using ftl::gui2::ThumbnailsController;
void ThumbnailsController::init() {
......@@ -31,12 +32,17 @@ ThumbnailsController::~ThumbnailsController() {
void ThumbnailsController::show_thumbnails() {
auto thumb_view = new ftl::gui2::Thumbnails(screen, this);
thumb_view->setHandle(
io->addCallback([this, thumb_view](const std::shared_ptr<ftl::data::FrameSet>& fs){
auto* filter = io->feed()->filter({Channel::Colour});
filter->on(
[this, thumb_view](const std::shared_ptr<ftl::data::FrameSet>& fs){
thumb_view->update(fs);
screen->redraw();
return true;
}));
});
thumb_view->onClose([this, filter](){
io->feed()->removeFilter(filter);
});
screen->setView(thumb_view);
}
......
......@@ -20,8 +20,6 @@ public:
void update(const std::shared_ptr<ftl::data::FrameSet>& fs);
virtual void draw(NVGcontext *ctx) override;
void setHandle(ftl::Handle&& handle) { handle_ = std::move(handle); }
private:
ftl::gui2::ThumbnailsController *control;
nanogui::Widget* panel;
......@@ -30,7 +28,6 @@ private:
std::shared_ptr<ftl::rgbd::FrameSet> fs_;
nanogui::Vector2i thumbsize_ = nanogui::Vector2i(320,180);
ftl::Handle handle_;
};
}
......
......@@ -12,9 +12,19 @@ class View : public nanogui::Widget {
public:
using nanogui::Widget::Widget;
virtual ~View() {}
virtual ~View() {
if(cb_close_) {
cb_close_();
}
}
virtual void render() {}// TODO remove if VR works?
/** onClose callback; view closed (destroyed) */
virtual void onClose(const std::function<void()> &cb) { cb_close_ = cb; }
private:
std::function<void()> cb_close_;
};
};
......
......@@ -70,9 +70,13 @@ struct [[nodiscard]] Handle {
* This class is used to manage callbacks. The template parameters are the
* arguments to be passed to the callback when triggered. This class is already
* thread-safe.
*
* POSSIBLE BUG: On destruction any remaining handles will be left with
* dangling pointer to Handler.
*/
template <typename ...ARGS>
struct Handler : BaseHandler {
/**
* Add a new callback function. It returns a `Handle` object that must
* remain in scope, the destructor of the `Handle` will remove the callback.
......
#ifndef _FTL_STREAMS_FEED_HPP_
#define _FTL_STREAMS_FEED_HPP_
#ifndef _FTL_STREAM_FEED_HPP_
#define _FTL_STREAM_FEED_HPP_
#include <ftl/configurable.hpp>
#include <ftl/net/universe.hpp>
......@@ -21,51 +21,89 @@
namespace ftl {
namespace stream {
using FrameSetHandler = std::function<bool(const std::shared_ptr<ftl::data::FrameSet>&)>;
using FrameSetPtr = std::shared_ptr<ftl::data::FrameSet>;
using FrameSetHandler = std::function<bool(const FrameSetPtr&)>;
class Feed : public ftl::Configurable {
public:
Feed(nlohmann::json &config, ftl::net::Universe *net);
~Feed();
/** Add source (file path, device path or URI) */
void add(const std::string &str);
void add(const ftl::URI &uri);
void setPrePipeline(std::function<void(ftl::operators::Graph&)> f);
ftl::Handle onFrameSet(const FrameSetHandler &f);
uint32_t allocateFrameSetId();
/**
* "Filtered feed"
*/
class Filter {
friend Feed;
public:
~Filter();
void createFrame(uint32_t id, ftl::data::Frame &f);
void createFrameSet(uint32_t id, int size, ftl::data::FrameSet &fs);
const std::unordered_set<ftl::codecs::Channel>& channels() const { return channels_; };
const std::unordered_set<uint32_t>& sources() const { return sources_; };
void on(const FrameSetHandler &cb);
private:
Filter(Feed* feed, const std::unordered_set<uint32_t>& sources, const std::unordered_set<ftl::codecs::Channel>& channels);
Feed* feed_;
std::unordered_set<ftl::codecs::Channel> channels_;
std::unordered_set<uint32_t> sources_;
ftl::Handler<const FrameSetPtr&> handler_;
std::vector<ftl::Handle> handles_;
};
void add(uint32_t fsid, ftl::stream::Stream *s);
//void add(ftl::rgbd::Source *s);
void updateNetSources();
void addPipeline(uint32_t fsid);
private:
// public methods acquire lock if necessary, private methods assume locking
// managed by caller
std::mutex mtx_;
ftl::net::Universe* const net_;
std::unique_ptr<ftl::data::Pool> pool_;
std::unordered_map<uint32_t, ftl::data::Session> stores_;
ftl::Handler<const std::shared_ptr<ftl::data::FrameSet>&> frameset_cb_;
std::unique_ptr<ftl::stream::Muxer> stream_;
std::unique_ptr<ftl::stream::Intercept> interceptor_;
std::unique_ptr<ftl::stream::Receiver> receiver_;
std::unique_ptr<ftl::stream::Sender> sender_;
std::mutex mtx_;
std::unordered_map<std::string, ftl::stream::Stream*> available_;
//ftl::Handler<const FrameSetPtr&> frameset_cb_;
std::unordered_map<std::string, uint32_t> fsid_lookup_;
std::unordered_map<uint32_t, ftl::stream::Stream*> streams_;
std::unordered_map<uint32_t, ftl::operators::Graph*> pre_pipelines_;
std::unordered_map<uint32_t, ftl::render::Renderer*> renderers_;
std::function<void(ftl::operators::Graph&)> pre_pipeline_builder_;
ftl::Handle handle_receiver_;
std::vector<Filter*> filters_;
ftl::Handler<ftl::stream::FrameSetPtr> handler_;
uint32_t fs_counter_ = 0;
ftl::Handle handle_receiver_;
uint32_t allocateFrameSetId();
void add(uint32_t fsid, const std::string &uri, ftl::stream::Stream *s);
/** callback for network (adds new sorces on connect/...) */
void updateNetSources();
/** select channels and sources based on current filters_; */
void select();
public:
/**
* "Filtered feed"
*/
Feed(nlohmann::json &config, ftl::net::Universe *net);
~Feed();
/** Add source (file path, device path or URI) */
uint32_t add(const std::string &str);
uint32_t add(const ftl::URI &uri);
std::vector<std::string> listSources();
ftl::operators::Graph* addPipeline(const std::string &name);
ftl::operators::Graph* addPipeline(uint32_t fsid);
/** Returns pointer to filter object. Pointers will be invalid after Feed
* is destroyed */
Filter* filter(const std::unordered_set<uint32_t> &sources, const std::unordered_set<ftl::codecs::Channel> &channels);
Filter* filter(const std::unordered_set<std::string> &sources, const std::unordered_set<ftl::codecs::Channel> &channels);
Filter* filter(const std::unordered_set<ftl::codecs::Channel> &channels);
void removeFilter(Filter* filter);
};
}
......
......@@ -6,6 +6,30 @@
#include <ftl/streams/filestream.hpp>
using ftl::stream::Feed;
using ftl::codecs::Channel;
////////////////////////////////////////////////////////////////////////////////
Feed::Filter::Filter(Feed* feed, const std::unordered_set<uint32_t>& sources, const std::unordered_set<ftl::codecs::Channel>& channels) :
feed_(feed), channels_(channels), sources_(sources) {
};
Feed::Filter::~Filter() {
}
void Feed::Filter::on(const FrameSetHandler &cb) {
std::unique_lock<std::mutex> lk(feed_->mtx_);
if (std::find(feed_->filters_.begin(), feed_->filters_.end(),this) == feed_->filters_.end()) {
throw ftl::exception("Filter does not belong to Feed; This should never happen!");
}
handles_.push_back(std::move(handler_.on(cb)));
}
////////////////////////////////////////////////////////////////////////////////
Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) :
ftl::Configurable(config), net_(net) {
......@@ -42,44 +66,110 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) :
net_->onDisconnect([this](ftl::net::Peer *) {
// TODO: maintain map between peer and sources, on disconnect remove all
// peer's source. Also map between Peers and fsids?
std::unique_lock<std::mutex> lk(mtx_);
});
handle_receiver_ = receiver_->onFrameSet(
[this](const std::shared_ptr<ftl::data::FrameSet>& fs) {
frameset_cb_.trigger(fs);
if (pre_pipelines_.count(fs->frameset()) == 1) {
pre_pipelines_[fs->frameset()]->apply(*fs, *fs, 0);
}
std::unique_lock<std::mutex> lk(mtx_);
for (auto* filter : filters_) {
if (filter->sources().empty()) {
filter->handler_.trigger(fs);
}
else {
// TODO: process partial/complete sets here (drop), that is
// intersection filter->sources() and fs->sources() is
// same as filter->sources().
// TODO: reverse map source ids required here?
for (const auto& src : filter->sources()) {
if (fs->hasFrame(src)) {
filter->handler_.trigger(fs);
break;
}
}
}
}
return true;
});
// add receiver->onFrameset() callbacks and apply pipelines there
// add callbacks for different stages
// configuration
stream_->begin();
}
Feed::~Feed() {
std::unique_lock<std::mutex> lk(mtx_);
for (auto* filter : filters_) {
delete filter;
}
// TODO stop everything and clean up
// delete pre_pipelines_
// delete
}
void Feed::setPrePipeline(std::function<void(ftl::operators::Graph&)> f) {
std::unique_lock<std::mutex> lk(mtx_);
pre_pipeline_builder_ = f;
void Feed::select() {
std::map<uint32_t, std::unordered_set<ftl::codecs::Channel>> selected_channels;
for (auto &filter : filters_) {
const auto& selected = filter->channels();
for(auto [fsid, pipeline]: pre_pipelines_) {
delete pipeline;
addPipeline(fsid);
if (filter->sources().empty()) {
// no sources: select all sources with selected channels
for (const auto& [uri, fsid] : fsid_lookup_) {
std::ignore = uri;
selected_channels[fsid].insert(selected.begin(), selected.end());
}
}
else {
// sources given
for (const auto& fsid : filter->sources()) {
if (selected_channels.count(fsid) == 0) {
selected_channels.try_emplace(fsid);
}
selected_channels[fsid].insert(selected.begin(), selected.end());
}
}
}
for (auto& [fsid, channels] : selected_channels) {
stream_->select(fsid, channels);
}
}
Feed::Filter* Feed::filter(const std::unordered_set<uint32_t> &sources,
const std::unordered_set<ftl::codecs::Channel> &channels) {
auto* filter = new Filter(this, sources, channels);
std::unique_lock<std::mutex> lk(mtx_);
filters_.push_back(filter);
select();
return filter;
}
Feed::Filter* Feed::filter(const std::unordered_set<ftl::codecs::Channel> &channels) {
return filter(std::unordered_set<uint32_t>{}, channels);
}
void Feed::addPipeline(uint32_t fsid) {
ftl::operators::Graph* Feed::addPipeline(uint32_t fsid) {
std::unique_lock<std::mutex> lk(mtx_);
if (pre_pipelines_.count(fsid) != 0) {
return;
delete pre_pipelines_[fsid];
}
pre_pipelines_[fsid] = ftl::config::create<ftl::operators::Graph>
(this, std::string("pre_filters") + std::to_string(fsid));
if (pre_pipeline_builder_) {
pre_pipeline_builder_(*pre_pipelines_[fsid]);
return pre_pipelines_[fsid];
}
void Feed::removeFilter(Feed::Filter* filter) {
std::unique_lock<std::mutex> lk(mtx_);
auto iter = std::find(filters_.begin(), filters_.end(), filter);
if (iter != filters_.end()) {
filters_.erase(iter);
delete filter;
}
}
......@@ -89,14 +179,14 @@ void Feed::updateNetSources() {
int n = 0;
for (auto s : netcams) {
if (available_.count(s) == 0) {
if (fsid_lookup_.count(s) == 0) {
auto *stream = ftl::create<ftl::stream::Net>
(this, std::string("netstream")+std::to_string(available_.size()), net_);
(this, std::string("netstream")+std::to_string(fsid_lookup_.size()), net_);
int fsid = allocateFrameSetId();
available_[s] = stream;
stream->set("uri", s);
add(fsid, stream);
add(fsid, s, stream);
n++;
......@@ -115,14 +205,14 @@ void Feed::updateNetSources() {
}*/
}
void Feed::add(uint32_t fsid, ftl::stream::Stream* stream) {
stores_.try_emplace(fsid);
addPipeline(fsid);
void Feed::add(uint32_t fsid, const std::string &uri, ftl::stream::Stream* stream) {
fsid_lookup_[uri] = fsid;
streams_[fsid] = stream;
stream_->add(stream, fsid);
stream->begin();
}
void Feed::add(const std::string &path) {
uint32_t Feed::add(const std::string &path) {
std::unique_lock<std::mutex> lk(mtx_);
ftl::URI uri(path);
......@@ -150,7 +240,10 @@ void Feed::add(const std::string &path) {
fstream->set("filename", uri.getPath());
}
add(fsid, fstream);
// TODO: URI normalization; should happen in add(,,) or add(,,,) take
// ftl::URI instead of std::string as argument. Note the bug above.
add(fsid, path, fstream);
return fsid;
}
else if (scheme == ftl::URI::SCHEME_DEVICE) {
//auto &config = getConfig()["sources"].emplace_back();
......@@ -165,25 +258,20 @@ void Feed::add(const std::string &path) {
// just connect, onConnect callback will add the stream
// TODO: should save URIs and avoid connecting more than once?
// TODO: not possible to return fsid
net_->connect(path);
}
else{
throw ftl::exception("bad uri");
}
}
void Feed::createFrameSet(uint32_t fsid, int size, ftl::data::FrameSet &fs) {
}
void Feed::createFrame(uint32_t fsid, ftl::data::Frame &f) {
//f = ftl::data::Frame(pool_.get(), &stores_[fsid], ftl::data::FrameID(fsid, 0) ,0);
return -1;
}
uint32_t Feed::allocateFrameSetId() {
return fs_counter_++;
}
ftl::Handle Feed::onFrameSet(const ftl::stream::FrameSetHandler &f) {
/*ftl::Handle Feed::onFrameSet(const ftl::stream::FrameSetHandler &f) {
return frameset_cb_.on(f);
}
}*/
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment