Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / programming / threads

Win32 Thread Pools and C++11 : A Quick Wrapper

4.79/5 (12 votes)
15 Apr 2019CPOL2 min read 22.1K  
Use Windows new ThreadPool through a single C++ 11 class
This document introduces a C++ class, tpool, which simplifies the usage of Windows' new Threadpool API. The class is available in two modes of operation.

Part of my multithreading library: https://github.com/WindowsNT/mt

Introduction

Windows has a new Threadpool API with a somewhat messy interface (see example code). Here is a class that simplifies its usage.

Terminology

  • A Work item is a worker thread that can run in the background.
  • An IO item is a worker thread that gets notified when I/O in a Handle occurs.
  • A Timer item is a worker thread that fires after a time set.
  • A Wait item is a worker thead that gets notified when some object is signaled.
  • A Cleanup Group is the API way to release all the handles after waiting for them.

Two Modes

C++
template <bool AutoDestruct = true>
class tpool

The class is available in two modes, one whose destruction is handled by the API, and the other whose destruction is handled through the class smart pointers.

  • Use with <true> when you want to use a Cleanup Group. This lets the API keep track of your objects, and with a simple Join(), you can wait for all of them.
  • Use with <false> when you want to manage the items yourself. This allows you to call Join() with the items you want to wait, cancel a specific item, etc.

Class Handle

C++
template <typename T,typename Destruction = destruction_policy<T>>
class handle

This smart pointer manages the items. If you instantiate tpool with true, these handles automatically destroy their items, with the aid of template specialization for each type:

C++
template<>    class destruction_policy<PTP_POOL>
{ public: static void destruct(PTP_POOL h) { CloseThreadpool(h); } };
template<>    class destruction_policy<PTP_WORK>
{ public: static void destruct(PTP_WORK h) { CloseThreadpoolWork(h); } };
template<>    class destruction_policy<PTP_WAIT>
{ public: static void destruct(PTP_WAIT h) { CloseThreadpoolWait(h); } };
template<>    class destruction_policy<PTP_TIMER>
{ public: static void destruct(PTP_TIMER h) { CloseThreadpoolTimer(h); } };
template<>    class destruction_policy<PTP_IO>
{ public: static void destruct(PTP_IO h) { CloseThreadpoolIo(h); } };
template<>    class destruction_policy<PTP_CLEANUP_GROUP>
{ public: static void destruct(PTP_CLEANUP_GROUP h) { CloseThreadpoolCleanupGroup(h); } };

The handle implements move/copy semantics, etc. with the aid of a std::shared_ptr.

tpool::Create

C++
bool Create(unsigned long nmin = 1,unsigned long nmax = 1) 

Creates the interfaces needed. You can pass the minimum and maximum number of threads you need.

Item Creation

There are four specializations to create your items:

C++
template <> handle<PTP_WORK> 
  CreateItem<PTP_WORK,PTP_WORK_CALLBACK>(PTP_WORK_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_WAIT> 
  CreateItem<PTP_WAIT,PTP_WAIT_CALLBACK>(PTP_WAIT_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_TIMER> 
  CreateItem<PTP_TIMER,PTP_TIMER_CALLBACK>(PTP_TIMER_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_IO> 
  CreateItem<PTP_IO,PTP_WIN32_IO_CALLBACK>(PTP_WIN32_IO_CALLBACK cb,PVOID opt,HANDLE hY);

Each call takes the callback to be executed, a parameter to pass to the callback. CreateItem<PTP_IO> also takes a HANDLE value for I/O operations.

Item Running

There are three specializations to run your items (A Wait object does not "run"). With the aid of std::tuple, we can use the same function signatures with different parameters:

C++
template <> void RunItem<PTP_WORK>(handle<PTP_WORK> h,std::tuple<>);
template <> void RunItem<PTP_TIMER>(handle<PTP_TIMER> h,std::tuple<FILETIME*,DWORD,DWORD>t);
template <> void RunItem<PTP_IO>(handle<PTP_IO> h,std::tuple<bool> t);

RunItem<PTP_TIMER> also takes the timer parameters (See SetThreadpoolTimer) and RunItem<PTP_IO> takes a bool, true in order to cancel the I/O or false to start it.

Item Waiting

There are four specializations to wait for your items:

C++
template <> void Wait<PTP_WORK>(handle<PTP_WORK> h,bool Cancel);
template <> void Wait<PTP_WAIT>(handle<PTP_WAIT> h,bool Cancel);
template <> void Wait<PTP_IO>(handle<PTP_IO> h,bool Cancel);
template <> void Wait<PTP_TIMER>(handle<PTP_TIMER> h,bool Cancel); 

Passing true in Cancel forces the function to kill the object if it hasn't started yet.

Joining

With the aid of type traits, we define the join function in two ways, depending on if tpool is instantiated with true or false.

C++
template <bool Q = AutoDestruct>
typename std::enable_if<Q,void>::type
Join(bool Cancel = false); 

For the case that you have auto destruction, Join waits for all your current items to finish. If Cancel is true, then all items that haven't yet started are cancelled.

C++
template <bool Q = AutoDestruct>
typename std::enable_if<!Q,void>::type
    Join(bool Cancel = false,
    std::initializer_list<handle<PTP_WORK>> h1 = std::initializer_list<handle<PTP_WORK>>({}),
    std::initializer_list<handle<PTP_TIMER>> h2 = std::initializer_list<handle<PTP_TIMER>>({}),
    std::initializer_list<handle<PTP_WAIT>> h3 = std::initializer_list<handle<PTP_WAIT>>({}),
    std::initializer_list<handle<PTP_IO>> h4 = std::initializer_list<handle<PTP_IO>>({})
    );

For the case that you have manual destruction, Join waits for the items you specify to finish. If Cancel is true, then all items that haven't yet started are cancelled.

The Final Code

C++
#include <windows.h>
#include <functional>
#include <memory>
#include <type_traits>

// -------------------------
namespace tpoollib
    {
    // Handles template
    
    // Destruction Policy    
    template<typename T>
    class destruction_policy
        {
        public:
            static void destruct(T h)
                {
                static_assert(false,"Must define destructor");
                }
        };

    // Policies Specialization
    template<>    class destruction_policy<PTP_POOL> 
        { public: static void destruct(PTP_POOL h) { CloseThreadpool(h); } };
    template<>    class destruction_policy<PTP_WORK> 
        { public: static void destruct(PTP_WORK h) { CloseThreadpoolWork(h); } };
    template<>    class destruction_policy<PTP_WAIT> 
        { public: static void destruct(PTP_WAIT h) { CloseThreadpoolWait(h); } };
    template<>    class destruction_policy<PTP_TIMER> 
        { public: static void destruct(PTP_TIMER h) { CloseThreadpoolTimer(h); } };
    template<>    class destruction_policy<PTP_IO> 
        { public: static void destruct(PTP_IO h) { CloseThreadpoolIo(h); } };
    template<>    class destruction_policy<PTP_CLEANUP_GROUP> 
        { public: static void destruct(PTP_CLEANUP_GROUP h) 
        { CloseThreadpoolCleanupGroup(h); } };
    
    // Template for Handles
    template <typename T,typename Destruction = destruction_policy<T>>
    class handle
        {
        private:
            T hX = 0;
            bool NoDestruct = true;
            std::shared_ptr<size_t> ptr = std::make_shared<size_t>();

        public:

            // Closing items
            void Close()
                {
                if (!ptr || !ptr.unique())
                    {
                    ptr.reset();
                    return;
                    }
                ptr.reset();
                if (hX != 0 && !NoDestruct)
                    Destruction::destruct(hX);
                hX = 0;
                }

            handle()
                {
                hX = 0;
                }
            ~handle()
                {
                Close();
                }
            handle(const handle& h)
                {
                Dup(h);
                }
            handle(handle&& h)
                {
                Move(std::forward<handle>(h));
                }
            handle(T hY,bool NoDestructOnClose)
                {
                hX = hY;
                NoDestruct = NoDestructOnClose;
                }

            handle& operator =(const handle& h)
                {
                Dup(h);
                return *this;
                }
            handle& operator =(handle&& h)
                {
                Move(std::forward<handle>(h));
                return *this;
                }

            void Dup(const handle& h)
                {
                Close();
                NoDestruct = h.NoDestruct;
                hX = h.hX;
                ptr = h.ptr;
                }
            void Move(handle&& h)
                {
                Close();
                hX = h.hX;
                ptr = h.ptr;
                NoDestruct = h.NoDestruct;
                h.ptr.reset();
                h.hX = 0;
                h.NoDestruct = false;
                }
            operator T() const
                {
                return hX;
                }

        };

    template <bool AutoDestruct = true>
    class tpool
        {
        private:
            handle<PTP_POOL> p;
            handle<PTP_CLEANUP_GROUP> pcg;
            TP_CALLBACK_ENVIRON env;

            tpool(const tpool&) = delete;
            tpool(tpool&&) = delete;
            void operator=(const tpool&) = delete;
            void operator=(tpool&&) = delete;

        public:

            tpool()
                {
                }

            ~tpool()
                {
                End();
                }

            void End()
                {
                Join();
                DestroyThreadpoolEnvironment(&env);
                p.Close();
                }

            // Creates the interfaces
            bool Create(unsigned long nmin = 1,unsigned long nmax = 1)
                { 
                bool jauto = AutoDestruct;

                // Env
                InitializeThreadpoolEnvironment(&env);

                // Pool and Min/Max
                handle<PTP_POOL> cx(CreateThreadpool(0),false);
                p = cx;
                if (!p)
                    {
                    End();
                    return false;
                    }
                if (!SetThreadpoolThreadMinimum(p,nmin))
                    {
                    End();
                    return false;
                    }
                SetThreadpoolThreadMaximum(p,nmax);

                // Cleanup Group
                if (jauto)
                    {
                    handle<PTP_CLEANUP_GROUP> cx(CreateThreadpoolCleanupGroup(),false);
                    pcg = cx;
                    if (!pcg)
                        {
                        End();
                        return false;
                        }
                    }

                // Sets
                SetThreadpoolCallbackPool(&env,p);
                SetThreadpoolCallbackCleanupGroup(&env,pcg,0);

                return true;
                }

            // Templates for each of the items, to be specialized later
            template <typename J>
            void Wait(handle<J> h,bool Cancel = false)
                {
                static_assert(false,"No Wait function is defined");
                }
            template <typename J,typename CB_J>
            handle<J> CreateItem(CB_J cb,PVOID opt = 0,HANDLE hX = 0)
                {
                static_assert(false,"No Create function is defined");
                }
            template <typename J,typename ...A>
            void RunItem(handle<J> h,std::tuple<A...> = std::make_tuple<>())
                {
                static_assert(false,"No Run function is defined");
                }

            // Work Stuff
            template <> handle<PTP_WORK> CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
                        (PTP_WORK_CALLBACK cb,PVOID opt,HANDLE)
                {
                handle<PTP_WORK> a(CreateThreadpoolWork(cb,opt,&env),AutoDestruct);
                return a;
                }
            template <> void RunItem<PTP_WORK>(handle<PTP_WORK> h,std::tuple<>)
                {
                SubmitThreadpoolWork(h);
                }
            template <> void Wait<PTP_WORK>(handle<PTP_WORK> h,bool Cancel)
                {
                WaitForThreadpoolWorkCallbacks(h,Cancel);
                }

            // Wait  stuff
            template <> handle<PTP_WAIT> CreateItem<PTP_WAIT,PTP_WAIT_CALLBACK>
                        (PTP_WAIT_CALLBACK cb,PVOID opt,HANDLE)
                {
                handle<PTP_WAIT> a(CreateThreadpoolWait(cb,opt,&env),AutoDestruct);
                return a;
                }
            template <> void Wait<PTP_WAIT>(handle<PTP_WAIT> h,bool Cancel)
                {
                WaitForThreadpoolWaitCallbacks(h,Cancel);
                }

            // Timer stuff
            template <> handle<PTP_TIMER> CreateItem<PTP_TIMER,PTP_TIMER_CALLBACK>
                        (PTP_TIMER_CALLBACK cb,PVOID opt,HANDLE)
                {
                handle<PTP_TIMER> a(CreateThreadpoolTimer(cb,opt,&env),AutoDestruct);
                return a;
                }
            template <> void RunItem<PTP_TIMER>(handle<PTP_TIMER> h,
                             std::tuple<FILETIME*,DWORD,DWORD>t)
                {
                SetThreadpoolTimer(h,std::get<0>(t),std::get<1>(t),std::get<2>(t));
                }
            template <> void Wait<PTP_TIMER>(handle<PTP_TIMER> h,bool Cancel)
                {
                WaitForThreadpoolTimerCallbacks(h,Cancel);
                }

            // IO Stuff
            template <> handle<PTP_IO> CreateItem<PTP_IO,PTP_WIN32_IO_CALLBACK>
                        (PTP_WIN32_IO_CALLBACK cb,PVOID opt,HANDLE hY)
                {
                handle<PTP_IO> a(CreateThreadpoolIo(hY,cb,opt,&env),AutoDestruct);
                return a;
                }
            template <> void RunItem<PTP_IO>(handle<PTP_IO> h,std::tuple<bool> t)
                {
                bool Cancel = std::get<0>(t);
                if (Cancel)
                    CancelThreadpoolIo(h);
                else
                    StartThreadpoolIo(h);
                }
            template <> void Wait<PTP_IO>(handle<PTP_IO> h,bool Cancel)
                {
                WaitForThreadpoolIoCallbacks(h,Cancel);
                }

            // Join functions, one for each option (AutoDestruct or not)
            template <bool Q = AutoDestruct>
            typename std::enable_if<Q,void>::type
            Join(bool Cancel = false)
                {
                if (pcg)
                    {
                    CloseThreadpoolCleanupGroupMembers(pcg,Cancel,0);
                    pcg.Close();
                    }
                }

            template <bool Q = AutoDestruct>
            typename std::enable_if<!Q,void>::type
                Join(bool Cancel = false,
                std::initializer_list<handle<PTP_WORK>> h1 = 
                                 std::initializer_list<handle<PTP_WORK>>({}),
                std::initializer_list<handle<PTP_TIMER>> h2 = 
                                 std::initializer_list<handle<PTP_TIMER>>({}),
                std::initializer_list<handle<PTP_WAIT>> h3 = 
                                 std::initializer_list<handle<PTP_WAIT>>({}),
                std::initializer_list<handle<PTP_IO>> h4 = 
                                 std::initializer_list<handle<PTP_IO>>({})
                )
                {
                for (auto a : h1)
                    Wait<PTP_WORK>(a,Cancel);
                for (auto a : h2)
                    Wait<PTP_TIMER>(a,Cancel);
                for (auto a : h3)
                    Wait<PTP_WAIT>(a,Cancel);
                for (auto a : h4)
                    Wait<PTP_IO>(a,Cancel);
                }
        };
    } 

Sample Usage

Error checking is removed for simplicity.

C++
using namespace tpoollib;
int __stdcall WinMain(HINSTANCE, HINSTANCE, LPSTR, int)
    {
    CoInitializeEx(0, COINIT_APARTMENTTHREADED);
    

    // Auto-Destruction of items
        {
        tpool<true> t;
        t.Create();
        auto workit = t.CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
                      ([] (PTP_CALLBACK_INSTANCE,PVOID,PTP_WORK)
            {
            Sleep(((rand() % 5) + 1) * 1000);
            return;
            },0);
        for (int i = 0; i < 3; i++)
            t.RunItem(workit);
        t.Join();
        }

    // Manual Destruction of items
        {
        tpool<false> t;
        t.Create();
        auto workit = t.CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
                      ([] (PTP_CALLBACK_INSTANCE,PVOID,PTP_WORK)
            {
            Sleep(((rand() % 5) + 1) * 1000);
            return;
            },0);
        for (int i = 0; i < 3; i++)
            t.RunItem(workit);
        t.Join(true,{workit});
        }

    return 0;
}

History

  • 26th July, 2015 - First release

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)