Adi Levin's Blog for programmers

August 29, 2009

Multithreading Patterns #4 – pipeline

Filed under: Multithreading — Adi Levin @ 9:01 pm
Tags: ,

The Pipeline pattern is suitable when there is a collection of items, and each item needs to go through the same sequence of filters. Each filter can run in parallel to other filters, but can only process one item at a time.

For example, imagine a military application that decrypts enemy messages and translates them to the language of the “good guys”. The input is a stream of messages. Each message is stored in a file, then decrypted, and the decrypted message is translated. Suppose that due to hardward limitations, the decryption unit can only handle one message at a time (e.g. if the decryption is done by special decryption hardware that has a single input and single output). Even though we can’t fully parallelize the work, we want to take advantage of the fact that saving messages to disk can be done in parallel to decrypting or translating previous messages.

One way to exploit this parallism is to allocate a thread for each of these actions. The first thread will intercept incoming messages and save them to disk. The second thread will be in charge of controlling the decryption hardware. The third thread will be responsible for translating decrypted messages from the enemy’s language.

More generally, a simple implementation of a pipeline goes as follows: There are N threads (N being the number of filters). Each thread runs a function that simply sleeps in alertable state, waiting to invoke incoming APC‘s (see the function pipeline_thread_func below). The filters are implemented as functions that get as input the pointer to the item to which they should apply. In each filter, the last instruction should be to place an APC to the thread responsible for the next filter in the pipeline, passing the item pointer as input (see the functions filter1 and filter2 below).

static void pipeline_thread_func(HANDLE exit_event)

{ // exit_event is used as a way to end this thread. When it is signaled, the thread will end.

while (true) {  if (WaitForSingleObjectEx(exit_event,INFINITE,TRUE)==WAIT_OBJECT_0)  return; }

}

static void filter1(void* item)

{

… do stuff on *item ….

QueueUserAPC(thread_handle2,filter2,item);

}

static void filter2(void* item)

{

… do stuff on *item ….

QueueUserAPC(thread_handle3,filter3,item);

}

For more reading material, I refer you to the documentation of the class pipeline  in Intel TBB (Thread Building Blocks).

Advertisements

Leave a Comment »

No comments yet.

RSS feed for comments on this post. TrackBack URI

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Create a free website or blog at WordPress.com.

%d bloggers like this: