-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy paththreadpool.hpp
110 lines (92 loc) · 2.37 KB
/
threadpool.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#pragma once
#include "queue.hpp"
#include "thread.hpp"
class ThreadPool : noncopyable
{
public:
using Task = Thread::Task;
explicit ThreadPool(uint64_t maxTasks = static_cast<uint64_t>(Thread::hardwareConcurrency()) * 4,
uint32_t maxThreads = Thread::hardwareConcurrency())
: m_maxThreads(maxThreads)
, m_queue(maxTasks)
{
assert(maxTasks > 0);
assert(maxThreads > 0);
}
~ThreadPool()
{
if (isRunning()) {
stop();
}
}
void start()
{
assert(!isRunning());
m_queue.start();
for (uint32_t i = 0; i < m_maxThreads; ++i) {
auto *thread = new Thread([this](std::stop_token token) { runInThread(token); });
m_threads.emplace_back(thread);
thread->start();
}
}
void stop()
{
m_queue.stop();
m_threads.clear();
}
void waitForDone()
{
if (!isRunning()) {
return;
}
while (!m_allDone.load()) {
m_allDone.wait(false);
}
}
bool addTask(Task task)
{
if (!isRunning()) {
return false;
}
return m_queue.push(std::move(task), [&]() {
m_allDone.store(false);
m_taskCount.fetch_add(1);
});
}
void clearTasks()
{
m_queue.clear();
m_taskCount.store(0);
notifyAllDone();
}
[[nodiscard]] auto activeThreadCount() const -> std::size_t { return m_threads.size(); }
[[nodiscard]] auto queuedTaskCount() const -> std::size_t { return m_queue.size(); }
[[nodiscard]] auto isRunning() const -> bool
{
return m_threads.empty() ? false : m_threads[0]->isRunning();
}
private:
void runInThread(std::stop_token token)
{
while (!token.stop_requested()) {
Task task;
if (m_queue.pop(task)) {
task(token);
m_taskCount.fetch_sub(1);
notifyAllDone();
}
}
}
void notifyAllDone()
{
if (m_taskCount.load() == 0) {
m_allDone.store(true);
m_allDone.notify_all();
}
}
uint32_t m_maxThreads;
Queue<Task> m_queue;
std::atomic_uint64_t m_taskCount{0};
std::atomic_bool m_allDone{true};
std::vector<std::unique_ptr<Thread>> m_threads;
};