MPI 中的点对点通信

Posted by zhangxiaojian on July 28, 2015

使用MPI作为集群通信的工具已经有一段时间,一开始只是粗略的了解了一下它提供的消息传递接口,虽然可以解决大部分问题,但是使用不便,效率较低。这几天仔细看了看官方的文档,在MPI中提供了很多有用的函数接口,使用者完全可以在其上建立灵活的通信层。本文主要介绍集群点对点的通信。

首先看一个简单的例子,从一个节点发送一条message到另一个节点:

#include <mpi.h>
#include <iostream>
#include <cstring>

using namespace std;

int main(int argc, char** argv)
{
    int rank;
    MPI_Init(&argc,&argv);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);

    char message[20];

    if(rank == 0)
    {
        strcpy(message,"show me the data");
        MPI_Send(message,strlen(message) + 1,MPI_CHAR,1,55,MPI_COMM_WORLD);
    }else if(rank == 1)
    {
        MPI_Status status;
        MPI_Recv(message,20,MPI_CHAR,0,55,MPI_COMM_WORLD,&status);
        cout << "received : " << message << endl;
    }

    MPI_Finalize();
    return 0;
}

首先初始化MPI环境并获得每个节点自己的rank,有两个节点,那么rank就是0和1。相同一份代码,在不同的节点上运行,将会根据rank值而执行不同的程序流。上述程序在0节点初始化消息然后发送到1节点上,1节点接收到消息然后打印出来。下来详细介绍。

 阻塞的send和receive

例子中的两个函数MPI_Send和MPI_Recv是阻塞调用,当message发送完毕和接收完毕这两个函数才会返回,否则将会一直等待发送或接收成功。发送成功之后,可以修改message的值而不会影响到刚刚发送的数据,接收成功后,在接收端的buffer里将包含接收到的数据。函数参数如下:

int MPI_Send(
        const void* buf,                    //要发送数据的起始位置
        int count,                          //数据的数量
        MPI_Datatype datatype,              //数据类型
        int dest,                           //目标节点
        int tag,                            //类似于端口号
        MPI_Comm comm                       //MPI通信的环境
        )

其中count是根据第三个参数的数据类型计算的,比如例子中发送多少个char,count就是多少,而不是根据字节数计算。MPI提供了与基本类型相对应的类型,比如MPI_CHAR就表示c或c++中的char类型,当然除了基本类型之外,MPI还提供了基于基本类型的复合类型扩充,单独作为一个章节来介绍,可以表示各种复杂的类型(比如说一个类)。dest表示目标节点的rank,指明要发送到哪个节点。tag类似于网络的端口号,发送端和接收端之后tag相等,才能够匹配。comm是MPI的通信环境,MPI可以将集群中的部分节点看成一个子集群,或者一个组,每个节点的rank是根据它在组里的顺序标识的,例子中MPI_COMM_WORLD表示所有节点为一个组。

int MPI_Recv(
        void* buf,
        int count,      
        MPI_Datatype datatype,
        int source,                       //发送节点的rank
        int tag,
        MPI_Comm comm, 
        MPI_Status *status                //接收完毕的状态信息
        )

MPI_Recv中的参数与MPI_Send类似,只是dest变成了source,表示消息从哪个节点发送过来。而count标识的是接收端用于接收数据的缓冲池有多大,接收到的数据可以少于这个值,但不能超过,否则就会报错。多出来一个参数MPI_Status,在函数返回后,其中记录了接收状态的信息,比如实际接收到的数据大小等。相比于MPI_Send需要确定的dest和tag,MPI_Recv可以使用通配符来获得任意节点任意tag发送过来的消息。可以使用:

source = MPI_ANY_SOURCE;
tag = MPI_ANY_TAG;

当使用通配符接收后,可以使用MPI_Status这个类型的数据获得实际的source和tag。status至少包含三个可以直接访问的变量。结构如下:

struct MPI_Status{
    int MPI_SOURCE,
        int MPI_TAG,
        int MPI_ERROR
};

MPI_Status类型变量还可以用来获得实际接收的数据大小,不能直接访问到,而是需要使用MPI提供的函数:

int MPI_Get_count(
        const MPI_Status *status,
        MPI_Datatype datatype,
        int *count                              //返回值
        )

