This lesson is still being designed and assembled (Pre-Alpha version)

Communicating Data with MPI

Overview

Teaching: 30 min
Exercises: 30 min
Questions
  • How do we perform point-to-point communications in MPI?

  • How do we perform collective communications in MPI?

Objectives
  • Learn how to communicate between processors using MPI (specifically mpi4py).

Messages

What is a message? A message is any information passed from one process to another. Subsequently, a message can be any object type. We need a structure here to handle the exchange of messages between the processes. The structure we need here is our MPI.COMM_WORLD object comm. comm can give us information such the rank of the process or the size of the process pool, but it can also help us send and receive messages. The messages that are passed through comm are then transfered from node to node using the network or a bus between CPU cores (from within the same computer node).

Types of Point to Point Communication

Unidirectional Messages

How do you send or receive a message with MPI4PY? Use comm.send to send a message from one process (source) to another (dest). It is paired with a comm.recv on the other process.

UnidirectionalMessaging

Figure 1: Unidirectional messaging.

message_to_send is sent from rank 0 to rank 1. So, rank 0 will send the message, with the destination dest=1. Rank 1 will recv the message from source=0.

This, and all proceeding coding files will be located in handson/mpi4py/. Save this unidirectional messaging file as send_recv.py.

#!/usr/bin/env python3
from mpi4py import MPI
import socket
import sys

comm = MPI.COMM_WORLD #initialize the communicator
size = comm.Get_size() #number of processes
rank = comm.Get_rank() #unique ID for the calling process
host_name = socket.gethostname()
message_to_send = "SENT_FROM_RANK_" + str(rank)
if rank == 0:
    comm.send(message_to_send, dest=1)
else:
    received_message = comm.recv(source=0)
    print("I am rank ", rank, " running on host ", host_name, \
         " I received from the other rank running on host ", received_message)
I am rank  1  running on host  d4-w6420b-08  I received from the other rank running on host  SENT_FROM_RANK_0

Bidirectional Sending and Receiving Messages

BidirectionalMessaging

Figure 2: Bidirectional messaging.

There is also bidirectional messsages. This requires proper ordering.

The send and recv commands are the blocking version of message passing. This means that the program waits/blocks the process that calls the blocking communication until the data buffers involved can be safely reused by the program. The computation time can be decreased as a result of using blocking communication since it prevents the process from doing anything else until the message is completly sent or received. This also provides a built in synchronization of messages, but can lead to deadlock.

Deadlock happens when no process can continue. This can happen when one process is waiting on another process (or itself) to complete an action, but that process is in turn waiting for another process to complete. Thus, no change in state is possible. Deadlock can happen when there is a very large message, but this can be mitigated by using asynchronous send and receiving (in other words, using non-blocking versions of message passing).

MPI (and mpi4py) allow non-blocking and blocking communication. Non-blocking versions of message passing is done by comm.Isend and comm.Irecv. They allow overlap of computations and communication. These are typically used when there is a dedicated communication controller. However, we will focus on the blocking versions of communication.

In the example below, rank 0 will (send, receive) and rank 1 will (receive, send). This example sends one mesage from rank 0 to rank 1, and sends another message from rank 1 to rank 0. (Here, rank 0 is just like any other rank in the MPI process world, not representing anything special such as the “master” rank.) Notice the reversal of send and receive on the two respective ranks, because of the direction of the two messages. Make sure to only run this with exactly two processes.

This is send_recv2.py.

#!/usr/bin/env python3

# This program demonstrates bi-directional communication using exactly two processes

from mpi4py import MPI
import socket
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# Check that we have exactly two processes otherwise terminate
if size != 2:
    sys.exit("This program requires exactly two processes. Please re-run with two process.")

host_name = socket.gethostname()
message_to_send = host_name
received_message = ""

if rank == 0:
    comm.send(message_to_send, dest = 1)
    received_message = comm.recv(source = 1)
else:
    received_message = comm.recv(source = 0)
    comm.send(message_to_send, dest = 0)


