drmaa Python包如何实现集群任务高效管理?

2026-06-10 21:358阅读0评论SEO问题
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2363个文字,预计阅读时间需要10分钟。

drmaa Python包如何实现集群任务高效管理?

目录

drmaa Python包如何实现集群任务高效管理?

1. DRMAA简介

2.安装和配置

3.示例

3.1. 开始和终止会话 3.2. 运行工作 3.3. 等待工作 3.4. 控制工作 3.5. 查询工作状态

目录

  • ​​1. drmaa简介​​
  • ​​2. 安装和配置​​
  • ​​3. 示例​​
  • ​​3.1 开始和终止会话​​
  • ​​3.2 运行工作​​
  • ​​3.3 等待工作​​
  • ​​3.4 控制工作​​
  • ​​3.5 查询工作状态​​
  • ​​4. 应用​​
  • ​​4.1 写一个简单应用​​
  • ​​4.2 应用示例2​​


搭建流程时,我们把各个模块脚本都写好了,现在通过编写主程序将模块串起来,那么怎么样依次(或者并行)将任务自动投递到集群呢?就是说这一步运行完之后,下一步自动运行。我们当然可以在脚本中设一个标志,反复检查这一个标志是否出现来决定是否运行下一步,但这种方法太原始,太多弊端了,耗内存,无法并行,且不可预料的出错。那么,有没有相应的工具来管理集群任务投递?有,python的drmaa包可以实现。

1. drmaa简介

​​Distributed Resource Management Application API​​(DRMAA),即分布式资源管理应用程序API,是一种高级 开放网格论坛(Open_Grid_Forum)应用程序接口规范,用于向分布式资源管理(DRM)系统(例如集群或网格计算提交和控制作业)。API的范围涵盖了应用程序提交,控制和监视DRM系统中执行资源上的作业所需的所有高级功能。DRMAA API已在Sun的Grid Engine(SGE)和Condor等作业管理调度系统中实现。关于SGE可参考我的推文:​​集群SGE作业调度系统​​

C、C++、Perl、Python等程序语言都开发有相应的drmaa包来实现SGE集群的任务管理。这里记录下drmaa-python:
Github:​​​drmaa-python​​​ PyPi:​​pypi.org/project/drmaa/​​

2. 安装和配置

要求:Python2.7+;与DRMAA兼容的集群,如SGE。

#安装
pipinstalldrmaa

#设置路径
exportSGE_ROOT=/path/to/gridengine#SGE安装的路径
exportSGE_CELL=default

#设置库
exportDRMAA_LIBRARY_PATH=/usr/lib/libdrmaa.so.1.0
#libdrmaa.so.1.0C动态库,是libdrmaa-dev包的一部分

3. 示例

3.1 开始和终止会话

Session

#!/usr/bin/envpython

importdrmaa

defmain():
"""Createadrmaasessionandexit"""
withdrmaa.Session()ass:#自动初始化,组织工作提交
print('Asessionwasstartedsuccessfully')
#with结束自动exit(),大部分函数都要在exit()前执行,如runJob/wait,getContact可在exit()后。
if__name__=='__main__':
main()

使用可重新连接的会话,可以将DRMAA库初始化为上一个会话,从而允许该库访问该会话的作业列表.

#!/usr/bin/envpython

importdrmaa

defmain():
"""
Createasession,showthateachsessionhasanID,usesessionIDto
disconnect,thenreconnect.Finally,exit.
"""
s=drmaa.Session()
s.initialize()
print('Asessionwasstartedsuccessfully')
response=s.contact
print('sessioncontactreturns:%s'%response)
s.exit()
print('Exitedfromsession')

s.initialize(response)#初始化上个session
print('Sessionwasrestartedsuccessfullly')
s.exit()


if__name__=='__main__':
main()

3.2 运行工作

假设已知当前目录有一个​​sleeper.sh​​脚本,后接两个参数:

#!/bin/bash
echo"Helloworld,theansweris$1"
sleep3s
echo"$2Byeworld!"

drmaa将sleeper.sh提交到SGE:

#!/usr/bin/envpython

importdrmaa
importos

