Python中如何使用mpi4py实现高效并行计算?

2026-06-09 12:313阅读0评论SEO资讯
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1740个文字,预计阅读时间需要7分钟。

Python中如何使用mpi4py实现高效并行计算?

MPI和MPI4PY的搭建与基本用法介绍

本文在介绍MPI(消息传递接口)和MPI4PY(MPI的Python接口)搭建的基础上,将介绍一些基本用法。

1. MPI和MPI4PY的搭建

一篇关于MPI和MPI4PY搭建的文章已经介绍过,这里不再赘述。

2. MPI4PY的基本用法

以下是一个简单的示例:

pythonfrom mpi4py import MPI

获取MPI环境comm=MPI.COMM_WORLD

获取进程号rank=comm.Get_rank()

打印进程号print(fHello from process {rank})

启动MPI执行器mpiexec -n 5 python3 x.py

3. 点对点通信

由于MPI4PY中点对点通信使用`s`(send)和`r`(recv)进行,以下是一个示例:

pythonfrom mpi4py import MPI

获取MPI环境comm=MPI.COMM_WORLD

获取进程号rank=comm.Get_rank()

发送者进程if rank==0: # 发送消息 comm.send(Hello, dest=1, tag=11) print(Sent message to process 1)

接收者进程elif rank==1: # 接收消息 msg=comm.recv(source=0, tag=11) print(fReceived message from process 0: {msg})

MPI 和 MPI4PY 的搭建上一篇文章已经介绍,这里面介绍一些基本用法。

mpi4py 的 helloworld

from mpi4py import MPI
print("hello world")

mpiexec -n 5 python3 x.py

2. 点对点通信

因为 mpi4py 中点对点的 通信 send 语句 在数据量较小的时候是把发送数据拷贝到缓存区,是非堵塞的操作, 然而在数据量较大时候是堵塞操作,由此如下:

在 发送较小数据时:

import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

# point to point communication
data_send = [comm_rank]*5

comm.send(data_send,dest=(comm_rank+1)%comm_size)

data_recv =comm.recv(source=(comm_rank-1)%comm_size)

print("my rank is %d, and Ireceived:" % comm_rank)
print(data_recv)

在数据量较大时, 比如发送 :

# point to point communication
data_send = [comm_rank]*1000000

这时候就会造成各个进程之间的死锁。(因为这时候各个进程是堵塞执行,每个进程都在等待另一个进程的发送数据)


修改后的代码,所有进程顺序执行, 0进程发送给1,1接收然后发送给2,以此类推:

import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

data_send = [comm_rank]*1000000

if comm_rank == 0:
comm.send(data_send, dest=(comm_rank+1)%comm_size)

if comm_rank > 0:
data_recv = comm.recv(source=(comm_rank-1)%comm_size)
comm.send(data_send, dest=(comm_rank+1)%comm_size)

if comm_rank == 0:
data_recv = comm.recv(source=(comm_rank-1)%comm_size)

print("my rank is %d, and Ireceived:" % comm_rank)
print(data_recv)



3 群体通信

3.1 广播bcast

一个进程把数据发送给所有进程

import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
data = range(comm_size)

dat = comm.bcast(data if comm_rank == 0 else None, root=0)

print('rank %d, got:' % (comm_rank))
print(dat)


发送方 也会收到 这部分数据,当然发送方这份数据并不是网络传输接受的,而是本身内存空间中就是存在的。

3.2 散播scatter


import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
data = range(comm_size)
else:
data = None

local_data = comm.scatter(data, root=0)

print('rank %d, got:' % comm_rank)
print(local_data)



3.3 收集gather

将所有数据搜集回来


import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
data = range(comm_size)
else:
data = None

local_data = comm.scatter(data, root=0)
local_data = local_data * 2

print('rank %d, got and do:' % comm_rank)
print(local_data)

combine_data = comm.gather(local_data,root=0)

if comm_rank == 0:
print("root recv {0}".format(combine_data))


3.4 规约reduce

import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
data = range(comm_size)
else:
data = None

local_data = comm.scatter(data, root=0)
local_data = local_data * 2

print('rank %d, got and do:' % comm_rank)
print(local_data)

all_sum = comm.reduce(local_data, root=0,op=MPI.SUM)

if comm_rank == 0:
print('sum is:%d' % all_sum)

SUM MAX MIN 等操作在数据搜集是在各个进程中进行一次操作后汇总到 root 进程中再进行一次总的操作。

op=MPI.SUMop=MPI.MAXop=MPI.MIN


3.5 对一个文件的多个行并行处理

Python中如何使用mpi4py实现高效并行计算?

#!usr/bin/env python
#-*- coding: utf-8 -*-
import sys
import os
import mpi4py.MPI as MPI
import numpy as np

# Global variables for MPI
# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm_size = comm.Get_size()


if __name__ == '__main__':
if comm_rank == 0:
sys.stderr.write("processor root starts reading data...\n")
all_lines = sys.stdin.readlines()

all_lines = comm.bcast(all_lines if comm_rank == 0 else None, root = 0)

num_lines = len(all_lines)
local_lines_offset = np.linspace(0, num_lines, comm_size +1).astype('int')

local_lines = all_lines[local_lines_offset[comm_rank] :local_lines_offset[comm_rank + 1]]

sys.stderr.write("%d/%d processor gets %d/%d data \n" %(comm_rank, comm_size, len(local_lines), num_lines))

for line in local_lines:
output = line.strip() + ' : process every line here'
print(output)

3.6 对多个文件并行处理

#!usr/bin/env python
#-*- coding: utf-8 -*-
import sys
import os
import mpi4py.MPI as MPI
import numpy as np

# Global variables for MPI
# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm_size = comm.Get_size()


if __name__ == '__main__':
if len(sys.argv) != 2:
sys.stderr.write("Usage: python *.py directoty_with_files\n")
sys.exit(1)

path = sys.argv[1]

if comm_rank == 0:
file_list = os.listdir(path)
sys.stderr.write("......%d files......\n" % len(file_list))

file_list = comm.bcast(file_list if comm_rank == 0 else None, root = 0)
num_files = len(file_list)
local_files_offset = np.linspace(0, num_files, comm_size +1).astype('int')
local_files = file_list[local_files_offset[comm_rank] :local_files_offset[comm_rank + 1]]

sys.stderr.write("%d/%d processor gets %d/%d data \n" %(comm_rank, comm_size, len(local_files), num_files))

sys.stderr.write("processor %d has %s files \n"%(comm_rank, local_files))

3.7 联合numpy对矩阵的多个行或者多列并行处理

import os, sys, time
import numpy as np
import mpi4py.MPI as MPI

# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm_size = comm.Get_size()

# test MPI
if __name__ == "__main__":
#create a matrix
if comm_rank == 0:
all_data = np.arange(20).reshape(4, 5)
print("************ data start******************")
print(all_data)
print("************ data end******************")

#broadcast the data to all processors
all_data = comm.bcast(all_data if comm_rank == 0 else None, root = 0)

#divide the data to each processor
num_samples = all_data.shape[0]
local_data_offset = np.linspace(0, num_samples, comm_size + 1).astype('int')

#get the local data which will be processed in this processor
local_data = all_data[local_data_offset[comm_rank] :local_data_offset[comm_rank + 1]]
print("****** %d/%d processor gets local data ****" %(comm_rank, comm_size))
print(local_data)

#reduce to get sum of elements
local_sum = local_data.sum()
all_sum = comm.reduce(local_sum, root = 0, op = MPI.SUM)

#process in local
local_result = local_data ** 2

#gather the result from all processors and broadcast it
result = comm.allgather(local_result)
result = np.vstack(result)

if comm_rank == 0:
print("*** sum: ", all_sum)
print("************ result ******************")
print(result)

参考文章:

