2

MPI学习笔记(三):矩阵相乘的分块并行(行列划分法) - 惊小呆520

 1 year ago
source link: https://www.cnblogs.com/babyclass/p/16547487.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

mpi矩阵乘法:C=αAB+βC

一、主从模式的行列划分并行法

1、实现方法

将可用于计算的进程数comm_sz分解为a*b,然后将矩阵A全体行划分为a个部分,将矩阵B全体列划分为b个部分,从而将整个结果矩阵划分为size相同的comm_sz个块。每个子进程负责计算最终结果的一块,只需要接收A对应范围的行和B对应范围的列,而不需要把整个矩阵传过去。主进程负责分发和汇总结果。

进程数comm_sz分解为a*b的方法:
int a=comm_sz/(int)sqrt(comm_sz);
int b=(int)sqrt(comm_sz);

例如comm_sz=12时,a=3,b=4。
但当comm_sz=13时,a=3,b=4,此时进程总数comm_sz不等于a*b,这样就会导致有多余的进程不参与运算。

当总进程数为comm_sz=6时计算以下A*B矩阵,其中矩阵A划分了a=3块,B矩阵划分了b=2块。

2173981-20220828184137151-447036370.png
第一个进程计算C00=A0B0,第二个进程计算C01=A0B1,以此类推。那么如何保证传输到各个进程矩阵A的行数据和矩阵B的列数据是相对应的。

3、对进程总数没有公约数的处理

方法1:当有多余的进程不参与运算时,使用MPI_ABORT终止该进程。
方法2:在MPI开始时加一个判断语句来终止MPI运行,让用户重新选择进程总数。

4、完整代码

#include <stdio.h>
#include "mpi.h"
#include <stdlib.h>
#include <math.h>
#include "dgemm_1.h"
/*** 主从模式 行列划分 ***/
int main(int argc, char **argv)
{
int M=2000,N=3000,K=2500; // 矩阵维度
int my_rank,comm_sz;
double start, stop; //计时时间
double alpha=2,beta=2; // 系数C=aA*B+bC
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD, &comm_sz);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
int a=comm_sz/(int)sqrt(comm_sz); // A矩阵行分多少块
int b=(int)sqrt(comm_sz); // B矩阵列分多少块
if(M<K){
int c=a;
a=b;
b=c;
}
if(comm_sz!=a*b || a>M || b>K){
if(my_rank==0)
printf("error process:%d\n",comm_sz);
MPI_Finalize();
return 0;
}
int saveM=M; // 为了使行数据平均分配每一个进程
if(M%a!=0){
M-=M%a;
M+=a;
}
int saveK=K;// 为了使列数据平均分配每一个进程
if(K%b!=0){
K-=K%b;
K+=b;
}
int each_row=M/a; // 矩阵A每块分多少行数据
int each_col=K/b; // 矩阵B每列分多少列数据
// 给矩阵A B,C赋值
if(my_rank==0){
double *Matrix_A,*Matrix_B,*Matrix_C,*result_Matrix;
Matrix_A=(double*)malloc(M*N*sizeof(double));
Matrix_B=(double*)malloc(N*K*sizeof(double));
Matrix_C=(double*)malloc(M*K*sizeof(double));
result_Matrix=(double*)malloc(M*K*sizeof(double)); // 保存数据计算结果
for(int i=0;i<M;i++){
for(int j=0;j<N;j++)
if(i<saveM)
Matrix_A[i*N+j]=i+1;
else
Matrix_A[i*N+j]=0;
for(int p=0;p<K;p++)
Matrix_C[i*K+p]=1;
}
for(int i=0;i<N;i++){
for(int j=0;j<K;j++)
if(j<saveK)
Matrix_B[i*K+j]=j+1;
else
Matrix_B[i*K+j]=0;
}
// 输出A,B,C
//Matrix_print(Matrix_A,saveM,N);
//Matrix_print2(Matrix_B,N,saveK,K);
//Matrix_print2(Matrix_C,saveM,saveK,K);
printf("a=%d b=%d row=%d col=%d\n",a,b,each_row,each_col);
start=MPI_Wtime();
// 主进程计算第1块
for(int i=0;i<each_row;i++){
for(int j=0;j<each_col;j++){
double temp=0;
for(int p=0;p<N;p++){
temp+=Matrix_A[i*N+p]*Matrix_B[p*K+j];
}
result_Matrix[i*K+j]=alpha*temp+beta*Matrix_C[i*K+j];
}
}
// 向每个进程发送其它块数据
for(int i=1;i<comm_sz;i++){
int beginRow=(i/b)*each_row; // 每个块的行列起始位置(坐标/偏移量)
int beginCol=(i%b)*each_col;
MPI_Send(Matrix_A+beginRow*N,each_row*N,MPI_DOUBLE,i,1,MPI_COMM_WORLD);
for(int j=0;j<N;j++)
MPI_Send(Matrix_B+j*K+beginCol,each_col,MPI_DOUBLE,i,j+2,MPI_COMM_WORLD);
for(int p=0;p<each_row;p++)
MPI_Send(Matrix_C+(beginRow+p)*K+beginCol,each_col,MPI_DOUBLE,i,p+2+N,MPI_COMM_WORLD);
}
// 接收从进程的计算结果
for(int i=1;i<comm_sz;i++){
int beginRow=(i/b)*each_row;
int endRow=beginRow+each_row;
int beginCol=(i%b)*each_col;
for(int j=beginRow;j<endRow;j++)
MPI_Recv(result_Matrix+j*K+beginCol,each_col,MPI_DOUBLE,i,j-beginRow+2+N+each_row,MPI_COMM_WORLD,&status);
}
//Matrix_print2(result_Matrix,saveM,saveK,K);
stop=MPI_Wtime();
printf("rank:%d time:%lfs\n",my_rank,stop-start);
free(Matrix_A);
free(Matrix_B);
free(Matrix_C);
free(result_Matrix);
}
else {
double *buffer_A,*buffer_B,*buffer_C,*ans;
buffer_A=(double*)malloc(each_row*N*sizeof(double)); // A的均分行的数据
buffer_B=(double*)malloc(N*each_col*sizeof(double)); // B的均分列的数据
buffer_C=(double*)malloc(each_row*each_col*sizeof(double)); // C的均分行的数据
//接收A行B列的数据
MPI_Recv(buffer_A,each_row*N,MPI_DOUBLE,0,1,MPI_COMM_WORLD,&status);
for(int j=0;j<N;j++){
MPI_Recv(buffer_B+j*each_col,each_col,MPI_DOUBLE,0,j+2,MPI_COMM_WORLD,&status);
}
for(int p=0;p<each_row;p++){
MPI_Recv(buffer_C+p*each_col,each_col,MPI_DOUBLE,0,p+2+N,MPI_COMM_WORLD,&status);
}
//计算乘积结果,并将结果发送给主进程
for(int i=0;i<each_row;i++){
for(int j=0;j<each_col;j++){
double temp=0;
for(int p=0;p<N;p++){
temp+=buffer_A[i*N+p]*buffer_B[p*each_col+j];
}
buffer_C[i*each_col+j]=alpha*temp+beta*buffer_C[i*each_col+j];
}
}
//matMulti(buffer_A,buffer_B,buffer_C,each_row,N,each_col,alpha,beta);
for(int j=0;j<each_row;j++){
MPI_Send(buffer_C+j*each_col,each_col,MPI_DOUBLE,0,j+2+N+each_row,MPI_COMM_WORLD);
}
free(buffer_A);
free(buffer_B);
free(buffer_C);
}
MPI_Finalize();
return 0;
}