如果接收到的数据太大,超过了int值可以表示的范围,那么count就会被赋值为MPI_UNDEFINED这个宏。有时我们不需要获得status里面的值,于是每次执行的赋值操作都会是一种浪费,可以传递MPI_STATUS_IGNORE给MPI_Recv中的参数。这个值可以用在任何不需要status的参数变量里。

类型匹配和转换

首先是在同构的集群之中,集群中所有机器的环境都是一样的。这种情况下可以谨慎的使用send和receive之间的类型不匹配导致的转换。比如:

//发送
int a = 10;
MPI_Send(&a,1,MPI_INT,1,55,MPI_COMM_WORLD);

//接收
int b;
MPI_Recv(&b,4,MPI_BYTE,0,55,MPI_COMM_WORLD,&status);

b同样能够得到正确的值,b甚至可以为double等8Byte的类型,只不过这样是无法得到正确的值的。但是相反,如果send是一个double类型的值,但是recv用1个int变量来接,这样是会报错的。就像前面说的,因为接收端的大小不够发送端的数据,这样是不允许的。

当在异构的集群环境中,MPI保证相同类型的数据能够得到正确的解析,比如发送和接收都是MPI_INT,虽然两端int类型长度可能不一致,比如一个4Byte一个8Byte,MPI还是能够辨识这种类型的转换。但是使用诸如MPI_BTYE这种类型的数据,在异构环境下就要小心了。综上,MPI建议能够使用类型匹配的数据尽量使用提供的类型进行传输,这样移植起来更方便,出错的概率也小。

通信模式

MPI提供了四种不同的通信模式,功能基本相同,但是各有特点。

  1. MPI_Send  标准模式,nonlocal。一旦调用就start,所谓start就是将自己的调用信息发送给MPI调度匹配的管理进程,负责匹配合适的recv。不论MPI_Recv操作有没有出现。但是发送的数据是阻塞到Recv出现直接发过去,还是根据数据大小选择是否缓冲到System buffer里面,这取决于MPI。如果放到buffer里,那么不等Recv出现,Send也许就已经返回了,有点非阻塞调用的意思。

  2. MPI_Ssend 同步模式,nonlocal。同样可以start,不论Recv操作是否出现。但是绝对不会有buffer,它等到Recv操作出现后,就把数据同步的copy到Recv端。

  3. MPI_Rsend 就绪模式,nonlocal。这种模式必须要等到Recv操作出现才会start,执行调度匹配等工作。也没有缓冲,匹配好之后copy数据。这种模式可以无缝的被标准模式代替。唯一不同的是就绪模式性能会好一些。因为它没有开始,也就没有注册到MPI调度进程中去,这样或多或少会减少MPI的压力。

  4. MPI_Bsend 缓冲模式,local。如果调用send的时候还没有可匹配的recv出现,缓冲模式首先将数据发送到缓冲内存中,然后就返回。无论是否最终由Recv接收。和标准模式的缓冲略有不同,标准模式如果支持缓冲,就是缓冲到System buffer中的,但是缓冲模式需要我们自己指定缓冲的空间,和缓冲的大小。默认缓冲大小是0。如果缓冲内存大小不足以接收发送的数据,就会报错。

如果使用MPI_Bsend,就需要设置接收的buffer,同时可以取消设置的buffer:

int MPI_Buffer_attach(void* buffer, int size);        //size以Byte计算

int MPI_Buffer_detach(void* buffer_addr, int* size);  //这两个参数都是返回值

需要注意以下几点:1 用户需要分配好空间,并且负责释放空间。2 设置的buffer仅仅适用于MPI_Bsend,标准模式要看不同的厂商实现情况。3 只能设置一个buffer,并且对应一个进程。关于MPI如何使用Buffer,建议的方式是将buffer视为一个环状内存空间,这样可以充分利用。最后仍然是调用标准模式来将数据传输过去的。

非阻塞调用

如果采用多线程进行发送和接收,上述阻塞调用也能够表现的和非阻塞一样,但使用难度较大,MPI提供了非阻塞的Send和Recv,并且有对应的函数操作来检测是否发送或接受成功。函数如下:

int MPI_Isend(const void* buf, int count, MPI_Datatype datatype, int dest,
        int tag, MPI_Comm comm, MPI_Request *request);

int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int source,
        int tag, MPI_Comm comm, MPI_Request *request)

和前面的对比,多了一个MPI_Request类型的的参数,这个参数包含了对应通信操作的很多信息,诸如:发送模式,buffer地址,上下文,tag,发送或者接收的rank,对应操作的状态等。非阻塞调用通常使用它作为参数,使用其它函数取检测操作是否完成。同样的,非阻塞调用也对应4中模式,命名方式如缓冲模式MPI_Ibsend,就绪模式MPI_Irsend,同步模式MPI_Issend。语义不变,只是不阻塞,可以当作调用之后就把操作交给一个独立的线程去执行。下面是等待和检测非阻塞调用是否成功的调用:

int MPI_Wait(MPI_Request *request, MPI_Status *status)
int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status);

MPI_Wait操作,根据操作返回的Request,等待对应的操作完成后返回,status保存操作完成的信息,与之前非阻塞调用相同。MPI_Test操作检测对应的操作是否完成,如果完成就返回flag为true,否则为false,不阻塞等待操作完成。这两个操作可以用在Send或者Recv操作返回的Request上,通常情况下,Send操作对应得到的status是没有用的,未定义的,只有在函数返回MPI_ERR_IN_STATUS,表示操作出错,status中的MPI_ERROR就会是有效的(这时的操作返回的通常是一个status数组,调用的如MPI_Waitall,等待一组操作,如果只返回一个status,就不会改变MPI_ERROR位,而是直接把错误状态return)。

在使用的时候还是需要小心,比如如果传进来的request参数是MPI_REQUEST_NULL,那么MPI_Test将会返回flag=true,status=empty。如果只判断flag的话,还以为成功完成了,因此需要对request检测或者判断status是否为空。

request参数是一个IN—OUT类型的参数,一旦操作成功,它就被标为释放(并不是释放内存空间),值为NULL。我们可以显示的释放一个request:

int MPI_Request_free(MPI_Request *request)

有时候程序需要对批量的request进行处理,比如等待一组request是否完成,或者其中有一个完成就马上处理,又或者需要其中的几个操作。MPI提供对应的函数,

int MPI_Waitany(int count, MPI_Request array_of_requests[],//输入参数 
        int *index,MPI_Status *status) //输出参数
int MPI_Testany(int count, MPI_Request array_of_requests[], int *index,
        int *flag, MPI_Status *status)

int MPI_Waitall(int count, MPI_Request array_of_requests[],//输入参数
        MPI_Status array_of_statuses[])//输出参数
int MPI_Testall(int count, MPI_Request array_of_requests[], int *flag,
        MPI_Status array_of_statuses[])

int MPI_Waitsome(int incount, MPI_Request array_of_requests[],//输入
int *outcount, int array_of_indices[],//输出
int MPI_Testsome(int incount, MPI_Request array_of_requests[],
int *outcount, int array_of_indices[],

上述输入参数都差不多,批量请求的request数组还有数组的大小。MPI_Waitany等待直到有一个完成的request,如果有超过一个的话,就任意的返回一个,并不保证有什么顺序性。对应的test操作类似,只是不阻塞等待,用flag参数判断是否有完成的request。MPI_Waitall就是阻塞等待所有的操作都完成。MPI_Waitsome与MPI_Waitany相似,但是当有超过一个request完成,它会将所有满足条件的都返回,而不是任意的返回一个。因此它的输出参数是数组形式。特别强调一点:当不断使用Wait操作选择成功的request数据进行处理时,由于Waitany是随机选择的,并不保证像OS中调度算法一样相对公平,所以可能会产生starvation,导致选择的不公平。这时使用Waitsome效果会好一些。

当调用Wait或者Test成功之后,request就会被释放掉,但是有时候只是单纯想看一下request的状态,而不释放它。这时调用:

int MPI_Request_get_status(MPI_Request request, int *flag,
    MPI_Status *status)

status标识状态信息,flag标识操作是否成功完成。

探测

回想开头的例子,发送过来的数据实际上大小是17,却开了一个大小为20的数组,因此这个空间只能大,不能小。但是在实际项目中,大部分情况是不知道将要接收的数据是多大的,就不知道到底需要开多大的空间去接比较好。MPI提供探测的功能,在实际Recv数据之前可以探测一下将要接收数据的MPI_Status,获得的status数据和正确执行Recv操作得到的保持一致。这样就可以提前知道数据大小,好做准备。在开头例子代码中执行MPI_Recv之前插入下面代码:

MPI_Probe(MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
MPI_Get_count(&status,MPI_BYTE,&count);

输出 status的值:

727

MPI_ERROR只有在MPI_Waitany这种批量操作中出错后,才会被置为有效值。

这是阻塞的调用,直到有一个符合条件的才会返回。当然对应的也有非阻塞的调用,由flag来看是否探测到可接收的数据:

int MPI_Iprobe(int source, int tag, MPI_Comm comm, int *flag,
        MPI_Status *status)

当遇到多线程的时候,又使用模糊的MPI_ANY_SOUECE , MPI_ANY_TAG情况就会变得有点复杂,探测到的可接收消息,并不一定是实际接收到的数据:

//发送端
if(rank == 0)
{    
    strcpy(message,"show me the data");
    int a = MPI_Send(message,strlen(message) + 1,MPI_CHAR,2,55,MPI_COMM_WORLD);
}
if(rank == 1)
{    
    strcpy(message,"show me the code");
    int a = MPI_Send(message,strlen(message) + 1,MPI_CHAR,2,55,MPI_COMM_WORLD);
}

//接收端
if(rank == 2)
{
    MPI_Probe(MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
    //线程1执行接收从0发来的message
    if(status.MPI_SOURCE == 0)
        MPI_Recv(message,20,MPI_CHAR,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
    //线程2接收从1发来的message,代码类似
}

MPI中调度匹配的信息都是进程可见的,当探测到来自rank = 0发来的消息后,交给线程1去执行,但是在通过判断执行Recv的时候,1发来的message也符合接收条件,那么就无法确定是到底接收到的messge是“show me the data” 还是”show me the code”。这种情况下,MPI又提供了解决的接口:

int MPI_Mprobe(int source, int tag, MPI_Comm comm, MPI_Message *message,
        MPI_Status *status)

多了一个MPI_Message类型的输出参数,它提供了唯一确定此探测结果的功能,当然,也要用特殊的带有这种参数类型的Recv才能保证接收到探测得到的消息:

int MPI_Mrecv(void* buf, int count, MPI_Datatype datatype,
        MPI_Message *message, MPI_Status *status)

对应上述两个阻塞的操作,也有两个非阻塞的函数:MPI_Improbe MPI_Imrecv。

取消

在非阻塞调用,无论是send还是recv,在已经执行,但是还在pending状态,未找到匹配项进行实际数据传输的时候,可以取消这个操作。首先要提及的,这是一个非常昂贵的操作,一般用在发生异常的情况下:

int MPI_Cancel(MPI_Request *request)

request标识对应的操作,返回的reqeust可以被MPI_Wait等调用,但是会立刻返回。取消保证发送和接收要么全部完成,要不一点都不做,不会有发送一部分数据的情况。比如如果是buffer模式的send操作,取消成功后buffer就会被释放,但是如果已经开始发送,取消操作就会以失败告终。使用被取消的request进行wait操作返回的status中保存有被取消的信息,要使用相应的函数取判断:

int MPI_Test_cancelled(const MPI_Status *status, int *flag)

在一个可能被取消的操作中,使用前,一定要判断一下是否真的被取消了。因为被取消后statues的值都是未定义的。

 持久的Request

前面知道,如果一个异步通信操作完成(wait 返回),相应的request变量就会被释放,虽然并不是空间的释放,但其与通信操作的绑定联系就失去了。有时候需要使用相同参数的Send或者Recv不断循环调用。那么一次次循环就会不断绑定,解绑定request。但每次绑定后的值是一样的。就造成了某种程度的浪费。这时,可以使用持久化的Request值,使用结束后并不解绑定,而是标识为inactive,下次可以继续使用:

int MPI_Send_init(const void* buf, int count, MPI_Datatype datatype,
        int dest, int tag, MPI_Comm comm, MPI_Request *request)

上述参数只有request是输出参数,返回之后,它就标识了前面参数确定的发送消息。当然对应的还有其它几种通信模式MPI_Bsend_init,MPI_Ssend_init,MPI_Rsend_init。还有MPI_Recv_init,略有不同是Recv的buf参数是输出参数。

在初始化之后,返回的Request是inactve状态,还不能发送和接收。需要使用start来激活:

int MPI_Start(MPI_Request *request)
int MPI_Startall(int count, MPI_Request array_of_requests[])

那如何释放持久化的request呢?使用前面提到的显示释放函数:MPI_Request_free。只有当持久化request是inactive状态才能够显示释放。

Done