《Python多核编程mpi4py实践》



本文共计1740个文字,预计阅读时间需要7分钟。

Python中如何使用mpi4py实现高效并行计算?

MPI和MPI4PY的搭建与基本用法介绍

本文在介绍MPI(消息传递接口)和MPI4PY(MPI的Python接口)搭建的基础上,将介绍一些基本用法。

1. MPI和MPI4PY的搭建

一篇关于MPI和MPI4PY搭建的文章已经介绍过,这里不再赘述。

2. MPI4PY的基本用法

以下是一个简单的示例:

pythonfrom mpi4py import MPI

获取MPI环境comm=MPI.COMM_WORLD

获取进程号rank=comm.Get_rank()

打印进程号print(fHello from process {rank})

启动MPI执行器mpiexec -n 5 python3 x.py

3. 点对点通信

由于MPI4PY中点对点通信使用`s`(send)和`r`(recv)进行,以下是一个示例:

pythonfrom mpi4py import MPI

获取MPI环境comm=MPI.COMM_WORLD

获取进程号rank=comm.Get_rank()

发送者进程if rank==0: # 发送消息 comm.send(Hello, dest=1, tag=11) print(Sent message to process 1)

接收者进程elif rank==1: # 接收消息 msg=comm.recv(source=0, tag=11) print(fReceived message from process 0: {msg})

MPI 和 MPI4PY 的搭建上一篇文章已经介绍,这里面介绍一些基本用法。

mpi4py 的 helloworld

from mpi4py import MPI
print("hello world")

mpiexec -n 5 python3 x.py

2. 点对点通信

因为 mpi4py 中点对点的 通信 send 语句 在数据量较小的时候是把发送数据拷贝到缓存区,是非堵塞的操作, 然而在数据量较大时候是堵塞操作,由此如下:

在 发送较小数据时:

import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

# point to point communication
data_send = [comm_rank]*5

comm.send(data_send,dest=(comm_rank+1)%comm_size)

data_recv =comm.recv(source=(comm_rank-1)%comm_size)

print("my rank is %d, and Ireceived:" % comm_rank)
print(data_recv)

在数据量较大时, 比如发送 :

# point to point communication
data_send = [comm_rank]*1000000

这时候就会造成各个进程之间的死锁。(因为这时候各个进程是堵塞执行,每个进程都在等待另一个进程的发送数据)


修改后的代码,所有进程顺序执行, 0进程发送给1,1接收然后发送给2,以此类推:

import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

data_send = [comm_rank]*1000000

if comm_rank == 0:
comm.send(data_send, dest=(comm_rank+1)%comm_size)

if comm_rank > 0:
data_recv = comm.recv(source=(comm_rank-1)%comm_size)
comm.send(data_send, dest=(comm_rank+1)%comm_size)

if comm_rank == 0:
data_recv = comm.recv(source=(comm_rank-1)%comm_size)

print("my rank is %d, and Ireceived:" % comm_rank)
print(data_recv)



3 群体通信

3.1 广播bcast

一个进程把数据发送给所有进程

import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
data = range(comm_size)

dat = comm.bcast(data if comm_rank == 0 else None, root=0)

print('rank %d, got:' % (comm_rank))
print(dat)


发送方 也会收到 这部分数据,当然发送方这份数据并不是网络传输接受的,而是本身内存空间中就是存在的。

3.2 散播scatter


import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
data = range(comm_size)
else:
data = None

local_data = comm.scatter(data, root=0)

print('rank %d, got:' % comm_rank)
print(local_data)



3.3 收集gather

将所有数据搜集回来


import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
data = range(comm_size)
else:
data = None

local_data = comm.scatter(data, root=0)
local_data = local_data * 2

print('rank %d, got and do:' % comm_rank)
print(local_data)

combine_data = comm.gather(local_data,root=0)

if comm_rank == 0:
print("root recv {0}".format(combine_data))


3.4 规约reduce

import mpi4py.MPI as MPI

comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()

