SparkSubmit通过Yarn提交作业的流程是怎样的?

2026-04-11 06:232阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1814个文字,预计阅读时间需要8分钟。

SparkSubmit通过Yarn提交作业的流程是怎样的?

SparkSubmit 提交流程解析及小贴士:分析基于以下执行命令启动:./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster ./examples/jars/spark-example_2.12-3.0.0.jar 10

首先执行了 spark-submit 命令,该命令用于提交 Spark 应用程序。--class 参数指定了主类名,这里是 org.apache.spark.examples.SparkPi,代表一个计算 Pi 的示例程序。--master 参数指定了集群模式,这里是 yarn,表示使用 Yarn 作为资源管理器。--deploy-mode 参数指定了部署模式,这里是 cluster,表示在集群模式下运行。./examples/jars/spark-example_2.12-3.0.0.jar 是 Spark 应用程序的 JAR 包路径。

10表示应用程序需要的 executor 数量。

SparkSubmit通过Yarn提交作业的流程是怎样的?

SparkSubmit提交流程分析

tips:分析基于如下执行命令开始

./spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-example_2.12-3.0.0.jar \ 10

首先执行了spark-submit这个脚本程序,找到这个脚本的代码

#!/usr/bin/env bash if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 #exec 调用spark-class脚本 然后传入SparkSubmit这个类 和 上面那一堆参数 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

然后我们去看spark-class这个脚本的代码(只关注重点版):

#!/usr/bin/env bash if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi . "${SPARK_HOME}"/bin/load-spark-env.sh if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" else if [ "$(command -v java)" ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fi fi if [ -d "${SPARK_HOME}/jars" ]; then SPARK_JARS_DIR="${SPARK_HOME}/jars" else SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars" fi if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2 echo "You need to build Spark with the target \"package\" before running this program." 1>&2 exit 1 else LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" fi if [ -n "$SPARK_PREPEND_CLASSES" ]; then LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH" fi if [[ -n "$SPARK_TESTING" ]]; then unset YARN_CONF_DIR unset HADOOP_CONF_DIR fi #3.$RUNNER="${JAVA_HOME}/bin/java" 调用类路径中的org.apache.spark.launcher.Main类 参数为spark-submit指定的所有参数,在这里调用launcher生成下面jvm command build_command() { "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" printf "%d\0" $? } set +o posix CMD=() DELIM=$'\n' CMD_START_FLAG="false" while IFS= read -d "$DELIM" -r ARG; do if [ "$CMD_START_FLAG" == "true" ]; then #2.CMD在这个循环里一直做累加,这个循环通过build_command把参数准备好 CMD+=("$ARG") else if [ "$ARG" == $'\0' ]; then DELIM='' CMD_START_FLAG="true" elif [ "$ARG" != "" ]; then echo "$ARG" fi fi done < <(build_command "$@") #1。我们执行了一个cmd,这个cmd从哪儿来的 CMD=("${CMD[@]:0:$LAST}") exec "${CMD[@]}"

最后执行的cmd:

/usr/lib/java/jdk1.8.0_144/bin/java -cp \ /home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/conf/:/home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/jars/* \ -Xmx1g \ org.apache.spark.deploy.SparkSubmit \ --master yarn \ --deploy-mode client \ --class org.apache.spark.examples.SparkPi \ ./examples/jars/spark-example_2.12-3.0.0.jar

所以,spark提交脚本很关键的点在于org.apache.spark.deploy.SparkSubmit这个类是怎么运作的,其他的都是参数,我们就先看看这个类的代码:

//一个可以运行的类肯定有main方法,所以我们从main方法开始 override def main(args: Array[String]): Unit = { //new 了一个sparksubmit的匿名内部类 val submit = new SparkSubmit() { self => override protected def parseArguments(args: Array[String]): SparkSubmitArguments = { new SparkSubmitArguments(args) { override protected def logInfo(msg: => String): Unit = self.logInfo(msg) override protected def logWarning(msg: => String): Unit = self.logWarning(msg) override protected def logError(msg: => String): Unit = self.logError(msg) } } override protected def logInfo(msg: => String): Unit = printMessage(msg) override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg") override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg") //所以是执行了这个方法,这个方法又调用的父类的doSubmit(args) override def doSubmit(args: Array[String]): Unit = { try { super.doSubmit(args) } catch { case e: SparkUserAppException => exitFn(e.exitCode) } } } //然后用匿名内部类执行了一个dosubmit方法,此方法在匿名内部类里已被重写 submit.doSubmit(args) } 1.从super.dosubmit开始的提交流程

def doSubmit(args: Array[String]): Unit = { // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the application starts. // 这个是日志,暂且不看 val uninitLog = initializeLogIfNecessary(true, silent = true) // *parseArguments这个方法返回了appArgs,作用在于解析参数 val appArgs = parseArguments(args) if (appArgs.verbose) { logInfo(appArgs.toString) } //这里模式匹配 appArgs.action属性一定在下面这四个之中,所以我们从parseArguments方法开始 appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) case SparkSubmitAction.PRINT_VERSION => printVersion() } } 1.1 parseArguments(args)

protected def parseArguments(args: Array[String]): SparkSubmitArguments = { //构造方法 执行了关键的1.1.1 和 1.1.2 两个东西 new SparkSubmitArguments(args) } 1.1.1 SparkSubmitArguments(args)

try { //代码块儿 parse(args.asJava) } catch { case e: IllegalArgumentException => SparkSubmit.printErrorAndExit(e.getMessage()) }

1.1.2 loadEnvironmentArguments()

1.1.1.1 parse(args.asJava)

//很明显了嘛,在爪子嘛,在格式化输入的参数撒 protected final void parse(List<String> args) { //这个就是分离参数的正则表达式 Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)"); int idx = 0; for (idx = 0; idx < args.size(); idx++) { String arg = args.get(idx); String value = null; Matcher m = eqSeparatedOpt.matcher(arg); if (m.matches()) { arg = m.group(1); value = m.group(2); } // Look for options with a value. String name = findCliOption(arg, opts); if (name != null) { if (value == null) { if (idx == args.size() - 1) { throw new IllegalArgumentException( String.format("Missing argument for option '%s'.", arg)); } idx++; value = args.get(idx); } if (!handle(name, value)) { break; } continue; } // Look for a switch. name = findCliOption(arg, switches); if (name != null) { // * 这里就是参数解析的关键函数 if (!handle(name, null)) { break; } continue; } if (!handleUnknown(arg)) { break; } } if (idx < args.size()) { idx++; } handleExtraArgs(args.subList(idx, args.size())); } 1.1.1.1.1 handle(name, null)

//看到这个模式匹配是不是一下就清晰了,找到这个参数,然后给属性赋值 override protected def handle(opt: String, value: String): Boolean = { opt match { // protected final String NAME = "--name"; case NAME => name = value // protected final String MASTER = "--master"; case MASTER => master = value // protected final String CLASS = "--class"; case CLASS => mainClass = value case NUM_EXECUTORS => numExecutors = value case TOTAL_EXECUTOR_CORES => totalExecutorCores = value case EXECUTOR_CORES => executorCores = value case EXECUTOR_MEMORY => executorMemory = value case DRIVER_MEMORY => driverMemory = value case DRIVER_CORES => driverCores = value case _ => throw new IllegalArgumentException(s"Unexpected argument '$opt'.") } true } 1.1.2 loadEnvironmentArguments()

// Action should be SUBMIT unless otherwise specified //第一次执行action为空 那么action赋值一定为submit action = Option(action).getOrElse(SUBMIT) 1.2 submit(appArgs, uninitLog)

runMain(args, uninitLog) 1.2.1 runMain(args, uninitLog) 删除不重要的log版

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { // (childArgs, childClasspath, sparkConf, childMainClass) // childMainClass =》 "org.apache.spark.deploy.yarn.YarnClusterApplication" -- prepareSubmitEnvironment(args) // classForName(childMainClass) -- var mainClass: Class[_] = Utils.classForName(childMainClass) // classOf[SparkApplication].isAssignableFrom(mainClass) val app: SparkApplication = -- a)mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] -- b)new JavaMainApplication(mainClass) // "org.apache.spark.deploy.yarn.YarnClusterApplication" app.start(childArgs.toArray, sparkConf) } 1.2.1.1 prepareSubmitEnvironment(args) 删除不重要版

if (isYarnCluster) { childMainClass = YARN_CLUSTER_SUBMIT_CLASS } 1.2.1.2 app.start(childArgs.toArray, sparkConf) 删除不重要版

override def start(args: Array[String], conf: SparkConf): Unit = { // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, // so remove them from sparkConf here for yarn mode. conf.remove(JARS) conf.remove(FILES) // new ClientArguments(args) 解析传过来的参数 其中 --class => userClass = value =>自己执行的那个类 // private val yarnClient = YarnClient.createYarnClient // YarnClient client = new YarnClientImpl(); // protected ApplicationClientProtocol rmClient; resourceManager的客户端说明这个client 是用来和resourceManager做交互的 // 对像明白了,接下来看看run里面都是些啥 new Client(new ClientArguments(args), conf, null).run() } 1.2.1.2.1 rmClient.run() 删除不重要版

def run(): Unit = { this.appId = submitApplication() } def submitApplication(): ApplicationId = { try { //启动了连接 launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start() // 从我们的 RM 获取新的应用程序 val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() // 设置适当的上下文来启动我们的 AM 进程 // 创建容器 // commands = JAVA_HOME/bin/java org.apache.spark.deploy.yarn.ApplicationMaster val containerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext) // 最后,提交并监控申请 yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) appId } catch { ... } }

本文共计1814个文字,预计阅读时间需要8分钟。

SparkSubmit通过Yarn提交作业的流程是怎样的?

SparkSubmit 提交流程解析及小贴士:分析基于以下执行命令启动:./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster ./examples/jars/spark-example_2.12-3.0.0.jar 10

首先执行了 spark-submit 命令,该命令用于提交 Spark 应用程序。--class 参数指定了主类名,这里是 org.apache.spark.examples.SparkPi,代表一个计算 Pi 的示例程序。--master 参数指定了集群模式,这里是 yarn,表示使用 Yarn 作为资源管理器。--deploy-mode 参数指定了部署模式,这里是 cluster,表示在集群模式下运行。./examples/jars/spark-example_2.12-3.0.0.jar 是 Spark 应用程序的 JAR 包路径。

10表示应用程序需要的 executor 数量。

SparkSubmit通过Yarn提交作业的流程是怎样的?

SparkSubmit提交流程分析

tips:分析基于如下执行命令开始

./spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-example_2.12-3.0.0.jar \ 10

首先执行了spark-submit这个脚本程序,找到这个脚本的代码

#!/usr/bin/env bash if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 #exec 调用spark-class脚本 然后传入SparkSubmit这个类 和 上面那一堆参数 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

然后我们去看spark-class这个脚本的代码(只关注重点版):

#!/usr/bin/env bash if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi . "${SPARK_HOME}"/bin/load-spark-env.sh if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" else if [ "$(command -v java)" ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fi fi if [ -d "${SPARK_HOME}/jars" ]; then SPARK_JARS_DIR="${SPARK_HOME}/jars" else SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars" fi if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2 echo "You need to build Spark with the target \"package\" before running this program." 1>&2 exit 1 else LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" fi if [ -n "$SPARK_PREPEND_CLASSES" ]; then LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH" fi if [[ -n "$SPARK_TESTING" ]]; then unset YARN_CONF_DIR unset HADOOP_CONF_DIR fi #3.$RUNNER="${JAVA_HOME}/bin/java" 调用类路径中的org.apache.spark.launcher.Main类 参数为spark-submit指定的所有参数,在这里调用launcher生成下面jvm command build_command() { "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" printf "%d\0" $? } set +o posix CMD=() DELIM=$'\n' CMD_START_FLAG="false" while IFS= read -d "$DELIM" -r ARG; do if [ "$CMD_START_FLAG" == "true" ]; then #2.CMD在这个循环里一直做累加,这个循环通过build_command把参数准备好 CMD+=("$ARG") else if [ "$ARG" == $'\0' ]; then DELIM='' CMD_START_FLAG="true" elif [ "$ARG" != "" ]; then echo "$ARG" fi fi done < <(build_command "$@") #1。我们执行了一个cmd,这个cmd从哪儿来的 CMD=("${CMD[@]:0:$LAST}") exec "${CMD[@]}"

最后执行的cmd:

/usr/lib/java/jdk1.8.0_144/bin/java -cp \ /home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/conf/:/home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/jars/* \ -Xmx1g \ org.apache.spark.deploy.SparkSubmit \ --master yarn \ --deploy-mode client \ --class org.apache.spark.examples.SparkPi \ ./examples/jars/spark-example_2.12-3.0.0.jar

所以,spark提交脚本很关键的点在于org.apache.spark.deploy.SparkSubmit这个类是怎么运作的,其他的都是参数,我们就先看看这个类的代码:

//一个可以运行的类肯定有main方法,所以我们从main方法开始 override def main(args: Array[String]): Unit = { //new 了一个sparksubmit的匿名内部类 val submit = new SparkSubmit() { self => override protected def parseArguments(args: Array[String]): SparkSubmitArguments = { new SparkSubmitArguments(args) { override protected def logInfo(msg: => String): Unit = self.logInfo(msg) override protected def logWarning(msg: => String): Unit = self.logWarning(msg) override protected def logError(msg: => String): Unit = self.logError(msg) } } override protected def logInfo(msg: => String): Unit = printMessage(msg) override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg") override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg") //所以是执行了这个方法,这个方法又调用的父类的doSubmit(args) override def doSubmit(args: Array[String]): Unit = { try { super.doSubmit(args) } catch { case e: SparkUserAppException => exitFn(e.exitCode) } } } //然后用匿名内部类执行了一个dosubmit方法,此方法在匿名内部类里已被重写 submit.doSubmit(args) } 1.从super.dosubmit开始的提交流程

def doSubmit(args: Array[String]): Unit = { // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the application starts. // 这个是日志,暂且不看 val uninitLog = initializeLogIfNecessary(true, silent = true) // *parseArguments这个方法返回了appArgs,作用在于解析参数 val appArgs = parseArguments(args) if (appArgs.verbose) { logInfo(appArgs.toString) } //这里模式匹配 appArgs.action属性一定在下面这四个之中,所以我们从parseArguments方法开始 appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) case SparkSubmitAction.PRINT_VERSION => printVersion() } } 1.1 parseArguments(args)

protected def parseArguments(args: Array[String]): SparkSubmitArguments = { //构造方法 执行了关键的1.1.1 和 1.1.2 两个东西 new SparkSubmitArguments(args) } 1.1.1 SparkSubmitArguments(args)

try { //代码块儿 parse(args.asJava) } catch { case e: IllegalArgumentException => SparkSubmit.printErrorAndExit(e.getMessage()) }

1.1.2 loadEnvironmentArguments()

1.1.1.1 parse(args.asJava)

//很明显了嘛,在爪子嘛,在格式化输入的参数撒 protected final void parse(List<String> args) { //这个就是分离参数的正则表达式 Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)"); int idx = 0; for (idx = 0; idx < args.size(); idx++) { String arg = args.get(idx); String value = null; Matcher m = eqSeparatedOpt.matcher(arg); if (m.matches()) { arg = m.group(1); value = m.group(2); } // Look for options with a value. String name = findCliOption(arg, opts); if (name != null) { if (value == null) { if (idx == args.size() - 1) { throw new IllegalArgumentException( String.format("Missing argument for option '%s'.", arg)); } idx++; value = args.get(idx); } if (!handle(name, value)) { break; } continue; } // Look for a switch. name = findCliOption(arg, switches); if (name != null) { // * 这里就是参数解析的关键函数 if (!handle(name, null)) { break; } continue; } if (!handleUnknown(arg)) { break; } } if (idx < args.size()) { idx++; } handleExtraArgs(args.subList(idx, args.size())); } 1.1.1.1.1 handle(name, null)

//看到这个模式匹配是不是一下就清晰了,找到这个参数,然后给属性赋值 override protected def handle(opt: String, value: String): Boolean = { opt match { // protected final String NAME = "--name"; case NAME => name = value // protected final String MASTER = "--master"; case MASTER => master = value // protected final String CLASS = "--class"; case CLASS => mainClass = value case NUM_EXECUTORS => numExecutors = value case TOTAL_EXECUTOR_CORES => totalExecutorCores = value case EXECUTOR_CORES => executorCores = value case EXECUTOR_MEMORY => executorMemory = value case DRIVER_MEMORY => driverMemory = value case DRIVER_CORES => driverCores = value case _ => throw new IllegalArgumentException(s"Unexpected argument '$opt'.") } true } 1.1.2 loadEnvironmentArguments()

// Action should be SUBMIT unless otherwise specified //第一次执行action为空 那么action赋值一定为submit action = Option(action).getOrElse(SUBMIT) 1.2 submit(appArgs, uninitLog)

runMain(args, uninitLog) 1.2.1 runMain(args, uninitLog) 删除不重要的log版

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { // (childArgs, childClasspath, sparkConf, childMainClass) // childMainClass =》 "org.apache.spark.deploy.yarn.YarnClusterApplication" -- prepareSubmitEnvironment(args) // classForName(childMainClass) -- var mainClass: Class[_] = Utils.classForName(childMainClass) // classOf[SparkApplication].isAssignableFrom(mainClass) val app: SparkApplication = -- a)mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] -- b)new JavaMainApplication(mainClass) // "org.apache.spark.deploy.yarn.YarnClusterApplication" app.start(childArgs.toArray, sparkConf) } 1.2.1.1 prepareSubmitEnvironment(args) 删除不重要版

if (isYarnCluster) { childMainClass = YARN_CLUSTER_SUBMIT_CLASS } 1.2.1.2 app.start(childArgs.toArray, sparkConf) 删除不重要版

override def start(args: Array[String], conf: SparkConf): Unit = { // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, // so remove them from sparkConf here for yarn mode. conf.remove(JARS) conf.remove(FILES) // new ClientArguments(args) 解析传过来的参数 其中 --class => userClass = value =>自己执行的那个类 // private val yarnClient = YarnClient.createYarnClient // YarnClient client = new YarnClientImpl(); // protected ApplicationClientProtocol rmClient; resourceManager的客户端说明这个client 是用来和resourceManager做交互的 // 对像明白了,接下来看看run里面都是些啥 new Client(new ClientArguments(args), conf, null).run() } 1.2.1.2.1 rmClient.run() 删除不重要版

def run(): Unit = { this.appId = submitApplication() } def submitApplication(): ApplicationId = { try { //启动了连接 launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start() // 从我们的 RM 获取新的应用程序 val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() // 设置适当的上下文来启动我们的 AM 进程 // 创建容器 // commands = JAVA_HOME/bin/java org.apache.spark.deploy.yarn.ApplicationMaster val containerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext) // 最后,提交并监控申请 yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) appId } catch { ... } }