二、对等模式的行列划分并行法

1、实现方法

i、将可用于计算的进程数为comm_sz,然后将矩阵A全体行划分为comm_sz个部分,将矩阵B全体列划分为comm_sz个部分,从而将整个结果矩阵C划分为size相同的comm_sz*comm_sz个块。每个子进程负责计算最终结果的一块,只需要接收A对应范围的行和B对应范围的列,而不需要把整个矩阵传过去。主进程负责分发和汇总结果。
ii、主从模式里是每个进程所需的块数据都是由主进程提供的,而对等模式里每个进程里只存储一块数据,所需的其它块数据要从其它进程交换。
iii、之后开始计算后所需的Bi数据从下一个进程接收,当前进程的Bi数据发向前一个进程,每个进程计算结果矩阵的一行的块矩阵。
2173981-20220828193323058-1315956373.png

2、实现步骤

(1)将Ai,Bi和Cij(j=0,1,...,p-1)存放在第i个处理器中(这样处理的方式是为了使得数据在处理器中不重复);
(2)Pi负责计算Cij(j=0,1,...,p-1);
(3)由于使用P个处理器,每次每个处理器只计算一个Cij,故计算出整个C需要p次完成。
(4)Cij计算是按对角线进行的。

3、完整代码

#include <stdio.h>
#include "mpi.h"
#include <stdlib.h>
#include <math.h>
#include "dgemm_1.h"
/*** 对等模式 行列划分 ***/
int main(int argc, char **argv)
{
int M=10000,N=10000,K=10000; // 矩阵维度
int my_rank,comm_sz;
double start, stop; //计时时间
double alpha=2,beta=2; // 系数C=aA*B+bC
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD, &comm_sz);
MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
if(comm_sz>M || comm_sz>K){
if(my_rank==0)
printf("error process:%d\n",comm_sz);
MPI_Finalize();
return 0;
}
int saveM=M; // 为了使行数据平均分配每一个进程
if(M%comm_sz!=0){
M-=M%comm_sz;
M+=comm_sz;
}
int saveK=K;// 为了使列数据平均分配每一个进程
if(K%comm_sz!=0){
K-=K%comm_sz;
K+=comm_sz;
}
int each_row=M/comm_sz; // 矩阵A每块分多少行数据
int each_col=K/comm_sz; // 矩阵B每列分多少列数据
double *Matrix_A,*Matrix_C,*buffer_A,*buffer_B,*buffer_C,*ans,*result_Matrix;
buffer_A=(double*)malloc(each_row*N*sizeof(double)); // A的块数据
buffer_B=(double*)malloc(N*each_col*sizeof(double)); // B的块数据
buffer_C=(double*)malloc(each_row*K*sizeof(double)); // C的块数据
ans=(double*)malloc(each_row*K*sizeof(double)); // 临时存储每块的结果数据
result_Matrix=(double*)malloc(M*K*sizeof(double)); // 保存数据计算结果
if(my_rank==0){
double *Matrix_B;
Matrix_A=(double*)malloc(M*N*sizeof(double));
Matrix_B=(double*)malloc(N*K*sizeof(double));
Matrix_C=(double*)malloc(M*K*sizeof(double));
// 给矩阵A B,C赋值
for(int i=0;i<M;i++){
for(int j=0;j<N;j++)
if(i<saveM)
Matrix_A[i*N+j]=j-i;
else
Matrix_A[i*N+j]=0;
for(int p=0;p<K;p++)
Matrix_C[i*K+p]=i*p;
}
for(int i=0;i<N;i++){
for(int j=0;j<K;j++)
if(j<saveK)
Matrix_B[i*K+j]=j+i;
else
Matrix_B[i*K+j]=0;
}
// 输出A,B,C
//Matrix_print(Matrix_A,saveM,N);
//Matrix_print2(Matrix_B,N,saveK,K);
//Matrix_print2(Matrix_C,saveM,saveK,K);
printf("row=%d col=%d\n",each_row,each_col);
start=MPI_Wtime();
// 0进程存储第1块 B0数据
for(int i=0;i<N;i++)
for(int j=0;j<each_col;j++)
buffer_B[i*each_col+j]=Matrix_B[i*K+j];
// 发送B1 B2 ... B(comm_sz-1)列数据
for(int p=1;p<comm_sz;p++){
int beginCol=(p%comm_sz)*each_col; // 每个块的列起始位置(坐标/偏移量)
for(int i=0;i<N;i++)
MPI_Send(Matrix_B+i*K+beginCol,each_col,MPI_DOUBLE,p,i,MPI_COMM_WORLD);
}
free(Matrix_B);
}
//  分发A0 A1 A2 ... A(comm_sz)行数据
MPI_Scatter(Matrix_A,each_row*N,MPI_DOUBLE,buffer_A,each_row*N,MPI_DOUBLE,0,MPI_COMM_WORLD);
MPI_Scatter(Matrix_C,each_row*K,MPI_DOUBLE,buffer_C,each_row*K,MPI_DOUBLE,0,MPI_COMM_WORLD);
if(my_rank==0){
free(Matrix_A);
free(Matrix_C);
}
// 接收B列数据
if(my_rank!=0){
for(int i=0;i<N;i++)
MPI_Recv(buffer_B+i*each_col,each_col,MPI_DOUBLE,0,i,MPI_COMM_WORLD,&status);
}
//交换(comm_sz-1)次数据B 计算乘积结果
for(int k=0;k<comm_sz;k++){
//int beginRow=my_rank*each_row; // 每个块的行列起始位置(坐标/偏移量)
int beginCol=((k+my_rank)%comm_sz)*each_col;
for(int i=0;i<each_row;i++){
for(int j=0;j<each_col;j++){
double temp=0;
for(int p=0;p<N;p++){
temp+=buffer_A[i*N+p]*buffer_B[p*each_col+j];
}
ans[i*K+beginCol+j]=alpha*temp+beta*buffer_C[i*K+beginCol+j];
}
}
int dest=(my_rank-1+comm_sz)%comm_sz; // 向上一个进程发送数据块B
int src=(my_rank+1)%comm_sz; // 接收位置
MPI_Sendrecv_replace(buffer_B,N*each_col,MPI_DOUBLE,dest,N,src,N,MPI_COMM_WORLD,&status);
}
// 结果聚集
MPI_Gather(ans,each_row*K,MPI_DOUBLE,result_Matrix,each_row*K,MPI_DOUBLE,0,MPI_COMM_WORLD);
if(my_rank==0){
//Matrix_print2(result_Matrix,saveM,saveK,K);
stop=MPI_Wtime();
double E=(double)(5.957/(stop-start))/comm_sz;
printf("time:%lfs 并行效率:%.2lf%\n",stop-start,100*E);
}
free(buffer_A);
free(buffer_B);
free(buffer_C);
free(ans);
free(result_Matrix);
MPI_Finalize();
return 0;
}

本文作者:惊小呆520

本文链接:https://www.cnblogs.com/babyclass/p/16547487.html

版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 2.5 中国大陆许可协议进行许可。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK