Skip to content
Snippets Groups Projects
Commit 8bd34373 authored by Sebastian Hahta's avatar Sebastian Hahta
Browse files

return queue size

parent 7e2c2251
Branches
No related tags found
No related merge requests found
......@@ -78,20 +78,21 @@ namespace ftl {
extern ctpl::thread_pool pool;
/** FIFO task queue for thread pool. */
template<typename Task>
class TaskQueue {
public:
void queue(Task task) {
/** Add task to the queue. Returns number of tasks in queue before the new task.
* If queue does not accept new tasks, returns -1 (stopped). */
int queue(Task task) {
auto lk = std::unique_lock(mtx_);
if (stop_) { return; }
if (stop_) { return -1; }
queue_.push_back(std::move(task));
start_and_unlock(lk);
}
/** Try to queue new task*/
/** Try to queue new task if queue is not larger than max_queue_size.
* Returns true if task was queued. */
bool try_queue(Task task, int max_queue_size) {
auto lk = std::unique_lock(mtx_);
if (stop_) { return false; }
......@@ -101,34 +102,49 @@ public:
return true;
}
/** Wait until finished. Returns if not stopped */
void wait()
{
/** Wait for any running tasks. Returns immediately if not stopped. */
void wait() {
auto lk = std::unique_lock(mtx_);
if (!stop_) { return; }
if (queue_.size() == 0) { return; }
cv_.wait_for(lk, [&](){ return queue_.size() == 0; });
if (!busy_) { return; }
cv_.wait_for(lk, [&](){ busy_; });
}
/** Stop any further processing. Call wait() to wait for any running tasks to complete. */
void stop() {
auto lk = std::unique_lock(mtx_);
stop_ = true;
if (!run_) {
if (!busy_) {
lk.unlock();
cv_.notify_all();
}
}
/** Remove all queued tasks */
void clear() {
auto lk = std::unique_lock(mtx_);
queue_.clear();
}
/** Continue processing remaining queue and accept new tasks to the queue. */
void resume() {
auto lk = std::unique_lock(mtx_);
stop_ = false;
if ((queue_.size() > 0) && !busy_) {
start_and_unlock(lk);
}
}
private:
std::mutex mtx_;
std::condition_variable cv_;
std::deque<Task> queue_;
bool run_ = false;
bool busy_ = false;
bool stop_ = false;
void start_and_unlock(std::unique_lock<std::mutex>& lk) {
if (!run_ && !stop_) {
run_ = true;
if (!busy_ && !stop_) {
busy_ = true;
lk.unlock();
pool.push(std::bind(&TaskQueue<Task>::run, this));
}
......@@ -139,8 +155,8 @@ private:
Task task;
{
auto lk = std::unique_lock(mtx_);
if (!run_ || stop_ || (queue_.size() == 0)) {
run_ = false;
if (stop_ || (queue_.size() == 0)) {
busy_ = false;
break;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment