Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / C

A Gentle Introduction to the Message Passing Interface (MPI)

5.00/5 (6 votes)
18 Apr 2015CPOL12 min read 19.2K   259  
This article provides an overview of MPI development for C programmers.

For some applications, particularly those involving large-scale mathematics, a single computer won't provide adequate performance. In this case, you need a framework that makes it possible to run applications across multiple systems. You could look at cloud-based solutions like Amazon's Elastic Compute Cloud (EC2) or the Google Compute Engine, but if you want a free solution that you can install and configure yourself, I recommend the Message Passing Interface, or MPI.

MPI is a standard that defines C routines and tools for running applications across connected computers. The first version of the standard was released in 1994, and today, there are two main implementations: MPICH and OpenMPI. MPICH is older and more widely used, but I've read accounts claiming that OpenMPI is faster. Many corporations and universities have released derivatives of MPICH, including the following:

  • MVAPICH - A free implementation of MPICH released under the BSD license
  • Microsoft MPI - An implementation of MPICH designed for the Windows operating system
  • Intel MPI - An Intel-specific implementation of MPICH targeting Intel processors 

This article doesn't discuss the differences between MPI implementations. Instead, my goal is to explain the basics of MPICH programming and show how applications can be compiled and executed. In particular, this article focuses on the different methods of transferring data. But first, I'd like to present an analogy that compares an MPI application to a soccer game.

1. Analogy: MPI and Soccer

Despite a complete lack of coordination, I played soccer for many years. Before a match, the coach would assemble the team and discuss his plan. Most of the plan was non-specific: each player had to play a portion of the field and cover opposing players. Some instructions were position-specific: the forwards should take shots on goal and the goalie should prevent opposing players from scoring.

If the coach wrote his plan in pseudocode, it might look like the following:

C++
if(position == COACH) {
  present_game_plan();
  keep_score();
}
else if(position == FORWARD) {
  take_shots_on_goal();
}
else if(position == GOALIE) {
  block_shots_on_goal();
}
else {
  play_section_of_field(position);
  cover_opposing_players();
}

Everyone on the team receives the same game plan, but they perform different roles according to their position. For regular players, the portion of the field they play depends on their position.

The purpose of MPI is to convert a set of networked computers into a united, functional unit similar to a soccer team. But MPI uses a different set of terms:

  • The set of instructions is called an executable instead of a gameplan.
  • The set of networked computers is called a communicator instead of a team.
  • Each computer is called a processor instead of a player (or a coach).
  • Each role performed by a processor is called a process instead of a position.
  • Each process has a rank instead of a jersey number.
  • Communication between processes is called message passing.

This analogy is fine at a high level, but it has (at least) four flaws:

  1. In a soccer game, each player can only have one position. In an MPI application, a processor can have multiple processes assigned to it.
  2. Soccer games don't permit sub-teams, but MPI applications can create communicators within the overall communicator.
  3. In a soccer game, coaches never play. In an MPI application, every process runs the executable.
  4. The number on a soccer jersey is essentially random. When MPI assigns ranks to processes, it starts at 0 and increments each successive rank by 1.

If you're still unclear on MPI's operation or terminology, don't be concerned. There are many helpful resources available, including a tutorial from Lawrence Livermore National Laboratory and a presentation from the Argonne National Laboratory.

2. A Simple Application

The MPI standard defines many functions and data structures, but four functions are particularly important:

  1. MPI_Init(int *argc, char ***argv) - Receives command line arguments and initializes the execution environment (required)
  2. MPI_Comm_size(MPI_Comm comm, int *size) - Provides the number of processes in the communicator at the memory location identified by size. If the first argument is set to MPI_COMM_WORLD, it provides the total number of processes.
  3. MPI_Comm_rank(MPI_Comm comm, int *rank) - Provides the rank of the process executing the function at the memory location identified by rank. If the first argument is set to MPI_COMM_WORLD, it provides the rank of the process among all the processes.
  4. MPI_Finalize() - Halts MPI execution (required)

These functions, like all MPI functions, return an integer that represents the completion status. A function returns MPI_SUCCESS (0) if it completed successfully. If it returns a value greater than 0, it failed.

The code in hello_mpi.c shows how these four functions can be used in practice:

C++
#include <stdio.h>
#include <mpi.h>

int main (int argc, char *argv[]) {

  int rank, size;

  // Initialize the MPI environment
  MPI_Init(&argc, &argv);      

  // Determine the size of the communicator
  MPI_Comm_size(MPI_COMM_WORLD, &size);   

  // Determine the rank of the current process
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);

  // Print a message from the process
  printf("This is process %d of %d\n", rank, size);

  MPI_Finalize();
}

To compile a normal C application, you need to identify the locations of headers and libraries. But implementations of MPI provide an executable called mpicc (MPI C Compiler). This compiler knows where to find the MPI dependencies, so hello_mpi.c can be compiled with the following command:

mpicc -o hello_mpi hello_mpi.c

In addition to mpicc, the MPI standard defines a tool called mpiexec to launch MPI executables. mpiexec has two important flags:

  • n - the number of processes to generate
  • host - comma-separated list of hosts on which to run the executable

For example, the following command executes hello_mpi with two processes. This execution will be performed on the systems named castor and pollux.

mpiexec -n 2 -host castor,pollux hello_mpi

For this example, the following output will be printed:

This is process 0 of 2
This is process 1 of 2

When the executable runs, it's likely that one process will run on castor and the other will run on pollux. But different MPI implementations have different ways of assigning processes to hosts. As an example, OpenMPI uses a rank file to assign processes to processors.

Many implementations provide another tool for launching executables called mpirun. mpirun isn't defined by the MPI standard, so its usage depends on the implementation. For example, OpenMPI's mpirun has a loadbalance flag that uniformly distributes processes among the processors. Intel's mpirun has an iface flag that defines which network interface to use.

Note: Before an executable can be run across multiple systems, it must be copied to each system at the same location. If the executable relies on data/configuration files, the files must also be copied to the same locations on each system.

3. Point-to-Point Communication

In many MPI applications, one process distributes data to other processes and then collects the processed data. For example, the process with rank 0 might read an array of floats from a file and send the array to other processes (ranks > 0) for computation. When processing is finished, Process 0 collects the output floats and saves them to a file.

In MPI, the transfer of data from one process to another is called point-to-point communication. This works in two steps:

  1. The sending process calls MPI_Send to express its intent to transfer data to a receiving process.
  2. The receiving process calls MPI_Recv to express its intent to receive data from a sending process.

The data transfer isn't finished until both functions complete their operations. The parameters of the two functions have a lot in common, as shown by their signatures:

C++
int MPI_Send(const void* buff, int count, MPI_Datatype type, int dest, int tag, MPI_COMM comm)

int MPI_Recv(void* buff, int count, MPI_Datatype type, int source, int tag,
MPI_COMM comm, MPI_Status *status)

In each case, buff points to the data to be sent or the memory location where the received data should be stored. count and type specify the nature of the transferred data. That is, type identifies the primitive type of the transmitted data (MPI_FLOATMPI_DOUBLEMPI_INTMPI_UNSIGNED_SHORTMPI_BYTE, and so on) and count identifies how many primitives should be transferred. For example, the following function sends 100 floats stored in float_array.

C++
MPI_Send(float_array, 100, MPI_FLOAT, 3, 2, MPI_COMM_WORLD);

In this case, dest is set to 3 because the data is intended to be sent to the process whose rank is 3. The tag value serves as a unique identifier for the data transfer. If Process 3 wants to receive this data, it will set the tag to 3 in MPI_Recv, which is given as follows:

C++
MPI_Recv(myfloats, 100, MPI_FLOAT, 3, 2, MPI_COMM_WORLD, &status);

The last argument points to an MPI_Status structure whose content is initialized by the function. This structure contains the sender's process number, the message's ID (tag), and the error code for the data transfer. If it's set to MPI_STATUS_IGNORE, the function won't provide the MPI_Status structure.

The code in p2p_example.c demonstrates how MPI_Send and MPI_Recv can be used. This application starts with Process 0 sending a float to every other process. The other processes add their rank to the float and send it back to Process 0, which collects the results into an array and prints its elements.

C++
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define INPUT_TAG 0
#define OUTPUT_TAG 1

int main(int argc, char **argv) {

  int rank, size, i;
  float* data;

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);

  // Allocate, initialize, and send data
  if(rank == 0) {
    data = (float*)malloc((size-1) * sizeof(float));
    for(i=1; i<size; i++) {
      data[i-1] = 0.1 * i;
      MPI_Send(&data[i-1], 1, MPI_FLOAT, i, INPUT_TAG, MPI_COMM_WORLD);
    }
  }

  // Receive, process, and resend data
  if(rank > 0) {
    data = (float*)malloc(sizeof(float));
    MPI_Recv(data, 1, MPI_FLOAT, 0, INPUT_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    data[0] += rank;
    MPI_Send(data, 1, MPI_FLOAT, 0, OUTPUT_TAG, MPI_COMM_WORLD);
  }

  // Receive processed data and print results
  if(rank == 0) {
    for(i=1; i<size; i++) {
      MPI_Recv(&data[i-1], 1, MPI_FLOAT, i, OUTPUT_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    }
    printf("Output: ");
    for(i=1; i<size; i++) {
      printf("%.1f ", data[i-1]);
    }
    printf("\n");
  }

  free(data);
  MPI_Finalize();
  return 0;
}

This application uses tags to distinguish transfers of data. When Process 0 distributes the input data, the transfer is identified with INPUT_TAG. When the other processes send the processed data, the transfer is identified with OUTPUT_TAG.

A drawback of using MPI_Send and MPI_Recv is that they force processes to wait for other processes. To make up for this, MPI provides alternative functions for point-to-point communication. For example, MPI_Isend and MPI_Irecv perform send/receive operations without halting either process. These functions are beyond the scope of this article, but you can read more about them here.

4. Collective Communication

In the preceding example, MPI_Send and MPI_Recv were called inside loops to transfer data between multiple processes. Communication involving multiple processes is called collective communication and MPI provides functions specifically for this purpose. Table 1 lists five of them and provides a description of each.

Function Signature Description

MPI_Bcast(void *buff, int count,
  MPI_Datatype type, int root,
  MPI_Comm comm)

Transfers a chunk of data from one process to
every process in a communicator

MPI_Scatter(const void* src_buff,
  int src_count, MPI_Datatype src_type,
  void* dst_buff, int dst_count,
  MPI_Datatype dst_type, int root,
  MPI_Comm comm)

Distributes successive chunks of data from one
process to every process in a communicator 

MPI_Gather(const void* src_buff,
  int src_count, MPI_Datatype src_type,
  void* dst_buff, int dst_count,
  MPI_Datatype dst_type, int root,
  MPI_Comm comm)

Collects chunks of data from multiple
processes into an array on one process

MPI_Allgather(const void src_buff,
  int src_count, MPI_Datatype src_type,
  void* dst_buff, int dst_count,
  MPI_Datatype dst_type, MPI_Comm comm)

Collects chunks of data from multiple 
processes into an identical array on
every process

MPI_Alltoall(const void* src_buff,
  int src_count, MPI_Datatype src_type,
  void* dst_buff, int dst_count,
  MPI_Datatype dst_type, MPI_Comm comm)

Transfers data between multiple 
processes in a transpose-like manner

These functions not only simplify the source code, but usually perform better than loops of point-to-point transfers. This section discusses each of these five in detail.

4.1 Broadcast

Consider the following code:

C++
if(rank == root) {
  for(i=0; i<size; i++) {
    MPI_Send(&data, 1, MPI_FLOAT, i, tag, MPI_COMM_WORLD);
  }
}
MPI_Recv(&data, 1, MPI_FLOAT, root, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

The for loop sends a float from one process to every process in the communicator. The call to MPI_Recv reads the float from the root process to each process (including root).

When one process transfers data to every process in the communicator, the operation is called a broadcast. This is performed by MPI_Bcast, the simplest of MPI's collective communication functions. For example, the following function call performs the same broadcast as the example code presented earlier.

C++
MPI_Bcast(&data, 1, MPI_FLOAT, root, MPI_COMM_WORLD);

When you use MPI_Bcast, no calls to MPI_Recv are necessary. MPI_Bcast doesn't complete until all of the data has transferred from the root process to every process in the communicator. This explains why the function doesn't use a tag value to identify the transfer.

4.2 Scatter and Gather

A scatter operation is similar to a broadcast, but there are two important differences:

  1. A broadcast transfers the same data from the root to each process. A scatter divides the root's data into chunks and transfers different chunks to different processes according to their rank. The first chunk goes to Process 0, the second to Process 1, and so on.
  2. In a broadcast, the received data always has the same type, number, and memory reference as the sent data. For a scatter, these characteristics can be changed.

For example, suppose you want to transfer the first two ints of root's array to Process 0, the next two ints to Process 1, and so on. This can be accomplished with the following call to MPI_Scatter:

C++
MPI_Scatter(input_array, 2, MPI_INT, output_array, 2, MPI_INT, root, MPI_COMM_WORLD);

The left side of Figure 1 clarifies how MPI_Scatter works by illustrating the preceding data transfer:

Image 1

Figure 1: The MPI_Scatter and MPI_Gather Functions

As shown on the right side of the figure, a gather operation performs the inverse of a scatter operation. That is, instead of distributing chunks of data from one process to many, it combines chunks from multiple processes into an array on one process. As an example, the following function call collects three floats from each process into output_array on the root process:

C++
MPI_Gather(input_array, 3, MPI_FLOAT, output_array, 3, MPI_FLOAT, root, MPI_COMM_WORLD);
As with MPI_Bcast, the receiving process or processes don't need to call MPI_Recv to complete the transfer. When MPI_Scatter or MPI_Gather complete, the data transfer has been accomplished.

4.3 AllGather

As discussed earlier, MPI_Gather collects chunks of data from multiple processes and transfers them to a single process as an array. The MPI_Allgather function performs a similar operation, but now the collected array is delivered to each process. In effect, MPI_Allgather performs multiple calls to MPI_Gather, using each process as root.

For example, the following code collects two ints from each process and combines them into an array. Then it delivers the array to every process in the communicator.

C++
MPI_Allgather(input_array, 2, MPI_INT, output_array, 2, MPI_INT, MPI_COMM_WORLD);

It's important to note that each process receives an identical array containing the collected elements.

4.4 Alltoall

MPI_Alltoall accepts the same parameters as MPI_Allgather and performs a similar operation—it collects data from every process and transfers data to every process. But MPI_Alltoall doesn't transfer an identical array to each process. Instead, Process k receives the kth element from each process. That is, Process 0 receives an array containing each process's first element, Process 1 receives an array containing each process's second element, and so on.

If you think of Process k's data as the kth row of a matrix, then the operation performed by MPI_Alltoall is similar to a matrix transpose, also called a corner turn. Figure 2 shows what this looks like:

Image 2

The a2a_example.c source file shows how MPI_Alltoall can be used in practice. Each process has an input array whose length equals the number of processes. The MPI_Alltoall function rearranges these values so that Process k receives the kth element from each process.

C++
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

int main(int argc, char **argv) {

  int rank, size, i;

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);

  // Allocate and initialize data
  int* input_array = (int*)malloc(size * sizeof(int));
  int* output_array  = (int*)malloc(size * sizeof(int));
  for(i=0; i<size; i++) {
    input_array[i] = rank * size + i;
  }

  // Rearrange the data
  MPI_Alltoall(input_array, 1, MPI_INT, output_array, 1, MPI_INT, MPI_COMM_WORLD);

  printf("Process %d: ", rank);
  for(i=0; i<size; i++) {
    printf("%d ", output_array[i]);
  }
  printf("\n");

  // Deallocate and finalize MPI
  free(input_array);
  free(output_array);
  MPI_Finalize();
  return 0;
}

In this code, the number of processes always equals the number of elements in each array. But this doesn't have to be the case. If the number of processes is greater than the array length, the extra processes' data will be set to zero.

5. Concluding Remarks

This article has discussed the basics of MPI coding and interprocess communication. But MPI provides many more capabilities than those discussed here. If you look through the list of functions, you'll see that MPI makes it possible to work with files, threads, windows, barriers and much more.

Using the Code

The code archive contains three files: hello_mpi.c, p2p_example.c, and a2a_example.c. After you've installed an implementation of MPI, you can compile them with mpicc, which works in much the same way as the popular GNU C Compiler (gcc). After an MPI executable is compiled and linked, you can launch it with mpiexec or mpirun.

History

  • 4/18/2015: Submitted

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)