diff --git a/include/ftl/counter.hpp b/include/ftl/counter.hpp new file mode 100644 index 0000000000000000000000000000000000000000..380e0e32d75d857d15af1a5e7ba7e96ec01530fc --- /dev/null +++ b/include/ftl/counter.hpp @@ -0,0 +1,37 @@ +/** + * @file counter.hpp + * @copyright Copyright (c) 2022 University of Turku, MIT License + * @author Nicolas Pope + */ + +#pragma once + +#include <atomic> + +namespace ftl { + +class Counter { + public: + inline explicit Counter(std::atomic_int *c): counter_(c) { + ++(*c); + } + Counter() = delete; + inline Counter(const Counter &c): counter_(c.counter_) { + if (counter_) { + ++(*counter_); + } + } + inline Counter(Counter &&c): counter_(c.counter_) { + c.counter_ = nullptr; + } + inline ~Counter() { + if (counter_) { + --(*counter_); + } + } + + private: + std::atomic_int *counter_; +}; + +} // namespace ftl diff --git a/include/ftl/handle.hpp b/include/ftl/handle.hpp index d59c6476c13385ba3cb6a9d83ddb874df87274af..86aeefc2e2357c8b10ad87b4f55c2c87b40ad103 100644 --- a/include/ftl/handle.hpp +++ b/include/ftl/handle.hpp @@ -12,6 +12,7 @@ #include <string> #include <ftl/threads.hpp> #include <ftl/exception.hpp> +#include <ftl/counter.hpp> namespace ftl { @@ -150,8 +151,7 @@ struct Handler : BaseHandler { * single thread, not in parallel. */ void triggerAsync(ARGS ...args) { - ++jobs_; - ftl::pool.push([this, args...](int id) { + ftl::pool.push([this, c = std::move(ftl::Counter(&jobs_)), args...](int id) { bool hadFault = false; std::string faultMsg; std::unique_lock<std::shared_mutex> lk(mutex_); @@ -167,7 +167,6 @@ struct Handler : BaseHandler { else ++i; } - --jobs_; if (hadFault) throw FTL_Error("Callback exception: " << faultMsg); }); } @@ -179,16 +178,13 @@ struct Handler : BaseHandler { */ void triggerParallel(ARGS ...args) { std::unique_lock<std::shared_mutex> lk(mutex_); - jobs_ += callbacks_.size(); for (auto i = callbacks_.begin(); i != callbacks_.end(); ++i) { - ftl::pool.push([this, f = i->second, args...](int id) { + ftl::pool.push([this, c = std::move(ftl::Counter(&jobs_)), f = i->second, args...](int id) { try { f(args...); } catch (const ftl::exception &e) { - --jobs_; throw e; } - --jobs_; }); } } diff --git a/src/peer.cpp b/src/peer.cpp index 5f6b1b60f5d09ee4551177af499acd785b6ae4b7..2ab4ca09f797831b5c3b101401c97c1a2520ba4b 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -15,6 +15,7 @@ #include <ftl/lib/loguru.hpp> #include <ftl/lib/ctpl_stl.hpp> +#include <ftl/counter.hpp> #include "common.hpp" @@ -277,20 +278,13 @@ NodeType Peer::getType() const { } void Peer::_createJob() { - ++job_count_; - - ftl::pool.push([this](int id) { + ftl::pool.push([this, c = std::move(ftl::Counter(&job_count_))](int id) { try { while (_data()); } catch (const std::exception &e) { net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what()); } - --job_count_; }); - - if (ftl::pool.size() == 0) { - --job_count_; - } } void Peer::data() { diff --git a/src/streams/filestream.cpp b/src/streams/filestream.cpp index 335e9f2e3395515e459b3991763a52772717073d..6f4e32ada8a3422b1440165ba99a803dae625df4 100644 --- a/src/streams/filestream.cpp +++ b/src/streams/filestream.cpp @@ -15,6 +15,7 @@ #include <chrono> #include "filestream.hpp" #include <ftl/time.hpp> +#include <ftl/counter.hpp> #include "packetMsgpack.hpp" #define LOGURU_REPLACE_GLOG 1 @@ -271,8 +272,6 @@ bool File::tick(int64_t ts) { if (i->timestamp <= fsdata.timestamp) { StreamPacket &spkt = *i; - ++jobs_; - if (spkt.channel == Channel::kEndFrame) { fsdata.needs_endframe = false; } @@ -298,7 +297,7 @@ bool File::tick(int64_t ts) { ++i; // TODO(Nick): Probably better not to do a thread per packet - ftl::pool.push([this, i = j](int id) { + ftl::pool.push([this, c = std::move(ftl::Counter(&jobs_)), i = j](int id) { StreamPacket &spkt = *i; Packet &pkt = *i; @@ -308,7 +307,6 @@ bool File::tick(int64_t ts) { UNIQUE_LOCK(data_mutex_, dlk); data_.erase(i); - --jobs_; }); } else { ++complete_count; diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index c551bfde86c20fb8e248238508d818a71b0e95ac..c6a546458614fa2405bf26e9023d734764eadb98 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -13,6 +13,7 @@ #include <chrono> #include "netstream.hpp" #include <ftl/time.hpp> +#include <ftl/counter.hpp> #include "../uuidMSGPACK.hpp" #include "packetMsgpack.hpp" @@ -359,15 +360,13 @@ void Net::_run() { spkt = ¤t->packets.first; pkt = ¤t->packets.second; - ++state->active; - ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) { + ftl::pool.push([this, c = std::move(ftl::Counter(&state->active)), buf = &*current, spkt, pkt, state](int ix) { try { _processPacket(buf->peer, 0, *spkt, *pkt); } catch (const std::exception &e) { LOG(ERROR) << "Packet processing error: " << e.what(); } buf->done = true; - --state->active; }); if (spkt->channel == Channel::kEndFrame) {