Adi Levin's Blog for programmers

June 13, 2009

Asynchronous Procedure Call

Filed under: Multithreading — Adi Levin @ 8:26 pm
Tags: , , ,

There are two ways to invoke a function on a different thread – the first is by calling CreateThread. The disadvantage here is that thread creation has a large overhead. It is much more efficient to invoke a function on an existing thread. This is done by the mechanism of Asynchronous Procedure Call (APC).

Every thread has a queue of asynchronous procedure calls attached to it. Another thread can queue a function to be invoked in that thread, using the API QueueUserAPC.

DWORD WINAPI QueueUserAPC(
  __in  PAPCFUNC pfnAPC,
  __in  HANDLE hThread,
  __in  ULONG_PTR dwData
);

A call to QueueUserAPC is a request from the thread whose handle is hThread, to run the function pfnAPC with the parameter pwData. The function pfnAPC has the following prototype:

VOID CALLBACK APCProc(
  __in  ULONG_PTR dwParam
);
 

Alertable state

A thread will invoke the queued APC function only if it the thread is in alertable state. After the thread is in an alertable state, the thread handles all pending APCs in first in – first out (FIFO) order, and the wait operation returns WAIT_IO_COMPLETION.

A thread enters an alertable state by using SleepEx, WaitForSingleObjectEx, WaitForMultipleObjectsEx to perform an alertable wait operation. For examaple, by calling SleepEx(10000,TRUE), a thread enters alertable state for 10 seconds. If, during these 10 seconds, APCs are queued, it will invoke the APCs and stop sleeping.

If an application queues an APC before the thread begins running, the thread begins by calling the APC function. After the thread calls an APC function, it calls the APC functions for all APCs in its APC queue.

A trivial example

#define _WIN32_WINNT 0x0400
#include <windows.h>

DWORD WINAPI thread_function(LPVOID lpParameter) {  while (true) { SleepEx(INFINITE,TRUE); } }
VOID CALLBACK apc_function_1(ULONG_PTR dwParam) {  C* obj = (C*)dwParam; obj->do_something(); }

void main() {
   C obj;
   WORD thread_id;
   HANDLE thread_handle = CreateThread(NULL,0,thread_function,NULL,0,&thread_id);
   Sleep(1000);
   QueueUserAPC(apc_function_1, thread_handle, (ULONG_PTR)&obj);
   Sleep(1000);
}

In the above program, the main thread creates a thread that runs thread_function, and then queues apc_function_1 to run on that thread. thread_function is specially designed to run APC functions, as it enters alertable state by calling SleepEx. Notice that QueueUserAPC requires to define _WIN32_WINNT as 0x0400 or higher.

Usage example: thread_team

The following source code demonstrates how APCs can be used to efficiently run many tasks in a number of threads. We use light-weight interlocked functions for synchronization and a single event for signaling that the tasks have all been invoked: (download the source code)

thread_team.h

class thread_team
{
public:
 thread_team(int num_of_threads);
 virtual ~thread_team();
 void invoke_tasks(task_collection& tc);

private:
 void thread_function();
 friend static DWORD WINAPI thread_func(void* param);

private:
 HANDLE m_thread_handles[1024];
 HANDLE m_exit_threads_event;
 int m_num_of_threads;
};

thread_team.cpp

#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0400
#endif

#include “thread_team.h”
#include “task_collection.h”

static VOID CALLBACK apc_proc_run(ULONG_PTR dwParam)
{
 task_collection* tc = (task_collection*)dwParam;
 tc->run();
}
 
static DWORD WINAPI thread_func(void* param)
{
 thread_team* owner = (thread_team*)param;
 owner->thread_function();
 return 0;
}

void thread_team::thread_function()
{
 while (true) {
  if (WaitForSingleObjectEx(m_exit_threads_event,INFINITE,TRUE)==WAIT_OBJECT_0)
   return;
 }
}

thread_team::thread_team(int num_of_threads) :
m_num_of_threads(num_of_threads)
{
 m_exit_threads_event = CreateEvent(NULL,TRUE,FALSE,NULL);
 DWORD thread_id;
 for(int i=0;i<num_of_threads;++i)
  m_thread_handles[i] = CreateThread(NULL,1<<24,thread_func,this,0,&thread_id);
}

thread_team::~thread_team()
{
 SetEvent(m_exit_threads_event);
 WaitForMultipleObjects(m_num_of_threads,m_thread_handles,TRUE,INFINITE);
 for(int i=0;i<m_num_of_threads;++i)
  CloseHandle(m_thread_handles[i]);
 CloseHandle(m_exit_threads_event);
}

void thread_team::invoke_tasks(task_collection& tc)
{
 for(int i=0;i<m_num_of_threads;++i)
  QueueUserAPC(apc_proc_run,m_thread_handles[i],(ULONG_PTR)&tc);
}

task_collection.h:

class task_collection
{
public:
 task_collection();
 virtual ~task_collection();

 virtual void perform_task(int index) = 0;
 virtual int  get_num_of_tasks() const = 0;

 void wait_for_completion() const;

private:

 void run();
 friend VOID CALLBACK apc_proc_run(ULONG_PTR dwParam);

private:

 volatile LONG m_next_task_index;
 volatile LONG m_number_of_completed_tasks;
 HANDLE m_finish_event;
};

task_collection.cpp

#include “task_collection.h”
#include “thread_team.h”

task_collection::task_collection() : m_next_task_index(0), m_number_of_completed_tasks(0)
{
 m_finish_event = CreateEvent(NULL,FALSE,FALSE,NULL);
}

task_collection::~task_collection()
{
 CloseHandle(m_finish_event);
}

void task_collection::wait_for_completion() const
{
 WaitForSingleObject(m_finish_event,INFINITE);
}

void task_collection::run() {
 int num_of_tasks = get_num_of_tasks();
 while (m_number_of_completed_tasks<num_of_tasks) {
  LONG task_index = InterlockedIncrement(&m_next_task_index) – 1;
  if (task_index>=num_of_tasks)
   break;
  perform_task(task_index);
  LONG num_of_completed_tasks = InterlockedIncrement(&m_number_of_completed_tasks);
  if (num_of_completed_tasks == num_of_tasks)
   SetEvent(m_finish_event);
 }
}

main.cpp

#include “thread_team.h”
#include “task_collection.h”

class apply_function_to_array : public task_collection
{
public:
 apply_function_to_array(double* x, int n, int num_of_tasks) : m_x(x), m_n(n), m_num_of_tasks(num_of_tasks){}
 virtual void perform_task(int index) {
  int nx_per_task = (m_n+1) / m_num_of_tasks;
  int i0 = index * nx_per_task;
  int i1 = (index+1) * nx_per_task;
  if (i1>m_n)
   i1 = m_n;
  for(int i=i0;i<i1;++i)
   m_x[i] = func(m_x[i]);
 }
 virtual int  get_num_of_tasks() const { return m_num_of_tasks; }

private:
 double func(double x) const { return 2*x; }
 int m_n;
 int m_num_of_tasks;
 double* m_x;
};

void main()
{
#define NX 10000000
 double* x = new double[NX];
 for(int i=0;i<NX;++i)
  x[i] = 1.0;

 apply_function_to_array tc(x,NX,1000);
 thread_team tt(4);
 tt.invoke_tasks(tc);
 tc.wait_for_completion();

 delete [] x;
}

What the function main() does, is to allocate an array of 10,000,000 double-precision numbers, set them all to 1.0, and then multiply all of them by 2.0. The multiplication is subdivided into 1000 tasks (each task performs the multiplication of 10,000 numbers in the array), and the 1000 tasks are invoked by 4 threads in parallel.

The class task_collection respresents an abstract collection of tasks. To use it, you need to inherit from the class and then overload the functions peform_task(int index) and get_num_of_tasks() to define what each task does and how many tasks there are. The class apply_function_to_array is an example of such inheritance.

