Parallel code

From HP-SEE Wiki

Revision as of 13:01, 2 May 2012 by Compphys (Talk | contribs)
Jump to: navigation, search

Contents

Message Passing Model

Section contributed by IFIN-HH

In most distributed memory systems parallelization is achieved by using various implementations of the widely adopted Message Passing Interface (MPI) standard [mpis]. MPI presents a set of specifications for writing message-passing programs, that is parallel programs in which one assumes the interprocess communication through messages. There are two versions of the MPI standard currently in use, MPI-1 and MPI-2, and various library implementations of these which are particularly tuned for specific target platforms.

The standard specifications, and its implementations that are used in the framework of the HP-SEE ecosystem have been shortly exposed in HP-SEE deliverable 8.1 [D8.1].

Here a detailed discussion is presented on the message passing issues that are relevant for the migration, adaption and optimization of parallel applications into the HPC infrastructure, together with examples drawn from the developers’ experience.

The topics is restricted to the libraries that are effectively in use in HP-SEE: implementations of the MPI-1 version (MPICH1 and its derivatives MPICH-MX, MVAPICH), Open MPI (which implements MPI-2), and MPICH2 (together with its derivatives MVAPICH2, MPIX, and Intel MPI) - which implements both versions MPI-1 and MPI-2.

REFERENCES

[mpis] MPI standard, http://www.mcs.anl.gov/research/projects/mpi/

[D8.1] HP-SEE deliverable D8.1, Software Scalability Analysis and Interoperability Issues Assessment

MPICH implementations

Introduction contributed by IFIN-HH

Proposed as a freely available and portable implementation of the MPI standard, MPICH has evolved along with it, from the MPICH1 implementation [mpc1] (fulfilling MPI-1 specifications and partially supporting some MPI-2 features like the parallel I/O), to MPICH2 [mpc2], which is moreover fully compatible with the MPI-2 version of the standard. Although the development of the MPICH1 was frozen since 2005 to the 1.2.7p1 version, with the intention to be replaced with MPICH2, it continues to be the most used MPI implementation worldwide.

REFERENCES

[mpc1] MPICH1, http://www.mcs.anl.gov/research/projects/mpi/mpich1-old/

[mpc2] MPICH2, http://www.mcs.anl.gov/research/projects/mpich2/

Open MPI

Section contributed by UPB

Introduction

The Open MPI Project (www.openmpi.org) is an open source MPI-2 implementation that is developed and maintained by a consortium of academic, research, and industry partners. Open MPI is therefore able to combine the expertise, technologies, and resources from all across the High Performance Computing community in order to build the best MPI library available. Open MPI offers advantages for system and software vendors, application developers and computer science researchers.

The compiler drivers are mpicc for C, mpif77 and mpif90 for FORTRAN, mpicxx and mpiCC for C++. mpirun is used to start a MPI program. Refere to the manual page for a detailed description of mpirun ( $ man mpirun). We have several Open MPI implementations. To use the one suitable for your programs, you must load the appropriate module (remember to also load the corresponding compiler module). For example, if you want to use the PGI implementation you should type the following:

 $ module list
 Currently Loaded Modulefiles:
 1) batch-system/sge-6.2u3 4) switcher/1.0.13
 2) compilers/sunstudio12.1 5) oscar-modules/1.0.5
 3) mpi/openmpi-1.3.2_sunstudio12.1
 $ module avail
 [...]
 -------------------- /opt/modules/modulefiles --------------------
 debuggers/totalview-8.6.2-2
 java/jdk1.6.0_13-32bit java/jdk1.6.0_13-64bit
 mpi/openmpi-1.3.2_gcc-4.1.2
 compilers/gcc-4.1.2 mpi/openmpi-1.3.2_gcc-4.4.0
 compilers/gcc-4.4.0 mpi/openmpi-1.3.2_intel-11.0_081
 compilers/intel-11.0_081 mpi/openmpi-1.3.2_pgi-7.0.7
 compilers/pgi-7.0.7 mpi/openmpi-1.3.2_sunstudio12.1

Load the PGI implementation of MPI

 $ module switch mpi/openmpi-1.3.2_pgi-7.0.7

Implementations of OpenMPI are used in the HP-SEE infrastructure by NCIT-Cluster (RO).

EagleEye OpenMPI Usage

Section contributed by UPB

Given that the processing of a single image takes a fairly small amount of time (30-40 seconds using a single thread) we have decided to not parallelize the actual image processing functions, but rather process multiple images at the same time using a different thread to process a different image. This approach allows more flexibility in choosing the number of threads since parallelizing the actual functions would have brought constraints in terms of number of threads and memory consumption. We have opted for using a MPI + PThreads hybrid approach.

We used gcc 4.6.0 with OpenMPI v1.5.3 for compiling our program.

The main processing process_maps function, coupling hybrid MPI and PThreads programming in EagleEye follows here:

 #include <string.h>
 #include <unistd.h>
 #include <sys/time.h>
 #include <dirent.h>
 #include <mpi.h>
 #include <unistd.h>
 #include <stdio.h>
 #include "eagleeye.h"
 //mpi tags
 #define WORK_TAG 0
 //mpi messages
 #define MPI_GET_MORE_WORK 0
 #define QUIT_MSG "*quit*"
 // ...
 int process_maps(int argc, char** argv) {
   config_data cd;
   DIR *dir;
   struct dirent *dp;
   char buffer[MAX_FILE_NAME_SIZE];
   int nr_procs, rank, nr_files, i, file_index, get_work, mpi_msg,
           nr_msg_sent, nr_slaves, res;
   char **file_names;
   MPI_Status mpi_stat;
   pthread_attr_t attr;
   thread_data thr_data_array[MAX_NR_THREADS];
   pthread_t threads[MAX_NR_THREADS];
   void *status;
   int mode = SATELLITE_MODE;
   //init MPI stuff
   MPI_Init(&argc, &argv);
   MPI_Comm_size(MPI_COMM_WORLD, &nr_procs);
   MPI_Comm_rank(MPI_COMM_WORLD, &rank);
   //validate number of arguments
   if (argc != 6) {
       printf("%s:%d: Usage: %s mode config_file input_folder output_folder nr_slaves\n",
               __func__, __LINE__, argv[0]);
       exit(-1);
   }
   if (!strcmp(argv[1], "satellite"))
       mode = SATELLITE_MODE;
   else
       if (!strcmp(argv[1], "military"))
       mode = MILITARY_MODE;
   //parse the config file
   if (read_config_file(mode, argv[2], &cd) != 0) {
       exit(-1);
   }
   //validate the number of threads
   nr_slaves = atoi(argv[5]);
   if (nr_slaves < 1 || nr_slaves > MAX_NR_THREADS) {
       printf("%s:%d: Number of threads should be between %d and %d. Exiting.\n",
               __func__, __LINE__, 1, MAX_NR_THREADS);
       exit(-1);
   }
   if (rank != 0) {
       //init pthread_attr
       pthread_attr_init(&attr);
       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
       //init command queue
       init_command_queue(nr_slaves);
       //initialize thread data for each thread
       for (i = 0; i < nr_slaves; i++) {
           thr_data_array[i].tid = (long) i;
           memcpy(&thr_data_array[i].cd, &cd, sizeof (cd));
           thr_data_array[i].sd.nr_marked_segments = 0;
           strcpy(thr_data_array[i].output_folder, argv[4]);
           thr_data_array[i].mpi_rank = rank;
           thr_data_array[i].process_type = mode;
           thr_data_array[i].nr_pixels1 = -1;
           thr_data_array[i].nr_pixels2 = -1;
           thr_data_array[i].nr_pixels3 = -1;
       }
       //start all the slaves
       for (i = 0; i < nr_slaves; i++) {
           res = pthread_create(&threads[i], NULL, thread_function,
                   (void *) &thr_data_array[i]);
           if (res) {
               printf("pthread_create() returned %d. Exiting.\n", res);
               exit(-1);
           }
       }
   }
   if (rank == 0) {
       //coordinator
       //scan the input folder and load the file names into memory
       if ((dir = opendir(argv[3])) == NULL) {
           fprintf(stderr, "Cannot open %s. Exiting.\n", argv[3]);
           exit(1);
       }
       nr_files = 0;
       while ((dp = readdir(dir)) != NULL) {
           if (!strcmp(dp->d_name, "."))
               continue;
           if (!strcmp(dp->d_name, ".."))
               continue;
           nr_files++;
       }
       rewinddir(dir);
       file_names = (char**) malloc(nr_files * sizeof (char*));
       for (i = 0; i < nr_files; i++) {
           file_names[i] = (char*) malloc(MAX_FILE_NAME_SIZE * sizeof (char));
       }
       printf("Processing %d files.\n", nr_files);
       i = 0;
       while ((dp = readdir(dir)) != NULL) {
           if (!strcmp(dp->d_name, "."))
               continue;
           if (!strcmp(dp->d_name, ".."))
               continue;
           strcpy(buffer, argv[3]);
           strcat(buffer, "/");
           strcat(buffer, dp->d_name);
           strcpy(file_names[i++], buffer);
       }
       //start sending work to our slaves
       file_index = 0;
       nr_msg_sent = 0;
       while (1) {
           MPI_Recv(&mpi_msg, 1, MPI_INT, MPI_ANY_SOURCE, WORK_TAG,
                   MPI_COMM_WORLD, &mpi_stat);
           if (file_index < nr_files) {
               MPI_Send(file_names[file_index++], MAX_FILE_NAME_SIZE, MPI_CHAR,
                       mpi_stat.MPI_SOURCE, WORK_TAG, MPI_COMM_WORLD);
               nr_msg_sent++;
           }
           if (nr_msg_sent == nr_files)
               break;
       }
       //no more images to process - send quit command
       strcpy(buffer, QUIT_MSG);
       for (i = 1; i < nr_procs; i++) {
           MPI_Send(buffer, MAX_FILE_NAME_SIZE, MPI_CHAR, i, WORK_TAG,
                   MPI_COMM_WORLD);
       }
       //free all the memory!
       for (i = 0; i < nr_files; i++) {
           free(file_names[i]);
       }
       free(file_names);
   }
   if (rank != 0) {
       //masters
       get_work = MPI_GET_MORE_WORK;
       while (1) {
           //ask for work
           MPI_Send(&get_work, 1, MPI_INT, 0, WORK_TAG, MPI_COMM_WORLD);
           //receive work
           MPI_Recv(buffer, MAX_FILE_NAME_SIZE, MPI_CHAR, 0, WORK_TAG,
                   MPI_COMM_WORLD, &mpi_stat);
           if (!strcmp(buffer, QUIT_MSG)) {
               //received quit command - tell the slaves
               for (i = 0; i < nr_slaves; i++) {
                   add_command(buffer); //blocking
                   printf("[%d] Added work: %s.\n", rank, buffer);
                   fflush(stdout);
               }
               // I want to break free
               break;
           } else {
               add_command(buffer); //blocking
               printf("[%d] Added work: %s.\n", rank, buffer);
               fflush(stdout);
           }
       }
       pthread_attr_destroy(&attr);
       for (i = 0; i < nr_slaves; i++) {
           res = pthread_join(threads[i], &status);
           if (res) {
               printf("pthread_join() returned error code = %d\n", res);
           }
       }
       destroy_queue();
   }
   printf("[%d] Happily finishing. Bye.\n", rank);
   fflush(stdout);
   MPI_Finalize();
   return 0;
 }

NEURON ParallelContext

Section contributed by IMBB-FORTH (CMSLTM application)

Preferred MPI environment is OpenMPI (openmpi_gcc-1.4.3). NEURON was compiled with parallel support (MPI) using the gcc compiler:

 ./configure --without-iv --with-paranrn --prefix=/home/gkastel/src/nrn-7.1

We used NEURON's ParallelContext to distribute the simulation of each neuron to different nodes evenly.

REFERENCES

Porting between MPI implementations

Section contributed by IICT-BAS

The MPI standard has been developed with the aim to simplify the development of cross-platform applications that use distributed memory model, as opposed to SMP. The first version of the MPI standard is well supported by the various MPI implementations, thus ensuring that a program tested with one such implementation will work correctly with another. Within the MPI specification there is some freedom of design choices, which are well documented and should serve as a warning to the user not to rely on specific implementation details. These considerations mostly affect the so-called asynchronous or non-blocking operations. For example, MPI_Isend is non-blocking version of MPI_Send. When a thread uses this function to send data, the function will return immediately. This would usually happen before the data is finished being sent. This means that users should not change any of the data in the buffer, which was passed as an argument, until they are sure that the data sending is completed. This can be ensured by invoking MPI_Wait. Although usage of non-blocking operations adds complexity to the program, it also enables overlap between communication and computations, thus increasing parallel efficiency.

The version 2 of the MPI standard added new advanced features, like parallel I/O, dynamic process management and remote memory operations. The implementations of these features among MPI versions is unequal and may lead to portability problems.

Some MPI implementations offer a way of network topology discovery, which may be extremely useful for achieving good parallel efficiency, especially when running on heterogeneous resources, but the usage of such information may also lead to portability problems for the application.

Shared Memory Model

OpenMP

Pthreads

EagleEye PThreads Usage

Section contributed by UPB

The local work queues are modeled as fixed length arrays. There are two functions that are exported to the masters and slaves. One function adds an element to the work queue and is used by the masters. If the queue is full than the function blocks until a slot becomes available. The other function removes the oldest element from the queue. When an element is added to the queue it gets a time stamp in the form of an integer - the remove function searches the array linearly and removes the oldest work unit. If the queue is empty the function blocks. Since the number of slave threads is closely related to the number of cores the size of the work queue will be small and the penalty for going through the array linearly will be negligible. Blocking these two functions is done using two POSIX semaphores: one that represents the number of free slots in the queue and another that represents the number of empty slots in the queue. At the beginning of the add function we block on the semaphore that represents the number of empty slots, while in the remove function we block on the semaphore that represents the number of filled slots. Guarding against multiple accesses from slave threads is done using a pthread mutex t object.

The coordinator and the masters are modeled as MPI processes. Each master process spawns a number of pthreads that server as slave processes. There is at most one master on machine. The approach we used for testing is to throw a master on each physical machine and then spawn a number of slave threads that is equal to the number of cores minus one (although the master thread isn't CPU intensive we decided not to oversubscribe the machine).

The thread_function that exemplifies the usage of PThreds in the EagleEye application is given below:

 #include <string.h>
 #include <unistd.h>
 #include <sys/time.h>
 #include <dirent.h>
 #include <unistd.h>
 #include <pthread.h>
 #include <semaphore.h>
 #include <stdio.h>
 #include "eagleeye.h"
 //mpi tags
 #define WORK_TAG 0
 //mpi messages
 #define MPI_GET_MORE_WORK 0
 #define QUIT_MSG "*quit*"
 // ...
 void *thread_function(void *param) {
   thread_data* tdp;
   char buffer[MAX_FILE_NAME_SIZE];
   int res;
   tdp = (thread_data*) param;
   while (1) {
       res = get_remove_command(buffer); //blocking
       switch (res) {
           case QUEUE_OP_OK:
               //queue isn't empty
               if (!strcmp(buffer, QUIT_MSG)) {
                   goto thread_exit;
               }
               if (tdp->process_type == SATELLITE_MODE) {
                   tdp->sd.nr_marked_segments = 0;
                   process_satellite_file(buffer, tdp);
               }
               if (tdp->process_type == MILITARY_MODE) {
                   process_military_file(buffer, tdp);
               }
               break;
           default:
               printf("%s:%d: Received unexpected error code %d\n",
                       __func__, __LINE__, res);
               goto thread_exit;
       }
   }
   thread_exit:
   pthread_exit(NULL);
 }

Hybrid Programming

Section contributed by IPB

Since the general purpose processors are not getting very much faster, the optimal HPC hardware goes to massively parallel computers. This includes many compute nodes coupled by high-speed interconnections, where each compute node has several shared memory sockets, and each socket has multi-core processor. Furthermore, today hybrid architecture couples compute nodes with highly specialized computing nodes, such as cell processors or general purpose GPUs. On the other hand, there is logical programing model that can be mapped to the hybrid hardware: OpenMP - which corresponds to one multi-core processor, and MPI - which corresponds to massively parallel computers.

Combination of MPI and OpenMP comes naturally, and matches the hardware trends. This Hybrid MPI/OpenMP programming is specially suitable for the applications that exhibit two level of parallelism: coarse-grained (MPI) and fine-grained (OpenMP). In addition, some applications show unbalances workload at the MPI level, which can be avoided by OpenMP, addressing this issue by assigning a different number of thread to each MPI process.

Introduction of OpenMP into an existing MPI code includes OpenMP drawbacks, such as limitation in work control distribution and synchronization, overhead introduced by thread creation, dependence on compiler quality and runtime support. For some application that exhibit only one level of parallelism there may be no benefit of hybrid approach.

In the case when development starts from sequential code, hybrid programing approach has to include decomposition of the problem for MPI parallelization followed by OpenMP directives. There are two general hybrid programming models: no overlapping and overlapping communication and computation. In the first case MPI is called by master thread outside the parallel regions, while in the overlapping model some of the threads communicate while the rest are executing other parts of the application.

In many cases, however, introducing threads is not straightforward, and can even lead to degradation of performance. MPI standard and its implementations define four levels of thread safety: MPI THREAD SINGLE, where only one thread of execution exists; MPI THREAD FUNNELED, where a process may be multithreaded but only the thread that initialized MPI makes MPI calls; MPI THREAD SERIALIZED, where multiple threads may make MPI calls but not simultaneously; and MPI THREAD MULTIPLE, where multiple threads may call MPI at any time. The use of MPI THREAD FUNNELED is the easiest choice, but can be far from optimal with large number of threads per MPI process. On the other hand, performance issues plague implementations of a more natural MPI THREAD MULTIPLE mode and, while it can be expected that its use could benefit application's performance, in practice it also requires significant work on refactoring application's structure and its data distribution. The programming model becomes even more complex if we add to the existing hierarchy GPGPUs, which are now typically integrated into compute nodes in newly designed and procured HPC systems. However, here we will not consider this, and instead will focus only on hybrid MPI/OpenMP programming model.

The hybrid approach here will be illustrated by FFTW library which is used for multi-dimensional DFTs in NUQG application. The following pseudo-codes show hybrid OpenMP/MPI usage of FFTW. Instead of calling MPI_Init, MPI_Init_threads is called, which is the initialization routine to indicate to MPI that program will be multithreaded. MPI_THREAD_FUNNELED is passed to indicate that MPI routines will be called from the main thread. The provided parameter returns what level of threads support is actually supported by available MPI implementation. This must be at least MPI_THREAD_FUNNELED, and global variable threads_ok is used to record this. fftw_init_threads, fftw_plan_with_nthreads functions can be called only if threads_ok is true. Other parts of this pseudo-codes are described in Libraries usage / FFTW and Adapting code to memory architecture / Distributed memory sections of the deliverable.

 // FFTW3 hybrid MPI/OpenMP usage in 2D
 #include <fftw3-mpi.h>
 #include <fftw3-omp.h>
 
 int main(int argc, char **argv) {
    int nproc, procid, provided, threads_ok;
    long cnti;
    long Nx, Ny;
    ptrdiff_t larray, lNx, lstart;
    fftw_complex *array;
    fftw_plan plan_forward, plan_backward;
    
    MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided);
    threads_ok = provided >= MPI_THREAD_FUNNELED;
    MPI_Comm_size(MPI_COMM_WORLD, &nproc);
    MPI_Comm_rank(MPI_COMM_WORLD, &procid);
    if (threads_ok) threads_ok = fftw_init_threads();
    fftw_init_threads();
    fftw_mpi_init();
    
    larray = fftw_mpi_local_size_2d(Nx, Ny, MPI_COMM_WORLD, &lNx, &lstart);
    array = fftw_alloc_complex(larray);
    
    if (threads_ok) fftw_plan_with_nthreads(omp_get_max_threads());
    plan_forward = fftw_mpi_plan_dft_2d(Nx, Ny, array, array, MPI_COMM_WORLD, FFTW_FORWARD, FFTW_ESTIMATE);
    plan_backward = fftw_mpi_plan_dft_2d(Nx, Ny, array, array, MPI_COMM_WORLD, FFTW_BACKWARD, FFTW_ESTIMATE);
    
    fftw_destroy_plan(plan_forward);
    fftw_destroy_plan(plan_backward);
    fftw_free(array);
    fftw_mpi_cleanup();
    fftw_cleanup_threads();
    MPI_Finalize();
 }
 // FFTW3 hybrid MPI/OpenMP usage in 3D
 #include <fftw3-mpi.h>
 #include <fftw3-omp.h>
 
 int main(int argc, char **argv) {
    int nproc, procid, provided, threads_ok;
    long cnti;
    long Nx, Ny, Nz;
    ptrdiff_t larray, lNx, lstart;
    fftw_complex *array;
    fftw_plan plan_forward, plan_backward;
    
    MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided);
    threads_ok = provided >= MPI_THREAD_FUNNELED;
    MPI_Comm_size(MPI_COMM_WORLD, &nproc);
    MPI_Comm_rank(MPI_COMM_WORLD, &procid);
    if (threads_ok) threads_ok = fftw_init_threads();
    fftw_init_threads();
    fftw_mpi_init();
    
    larray = fftw_mpi_local_size_3d(Nx, Ny, Nz, MPI_COMM_WORLD, &lNx, &lstart);
    array = fftw_alloc_complex(larray);
    
    if (threads_ok) fftw_plan_with_nthreads(omp_get_max_threads());
    plan_forward = fftw_mpi_plan_dft_3d(Nx, Ny, Nz, array, array, MPI_COMM_WORLD, FFTW_FORWARD, FFTW_ESTIMATE);
    plan_backward = fftw_mpi_plan_dft_3d(Nx, Ny, Nz, array, array, MPI_COMM_WORLD, FFTW_BACKWARD, FFTW_ESTIMATE);
    
    fftw_destroy_plan(plan_forward);
    fftw_destroy_plan(plan_backward);
    fftw_free(array);
    fftw_mpi_cleanup();
    fftw_cleanup_threads();
    MPI_Finalize();
 }

CUDA