if comm_rank == 0:
data = range(comm_size)
else:
data = None

local_data = comm.scatter(data, root=0)
local_data = local_data * 2

print('rank %d, got and do:' % comm_rank)
print(local_data)

all_sum = comm.reduce(local_data, root=0,op=MPI.SUM)

if comm_rank == 0:
print('sum is:%d' % all_sum)

SUM MAX MIN 等操作在数据搜集是在各个进程中进行一次操作后汇总到 root 进程中再进行一次总的操作。

op=MPI.SUMop=MPI.MAXop=MPI.MIN


3.5 对一个文件的多个行并行处理

Python中如何使用mpi4py实现高效并行计算?

#!usr/bin/env python
#-*- coding: utf-8 -*-
import sys
import os
import mpi4py.MPI as MPI
import numpy as np

# Global variables for MPI
# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm_size = comm.Get_size()


if __name__ == '__main__':
if comm_rank == 0:
sys.stderr.write("processor root starts reading data...\n")
all_lines = sys.stdin.readlines()

all_lines = comm.bcast(all_lines if comm_rank == 0 else None, root = 0)

num_lines = len(all_lines)
local_lines_offset = np.linspace(0, num_lines, comm_size +1).astype('int')

local_lines = all_lines[local_lines_offset[comm_rank] :local_lines_offset[comm_rank + 1]]

sys.stderr.write("%d/%d processor gets %d/%d data \n" %(comm_rank, comm_size, len(local_lines), num_lines))

for line in local_lines:
output = line.strip() + ' : process every line here'
print(output)

3.6 对多个文件并行处理

#!usr/bin/env python
#-*- coding: utf-8 -*-
import sys
import os
import mpi4py.MPI as MPI
import numpy as np

# Global variables for MPI
# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm_size = comm.Get_size()


if __name__ == '__main__':
if len(sys.argv) != 2:
sys.stderr.write("Usage: python *.py directoty_with_files\n")
sys.exit(1)

path = sys.argv[1]

if comm_rank == 0:
file_list = os.listdir(path)
sys.stderr.write("......%d files......\n" % len(file_list))

file_list = comm.bcast(file_list if comm_rank == 0 else None, root = 0)
num_files = len(file_list)
local_files_offset = np.linspace(0, num_files, comm_size +1).astype('int')
local_files = file_list[local_files_offset[comm_rank] :local_files_offset[comm_rank + 1]]

sys.stderr.write("%d/%d processor gets %d/%d data \n" %(comm_rank, comm_size, len(local_files), num_files))

sys.stderr.write("processor %d has %s files \n"%(comm_rank, local_files))

3.7 联合numpy对矩阵的多个行或者多列并行处理

import os, sys, time
import numpy as np
import mpi4py.MPI as MPI

# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm_size = comm.Get_size()

# test MPI
if __name__ == "__main__":
#create a matrix
if comm_rank == 0:
all_data = np.arange(20).reshape(4, 5)
print("************ data start******************")
print(all_data)
print("************ data end******************")

#broadcast the data to all processors
all_data = comm.bcast(all_data if comm_rank == 0 else None, root = 0)

#divide the data to each processor
num_samples = all_data.shape[0]
local_data_offset = np.linspace(0, num_samples, comm_size + 1).astype('int')

#get the local data which will be processed in this processor
local_data = all_data[local_data_offset[comm_rank] :local_data_offset[comm_rank + 1]]
print("****** %d/%d processor gets local data ****" %(comm_rank, comm_size))
print(local_data)

#reduce to get sum of elements
local_sum = local_data.sum()
all_sum = comm.reduce(local_sum, root = 0, op = MPI.SUM)

#process in local
local_result = local_data ** 2

#gather the result from all processors and broadcast it
result = comm.allgather(local_result)
result = np.vstack(result)

if comm_rank == 0:
print("*** sum: ", all_sum)
print("************ result ******************")
print(result)

参考文章:

《Python多核编程mpi4py实践》