Skip to content

feat: thread pool #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: universal_stream_join
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions benchmark/src/Benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <chrono>
#include <filesystem>
#include <memory>
#include <type_traits>

using namespace std;
using namespace AllianceDB;
Expand All @@ -49,6 +50,14 @@ DEFINE_uint32(num_workers, 2, "Number of workers");
*/
int main(int argc, char **argv)
{
string usage = "\n";
for (auto i = 0; i < extent<decltype(algo_names)>::value; ++i)
{
usage += "\t" + to_string(i) + ": ";
usage += algo_names[i];
usage += "\n";
}
gflags::SetUsageMessage(usage);
Param param;
gflags::ParseCommandLineFlags(&argc, &argv, true);

Expand Down Expand Up @@ -88,17 +97,8 @@ int main(int argc, char **argv)
break;
}
case AlgoType::HandshakeJoin:
{
auto engine = make_unique<EagerEngine>(param);
engine->Run(ctx);
break;
}
case AlgoType::HandshakeJoinOrigin:
case AlgoType::SplitJoin:
{
auto engine = make_unique<EagerEngine>(param);
engine->Run(ctx);
break;
}
case AlgoType::SplitJoinOrigin:
{
auto engine = make_unique<EagerEngine>(param);
Expand Down
6 changes: 5 additions & 1 deletion include/Common/Context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "Common/Param.hpp"
#include "Common/Result.hpp"
#include "Common/Stream.hpp"
#include "Utils/ThreadPool.hpp"

namespace AllianceDB
{
Expand All @@ -12,7 +13,10 @@ struct Context
const Param &param;
ResultPtr res;
StreamPtr sr, ss;
Context(const Param &param) : param(param), res(std::make_shared<JoinResult>(param)) {}
ThreadPool pool;
Context(const Param &param)
: param(param), res(std::make_shared<JoinResult>(param)), pool(param.num_workers)
{}
};

} // namespace AllianceDB
Expand Down
20 changes: 11 additions & 9 deletions include/Common/Types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,19 @@ enum class StreamType
}; /*!< Type of the stream, default R and S */
enum class AlgoType
{
Verify = 0,
LWJ = 1,
HandshakeJoin = 2,
SplitJoin = 3,
IBWJ = 4,
HashJoin = 5,
SplitJoinOrigin = 6,
Verify = 0,
LWJ = 1,
HandshakeJoin = 2,
SplitJoin = 3,
IBWJ = 4,
HashJoin = 5,
HandshakeJoinOrigin = 6,
SplitJoinOrigin = 7,
};

constexpr std::string_view algo_names[32] = {
"Verify", "LWJ", "HandshakeJoin", "SplitJoin", "IBWJ", "HashJoin", "SplitJoinOrigin",
constexpr std::string_view algo_names[] = {
"Verify", "LWJ", "HandshakeJoin", "SplitJoin",
"IBWJ", "HashJoin", "HandshakeJoinOrigin", "SplitJoinOrigin",
};

using ThreadPtr = std::shared_ptr<std::thread>; /*!< Type of the thread pointer */
Expand Down
23 changes: 23 additions & 0 deletions include/Utils/ThreadPool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#ifndef INCLUDE_UTILS_THREADPOOL_HPP_
#define INCLUDE_UTILS_THREADPOOL_HPP_

#include <boost/asio/post.hpp>
#include <boost/asio/thread_pool.hpp>

class ThreadPool
{
public:
ThreadPool(size_t num_threads) : num_threads(num_threads), pool(num_threads) {}
template <typename T>
void Post(T func)
{
boost::asio::post(pool, func);
}
void Wait() { pool.join(); }

private:
size_t num_threads;
boost::asio::thread_pool pool;
};

#endif
9 changes: 7 additions & 2 deletions src/Engine/EagerEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ JoinPtr EagerEngine::New()
{
return make_shared<HandshakeJoin>(param, windows.size());
}
case AlgoType::HandshakeJoinOrigin:
{
return make_shared<HandshakeJoin>(param, windows.size());
}
case AlgoType::SplitJoin:
{
return make_shared<SplitJoin>(param, windows.size());
Expand All @@ -48,7 +52,7 @@ JoinPtr EagerEngine::New()
}
default:
{
FATAL("Unsupported algorithm %d", param.algo);
FATAL("Unsupported algorithm %d", static_cast<int>(param.algo));
}
}
}
Expand All @@ -61,7 +65,7 @@ void EagerEngine::Run(Context &ctx)
{
auto nextS = ss->Next(), nextR = sr->Next();
// no new joiner
if (param.algo == AlgoType::SplitJoinOrigin)
if (param.algo == AlgoType::SplitJoinOrigin || param.algo == AlgoType::HandshakeJoinOrigin)
{
if (nextR->ts == 0)
{
Expand Down Expand Up @@ -100,4 +104,5 @@ void EagerEngine::Run(Context &ctx)
windows[i]->Wait();
LOG("algo[%d/%d] joined", i, windows.size());
}
ctx.pool.Wait();
}
12 changes: 7 additions & 5 deletions src/Join/HandshakeJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ void HandshakeJoin::Start(Context &ctx)
{
for (auto &w : workers)
{
w->Start(ctx);
// w->Start(ctx);
auto func = [&w, &ctx]() { w->Run(ctx); };
ctx.pool.Post(func);
}
}

Expand All @@ -82,12 +84,12 @@ void HandshakeJoin::Wait()

HandshakeJoin::Worker::Worker(const Param &param)
: param(param),
inputr(param.window * 2),
inputs(param.window * 2),
inputr(param.num_tuples),
inputs(param.num_tuples),
msgi(1),
msgo(1),
left_recv_queue(std::make_shared<spsc_queue<Msg>>(param.window * 2)),
right_recv_queue(std::make_shared<spsc_queue<Msg>>(param.window * 2))
left_recv_queue(std::make_shared<spsc_queue<Msg>>(param.num_tuples)),
right_recv_queue(std::make_shared<spsc_queue<Msg>>(param.num_tuples))
{}

void HandshakeJoin::Worker::Run(Context &ctx)
Expand Down
3 changes: 2 additions & 1 deletion src/Join/SplitJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ void SplitJoin::JoinCore::Run(Context &ctx)
void SplitJoin::JoinCore::Start(Context &ctx)
{
auto func = [this, &ctx]() { this->Run(ctx); };
t = make_shared<thread>(func);
// t = make_shared<thread>(func);
ctx.pool.Post(func);
}

void SplitJoin::JoinCore::Store(TuplePtr tuple)
Expand Down