This document introduces a C++ class, tpool, which simplifies the usage of Windows' new Threadpool API. The class is available in two modes of operation.
Part of my multithreading library: https://github.com/WindowsNT/mt
Introduction
Windows has a new Threadpool
API with a somewhat messy interface (see example code). Here is a class that simplifies its usage.
Terminology
- A Work item is a worker thread that can run in the background.
- An IO item is a worker thread that gets notified when I/O in a Handle occurs.
- A Timer item is a worker thread that fires after a time set.
- A Wait item is a worker thead that gets notified when some object is signaled.
- A Cleanup Group is the API way to release all the handles after waiting for them.
Two Modes
template <bool AutoDestruct = true>
class tpool
The class is available in two modes, one whose destruction is handled by the API, and the other whose destruction is handled through the class smart pointers.
- Use with
<true>
when you want to use a Cleanup Group. This lets the API keep track of your objects, and with a simple Join()
, you can wait for all of them. - Use with
<false>
when you want to manage the items yourself. This allows you to call Join()
with the items you want to wait, cancel a specific item, etc.
Class Handle
template <typename T,typename Destruction = destruction_policy<T>>
class handle
This smart pointer manages the items. If you instantiate tpool
with true
, these handles automatically destroy their items, with the aid of template specialization for each type:
template<> class destruction_policy<PTP_POOL>
{ public: static void destruct(PTP_POOL h) { CloseThreadpool(h); } };
template<> class destruction_policy<PTP_WORK>
{ public: static void destruct(PTP_WORK h) { CloseThreadpoolWork(h); } };
template<> class destruction_policy<PTP_WAIT>
{ public: static void destruct(PTP_WAIT h) { CloseThreadpoolWait(h); } };
template<> class destruction_policy<PTP_TIMER>
{ public: static void destruct(PTP_TIMER h) { CloseThreadpoolTimer(h); } };
template<> class destruction_policy<PTP_IO>
{ public: static void destruct(PTP_IO h) { CloseThreadpoolIo(h); } };
template<> class destruction_policy<PTP_CLEANUP_GROUP>
{ public: static void destruct(PTP_CLEANUP_GROUP h) { CloseThreadpoolCleanupGroup(h); } };
The handle implements move/copy semantics, etc. with the aid of a std::shared_ptr
.
tpool::Create
bool Create(unsigned long nmin = 1,unsigned long nmax = 1)
Creates the interfaces needed. You can pass the minimum and maximum number of threads you need.
Item Creation
There are four specializations to create your items:
template <> handle<PTP_WORK>
CreateItem<PTP_WORK,PTP_WORK_CALLBACK>(PTP_WORK_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_WAIT>
CreateItem<PTP_WAIT,PTP_WAIT_CALLBACK>(PTP_WAIT_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_TIMER>
CreateItem<PTP_TIMER,PTP_TIMER_CALLBACK>(PTP_TIMER_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_IO>
CreateItem<PTP_IO,PTP_WIN32_IO_CALLBACK>(PTP_WIN32_IO_CALLBACK cb,PVOID opt,HANDLE hY);
Each call takes the callback to be executed, a parameter to pass to the callback. CreateItem<PTP_IO>
also takes a HANDLE
value for I/O operations.
Item Running
There are three specializations to run your items (A Wait
object does not "run"). With the aid of std::tuple
, we can use the same function signatures with different parameters:
template <> void RunItem<PTP_WORK>(handle<PTP_WORK> h,std::tuple<>);
template <> void RunItem<PTP_TIMER>(handle<PTP_TIMER> h,std::tuple<FILETIME*,DWORD,DWORD>t);
template <> void RunItem<PTP_IO>(handle<PTP_IO> h,std::tuple<bool> t);
RunItem<PTP_TIMER>
also takes the timer parameters (See SetThreadpoolTimer) and RunItem<PTP_IO>
takes a bool
, true
in order to cancel the I/O or false
to start it.
Item Waiting
There are four specializations to wait for your items:
template <> void Wait<PTP_WORK>(handle<PTP_WORK> h,bool Cancel);
template <> void Wait<PTP_WAIT>(handle<PTP_WAIT> h,bool Cancel);
template <> void Wait<PTP_IO>(handle<PTP_IO> h,bool Cancel);
template <> void Wait<PTP_TIMER>(handle<PTP_TIMER> h,bool Cancel);
Passing true
in Cancel
forces the function to kill the object if it hasn't started yet.
Joining
With the aid of type traits, we define the join
function in two ways, depending on if tpool
is instantiated with true
or false
.
template <bool Q = AutoDestruct>
typename std::enable_if<Q,void>::type
Join(bool Cancel = false);
For the case that you have auto destruction, Join
waits for all your current items to finish. If Cancel
is true
, then all items that haven't yet started are cancelled.
template <bool Q = AutoDestruct>
typename std::enable_if<!Q,void>::type
Join(bool Cancel = false,
std::initializer_list<handle<PTP_WORK>> h1 = std::initializer_list<handle<PTP_WORK>>({}),
std::initializer_list<handle<PTP_TIMER>> h2 = std::initializer_list<handle<PTP_TIMER>>({}),
std::initializer_list<handle<PTP_WAIT>> h3 = std::initializer_list<handle<PTP_WAIT>>({}),
std::initializer_list<handle<PTP_IO>> h4 = std::initializer_list<handle<PTP_IO>>({})
);
For the case that you have manual destruction, Join
waits for the items you specify to finish. If Cancel
is true
, then all items that haven't yet started are cancelled.
The Final Code
#include <windows.h>
#include <functional>
#include <memory>
#include <type_traits>
namespace tpoollib
{
template<typename T>
class destruction_policy
{
public:
static void destruct(T h)
{
static_assert(false,"Must define destructor");
}
};
template<> class destruction_policy<PTP_POOL>
{ public: static void destruct(PTP_POOL h) { CloseThreadpool(h); } };
template<> class destruction_policy<PTP_WORK>
{ public: static void destruct(PTP_WORK h) { CloseThreadpoolWork(h); } };
template<> class destruction_policy<PTP_WAIT>
{ public: static void destruct(PTP_WAIT h) { CloseThreadpoolWait(h); } };
template<> class destruction_policy<PTP_TIMER>
{ public: static void destruct(PTP_TIMER h) { CloseThreadpoolTimer(h); } };
template<> class destruction_policy<PTP_IO>
{ public: static void destruct(PTP_IO h) { CloseThreadpoolIo(h); } };
template<> class destruction_policy<PTP_CLEANUP_GROUP>
{ public: static void destruct(PTP_CLEANUP_GROUP h)
{ CloseThreadpoolCleanupGroup(h); } };
template <typename T,typename Destruction = destruction_policy<T>>
class handle
{
private:
T hX = 0;
bool NoDestruct = true;
std::shared_ptr<size_t> ptr = std::make_shared<size_t>();
public:
void Close()
{
if (!ptr || !ptr.unique())
{
ptr.reset();
return;
}
ptr.reset();
if (hX != 0 && !NoDestruct)
Destruction::destruct(hX);
hX = 0;
}
handle()
{
hX = 0;
}
~handle()
{
Close();
}
handle(const handle& h)
{
Dup(h);
}
handle(handle&& h)
{
Move(std::forward<handle>(h));
}
handle(T hY,bool NoDestructOnClose)
{
hX = hY;
NoDestruct = NoDestructOnClose;
}
handle& operator =(const handle& h)
{
Dup(h);
return *this;
}
handle& operator =(handle&& h)
{
Move(std::forward<handle>(h));
return *this;
}
void Dup(const handle& h)
{
Close();
NoDestruct = h.NoDestruct;
hX = h.hX;
ptr = h.ptr;
}
void Move(handle&& h)
{
Close();
hX = h.hX;
ptr = h.ptr;
NoDestruct = h.NoDestruct;
h.ptr.reset();
h.hX = 0;
h.NoDestruct = false;
}
operator T() const
{
return hX;
}
};
template <bool AutoDestruct = true>
class tpool
{
private:
handle<PTP_POOL> p;
handle<PTP_CLEANUP_GROUP> pcg;
TP_CALLBACK_ENVIRON env;
tpool(const tpool&) = delete;
tpool(tpool&&) = delete;
void operator=(const tpool&) = delete;
void operator=(tpool&&) = delete;
public:
tpool()
{
}
~tpool()
{
End();
}
void End()
{
Join();
DestroyThreadpoolEnvironment(&env);
p.Close();
}
bool Create(unsigned long nmin = 1,unsigned long nmax = 1)
{
bool jauto = AutoDestruct;
InitializeThreadpoolEnvironment(&env);
handle<PTP_POOL> cx(CreateThreadpool(0),false);
p = cx;
if (!p)
{
End();
return false;
}
if (!SetThreadpoolThreadMinimum(p,nmin))
{
End();
return false;
}
SetThreadpoolThreadMaximum(p,nmax);
if (jauto)
{
handle<PTP_CLEANUP_GROUP> cx(CreateThreadpoolCleanupGroup(),false);
pcg = cx;
if (!pcg)
{
End();
return false;
}
}
SetThreadpoolCallbackPool(&env,p);
SetThreadpoolCallbackCleanupGroup(&env,pcg,0);
return true;
}
template <typename J>
void Wait(handle<J> h,bool Cancel = false)
{
static_assert(false,"No Wait function is defined");
}
template <typename J,typename CB_J>
handle<J> CreateItem(CB_J cb,PVOID opt = 0,HANDLE hX = 0)
{
static_assert(false,"No Create function is defined");
}
template <typename J,typename ...A>
void RunItem(handle<J> h,std::tuple<A...> = std::make_tuple<>())
{
static_assert(false,"No Run function is defined");
}
template <> handle<PTP_WORK> CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
(PTP_WORK_CALLBACK cb,PVOID opt,HANDLE)
{
handle<PTP_WORK> a(CreateThreadpoolWork(cb,opt,&env),AutoDestruct);
return a;
}
template <> void RunItem<PTP_WORK>(handle<PTP_WORK> h,std::tuple<>)
{
SubmitThreadpoolWork(h);
}
template <> void Wait<PTP_WORK>(handle<PTP_WORK> h,bool Cancel)
{
WaitForThreadpoolWorkCallbacks(h,Cancel);
}
template <> handle<PTP_WAIT> CreateItem<PTP_WAIT,PTP_WAIT_CALLBACK>
(PTP_WAIT_CALLBACK cb,PVOID opt,HANDLE)
{
handle<PTP_WAIT> a(CreateThreadpoolWait(cb,opt,&env),AutoDestruct);
return a;
}
template <> void Wait<PTP_WAIT>(handle<PTP_WAIT> h,bool Cancel)
{
WaitForThreadpoolWaitCallbacks(h,Cancel);
}
template <> handle<PTP_TIMER> CreateItem<PTP_TIMER,PTP_TIMER_CALLBACK>
(PTP_TIMER_CALLBACK cb,PVOID opt,HANDLE)
{
handle<PTP_TIMER> a(CreateThreadpoolTimer(cb,opt,&env),AutoDestruct);
return a;
}
template <> void RunItem<PTP_TIMER>(handle<PTP_TIMER> h,
std::tuple<FILETIME*,DWORD,DWORD>t)
{
SetThreadpoolTimer(h,std::get<0>(t),std::get<1>(t),std::get<2>(t));
}
template <> void Wait<PTP_TIMER>(handle<PTP_TIMER> h,bool Cancel)
{
WaitForThreadpoolTimerCallbacks(h,Cancel);
}
template <> handle<PTP_IO> CreateItem<PTP_IO,PTP_WIN32_IO_CALLBACK>
(PTP_WIN32_IO_CALLBACK cb,PVOID opt,HANDLE hY)
{
handle<PTP_IO> a(CreateThreadpoolIo(hY,cb,opt,&env),AutoDestruct);
return a;
}
template <> void RunItem<PTP_IO>(handle<PTP_IO> h,std::tuple<bool> t)
{
bool Cancel = std::get<0>(t);
if (Cancel)
CancelThreadpoolIo(h);
else
StartThreadpoolIo(h);
}
template <> void Wait<PTP_IO>(handle<PTP_IO> h,bool Cancel)
{
WaitForThreadpoolIoCallbacks(h,Cancel);
}
template <bool Q = AutoDestruct>
typename std::enable_if<Q,void>::type
Join(bool Cancel = false)
{
if (pcg)
{
CloseThreadpoolCleanupGroupMembers(pcg,Cancel,0);
pcg.Close();
}
}
template <bool Q = AutoDestruct>
typename std::enable_if<!Q,void>::type
Join(bool Cancel = false,
std::initializer_list<handle<PTP_WORK>> h1 =
std::initializer_list<handle<PTP_WORK>>({}),
std::initializer_list<handle<PTP_TIMER>> h2 =
std::initializer_list<handle<PTP_TIMER>>({}),
std::initializer_list<handle<PTP_WAIT>> h3 =
std::initializer_list<handle<PTP_WAIT>>({}),
std::initializer_list<handle<PTP_IO>> h4 =
std::initializer_list<handle<PTP_IO>>({})
)
{
for (auto a : h1)
Wait<PTP_WORK>(a,Cancel);
for (auto a : h2)
Wait<PTP_TIMER>(a,Cancel);
for (auto a : h3)
Wait<PTP_WAIT>(a,Cancel);
for (auto a : h4)
Wait<PTP_IO>(a,Cancel);
}
};
}
Sample Usage
Error checking is removed for simplicity.
using namespace tpoollib;
int __stdcall WinMain(HINSTANCE, HINSTANCE, LPSTR, int)
{
CoInitializeEx(0, COINIT_APARTMENTTHREADED);
{
tpool<true> t;
t.Create();
auto workit = t.CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
([] (PTP_CALLBACK_INSTANCE,PVOID,PTP_WORK)
{
Sleep(((rand() % 5) + 1) * 1000);
return;
},0);
for (int i = 0; i < 3; i++)
t.RunItem(workit);
t.Join();
}
{
tpool<false> t;
t.Create();
auto workit = t.CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
([] (PTP_CALLBACK_INSTANCE,PVOID,PTP_WORK)
{
Sleep(((rand() % 5) + 1) * 1000);
return;
},0);
for (int i = 0; i < 3; i++)
t.RunItem(workit);
t.Join(true,{workit});
}
return 0;
}
History
- 26th July, 2015 - First release