print("I am rank ", rank, " running on host ", host_name, \
         " I received from the other rank running on host ", received_message)
I am rank  1  running on host  d4-w6420b-08  I received from the other rank running on host  d4-w6420b-07
I am rank  0  running on host  d4-w6420b-07  I received from the other rank running on host  d4-w6420b-08

Many Sending and Receiving Messages (Tags)

Many messages can be sent using comm.send and comm.recv, but the user needs to keep track of the correct order of messages (as well as the destination and source processes). Alternatively, mpi4py allows the user the ability to utilize tags to distinguish messages and route them correctly. This file is send_multi_tagged.py.

#!/usr/bin/env python3

# Send multiple messages using tags.
# This program runs with 2 processes.

from mpi4py import MPI
import socket
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# Check that we have exactly two processes otherwise terminate
if size != 2:
    sys.exit("This program requires exactly two processes. Please re-run with two processes.")

host_name = socket.gethostname()
msg_send1 = "hello"
msg_send2 = 3728
msg_send3 = [1, 2, 3, 5, 8, 13, 21]

if rank == 0:
    comm.send(msg_send1, dest=1, tag=1)
    comm.send(msg_send2, dest=1, tag=2)
    comm.send(msg_send3, dest=1, tag=3)

    print("Rank", rank, ": sent 3 messages:")
    print("msg_send1 =", msg_send1)
    print("msg_send2 =", msg_send2)
    print("msg_send3 =", msg_send3)

else:
    # This works out ok for small messages, but for large messages
    # this may create deadlock:
    msg_recv3 = comm.recv(source=0, tag=3)
    msg_recv1 = comm.recv(source=0, tag=1)
    msg_recv2 = comm.recv(source=0, tag=2)

    print("Rank", rank, ": received 3 messages:")
    print("msg_send1 =", msg_recv1)
    print("msg_recv2 =", msg_recv2)
    print("msg_recv3 =", msg_recv3)
Rank 0 : sent 3 messages:
msg_send1 = hello
msg_send2 = 3728
msg_send3 = [1, 2, 3, 5, 8, 13, 21]
Rank 1 : received 3 messages:
msg_send1 = hello
msg_recv2 = 3728
msg_recv3 = [1, 2, 3, 5, 8, 13, 21]

Multiple Messages Exercise

Using tags is the best practice for sending multiple messages. However, this can also be accomplished without them, but more care must be made. Use comm.send and comm.recv to send three messages, making sure to keep track of the order of the messages. Program two processes, one that will send three messages, and one that will receive the three messages: “hello,” 3728, and [1, 2, 3, 5, 8, 13, 21]. The output should look like the following. Name this file send_multi.py.

Rank 0 : sent 3 messages:
msg_send1 = hello
msg_send2 = 3728
msg_send3 = [1, 2, 3, 5, 8, 13, 21]
Rank 1 : received 3 messages:
msg_send1 = hello
msg_recv2 = 3728
msg_recv3 = [1, 2, 3, 5, 8, 13, 21]

Solution

#!/usr/bin/env python3

# This program sends multiple messages.
# This program runs with 2 processes.

from mpi4py import MPI
import socket
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# Check that we have exactly two processes otherwise terminate
if size != 2:
    sys.exit("This program requires exactly two processes. 
Please re-run with two processes.")

host_name = socket.gethostname()
msg_send1 = "hello"
msg_send2 = 3728
msg_send3 = [1, 2, 3, 5, 8, 13, 21]

if rank == 0:
    comm.send(msg_send1, dest=1)
    comm.send(msg_send2, dest=1)
    comm.send(msg_send3, dest=1)

    print("Rank", rank, ": sent 3 messages:")
    print("msg_send1 =", msg_send1)
    print("msg_send2 =", msg_send2)
    print("msg_send3 =", msg_send3)

else:
    msg_recv1 = comm.recv(source=0)
    msg_recv2 = comm.recv(source=0)
    msg_recv3 = comm.recv(source=0)

    print("Rank", rank, ": received 3 messages:")
    print("msg_send1 =", msg_recv1)
    print("msg_recv2 =", msg_recv2)
    print("msg_recv3 =", msg_recv3)

Asynchronous Multiple Messages

Asynchronous messages (non-blocking) allows the receiving process to perform (other) tasks while waiting on the message. Whereas, for synchronous messages, the receieving processes must wait on the message to be received before performing other tasks. However, as seen in the first print statement, asynchronous messaging requires more user oversight. Process 1 (the sending process) changes result, so the user needs to pay attention to whether a variable’s value is changed before, during, or after the message is received.

Create a program with two processes that send three messages (result, result1, result3) asynchronously. This file is called nonblocking.py.

#!/usr/bin/env python3

# Send multiple messages asynchronously between
# the sending (rank 1) and receiving (rank 0) processes.

import numpy
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# initial messages
result = numpy.array([0.])
result1 = numpy.array([2.7, 3.9])
result2 = numpy.array([0.0, 1.7, 8.0])

if rank == 1:
        result = numpy.array([6.7])
        print("Process", rank, " now set the variable to:", result)
        req = comm.Isend(result, dest=0)
        req.Wait()
        
        print("Process", rank, " now set the variable to:", result1)
        req = comm.Isend(result1, dest=0)
        req.Wait()
        
        print("Process", rank, " now set the variable to:", result2)
        req = comm.Isend(result2, dest=0)
        req.Wait()
        
if rank == 0:
        print("Process", rank, "before receiving has the number", result)
        req = comm.Irecv(result, source=1)
        req.Wait()
        print("Process", rank, "received", result)
        
        req = comm.Irecv(result1, source=1)
        req.Wait()
        print("Process", rank, "received", result1)
        
        req = comm.Irecv(result2, source=1)
        req.Wait()
        print("Process", rank, "received", result2)
Process 1  now set the variable to: [6.7]
Process 1  now set the variable to: [2.7 3.9]
Process 1  now set the variable to: [0.  1.7 8. ]
Process 0 before receiving has the number [0.]
Process 0 received [6.7]
Process 0 received [2.7 3.9]
Process 0 received [0.  1.7 8. ]

Collective Communication

Collective communication routines allow the user to efficiently implement data transfer from all to one, and from one to all processes at once. Instead of individually sending to all other processes or receiving from other process, collective communication routines can be used to do this more efficiently with minimal code. Collective communications include:

Broadcast

Broadcasting is sending a message from a source process (or root) to all other processes. This is often used to replicate data across all processes. This is equivalent to the following (using simple send and receive):

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

root = 0
message = ""

if rank == root:
    message = "This message was broadcast by rank "+ str(root)
    for i in range(1, size):
        comm.send(message, dest=i)
else:
    message = comm.recv(source=root)

print(message)

The following code is the proper way to broadcast. This file is called bcast.py. Make sure that the SLURM script uses 4 processes.

#!/usr/bin/env python3

# Broadcast message from rank 0 to the other 3 processes.

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
    message = "SENT_FROM_RANK_0"
else:
    message = "Empty"

print("Content of message in rank ", rank, " before broadcast: ", message,
      flush=True)

message = comm.bcast(message, root=0)

print("Content of message in rank ", rank, " after broadcast:  ", message,
      flush=True)

Content of message in rank  2  before broadcast:  Empty
Content of message in rank  3  before broadcast:  Empty
Content of message in rank  0  before broadcast:  SENT_FROM_RANK_0
Content of message in rank  1  before broadcast:  Empty
Content of message in rank  2  after broadcast:   SENT_FROM_RANK_0
Content of message in rank  1  after broadcast:   SENT_FROM_RANK_0
Content of message in rank  0  after broadcast:   SENT_FROM_RANK_0
Content of message in rank  3  after broadcast:   SENT_FROM_RANK_0

To broadcast a message, you need to provide bcast with at least two parameters: the message to transfer and the root, representing the rank broadcasting the message.

Broadcast

Figure 3: Broadcasting. Process r0 broadcasts message m to r1, r2, and r3.

Gather

Gathering (comm.gather) is not exactly the opposite of broadcasting. The main difference between gathering and broadcasting is that broadcasting replicates a single message across processes. Gathering on the other hand copies different variation of a given piece of information from all processes to one single process. In other words, it collects distinct messages from all other ranks to one process (root). It is useful for collecting pieces of data (or intermediate results) from processes. The file is called gather.py.

Gather

Figure 4: Gathering. Process r0 gathers the distinct messages m1 from r0, m2 from r2, and m3 from r3.

#!/usr/bin/env python3
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# the message to be gathered:
msg = rank * (rank + 1)
print("message from rank", rank, ":", msg)
#We gather all `msg` to process 0
gather_messages = comm.gather(msg, root=0)

if rank == 0:
    print("Gathered messages:", gather_messages)
message from rank 2 : 6
message from rank 1 : 2
message from rank 3 : 12
message from rank 0 : 0
Gathered messages: [0, 2, 6, 12]

Similarly to bcast, gather needs at least two arguments: the data and the root, where all copies are sent.

Scatter

Scatter (comm.scatter) takes a message and divides it into chunks, then sends each chunk to a given process. The difference between broadcast is that receiving process only get a part of the original message, this is, the chunk that has been sent to them. For example, if the message is an array, we scatter the array. It will be ordered and divided in equal sub-arrays and then sent in order to the processes. Note that the sending process (root) also keeps a chunk for itself.

Scatter

Figure 5: Scatter. The array containing the data m0, m1, m2, and m3 before the scatter is on r0. r0 scatters the array amongst itself (r0), r1, r2, and r3. m0 is scattered to r0, m1 to r1, m2 to r2, and m3 to r3.

The file is scatter.py.

#!/usr/bin/env python3

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

data = None
recv_message = None 

if rank == 0:
    data = ["m"+str(i) for i in range(size)]
    print("Data to scatter:", data)

recv_message = comm.scatter(data, root=0)

print("After scatter, rank", rank ,"has recv_message:", recv_message)
Data to scatter: ['m0', 'm1', 'm2', 'm3']
After scatter, rank 0 has recv_message: m0
After scatter, rank 1 has recv_message: m1
After scatter, rank 2 has recv_message: m2
After scatter, rank 3 has recv_message: m3

scatter is similar to other collective routines in terms of arguments, it requires the data to scatter and the root process.

Reduce

Reduce operations allow users to apply a particular operation in parallel to multiple processes and sends the result to Rank 0. For example, when computing the sum of numbers in parallel and relaying that result to Rank 0, instead of each process sending their variables to Rank 0 and then manually implementing the sum, apply a reduction operation. Other popular reduction operations returns the maximum, minimum, product, and bitwise logical operations (such as AND and OR).

This is a useful and efficient way to compute these values in parallel. Internally, each process is given a chunk of the data to work with and then calculates its intermediary value before broadcasting its result and Rank 0 receiving all other processes results to calculate the overall result.

Some examples are included in reduce.py:

#!/usr/bin/env python3

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# Create some np arrays on each process:
# For this demo, the arrays have only one
# entry that is assigned to be the rank + 1 of the processor
value = np.array(rank+1,'d')

print(' Rank: ',rank, ' value = ', value)

# initialize the np arrays that will store the results:
value_sum      = np.array(0.0,'d')
value_max      = np.array(0.0,'d')
value_min      = np.array(0.0,'d')
value_prod     = np.array(0.0,'d')

# perform the reductions:
comm.Reduce(value, value_sum, op=MPI.SUM, root=0)
comm.Reduce(value, value_max, op=MPI.MAX, root=0)
comm.Reduce(value, value_min, op=MPI.MIN, root=0)
comm.Reduce(value, value_prod, op=MPI.PROD, root=0)

if rank == 0:
    print(' Rank 0: value_sum =    ',value_sum)
    print(' Rank 0: value_max =    ',value_max)
    print(' Rank 0: value_min =    ',value_min)
    print(' Rank 0: value_prod =    ',value_prod)

 Rank:  2  value =  3.0
 Rank:  1  value =  2.0
 Rank:  3  value =  4.0
 Rank:  0  value =  1.0
 Rank 0: value_sum =     10.0
 Rank 0: value_max =     4.0
 Rank 0: value_min =     1.0
 Rank 0: value_prod =     24.0

reduce requires at least four parameters: the data on which the computation is done (array here), the variable to store the result of the operation (value_min, value_max, value_sum), the aggregate function to use on the data (for example, op=MPI.MIN, MPI.MAX, MPI.SUM), and the root. root will be in charge of computing the final result from intermediary results coming from other processes.

Reduction

Figure 6: Using Reduce to get the sum over the processes. Each process has their own locally saved variable data (M0, M1, M2, and M3). The values are sent to r0 where it applies a summing function. The total is saved in the variable M_total.

Here is an example using reduce to apply MPI.SUM.

#!/usr/bin/env python3

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

my_number = rank * rank

print("Content of my_number in rank", rank, " = ", my_number,
      flush=True)

SUM_NUMBER = comm.reduce(my_number, op=MPI.SUM, root=0)

if rank == 0:
    print("SUM_NUMBER after summation:", SUM_NUMBER,
         flush=True)
Content of my_number in rank 0  =  0
Content of my_number in rank 1  =  1
Content of my_number in rank 2  =  4
Content of my_number in rank 4  =  16
Content of my_number in rank 5  =  25
Content of my_number in rank 3  =  9
SUM_NUMBER after summation: 55

It is important to note that all processors participate when using collective communications.

Barrier: Synchronizing Workers

comm.barrier prevents workers from proceeding until all processors reach a particular point. It allows the parallel program to reach a consistent global state. In other words, all of the data values are up-to-date and all processors have reached the same simultaneous time marker.

Barrier

Figure 7: Shows how barrier synchronizes the workers. The left side shows the before barrier states of the processors r0, r1, r2, and r3. The right side shows after the barrier, where all processors start in synch.

Run barrier.py.

#!/usr/bin/env python3

from mpi4py import MPI
import time

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if size != 4:
    sys.exit("This program requires exactly four processes. Please re-run with four processors.")

# file path
file_name = "1000_num.txt"
data = None

t0 = time.time()
time0 = comm.bcast(t0, root=0)

if rank == 0:
    file_name = "1000_num.txt"

    # Read the numbers from file into an array
    with open(file_name) as fd:
        lines = fd.readlines()
        data = [int(x.strip()) for x in lines]
        fd.close()
        
elif rank == 1:
    # pretend this rank does some type of work for 5 seconds
    time.sleep(5)
    print("Rank 1 finished work, it took", '%.2f' %(time.time()-time0), "seconds. \n")
    
elif rank == 2:
    # pretend this rank does some type of work for 7 seconds
    time.sleep(7)
    print("Rank 2 finished work, it took", '%.2f' %(time.time()-time0), "seconds. \n")
else:
    # pretend this rank does some type of work for 15 seconds
    time.sleep(15)
    print("Rank 3 finished work, it took", '%.2f' %(time.time()-time0), "seconds. \n")

comm.barrier()
print("Rank", rank, "after barrier, start work after", '%.2f' %(time.time()-time0), "seconds.")
Rank 1 finished work, it took 5.01 seconds. 

Rank 1 after barrier, start work after 15.03 seconds.
Rank 2 finished work, it took 7.01 seconds. 

Rank 2 after barrier, start work after 15.03 seconds.
Rank 3 finished work, it took 15.03 seconds. 

Rank 3 after barrier, start work after 15.03 seconds.
Rank 0 after barrier, start work after 15.03 seconds.

Hands-on : Sum of numbers across MPI4PY processes

Problem

We are given a large file containing a number of integers, one per line. We are tasked with doing a simple computation on these numbers, calculate the mean of the squared values from the numbers in the given file. We have the formula:

Square sum

Figure 8: Mathematical equation for the this hands-on problem.

The solution should be a parallel python program that reads in the given input file containing the numbers to use for the computation.

Solution

See parallel_square_mean.py and serial_square_mean.py.

We start by writing a serial version of our program:

#!/usr/bin/env python3

import time

# file path
file_name = "1000_num.txt"

# Read the numbers from file into array
data = None
with open(file_name) as fd:
    lines = fd.readlines()
    data = [int(x.strip()) for x in lines]
    fd.close()

t0 =time.time()
# Number of values
n = len(data)
    
# Square values
squares = [x**2 for x in data]

# Sum of all squared numbers
sum_squares = 0
for i in range(n):
    sum_squares = sum_squares + squares[i]

# Calculate mean of squares
squares_mean = sum_squares / n
t1 = time.time()

# Print result
print("The calculated squares mean is : ", squares_mean, "\nComputation time : ", t1-t0)

Now that we have a working serial program, we need to add parallel constructs to make it work in parallel.

We first need to divide the data fairly so that each process will have the same amount of work to do. This is simple, since we have n numbers, we need to divide n by the total number of process, let say p. Now, what happens if we do not have an exact division of data among the processors? If n/p does not equal an integer, there is a remainder and the workload cannot be (exactly) evenly split. In this instance, we evenly split the data (in “chunks”) among the processors, then add the remainder, starting from process 0, adding one at the time.

# Number of values
n = len(data)

# Chunk size
chunk = int(n / size)
if n % size > 0:
    if rank < n % size:
        chunk = chunk + 1

Now that we know what chunk size each process will work on, we need to know where does each process chunk starts. This is the offset, which is just the amount of work all processes before the current process has to work on.

# Number of values
n = len(data)

# Chunk size
chunk = int(n / size)
if n % size > 0:
    if rank < n % size:
        chunk = chunk + 1

# Offset        
offset = rank * chunk
if rank > n % size:
    offset = offset + n % size

Now each process knows what amount of work it has to accomplish and where its part of the work starts. Now, we can re-write our program to incorporate the parallel programing concepts.

One way to solve this problem is to do the following. The program will use rank 0 to read in the text file that contains the numbers. This data is then broadcast to the other processes. The chunk size and offset amounts are calculated (based on the number of processors). Each processor calculates the intermediate result for their chunk of data (sum the squared values). gather collects the intermediate results from each process on rank 0. Process 0 then uses the gathered results to calculate the mean of the squares.

#!/usr/bin/env python3

from mpi4py import MPI
import time

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# file path
file_name = "1000_num.txt"
data = None
if rank == 0:
    file_name = "1000_num.txt"

    # Read the numbers from file into an array
    with open(file_name) as fd:
        lines = fd.readlines()
        data = [int(x.strip()) for x in lines]
        fd.close()

# Broadcast data to all processes
data = comm.bcast(data, root=0)

t0 = time.time()
# Number of values
n = len(data)

# Chunk size
chunk = int(n / size)
if n % size > 0:
    if rank < n % size:
        chunk = chunk + 1

# Offset        
offset = rank * chunk
if rank > n % size:
    offset = offset + n % size
print("Rank ",rank," offset ",offset)   
 
# Square values
my_squares = [x**2 for x in data[offset:offset + chunk]]

# Sum of all squared numbers
my_sum_squares = 0
for i in range(chunk):
    my_sum_squares += my_squares[i]


# Now we gather all my_sum_squares in process 0 to calculate the overall mean
sums = comm.gather(my_sum_squares, root=0)

# Process 0 does the rest of the work
if rank == 0:
    # Overall sum 
    sum_squares = 0
    for i in range(size):
        sum_squares += sums[i]
        
    # Calculate mean of squares
    squares_mean = sum_squares / n
    
    t1 = time.time()
    # Print result
    print("The calculated squares mean is : ", squares_mean, "\nComputation time : ", t1-t0) 

Where to go from here?

For more information on MPI4PY, you can consult the project’s page at: Read the docs MPI4PY

Key Points

  • A message can be any object type.

  • There are different types of communication types.