defmain():
"""
Submitajob.
Note,needfilecalledsleeper.shincurrentdirectory.
"""
withdrmaa.Session()ass:
print('Creatingjobtemplate')
jt=s.createJobTemplate()#分配工作模板(存储提交作业的信息结构)
jt.remoteCommand=os.path.join(os.getcwd(),'sleeper.sh')#设置remoteCommand属性,找到要运行的程序。
#路径默认为用户的主目录,相对路径用workingDirectory属性
jt.args=['42','Simonsays:']#执行文件的参数
jt.joinFiles=True

jobid=s.runJob(jt)#将分配给作业的ID放入我们传递给的字符数组中runJob()
print('YourjobhasbeensubmittedwithID%s'%jobid)

#jobid=s.runBulkJobs(jt,1,30,2)#提交一个数组作业
#print('YourjobshavebeensubmittedwithIDs%s'%jobid)

print('Cleaningup')
s.deleteJobTemplate(jt)#删除作业模板,释放作业模板保留的DRMAA内存,但对提交的作业没有影响

if__name__=='__main__':
main()

3.3 等待工作

即等待任务完成

#!/usr/bin/envpython

importdrmaa
importos

defmain():
"""
Submitajobandwaitforittofinish.
Note,needfilecalledsleeper.shinhomedirectory.
"""
withdrmaa.Session()ass:
print('Creatingjobtemplate')
jt=s.createJobTemplate()
jt.remoteCommand=os.path.join(os.getcwd(),'sleeper.sh')
jt.args=['42','Simonsays:']
jt.joinFiles=True

jobid=s.runJob(jt)
print('YourjobhasbeensubmittedwithID%s'%jobid)

retval=s.wait(jobid,drmaa.Session.TIMEOUT_WAIT_FOREVER)#调用wait()等待作业结束
print('Job:{0}finishedwithstatus{1}'.format(retval.jobId,retval.hasExited))

#以下是提交多个作业的等待处理,synchronize替代wait
#joblist=s.runBulkJobs(jt,1,30,2)
#print('YourjobshavebeensubmittedwithIDs%s'%joblist)
#s.synchronize(joblist,drmaa.Session.TIMEOUT_WAIT_FOREVER,True)

print('Cleaningup')
s.deleteJobTemplate(jt)

if__name__=='__main__':
main()

wait()返回一个JobInfo元组,其具有下面的属性: ​​jobId,hasExited,hasSignal,terminatedSignal,hasCoreDump, wasAborted,exitStatus,resourceUsage​​

synchronize()的第3个参数是该synchronize()的调用是否在工作后清除。工作完成后,它会留下一些统计信息,如退出状态和用途,直到wait() 或synchronize()的处理状态变为True。确保每一项任务对这两个函数之一调用是很有必要的,否则可能引起内存泄漏。如果想要每一项任务恢复统计信息,可将synchronize()设置False。如下:

joblist=s.runBulkJobs(jt,1,30,2)
print('YourjobshavebeensubmittedwithIDs%s'%joblist)

s.synchronize(joblist,drmaa.Session.TIMEOUT_WAIT_FOREVER,False)#False,每一项工作等待一次
forcurjobinjoblist:
print('Collectingjob'+curjob)
retval=s.wait(curjob,drmaa.Session.TIMEOUT_WAIT_FOREVER)
print('Job:{0}finishedwithstatus{1}'.format(retval.jobId,retval.hasExited))

3.4 控制工作

#!/usr/bin/envpython

importdrmaa
importos

defmain():
"""Submitajob,thenkillit.
Note,needfilecalledsleeper.shinhomedirectory.
"""
withdrmaa.Session()ass:
print('Creatingjobtemplate')
jt=s.createJobTemplate()
jt.remoteCommand=os.path.join(os.getcwd(),'sleeper.sh')
jt.args=['42','Simonsays:']
jt.joinFiles=True

jobid=s.runJob(jt)
print('YourjobhasbeensubmittedwithID%s'%jobid)
#optionsare:SUSPEND,RESUME,HOLD,RELEASE,TERMINATE
s.control(jobid,drmaa.JobControlAction.TERMINATE)#删除刚提交的作业

print('Cleaningup')
s.deleteJobTemplate(jt)

if__name__=='__main__':
main()

还可以用control()来暂停,恢复,保留或释放工作。control()还可用于控制未通过DRMAA提交的作业,可以将任何有效的SGE作业ID传递control()为要删除的作业ID。

3.5 查询工作状态

#!/usr/bin/envpython

importdrmaa
importtime
importos

defmain():
"""
Submitajob,andcheckitsprogress.
Note,needfilecalledsleeper.shinhomedirectory.
"""
withdrmaa.Session()ass:
print('Creatingjobtemplate')
jt=s.createJobTemplate()
jt.remoteCommand=os.path.join(os.getcwd(),'sleeper.sh')
jt.args=['42','Simonsays:']
jt.joinFiles=True

jobid=s.runJob(jt)
print('YourjobhasbeensubmittedwithID%s'%jobid)

#Whoneedsacasestatementwhenyouhavedictionaries?
decodestatus={drmaa.JobState.UNDETERMINED:'processstatuscannotbedetermined',
drmaa.JobState.QUEUED_ACTIVE:'jobisqueuedandactive',
drmaa.JobState.SYSTEM_ON_HOLD:'jobisqueuedandinsystemhold',
drmaa.JobState.USER_ON_HOLD:'jobisqueuedandinuserhold',
drmaa.JobState.USER_SYSTEM_ON_HOLD:'jobisqueuedandinuserandsystemhold',
drmaa.JobState.RUNNING:'jobisrunning',
drmaa.JobState.SYSTEM_SUSPENDED:'jobissystemsuspended',
drmaa.JobState.USER_SUSPENDED:'jobisusersuspended',
drmaa.JobState.DONE:'jobfinishednormally',
drmaa.JobState.FAILED:'jobfinished,butfailed'}

forixinrange(10):
print('Checking%sof10times'%ix)
printdecodestatus(s.jobStatus(jobid))#jobStatus()获取作业的状态
time.sleep(5)

print('Cleaningup')
s.deleteJobTemplate(jt)

if__name__=='__main__':
main()#确定工作状态并报告

其他更多关于JobInfo,JobTemplate,Session等方法的属性可参考:​​drmaa-python.readthedocs.io/en/latest/drmaa.html​​

4. 应用

4.1 写一个简单应用

#!/usr/bin/envpython

importdrmaa
importos

classSGE():
def__init__(self):
self.__sgeProject="Test"
self.__sgeQueue="test.q"
self.__maxvmen="1G"
self.__proc="1"
self.__script=""
self.__workdir=""
self.__session=""
defsetSgeProject(self,p):
self.__sgeProject=p
defgetSgeProject(self):
returnself.__sgeProject
defsetSgeQueue(self,q):
self.__sgeQueue=q
defgetSgeQueue(self):
returnself.__sgeQueue
defsetMaxvmem(self,m):
self.__maxvmem=m
defsetNumproc(self,proc):
self.__proc=proc
defgetMaxvmem(self):
returnself.__maxvmem
defsetScript(self,s):
self.__script=s
defgetScript(self):
returnself.__script
defsetWorkDir(self,w):
self.__workdir=w
defgetWorkDir(self):
returnself.__workdir
defsetSession(self,ss):
self.__session=ss
defgetSession(self):
returnself.__session

defsubmit(self):
st=os.stat(self.__script)#系统stat的调用,返回stat结构
os.chmod(self.__script,st.st_mode|stat.S_IEXEC|stat.S_IXGRP)#S_IEXEC是S_IXUSR同义词,所有者具有执行权限;S_IXGRP,组具有执行权限
jt=self.__session.createJobTemplate()##分配工作模板
jt.remoteCommand=self.__script#remoteCommand属性找到要执行的脚本
jt.workingDirectory=self.__workdir#设定当前工作目录
par4qsub="".join(["-bindinglinear:",self.__proc,"-P",self.__sgeProject,"-q",self.__sgeQueue,"-cwd-l","vf=",self.__maxvmem,"-lp=",self.__proc])
print('qsub{0}{1}'.format(par4qsub,self.__script))
jt.nativeSpecification=par4qsub#传递给jt的指令
jobid=self.__session.runJob(jt)#将分配给作业的ID传递给的字符数组
self.__session.deleteJobTemplate(jt)
returnjobid

defmain():
withdrmaa.Session()ass:
sgeObj=SGE()
sgeObj.setSession(session)
sgeObj.setSgeProject("SGEProject")
sgeObj.setSgeQueue("SGEQueue")
dict_qsub_id={}
joblist=[]
cwdir=os.path.join(getcwd())
sgeObj.setWorkDir(cwdir)
sgeObj.setScript(os.path.join(cwdir,"test.sh"))
sgeObj.setMaxvmem("Memory")
sgeObj.setNumproc("1")
jobid=sgeObj.submit()
dict_qsub_id[jobid]=os.path.join(cwdir,"test.sh")
joblist.append(jobid)

s.synchronize(joblist,drmaa.Session.TIMEOUT_WAIT_FOREVER,False)#设为false
forcurjobinjoblist:
retval=session.wait(curjob,drmaa.Session.TIMEOUT_WAIT_FOREVER)
print('Job:{0}finishedwithstatus{1}'.format(retval.jobId,retval.hasExited))

if__name__=="__main__":
main()

4.2 应用示例2

说明:用MEGAN做微生物物种注释时,blast nr得到的结果太多,一次性注释太久,因此将其拆分开来。Linux环境中使用MEGAN注释需要调用xvfb-run(相当于一个wrapper, 给应用程序提供虚拟的 X server),但xvfb不能并行,当我同时运行多个注释时,MEGAN生成的临时文件rma会发生冲突,因而无法同时得到注释结果。不能并行就只能串行,但我拆分了上百份文件,不可能手动一个个投递,如何一个个任务依次运行呢?可以用drmaa写个循环。

#继承上面的SGE类
defcheck_status(retval,running_log,path,email):
if(retval.exitStatus!=0):#出错的要发邮件通知
running_log.write('{0}\nErrorjob:{1}\nexitStatus:{2}\nwasAborted:{3}\nmaxvmem:{4}Gb\nQsub_id:{5}\n\n'.format("="*40,path,retval.exitStatus,retval.wasAborted,str(float(retval.resourceUsage['maxvmem'])/1000000000),retval.jobId))
running_log.close()
emailObj=Email()
emailObj.setReceiver(email)
emailObj.sendMail('<html><head><metadrmaa-python.readthedocs.io/en/latest/tutorials.html#starting-and-stopping-a-session​​



作者:Bioinfarmer,若要及时了解动态信息,请关注同名微信公众号:Bioinfarmer。

本文共计2363个文字,预计阅读时间需要10分钟。

drmaa Python包如何实现集群任务高效管理?

目录

drmaa Python包如何实现集群任务高效管理?

1. DRMAA简介

2.安装和配置

3.示例

3.1. 开始和终止会话 3.2. 运行工作 3.3. 等待工作 3.4. 控制工作 3.5. 查询工作状态

目录

  • ​​1. drmaa简介​​
  • ​​2. 安装和配置​​
  • ​​3. 示例​​
  • ​​3.1 开始和终止会话​​
  • ​​3.2 运行工作​​
  • ​​3.3 等待工作​​
  • ​​3.4 控制工作​​
  • ​​3.5 查询工作状态​​
  • ​​4. 应用​​
  • ​​4.1 写一个简单应用​​
  • ​​4.2 应用示例2​​


搭建流程时,我们把各个模块脚本都写好了,现在通过编写主程序将模块串起来,那么怎么样依次(或者并行)将任务自动投递到集群呢?就是说这一步运行完之后,下一步自动运行。我们当然可以在脚本中设一个标志,反复检查这一个标志是否出现来决定是否运行下一步,但这种方法太原始,太多弊端了,耗内存,无法并行,且不可预料的出错。那么,有没有相应的工具来管理集群任务投递?有,python的drmaa包可以实现。

1. drmaa简介

​​Distributed Resource Management Application API​​(DRMAA),即分布式资源管理应用程序API,是一种高级 开放网格论坛(Open_Grid_Forum)应用程序接口规范,用于向分布式资源管理(DRM)系统(例如集群或网格计算提交和控制作业)。API的范围涵盖了应用程序提交,控制和监视DRM系统中执行资源上的作业所需的所有高级功能。DRMAA API已在Sun的Grid Engine(SGE)和Condor等作业管理调度系统中实现。关于SGE可参考我的推文:​​集群SGE作业调度系统​​

C、C++、Perl、Python等程序语言都开发有相应的drmaa包来实现SGE集群的任务管理。这里记录下drmaa-python:
Github:​​​drmaa-python​​​ PyPi:​​pypi.org/project/drmaa/​​

2. 安装和配置

要求:Python2.7+;与DRMAA兼容的集群,如SGE。

#安装
pipinstalldrmaa

#设置路径
exportSGE_ROOT=/path/to/gridengine#SGE安装的路径
exportSGE_CELL=default

#设置库
exportDRMAA_LIBRARY_PATH=/usr/lib/libdrmaa.so.1.0
#libdrmaa.so.1.0C动态库,是libdrmaa-dev包的一部分

3. 示例

3.1 开始和终止会话

Session

#!/usr/bin/envpython

importdrmaa

defmain():
"""Createadrmaasessionandexit"""
withdrmaa.Session()ass:#自动初始化,组织工作提交
print('Asessionwasstartedsuccessfully')
#with结束自动exit(),大部分函数都要在exit()前执行,如runJob/wait,getContact可在exit()后。
if__name__=='__main__':
main()

使用可重新连接的会话,可以将DRMAA库初始化为上一个会话,从而允许该库访问该会话的作业列表.

#!/usr/bin/envpython

importdrmaa

defmain():
"""
Createasession,showthateachsessionhasanID,usesessionIDto
disconnect,thenreconnect.Finally,exit.
"""
s=drmaa.Session()
s.initialize()
print('Asessionwasstartedsuccessfully')
response=s.contact
print('sessioncontactreturns:%s'%response)
s.exit()
print('Exitedfromsession')

s.initialize(response)#初始化上个session
print('Sessionwasrestartedsuccessfullly')
s.exit()


if__name__=='__main__':
main()

3.2 运行工作

假设已知当前目录有一个​​sleeper.sh​​脚本,后接两个参数:

#!/bin/bash
echo"Helloworld,theansweris$1"
sleep3s
echo"$2Byeworld!"

drmaa将sleeper.sh提交到SGE:

#!/usr/bin/envpython

importdrmaa
importos

defmain():
"""
Submitajob.
Note,needfilecalledsleeper.shincurrentdirectory.
"""
withdrmaa.Session()ass:
print('Creatingjobtemplate')
jt=s.createJobTemplate()#分配工作模板(存储提交作业的信息结构)
jt.remoteCommand=os.path.join(os.getcwd(),'sleeper.sh')#设置remoteCommand属性,找到要运行的程序。
#路径默认为用户的主目录,相对路径用workingDirectory属性
jt.args=['42','Simonsays:']#执行文件的参数
jt.joinFiles=True

jobid=s.runJob(jt)#将分配给作业的ID放入我们传递给的字符数组中runJob()
print('YourjobhasbeensubmittedwithID%s'%jobid)

#jobid=s.runBulkJobs(jt,1,30,2)#提交一个数组作业
#print('YourjobshavebeensubmittedwithIDs%s'%jobid)

print('Cleaningup')
s.deleteJobTemplate(jt)#删除作业模板,释放作业模板保留的DRMAA内存,但对提交的作业没有影响

if__name__=='__main__':
main()

3.3 等待工作

即等待任务完成

#!/usr/bin/envpython

importdrmaa
importos

defmain():
"""
Submitajobandwaitforittofinish.
Note,needfilecalledsleeper.shinhomedirectory.
"""
withdrmaa.Session()ass:
print('Creatingjobtemplate')
jt=s.createJobTemplate()
jt.remoteCommand=os.path.join(os.getcwd(),'sleeper.sh')
jt.args=['42','Simonsays:']
jt.joinFiles=True

jobid=s.runJob(jt)
print('YourjobhasbeensubmittedwithID%s'%jobid)

retval=s.wait(jobid,drmaa.Session.TIMEOUT_WAIT_FOREVER)#调用wait()等待作业结束
print('Job:{0}finishedwithstatus{1}'.format(retval.jobId,retval.hasExited))

#以下是提交多个作业的等待处理,synchronize替代wait
#joblist=s.runBulkJobs(jt,1,30,2)
#print('YourjobshavebeensubmittedwithIDs%s'%joblist)
#s.synchronize(joblist,drmaa.Session.TIMEOUT_WAIT_FOREVER,True)

print('Cleaningup')
s.deleteJobTemplate(jt)

if__name__=='__main__':
main()

wait()返回一个JobInfo元组,其具有下面的属性: ​​jobId,hasExited,hasSignal,terminatedSignal,hasCoreDump, wasAborted,exitStatus,resourceUsage​​

synchronize()的第3个参数是该synchronize()的调用是否在工作后清除。工作完成后,它会留下一些统计信息,如退出状态和用途,直到wait() 或synchronize()的处理状态变为True。确保每一项任务对这两个函数之一调用是很有必要的,否则可能引起内存泄漏。如果想要每一项任务恢复统计信息,可将synchronize()设置False。如下:

joblist=s.runBulkJobs(jt,1,30,2)
print('YourjobshavebeensubmittedwithIDs%s'%joblist)

s.synchronize(joblist,drmaa.Session.TIMEOUT_WAIT_FOREVER,False)#False,每一项工作等待一次
forcurjobinjoblist:
print('Collectingjob'+curjob)
retval=s.wait(curjob,drmaa.Session.TIMEOUT_WAIT_FOREVER)
print('Job:{0}finishedwithstatus{1}'.format(retval.jobId,retval.hasExited))

3.4 控制工作

#!/usr/bin/envpython

importdrmaa
importos

defmain():
"""Submitajob,thenkillit.
Note,needfilecalledsleeper.shinhomedirectory.
"""
withdrmaa.Session()ass:
print('Creatingjobtemplate')
jt=s.createJobTemplate()
jt.remoteCommand=os.path.join(os.getcwd(),'sleeper.sh')
jt.args=['42','Simonsays:']
jt.joinFiles=True

jobid=s.runJob(jt)
print('YourjobhasbeensubmittedwithID%s'%jobid)
#optionsare:SUSPEND,RESUME,HOLD,RELEASE,TERMINATE
s.control(jobid,drmaa.JobControlAction.TERMINATE)#删除刚提交的作业

print('Cleaningup')
s.deleteJobTemplate(jt)

if__name__=='__main__':
main()

还可以用control()来暂停,恢复,保留或释放工作。control()还可用于控制未通过DRMAA提交的作业,可以将任何有效的SGE作业ID传递control()为要删除的作业ID。

3.5 查询工作状态

#!/usr/bin/envpython

importdrmaa
importtime
importos

defmain():
"""
Submitajob,andcheckitsprogress.
Note,needfilecalledsleeper.shinhomedirectory.
"""
withdrmaa.Session()ass:
print('Creatingjobtemplate')
jt=s.createJobTemplate()
jt.remoteCommand=os.path.join(os.getcwd(),'sleeper.sh')
jt.args=['42','Simonsays:']
jt.joinFiles=True

jobid=s.runJob(jt)
print('YourjobhasbeensubmittedwithID%s'%jobid)

#Whoneedsacasestatementwhenyouhavedictionaries?
decodestatus={drmaa.JobState.UNDETERMINED:'processstatuscannotbedetermined',
drmaa.JobState.QUEUED_ACTIVE:'jobisqueuedandactive',
drmaa.JobState.SYSTEM_ON_HOLD:'jobisqueuedandinsystemhold',
drmaa.JobState.USER_ON_HOLD:'jobisqueuedandinuserhold',
drmaa.JobState.USER_SYSTEM_ON_HOLD:'jobisqueuedandinuserandsystemhold',
drmaa.JobState.RUNNING:'jobisrunning',
drmaa.JobState.SYSTEM_SUSPENDED:'jobissystemsuspended',
drmaa.JobState.USER_SUSPENDED:'jobisusersuspended',
drmaa.JobState.DONE:'jobfinishednormally',
drmaa.JobState.FAILED:'jobfinished,butfailed'}

forixinrange(10):
print('Checking%sof10times'%ix)
printdecodestatus(s.jobStatus(jobid))#jobStatus()获取作业的状态
time.sleep(5)

print('Cleaningup')
s.deleteJobTemplate(jt)

if__name__=='__main__':
main()#确定工作状态并报告

其他更多关于JobInfo,JobTemplate,Session等方法的属性可参考:​​drmaa-python.readthedocs.io/en/latest/drmaa.html​​

4. 应用

4.1 写一个简单应用

#!/usr/bin/envpython

importdrmaa
importos

classSGE():
def__init__(self):
self.__sgeProject="Test"
self.__sgeQueue="test.q"
self.__maxvmen="1G"
self.__proc="1"
self.__script=""
self.__workdir=""
self.__session=""
defsetSgeProject(self,p):
self.__sgeProject=p
defgetSgeProject(self):
returnself.__sgeProject
defsetSgeQueue(self,q):
self.__sgeQueue=q
defgetSgeQueue(self):
returnself.__sgeQueue
defsetMaxvmem(self,m):
self.__maxvmem=m
defsetNumproc(self,proc):
self.__proc=proc
defgetMaxvmem(self):
returnself.__maxvmem
defsetScript(self,s):
self.__script=s
defgetScript(self):
returnself.__script
defsetWorkDir(self,w):
self.__workdir=w
defgetWorkDir(self):
returnself.__workdir
defsetSession(self,ss):
self.__session=ss
defgetSession(self):
returnself.__session

defsubmit(self):
st=os.stat(self.__script)#系统stat的调用,返回stat结构
os.chmod(self.__script,st.st_mode|stat.S_IEXEC|stat.S_IXGRP)#S_IEXEC是S_IXUSR同义词,所有者具有执行权限;S_IXGRP,组具有执行权限
jt=self.__session.createJobTemplate()##分配工作模板
jt.remoteCommand=self.__script#remoteCommand属性找到要执行的脚本
jt.workingDirectory=self.__workdir#设定当前工作目录
par4qsub="".join(["-bindinglinear:",self.__proc,"-P",self.__sgeProject,"-q",self.__sgeQueue,"-cwd-l","vf=",self.__maxvmem,"-lp=",self.__proc])
print('qsub{0}{1}'.format(par4qsub,self.__script))
jt.nativeSpecification=par4qsub#传递给jt的指令
jobid=self.__session.runJob(jt)#将分配给作业的ID传递给的字符数组
self.__session.deleteJobTemplate(jt)
returnjobid

defmain():
withdrmaa.Session()ass:
sgeObj=SGE()
sgeObj.setSession(session)
sgeObj.setSgeProject("SGEProject")
sgeObj.setSgeQueue("SGEQueue")
dict_qsub_id={}
joblist=[]
cwdir=os.path.join(getcwd())
sgeObj.setWorkDir(cwdir)
sgeObj.setScript(os.path.join(cwdir,"test.sh"))
sgeObj.setMaxvmem("Memory")
sgeObj.setNumproc("1")
jobid=sgeObj.submit()
dict_qsub_id[jobid]=os.path.join(cwdir,"test.sh")
joblist.append(jobid)

s.synchronize(joblist,drmaa.Session.TIMEOUT_WAIT_FOREVER,False)#设为false
forcurjobinjoblist:
retval=session.wait(curjob,drmaa.Session.TIMEOUT_WAIT_FOREVER)
print('Job:{0}finishedwithstatus{1}'.format(retval.jobId,retval.hasExited))

if__name__=="__main__":
main()

4.2 应用示例2

说明:用MEGAN做微生物物种注释时,blast nr得到的结果太多,一次性注释太久,因此将其拆分开来。Linux环境中使用MEGAN注释需要调用xvfb-run(相当于一个wrapper, 给应用程序提供虚拟的 X server),但xvfb不能并行,当我同时运行多个注释时,MEGAN生成的临时文件rma会发生冲突,因而无法同时得到注释结果。不能并行就只能串行,但我拆分了上百份文件,不可能手动一个个投递,如何一个个任务依次运行呢?可以用drmaa写个循环。

#继承上面的SGE类
defcheck_status(retval,running_log,path,email):
if(retval.exitStatus!=0):#出错的要发邮件通知
running_log.write('{0}\nErrorjob:{1}\nexitStatus:{2}\nwasAborted:{3}\nmaxvmem:{4}Gb\nQsub_id:{5}\n\n'.format("="*40,path,retval.exitStatus,retval.wasAborted,str(float(retval.resourceUsage['maxvmem'])/1000000000),retval.jobId))
running_log.close()
emailObj=Email()
emailObj.setReceiver(email)
emailObj.sendMail('<html><head><metadrmaa-python.readthedocs.io/en/latest/tutorials.html#starting-and-stopping-a-session​​



作者:Bioinfarmer,若要及时了解动态信息,请关注同名微信公众号:Bioinfarmer。