#!/usr/bin/env bash # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi #disable randomized hashfor string in Python 3.3+ export PYTHONHASHSEED=0
defmain(args: Array[String]): Unit = { val submit = newSparkSubmit() submit.doSubmit(args) }
执行了doSubmit 再去看看这里边干了点啥,一个模式匹配,执行submit方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
defdoSubmit(args: Array[String]): Unit = { // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the application starts. val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = parseArguments(args) if (appArgs.verbose) { logInfo(appArgs.toString) } appArgs.action match { caseSparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) caseSparkSubmitAction.KILL => kill(appArgs) caseSparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) caseSparkSubmitAction.PRINT_VERSION => printVersion() } }
privatedefsubmit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
defdoRunMain(): Unit = { //拿到反向代理对象,在这个方法中,首先会检查参数中是否指定了proxyUser,这个参数是指定spark 程序运行的用户,并且检查了用户是否有提交spark 应用程序的权限。一般在使用了安全管理的集群中配合使用。我们前面的提交脚本中没有指定proxy User,所以这里会直接执行runMain方法。我们跟进去看看runMain 方法 if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(newPrivilegedExceptionAction[Unit]() { //解析到入口类的main方法去runMain里再看看 overridedefrun(): Unit = { runMain(args, uninitLog) } }) } catch { case e: Exception => // Hadoop's AuthorizationException suppresses the exception's stack trace, which // makes the message printed to the output by the JVM not very helpful. Instead, // detect exceptions with empty stack traces here, and treat them differently. if (e.getStackTrace().length == 0) { error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") } else { throw e } } } else { runMain(args, uninitLog) } }
// In standalone cluster mode, there are two submission gateways: // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper // (2) The new REST-based gateway introduced in Spark 1.3 // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over // to use the legacy gateway if the master endpoint turns out to be not a REST server. if (args.isStandaloneCluster && args.useRest) { try { logInfo("Running Spark using the REST application submission protocol.") doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => logWarning(s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.") args.useRest = false submit(args, false) } // In all other modes, just run the main class as prepared } else { doRunMain() } }
privatedefrunMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { //首先是拿到各种参数解析校验 val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) // Let the main class re-initialize the logging system once it starts. if (uninitLog) { Logging.uninitialize() }
if (args.verbose) { logInfo(s"Main class:\n$childMainClass") logInfo(s"Arguments:\n${childArgs.mkString("\n")}") // sysProps may contain sensitive information, so redact before printing logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}") logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}") logInfo("\n") } val loader = getSubmitClassLoader(sparkConf) for (jar <- childClasspath) { addJarToClasspath(jar, loader) }
var mainClass: Class[_] = null
try { //获取到入口类的main方法,也就是你传入的你要执行的类,通过类名加载这个类 mainClass = Utils.classForName(childMainClass) } catch { case e: ClassNotFoundException => logError(s"Failed to load class $childMainClass.") if (childMainClass.contains("thriftserver")) { logInfo(s"Failed to load main class $childMainClass.") logInfo("You need to build Spark with -Phive and -Phive-thriftserver.") } thrownewSparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS) case e: NoClassDefFoundError => logError(s"Failed to load $childMainClass: ${e.getMessage()}") if (e.getMessage.contains("org/apache/hadoop/hive")) { logInfo(s"Failed to load hive class.") logInfo("You need to build Spark with -Phive and -Phive-thriftserver.") } thrownewSparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS) } //反射创建类的对象并进行类型转换 val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] } else { newJavaMainApplication(mainClass) }
@tailrec deffindCause(t: Throwable): Throwable = t match { case e: UndeclaredThrowableException => if (e.getCause() != null) findCause(e.getCause()) else e case e: InvocationTargetException => if (e.getCause() != null) findCause(e.getCause()) else e case e: Throwable => e }