The class thread_team represents a simple collection of threads. Upon creation of a thread_team object, the threads are created (in this example – 4 threads), and they all run the function thread_func that immediately enters alertable state.

When thread_team::invoke_tasks is called, it simply invokes the function apc_proc_run on each of the threads, using the APC mechanism. All apc_proc_run is doing, is to execute task_collection::run().

So, the same function, task_collection::run(), is invoked by 4 concurrent threads. What it does, is to grab (repeatedly) a task that has not been invoked yet, out of the 1000 tasks, and invoke it. In order to make sure that we don’t run the same task twice by two different threads, we use InterlockedIncrement, to increment the member m_next_task_index.

We use another counter (m_number_of_completed_tasks) with an interlocked function, to count how many tasks have been completed. This is needed in order to set an event when all tasks have been completed, which allows main() to wait for the completion of all tasks, by calling task_collection::wait_for_completion().

In order to exit the threads (which is done upon destruction of the thread_team object), the destructor of thread_team signals the event m_exit_threads_event which causes the threads to exit the while(true) loop, because WaitForSingleObjectEx returns WAIT_OBJECT_0. When an APC is called, WaitForSingleObjectEx returns WAIT_IO_COMPLETION.

Advertisements

8 Comments »

  1. […] 1. Parallel “for” loop. Each iteration of the loop becomes a task, or a range of iterations of the loop become a task. For an example, see my post on Asynchronous Procedure Call. […]

    Pingback by Multithreading Patterns #1 – Fork-Join « Adi Levin's Blog for programmers — August 28, 2009 @ 12:52 pm | Reply

  2. […] to creating a thread is to invoke the function on an existing thread – either by an Asynchronous Procedure Call (APC) or by using the Windows Thread […]

    Pingback by CreateThread: An example « Adi Levin's Blog for programmers — September 7, 2009 @ 9:46 am | Reply

  3. I got lost!
    Can anyone help me finding my way home ?
    Please…

    Adi
    I’ll try it agian tomorrow. I have to put some more effort in order to cross so many c++ functions.
    I understand the motivation, but the technique is a bit complicated for me – at least for the moment.
    I think that this is the most important chapter (for tasks like toolpath calculation) because the overhead issue is a big problem when you deal with a simple operation that has to be calculated many times.

    Comment by Doron Osovlanski — September 8, 2009 @ 1:26 pm | Reply

    • Doron – I added a trivial example. I hope it is clearer now.

      Comment by adilevin — September 18, 2009 @ 9:10 am | Reply

  4. […] purpose of simplifying the code. For example, if the thread is in alertable state (i.e. waits for APC functions), other threads can queue an APC function that calls ExitThread, thereby causing the thread to […]

    Pingback by Ending a thread « Adi Levin's Blog for programmers — September 19, 2009 @ 2:28 pm | Reply

  5. Adi,
    Thanks, it’s now clear – although I didn’t go through all the source lines.

    1. In the sentence “A thread will invoke the queued APC function only if it is in alertable state.” you cannot understand who should be in alertable-state, the thread or the function. This issue becomes clear only after you continue reading.

    2. I would replace “first in, first out” with “first in – first out”.

    3. I have downloaded the files with the source code. It would be nice to have the project file with the proper definitions, so we can try it at home easily.

    Comment by Doron Osovlanski — September 22, 2009 @ 12:16 pm | Reply

    • Doron – thanks again for the comments. I made the corrections in response to the first 2 comments. About the third one, what is the problem with the project files? Are they not defined well? I thought they were o.k.?

      Comment by adilevin — September 22, 2009 @ 2:16 pm | Reply

  6. Really nice article/blog thanks a lot!

    Comment by roulio — December 16, 2009 @ 3:00 pm | Reply


RSS feed for comments on this post. TrackBack URI

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Blog at WordPress.com.

%d bloggers like this: