Skip to content
Snippets Groups Projects
Commit 20098df9 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'bug/job-counters' into 'main'

Use a better thread job counter

See merge request beyondaka/beyond-protocol!72
parents 986a7b54 ca6395b7
No related branches found
Tags 0.6.15
No related merge requests found
/**
* @file counter.hpp
* @copyright Copyright (c) 2022 University of Turku, MIT License
* @author Nicolas Pope
*/
#pragma once
#include <atomic>
namespace ftl {
class Counter {
public:
inline explicit Counter(std::atomic_int *c): counter_(c) {
++(*c);
}
Counter() = delete;
inline Counter(const Counter &c): counter_(c.counter_) {
if (counter_) {
++(*counter_);
}
}
inline Counter(Counter &&c): counter_(c.counter_) {
c.counter_ = nullptr;
}
inline ~Counter() {
if (counter_) {
--(*counter_);
}
}
private:
std::atomic_int *counter_;
};
} // namespace ftl
......@@ -12,6 +12,7 @@
#include <string>
#include <ftl/threads.hpp>
#include <ftl/exception.hpp>
#include <ftl/counter.hpp>
namespace ftl {
......@@ -150,8 +151,7 @@ struct Handler : BaseHandler {
* single thread, not in parallel.
*/
void triggerAsync(ARGS ...args) {
++jobs_;
ftl::pool.push([this, args...](int id) {
ftl::pool.push([this, c = std::move(ftl::Counter(&jobs_)), args...](int id) {
bool hadFault = false;
std::string faultMsg;
std::unique_lock<std::shared_mutex> lk(mutex_);
......@@ -167,7 +167,6 @@ struct Handler : BaseHandler {
else
++i;
}
--jobs_;
if (hadFault) throw FTL_Error("Callback exception: " << faultMsg);
});
}
......@@ -179,16 +178,13 @@ struct Handler : BaseHandler {
*/
void triggerParallel(ARGS ...args) {
std::unique_lock<std::shared_mutex> lk(mutex_);
jobs_ += callbacks_.size();
for (auto i = callbacks_.begin(); i != callbacks_.end(); ++i) {
ftl::pool.push([this, f = i->second, args...](int id) {
ftl::pool.push([this, c = std::move(ftl::Counter(&jobs_)), f = i->second, args...](int id) {
try {
f(args...);
} catch (const ftl::exception &e) {
--jobs_;
throw e;
}
--jobs_;
});
}
}
......
......@@ -15,6 +15,7 @@
#include <ftl/lib/loguru.hpp>
#include <ftl/lib/ctpl_stl.hpp>
#include <ftl/counter.hpp>
#include "common.hpp"
......@@ -277,20 +278,13 @@ NodeType Peer::getType() const {
}
void Peer::_createJob() {
++job_count_;
ftl::pool.push([this](int id) {
ftl::pool.push([this, c = std::move(ftl::Counter(&job_count_))](int id) {
try {
while (_data());
} catch (const std::exception &e) {
net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what());
}
--job_count_;
});
if (ftl::pool.size() == 0) {
--job_count_;
}
}
void Peer::data() {
......
......@@ -15,6 +15,7 @@
#include <chrono>
#include "filestream.hpp"
#include <ftl/time.hpp>
#include <ftl/counter.hpp>
#include "packetMsgpack.hpp"
#define LOGURU_REPLACE_GLOG 1
......@@ -271,8 +272,6 @@ bool File::tick(int64_t ts) {
if (i->timestamp <= fsdata.timestamp) {
StreamPacket &spkt = *i;
++jobs_;
if (spkt.channel == Channel::kEndFrame) {
fsdata.needs_endframe = false;
}
......@@ -298,7 +297,7 @@ bool File::tick(int64_t ts) {
++i;
// TODO(Nick): Probably better not to do a thread per packet
ftl::pool.push([this, i = j](int id) {
ftl::pool.push([this, c = std::move(ftl::Counter(&jobs_)), i = j](int id) {
StreamPacket &spkt = *i;
Packet &pkt = *i;
......@@ -308,7 +307,6 @@ bool File::tick(int64_t ts) {
UNIQUE_LOCK(data_mutex_, dlk);
data_.erase(i);
--jobs_;
});
} else {
++complete_count;
......
......@@ -13,6 +13,7 @@
#include <chrono>
#include "netstream.hpp"
#include <ftl/time.hpp>
#include <ftl/counter.hpp>
#include "../uuidMSGPACK.hpp"
#include "packetMsgpack.hpp"
......@@ -359,15 +360,13 @@ void Net::_run() {
spkt = &current->packets.first;
pkt = &current->packets.second;
++state->active;
ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) {
ftl::pool.push([this, c = std::move(ftl::Counter(&state->active)), buf = &*current, spkt, pkt, state](int ix) {
try {
_processPacket(buf->peer, 0, *spkt, *pkt);
} catch (const std::exception &e) {
LOG(ERROR) << "Packet processing error: " << e.what();
}
buf->done = true;
--state->active;
});
if (spkt->channel == Channel::kEndFrame) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment