Preface
This article provides some concepts and generalities to describe how to find concurrency in sequential (or serial code) to achieve data parallelism. The examples may appear complicated, but they don't have to be. We are merely looking at code that can be transformed from sequential into concurrent. In this quest for concurrency, patterns are established that form the criterion to then further write a parallel algorithm. We therefore must understand the difference between the terms concurrency and parallelism. After having done that, we will use a downloaded evaluation copy of the Intel Threading Building Blocks library to explain what particular benefits this library offers to achieve concurrency in parallel computing. This involves downloading an evaluation copy of the Intel C++ compiler and installing it to integrate with Microsoft Visual Studio 2008.
After a successful installation, we then install the Intel TBB, a C++ template-based intrinsic threading library for loop-level parallelism that concentrates on defining tasks rather than explicit threads. The components of TBB include generic parallel algorithms, concurrent containers, low-level synchronization primitives, and a task scheduler. TBB is available in both commercial and Open Source versions. At the time of this writing, TBB 2.2 is the most recent version. This is why we want to choose this library. The OpenMP API is an intrinsic threading library that is strong in C and FORTRAN. POSIX threads are an explicit, or raw API type of threading that are also effective for the C language. Both are powerful for writing algorithms and breaking up serial code to examine what can be made concurrent. Neither of them, however, are effective for C++. Having noted that, we'll use the Intel TBB, which directs its constructs towards C++. With Windows systems, you have to modify your Visual Studio Project Build configurations (debug, release) as follows: In the C++ properties General choice, add the include directory $(TBB22_INSTALL_DIR)\include. In the General choice of the Linker properties, add this additional library directory: $(TBB22_INSTALL_DIR)\ia32\vc8\lib. In the Input choice, add as an additional dependency: tbb_debug.lib or tbb.lib.
Abstract
Software developers must now take advantage of multicore processor technology. The term multicore processor technology refers to a microprocessor architecture designed to contain a sequence of logical cores (CPUs) on one physical chip. That is, rather than increasing the clock speed, improved performance, power savings, and system stability can be achieved by utilizing these cores to execute tasks simultaneously. Software design must both adhere to and obtain concurrency with this new technology. Concurrency is achieved when two or more tasks are executed simultaneously. This means that these tasks execute within the same time interval, nearly at the same second, but probably off by fractions of a second. A concurrent application will have two or more threads in progress at some time. In parallel execution, there must be multiple cores available within the computation platform. In that case, the two or more threads could each be assigned a separate core, and would be running simultaneously. This is how we distinguish between the terms parallel and concurrency.
Parallel is a subset of concurrent. That is, you can write a concurrent application that uses multiple threads or processes, but if you don't have multiple cores for execution, you won't be able to run your code in parallel. The difference between these two terms is that a system is said to be concurrent if it can support two or more actions in progress at the same time. A system is said to be in parallel if it can support two more actions executing simultaneously. The key to parallel computing is exploitable concurrency. Concurrency exists in a computational problem when the problem can be decomposed into sub problems that can safely execute at the same time. To be of any use, however, it must be possible to structure the code to expose and later exploit the concurrency and permit the sub problems to actually run concurrently; that is, the concurrency must be exploitable.
As a simple example, suppose part of a computation involves computing the summation of a large set of values. If multiple processors are available, instead of adding the values together sequentially, the set can be partitioned and the summations of the subsets computed simultaneously, each on a different processor. The partial sums are then combined to get the final answer. Thus, using multiple processors to compute in parallel may allow us to obtain a solution sooner. Also, if each processor has its own memory, partitioning the data between the processors may allow larger problems to be handled than could be handled on a single processor. Downtime starts to cost when a piece of software does not execute reliably in different environments: on the Internet or on an intranet. Software might execute concurrently in a distributed computing environment, but that is not always the case.
So Where do We Start?
Well, with a numerical computation. The following code calculates the Fibonacci series of a number that is passed as an argument. The code, however, is in the form of an algorithm that shows and compares the times of calculation, the multithreading measures taken, along with avoiding the headaches by using certain types of multithreading, the synchronization objects that are involved, and the queues. The important aspect to note is time savings in these numerical calculations. Those numerical calculations that can be performed simultaneously are done so using the parallel algorithms included in the Intel TBB:
#ifdef NDEBUG
#undef NDEBUG
#endif
#include <cstdio>
#include <cstdlib>
#include <cassert>
#include <utility>
#include "tbb/task.h"
#include "tbb/task_scheduler_init.h"
#include "tbb/tick_count.h"
#include "tbb/blocked_range.h"
#include "tbb/concurrent_vector.h"
#include "tbb/concurrent_queue.h"
#include "tbb/concurrent_hash_map.h"
#include "tbb/parallel_while.h"
#include "tbb/parallel_for.h"
#include "tbb/parallel_reduce.h"
#include "tbb/parallel_scan.h"
#include "tbb/pipeline.h"
#include "tbb/atomic.h"
#include "tbb/mutex.h"
#include "tbb/spin_mutex.h"
#include "tbb/queuing_mutex.h"
#include "tbb/tbb_thread.h"
using namespace std;
using namespace tbb;
typedef long long value;
struct Matrix2x2
{
value v[2][2];
Matrix2x2() {}
Matrix2x2(value v00, value v01, value v10, value v11) {
v[0][0] = v00; v[0][1] = v01; v[1][0] = v10; v[1][1] = v11;
}
Matrix2x2 operator * (const Matrix2x2 &to) const;
};
static const Matrix2x2 Matrix1110(1, 1, 1, 0);
void Matrix2x2Multiply(const value a[2][2],
const value b[2][2], value c[2][2]);
value SerialFib(int n)
{
if(n < 2)
return n;
value a = 0, b = 1, sum; int i;
for( i = 2; i <= n; i++ )
{ sum = a + b; a = b; b = sum;
}
return sum;
}
value SerialMatrixFib(int n)
{
value c[2][2],
a[2][2] = {{1, 1}, {1, 0}}, b[2][2] = {{1, 1}, {1, 0}};
int i;
for(i = 2; i < n; i++)
{ if(i & 1) Matrix2x2Multiply(a, c, b);
else Matrix2x2Multiply(a, b, c);
}
return (i & 1) ? c[0][0] : b[0][0];
}
value SerialRecursiveFib(int n)
{
value result;
if(n < 2)
result = n;
else
result =
SerialRecursiveFib(n - 1) + SerialRecursiveFib(n - 2);
return result;
}
value SerialQueueFib(int n)
{
concurrent_queue<matrix2x2> Q;
for(int i = 1; i < n; i++)
Q.push(Matrix1110);
Matrix2x2 A, B;
while(true) {
while( !Q.try_pop(A) ) this_tbb_thread::yield();
if(Q.empty()) break;
while( !Q.try_pop(B) ) this_tbb_thread::yield();
Q.push(A * B);
}
return A.v[0][0];
}
value SerialVectorFib(int n)
{
concurrent_vector<value> A;
A.grow_by(2);
A[0] = 0; A[1] = 1;
for( int i = 2; i <= n; i++)
{
A.grow_to_at_least(i+1);
A[i] = A[i-1] + A[i-2];
}
return A[n];
}
value SharedA = 0, SharedB = 1; int SharedI = 1, SharedN;
template<typename>
class SharedSerialFibBody {
M &mutex;
public:
SharedSerialFibBody( M &m ) : mutex( m ) {}
void operator()( const blocked_range<int>& range ) const {
for(;;) {
typename M::scoped_lock lock( mutex );
if(SharedI >= SharedN) break;
value sum = SharedA + SharedB;
SharedA = SharedB; SharedB = sum;
++SharedI;
}
}
};
template<class>
value SharedSerialFib(int n)
{
SharedA = 0; SharedB = 1; SharedI = 1; SharedN = n; M mutex;
parallel_for( blocked_range<int>(0,4,1),
SharedSerialFibBody<m>( mutex ) );
return SharedB;
}
struct IntHashCompare {
bool equal( const int j, const int k ) const { return j == k; }
unsigned long hash( const int k ) const { return (unsigned long)k; }
};
typedef concurrent_hash_map<int,> NumbersTable;
class ConcurrentHashSerialFibTask: public task {
NumbersTable &Fib;
int my_n;
public:
ConcurrentHashSerialFibTask( NumbersTable &cht, int n ) : Fib(cht), my_n(n) { }
task* execute()
{
for( int i = 2; i <= my_n; ++i ) {
NumbersTable::const_accessor f1, f2; if( !Fib.find(f1, i-1) || !Fib.find(f2, i-2) ) {
assert(0);
}
value sum = f1->second + f2->second;
NumbersTable::const_accessor fsum;
Fib.insert(fsum, make_pair(i, sum)); assert( fsum->second == sum ); }
return 0;
}
};
value ConcurrentHashSerialFib(int n)
{
NumbersTable Fib;
bool okay;
okay = Fib.insert( make_pair(0, 0) ); assert(okay); okay = Fib.insert( make_pair(1, 1) ); assert(okay);
task_list list;
list.push_back(*new(task::allocate_root()) ConcurrentHashSerialFibTask(Fib, n));
list.push_back(*new(task::allocate_root()) ConcurrentHashSerialFibTask(Fib, n));
task::spawn_root_and_wait(list);
NumbersTable::const_accessor fresult;
okay = Fib.find( fresult, n );
assert(okay);
return fresult->second;
}
struct QueueStream {
volatile bool producer_is_done;
concurrent_queue<matrix2x2> Queue;
bool pop_if_present( pair<matrix2x2,> &mm ) {
if(!Queue.try_pop(mm.first)) return false;
if(!Queue.try_pop(mm.second)) {
Queue.push(mm.first); return false;
}
return true;
}
};
struct parallel_forFibBody {
QueueStream &my_stream;
parallel_forFibBody(QueueStream &s) : my_stream(s) { }
void operator()( const blocked_range<int> &range ) const {
int i_end = range.end();
for( int i = range.begin(); i != i_end; ++i ) {
my_stream.Queue.push( Matrix1110 ); }
}
};
class parallel_whileFibBody
{
QueueStream &my_stream;
parallel_while<parallel_whilefibbody> &my_while;
public:
typedef pair<matrix2x2,> argument_type;
parallel_whileFibBody(
parallel_while<parallel_whilefibbody> &w, QueueStream &s)
: my_while(w), my_stream(s) { }
void operator() (argument_type mm) const {
mm.first = mm.first * mm.second;
if(my_stream.Queue.try_pop(mm.second))
my_while.add( mm );
else my_stream.Queue.push( mm.first );
}
};
struct QueueInsertTask: public task {
QueueStream &my_stream;
int my_n;
QueueInsertTask( int n, QueueStream &s ) : my_n(n), my_stream(s) { }
task* execute() {
parallel_for( blocked_range<int>( 1, my_n, 10 ),
parallel_forFibBody(my_stream) );
my_stream.producer_is_done = true;
return 0;
}
};
struct QueueProcessTask: public task {
QueueStream &my_stream;
QueueProcessTask( QueueStream &s ) : my_stream(s) { }
task* execute() {
while( !my_stream.producer_is_done || my_stream.Queue.unsafe_size()>1 ) {
parallel_while<parallel_whilefibbody> w; w.run( my_stream, parallel_whileFibBody( w, my_stream ) );
}
return 0;
}
};
value ParallelQueueFib(int n)
{
QueueStream stream;
stream.producer_is_done = false;
task_list list;
list.push_back(*new(task::allocate_root()) QueueInsertTask( n, stream ));
list.push_back(*new(task::allocate_root()) QueueProcessTask( stream ));
task::spawn_root_and_wait(list);
assert(stream.Queue.unsafe_size() == 1); Matrix2x2 M;
bool result = stream.Queue.try_pop( M ); assert( result );
return M.v[0][0]; }
class InputFilter: public filter {
atomic<int /> N; public:
concurrent_queue<matrix2x2 /> Queue;
InputFilter( int n ) : filter(false ) { N = n; }
void* operator()(void*)
{
int n = --N;
if(n <= 0) return 0;
Queue.push( Matrix1110 );
return &Queue;
}
};
class MultiplyFilter: public filter {
public:
MultiplyFilter( ) : filter(false ) { }
void* operator()(void*p)
{
concurrent_queue<matrix2x2> &Queue =
*static_cast<concurrent_queue<matrix2x2> *>(p);
Matrix2x2 m1, m2;
while( !Queue.try_pop( m1 ) ) this_tbb_thread::yield();
while( !Queue.try_pop( m2 ) ) this_tbb_thread::yield();
m1 = m1 * m2; Queue.push( m1 ); return this; }
};
value ParallelPipeFib(int n)
{
InputFilter input( n-1 );
MultiplyFilter process;
pipeline pipeline;
pipeline.add_filter( input ); pipeline.add_filter( process );
input.Queue.push( Matrix1110 );
pipeline.run( n ); pipeline.clear();
assert( input.Queue.unsafe_size()==1 );
Matrix2x2 M;
bool result = input.Queue.try_pop( M ); assert( result );
return M.v[0][0]; }
struct parallel_reduceFibBody {
Matrix2x2 sum;
int splitted; parallel_reduceFibBody() : sum( Matrix1110 ), splitted(0) { }
parallel_reduceFibBody( parallel_reduceFibBody& other, split ) :
sum( Matrix1110 ), splitted(1) {}
void join( parallel_reduceFibBody &s ) {
sum = sum * s.sum;
}
void operator()( const blocked_range<int> &r ) {
for( int k = r.begin() + splitted; k < r.end(); ++k )
sum = sum * Matrix1110;
splitted = 0;
}
};
value parallel_reduceFib(int n)
{
parallel_reduceFibBody b;
parallel_reduce(blocked_range<int>(2, n, 3), b);
return b.sum.v[0][0];
}
struct parallel_scanFibBody {
Matrix2x2 sum;
int first; parallel_scanFibBody() : sum( Matrix1110 ), first(1) {}
parallel_scanFibBody( parallel_scanFibBody &b, split) :
sum( Matrix1110 ), first(1) {}
void reverse_join( parallel_scanFibBody &a ) {
sum = sum * a.sum;
}
void assign( parallel_scanFibBody &b ) {
sum = b.sum;
}
template<typename>
void operator()( const blocked_range<int> &r, T) {
for( int k = r.begin() + first; k < r.end(); ++k )
sum = sum * Matrix1110;
first = 0;
}
};
value parallel_scanFib(int n)
{
parallel_scanFibBody b;
parallel_scan(blocked_range<int>(1,
n, 3), b);
return b.sum.v[0][0];
}
struct FibTask: public task {
const int n;
value& sum;
value x, y;
bool second_phase; FibTask( int n_, value& sum_ ) :
n(n_), sum(sum_), second_phase(false)
{}
task* execute() {
if( second_phase ) { sum = n&1 ? x*x + y*y : x*x - y*y;
return NULL;
}
if( n <= 2 ) {
sum = n!=0;
return NULL;
} else {
recycle_as_continuation();
second_phase = true; FibTask& a =
*new( allocate_child() ) FibTask( n/2 + 1, x );
FibTask& b =
*new( allocate_child() ) FibTask( n/2 - 1 + (n&1), y );
set_ref_count(2);
spawn( a );
return &b;
}
}
};
value ParallelTaskFib(int n) {
value sum;
FibTask& a = *new(task::allocate_root()) FibTask(n, sum);
task::spawn_root_and_wait(a);
return sum;
}
struct IntRange {
int low;
int high;
void set_from_string( const char* s );
IntRange( int low_, int high_ ) : low(low_), high(high_) {}
};
void IntRange::set_from_string( const char* s ) {
char* end;
high = low = strtol(s,&end,0);
switch( *end ) {
case ':':
high = strtol(end+1,0,0);
break;
case '\0':
break;
default:
printf("unexpected character = %c\n",*end);
}
}
static tick_count t0;
static bool Verbose = false;
typedef value (*MeasureFunc)(int);
value Measure(const char *name, MeasureFunc func, int n)
{
value result;
if(Verbose) printf("%s",name);
t0 = tick_count::now();
for(int number = 2; number <= n; number++)
result = func(number);
if(Verbose) printf("\t- in %f msec\n",
(tick_count::now() - t0).seconds()*1000);
return result;
}
int main(int argc, char* argv[])
{
if(argc>1) Verbose = true;
int NumbersCount = argc>1 ? strtol(argv[1],0,0) : 500;
IntRange NThread(1,4); if(argc>2) NThread.set_from_string(argv[2]);
unsigned long ntrial = argc>3? (unsigned long)strtoul(argv[3],0,0) : 1;
value result, sum;
if(Verbose) printf("Fibonacci numbers example.
Generating %d numbers..\n", NumbersCount);
result = Measure("Serial loop", SerialFib, NumbersCount);
sum = Measure("Serial matrix", SerialMatrixFib, NumbersCount); assert(result == sum);
sum = Measure("Serial vector", SerialVectorFib, NumbersCount); assert(result == sum);
sum = Measure("Serial queue", SerialQueueFib, NumbersCount); assert(result == sum);
for( unsigned long i=0; i < ntrial; sum=Measure(Shared *=2) <= NThread.high;
threads=NThread.low; NumbersCount); assert(result == sum);
sum = Measure("Shared serial (spin_mutex)", SharedSerialFib<spin_mutex>,
NumbersCount); assert(result == sum);
sum = Measure("Shared serial (queuing_mutex)",
SharedSerialFib<queuing_mutex>, NumbersCount);
assert(result == sum);
sum = Measure("Shared serial (Conc.HashTable)",
ConcurrentHashSerialFib, NumbersCount);
assert(result == sum);
sum = Measure("Parallel while+for/queue",
ParallelQueueFib, NumbersCount); assert(result == sum);
sum = Measure("Parallel pipe/queue\t", ParallelPipeFib,
NumbersCount); assert(result == sum);
sum = Measure("Parallel reduce\t\t", parallel_reduceFib,
NumbersCount); assert(result == sum);
sum = Measure("Parallel scan\t\t", parallel_scanFib,
NumbersCount); assert(result == sum);
sum = Measure("Parallel tasks\t\t", ParallelTaskFib,
NumbersCount); assert(result == sum);
}
#ifdef __GNUC__
if(Verbose)
printf("Fibonacci number #%d modulo 2^64 is %lld\n\n",
NumbersCount, result);
#else
if(Verbose)
printf("Fibonacci number #%d modulo 2^64 is %I64d\n\n",
NumbersCount, result);
#endif
}
if(!Verbose) printf("TEST PASSED\n");
return 0;
}
void Matrix2x2Multiply(const value a[2][2], const value b[2][2], value c[2][2])
{
for( int i = 0; i <= 1; i++)
for( int j = 0; j <= 1; j++)
c[i][j] = a[i][0]*b[0][j] + a[i][1]*b[1][j];
}
Matrix2x2 Matrix2x2::operator *(const Matrix2x2 &to) const
{
Matrix2x2 result;
Matrix2x2Multiply(v, to.v, result.v);
return result;
}
Here is the output when passing the number 13 as an argument:
Fibonacci numbers example. Generating 13 numbers..
Serial loop - in 0.003283 msec
Serial matrix - in 0.031708 msec
Serial vector - in 5.112591 msec
Serial queue - in 0.138425 msec
Threads number is 1
Shared serial (mutex) - in 0.119568 msec
Shared serial (spin_mutex) - in 0.053498 msec
Shared serial (queuing_mutex) - in 0.083251 msec
Shared serial (Conc.HashTable) - in 0.320851 msec
Parallel while+for/queue - in 0.209454 msec
Parallel pipe/queue - in 0.207429 msec
Parallel reduce - in 0.046794 msec
Parallel scan - in 0.050216 msec
Parallel tasks - in 0.044978 msec
Threads number is 2
Shared serial (mutex) - in 0.151975 msec
Shared serial (spin_mutex) - in 0.070540 msec
Shared serial (queuing_mutex) - in 0.081854 msec
Shared serial (Conc.HashTable) - in 0.272870 msec
Parallel while+for/queue - in 0.217556 msec
Parallel pipe/queue - in 0.250521 msec
Parallel reduce - in 0.060063 msec
Parallel scan - in 0.077454 msec
Parallel tasks - in 0.058876 msec
Threads number is 4
Shared serial (mutex) - in 0.042114 msec
Shared serial (spin_mutex) - in 0.047562 msec
Shared serial (queuing_mutex) - in 0.063905 msec
Shared serial (Conc.HashTable) - in 0.125086 msec
Parallel while+for/queue - in 0.116356 msec
Parallel pipe/queue - in 0.091003 msec
Parallel reduce - in 0.025632 msec
Parallel scan - in 0.034152 msec
Parallel tasks - in 0.025213 msec
Fibonacci number #13 modulo 2^64 is 233
Applied Concurrency and Parallel Computing
Sound waves propagation, heat wave probation, and even seismic wave propagation are problems that applied scientists face daily. These waves form a chain until they form an angle of refraction once they hit something denser than them. Of all of the aforementioned, seismic waves are the most powerful. Here is a simple seismic wave simulation (wave propagation) based on parallel_for
and blocked_range
. The main program steps through the simulation of a seismic wave in a core loop that sets the impulse from the source of the disturbance, does the two tough computations of stress updates and velocities, and finally cleans up the edges of the simulation. First, we'll look at the stress algorithm, in a serial version, and then we'll look at an equivalent parallel version.
static void SerialUpdateStress( ) {
drawing_area drawing(0, 0, UniverseWidth, UniverseHeight);
for( int i=1; i<universeheight-1; index=(int)(V[i][j]*(ColorMapSize/2)) j=1;
c=ColorMap[Material[i]]; +=(V[i][j+1]-V[i][j]); =ColorMapSize )
index = ColorMapSize-1;
drawing.put_pixel(c[index]);
}
}
}
Here is the parallel version:
struct UpdateStressBody {
void operator( )( const tbb::blocked_range<int>& range ) const {
drawing_area drawing(0,
range.begin( ),
UniverseWidth,
range.end()-range.begin( ));
int i_end = range.end( );
for( int y = 0, i=range.begin( ); i!=i_end; ++i,y++ ) {
color_t* c = ColorMap[Material[i]];
drawing.set_pos(1, y);
#pragma ivdep
for( int j=1; j<universewidth-1;
index=(int)(V[i][j]*(ColorMapSize/2)) +=
(V[i][j+1]-V[i][j]); =ColorMapSize )
index = ColorMapSize-1;
drawing.put_pixel(c[index]);
}
}
}
};
The code is broken into parts that can be divvied up and run on separate cores simultaneously. Here is the entire program:
#define VIDEO_WINMAIN_ARGS
#include "../../common/gui/video.h"
#include <cstdlib>
#include <cstdio>
#include <cstring>
#include <cctype>
#include <cassert>
#include <math.h>
#include "tbb/task_scheduler_init.h"
#include "tbb/blocked_range.h"
#include "tbb/parallel_for.h"
#include "tbb/tick_count.h"
using namespace std;
#ifdef _MSC_VER
#pragma warning(disable: 4068)
#endif
#define DEFAULT_NUMBER_OF_FRAMES 100
int number_of_frames = -1;
const size_t MAX_WIDTH = 1024;
const size_t MAX_HEIGHT = 512;
int UniverseHeight=MAX_HEIGHT;
int UniverseWidth=MAX_WIDTH;
typedef float value;
static value V[MAX_HEIGHT][MAX_WIDTH];
static value S[MAX_HEIGHT][MAX_WIDTH];
static value T[MAX_HEIGHT][MAX_WIDTH];
static value M[MAX_HEIGHT][MAX_WIDTH];
static value L[MAX_HEIGHT][MAX_WIDTH];
static value D[MAX_HEIGHT][MAX_WIDTH];
static tbb::affinity_partitioner Affinity;
enum MaterialType {
WATER=0,
SANDSTONE=1,
SHALE=2
};
static unsigned char Material[MAX_HEIGHT][MAX_WIDTH];
static const colorcomp_t MaterialColor[4][3] = { {96,0,0}, {0,48,48}, {32,32,23} };
static const int DamperSize = 32;
static const int ColorMapSize = 1024;
static color_t ColorMap[4][ColorMapSize];
static int PulseTime = 100;
static int PulseCounter;
static int PulseX = UniverseWidth/3;
static int PulseY = UniverseHeight/4;
static bool InitIsParallel = true;
const char *titles[2] = {"Seismic Simulation: Serial", "Seismic Simulation: Parallel"};
int threads_low = 0, threads_high = tbb::task_scheduler_init::automatic;
static void UpdatePulse() {
if( PulseCounter>0 ) {
value t = (PulseCounter-PulseTime/2)*0.05f;
V[PulseY][PulseX] += 64*sqrt(M[PulseY][PulseX])*exp(-t*t);
--PulseCounter;
}
}
static void SerialUpdateStress() {
drawing_area drawing(0, 0, UniverseWidth, UniverseHeight);
for( int i=1; i<universeheight-1; index=(int)(V[i][j]*(ColorMapSize/2))
j=1; +=M[i][j]*(V[i][j+1]-V[i][j]); =ColorMapSize ) index = ColorMapSize-1;
color_t* c = ColorMap[Material[i][j]];
drawing.put_pixel(c[index]);
}
}
}
struct UpdateStressBody {
void operator()( const tbb::blocked_range<int>& range ) const {
drawing_area drawing(0, range.begin(), UniverseWidth, range.end()-range.begin());
int i_end = range.end();
for( int y = 0, i=range.begin(); i!=i_end; ++i,y++ ) {
drawing.set_pos(1, y);
#pragma ivdep
for( int j=1; j<universewidth-1; index="(int)(V[i][j]*(ColorMapSize/2))
+=M[i][j]*(V[i][j+1]-V[i][j]); =ColorMapSize ) index = ColorMapSize-1;
color_t* c = ColorMap[Material[i][j]];
drawing.put_pixel(c[index]);
}
}
}
};
static void ParallelUpdateStress() {
tbb::parallel_for(
tbb::blocked_range<int>( 1,
UniverseHeight-1 ), // Index space for loop
UpdateStressBody(), // Body of loop
Affinity ); // Affinity hint
}
static void SerialUpdateVelocity() {
for( int i=1; i<universeheight-1;
j="1;" v[i][j]="D[i][j]*(V[i][j]" />& range ) const {
int i_end = range.end();
for( int i=range.begin(); i!=i_end; ++i )
#pragma ivdep
for( int j=1; j<universewidth-1;
v[i][j]="D[i][j]*(V[i][j]" />( 1,
UniverseHeight-1 ), // Index space for loop
UpdateVelocityBody(), // Body of loop
Affinity ); // Affinity hint
}
void SerialUpdateUniverse() {
UpdatePulse();
SerialUpdateStress();
SerialUpdateVelocity();
}
void ParallelUpdateUniverse() {
UpdatePulse();
ParallelUpdateStress();
ParallelUpdateVelocity();
}
class seismic_video : public video
{
void on_mouse(int x, int y, int key) {
if(key == 1 && PulseCounter == 0) {
PulseCounter = PulseTime;
PulseX = x; PulseY = y;
}
}
void on_key(int key) {
key &= 0xff;
if(char(key) == ' ') InitIsParallel = !InitIsParallel;
else if(char(key) == 'p') InitIsParallel = true;
else if(char(key) == 's') InitIsParallel = false;
else if(char(key) == 'e') updating = true;
else if(char(key) == 'd') updating = false;
else if(key == 27) running = false;
title = InitIsParallel?titles[1]:titles[0];
}
void on_process() {
tbb::task_scheduler_init Init(threads_high);
do {
if( InitIsParallel )
ParallelUpdateUniverse();
else
SerialUpdateUniverse();
if( number_of_frames > 0 ) --number_of_frames;
} while(next_frame() && number_of_frames);
}
} video;
void InitializeUniverse() {
PulseCounter = PulseTime;
// Initialize V, S, and T to slightly non-zero values,
// in order to avoid denormal waves.
for( int i=0; i<universeheight; j="0;" r="t" i="1;"
t[i][j]="S[i][j]" t="(value)i/UniverseHeight;" k="0;"
scale="2.0f/ColorMapSize;" material[i][j]="m;"
l[i][j]="0.125;" m[i][j]="0.125;" m="WATER;"
fabs(t-0.7+0.2*exp(-8*x*x)+0.025*x)<="0.1" d[i][j]="1.0;"
x="float(j-UniverseWidth/2)/(UniverseWidth/2);"
i<universeheight-1;="V[i][j]" />0 ? t : 0;
value b = t<0 ? -t : 0;
value g = 0.5f*fabs(t);
memcpy(c, MaterialColor[k], sizeof(c));
c[2] = colorcomp_t(r*(255-c[2])+c[2]);
c[1] = colorcomp_t(g*(255-c[1])+c[1]);
c[0] = colorcomp_t(b*(255-c[0])+c[0]);
ColorMap[k][i] = video.get_color(c[2], c[1], c[0]);
}
}
// Set damping coefficients around border to reduce reflections from boundaries.
value d = 1.0;
for( int k=DamperSize-1; k>0; --k ) {
d *= 1-1.0f/(DamperSize*DamperSize);
for( int j=1; j < universewidth-1; *="d;" i="1;" /> 1 && isdigit(argv[1][0])) {
char* end; threads_high = threads_low = (int)strtol(argv[1],&end,0);
switch( *end ) {
case ':': threads_high = (int)strtol(end+1,0,0); break;
case '\0': break;
default: printf("unexpected character = %c\n",*end);
}
}
if (argc > 2 && isdigit(argv[2][0])){
number_of_frames = (int)strtol(argv[2],0,0);
}
// video layer init
video.title = InitIsParallel?titles[1]:titles[0];
#ifdef _WINDOWS
#define MAX_LOADSTRING 100
TCHAR szWindowClass[MAX_LOADSTRING]; // the main window class name
LoadStringA(video::win_hInstance, IDC_SEISMICSIMULATION,
szWindowClass, MAX_LOADSTRING);
LRESULT CALLBACK WndProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam);
WNDCLASSEX wcex; memset(&wcex, 0, sizeof(wcex));
wcex.lpfnWndProc = (WNDPROC)WndProc;
wcex.hIcon = LoadIcon(video::win_hInstance,
MAKEINTRESOURCE(IDI_SEISMICSIMULATION));
wcex.hCursor = LoadCursor(NULL, IDC_ARROW);
wcex.hbrBackground = (HBRUSH)(COLOR_WINDOW+1);
wcex.lpszMenuName = LPCTSTR(IDC_SEISMICSIMULATION);
wcex.lpszClassName = szWindowClass;
wcex.hIconSm = LoadIcon(video::win_hInstance,
MAKEINTRESOURCE(IDI_SMALL));
video.win_set_class(wcex); // ascii convention here
video.win_load_accelerators(IDC_SEISMICSIMULATION);
#endif
if(video.init_window(UniverseWidth, UniverseHeight)) {
video.calc_fps = true;
video.threaded = threads_low > 0;
// video is ok, init universe
InitializeUniverse();
// main loop
video.main_loop();
}
else if(video.init_console()) {
// do console mode
if(number_of_frames <= 0) number_of_frames = DEFAULT_NUMBER_OF_FRAMES;
if(threads_high == tbb::task_scheduler_init::automatic) threads_high = 4;
if(threads_high < threads_low) threads_high = threads_low;
for( int p = threads_low; p <= threads_high; ++p ) {
InitializeUniverse();
tbb::task_scheduler_init init(tbb::task_scheduler_init::deferred);
if( p > 0 )
init.initialize( p );
tbb::tick_count t0 = tbb::tick_count::now();
if( p > 0 )
for( int i=0; i<number_of_frames; i="0;" t1="tbb::tick_count::now();" > 0 )
printf(" with %d way parallelism\n",p);
else
printf(" with serial version\n");
}
}
video.terminate();
return 0;
}
#ifdef _WINDOWS
//
// FUNCTION: WndProc(HWND, unsigned, WORD, LONG)
//
// PURPOSE: Processes messages for the main window.
//
// WM_COMMAND - process the application menu
// WM_PAINT - Paint the main window
// WM_DESTROY - post a quit message and return
//
//
LRESULT CALLBACK About(HWND hDlg, UINT message, WPARAM wParam, LPARAM lParam)
{
switch (message)
{
case WM_INITDIALOG: return TRUE;
case WM_COMMAND:
if (LOWORD(wParam) == IDOK || LOWORD(wParam) == IDCANCEL) {
EndDialog(hDlg, LOWORD(wParam));
return TRUE;
}
break;
}
return FALSE;
}
LRESULT CALLBACK WndProc(HWND hWnd, UINT message,
WPARAM wParam, LPARAM lParam)
{
int wmId, wmEvent;
switch (message) {
case WM_COMMAND:
wmId = LOWORD(wParam);
wmEvent = HIWORD(wParam);
// Parse the menu selections:
switch (wmId)
{
case IDM_ABOUT:
DialogBox(video::win_hInstance,
MAKEINTRESOURCE(IDD_ABOUTBOX), hWnd, (DLGPROC)About);
break;
case IDM_EXIT:
PostQuitMessage(0);
break;
case ID_FILE_PARALLEL:
if( !InitIsParallel ) {
InitIsParallel = true;
video.title = titles[1];
}
break;
case ID_FILE_SERIAL:
if( InitIsParallel ) {
InitIsParallel = false;
video.title = titles[0];
}
break;
case ID_FILE_ENABLEGUI:
video.updating = true;
break;
case ID_FILE_DISABLEGUI:
video.updating = false;
break;
default:
return DefWindowProc(hWnd, message, wParam, lParam);
}
break;
default:
return DefWindowProc(hWnd, message, wParam, lParam);
}
return 0;
}
#endif
After building the code in Visual Studio 2008, here is the result shown in two images: one starts as a small circle, and the other shows the chain reaction:
Now that spherical ball will start to send a wave and depict the wave propagation:
For the Fibonacci series examples, you can just copy and paste the code into a self-made folder and build it using this Makefile:
# Common Makefile that builds and runs example.
# Just specify your program basename
PROG=Fibonacci
ARGS=
# The C++ compiler options
CXX = cl.exe
MYCXXFLAGS = /TP /EHsc /W3 /nologo $(TBB_SECURITY_SWITCH)
/D _CONSOLE /D _MBCS /D WIN32 $(CXXFLAGS)
MYLDFLAGS =/INCREMENTAL:NO /NOLOGO /DEBUG /FIXED:NO $(LDFLAGS)
all: release test
release:
$(CXX) *.cpp /MD /O2 /D NDEBUG $(MYCXXFLAGS)
/link tbb.lib $(LIBS) $(MYLDFLAGS) /OUT:$(PROG).exe
debug:
$(CXX) *.cpp /MDd /Od /Zi /D TBB_USE_DEBUG /D _DEBUG $(MYCXXFLAGS)
/link tbb_debug.lib $(LIBS) $(MYLDFLAGS) /OUT:$(PROG).exe
clean:
@cmd.exe /C del $(PROG).exe *.obj *.?db *.manifest
test:
$(PROG) $(ARGS)
Be sure and set your environmental paths by typing: set PATH=%PATH%;.;C:\Program Files\Microsoft Visual Studio 9.0\vc\bin
. Then, type 'vcvars32.bat' to set the environment.
For the Seismic example, download the zip file, extract it into your Projects directory, and click SeismicSimulation.vcproj. This particular example was referenced from Intel documentation.
Reference
- The Intel Threading Building Blocks
History
- 16th February, 2010: Initial post