Spark On YARN(Yarn-Cluster模式)启动流程源码分析

琉璃若梦 2024-02-18 ⋅ 24 阅读

概述

在上一篇博客[^1^]中,我们介绍了Spark On YARN的启动流程,并对主要步骤进行了分析。在本篇博客中,我们将继续分析Spark On YARN的源码,并丰富内容,从而更好地理解Spark On YARN的运行机制。

Spark On YARN的启动流程

  1. 首先,当我们通过提交Spark应用程序的命令时,YARN的Client模式将会被触发。具体而言,我们可以通过以下命令提交应用程序:

    spark-submit --class <main-class> --master yarn --deploy-mode client <application-jar> <application-arguments>
    
  2. 在启动流程中的第一步,我们需要创建一个YarnClient实例,以与YARN资源管理器进行通信。该实例通过向资源管理器发送应用程序的启动请求的方式,将应用程序提交给YARN。

    val yarnClient = YarnClient.createYarnClient()
    yarnClient.init(yarnConf)
    yarnClient.start()
    
  3. 接下来,我们需要创建一个ApplicationSubmissionContext实例,用于设置和提交应用程序的上下文信息。在这个阶段,我们可以设置各种应用程序相关的属性,包括应用程序名称、队列等。

    val appContext = yarnClient.createApplication()
    val appSubmissionContext = appContext.getApplicationSubmissionContext()
    appSubmissionContext.setApplicationName(appName)
    appSubmissionContext.setQueue(queue)
    
  4. 紧接着,我们需要为应用程序指定资源需求,包括ResourceRequest用于指定驱动程序的资源需求和ContainerLaunchContext用于指定Executor的资源需求。

    // 设置驱动程序资源需求
    val resource = Records.newRecord(classOf[Resource])
    resource.setMemory(driverMemory)
    resource.setVirtualCores(driverCores)
    appSubmissionContext.setResource(resource)
    
    // 设置Executor资源需求
    val launchContext = Records.newRecord(classOf[ContainerLaunchContext])
    launchContext.setCommands(commands)
    launchContext.setLocalResources(localResources)
    val priority = Records.newRecord(classOf[Priority])
    priority.setPriority(0)
    val containerRequest = new AMRMClient.ContainerRequest(resource, null, null, priority)
    val containerRequests = Collections.singletonList(containerRequest)
    rmClient.addContainerRequest(containerRequests.get(0))
    
  5. 接下来,我们将应用程序的上下文信息提交给YARN资源管理器,并获取应用程序的ApplicationId,这个Id将作为后面与应用程序进行交互的标识。

    val appId = appContext.getApplicationId()
    appContext.submitApplication(appSubmissionContext)
    
  6. 在应用程序成功提交后,我们可以通过YarnClusterApplication获取应用程序的状态、输出日志等相关信息。

    val clusterApplication = yarnClient.createApplication().getSessionState(appId)
    val logUrls = clusterApplication.getAppAttemptLogUrls
    val state = clusterApplication.getAppMasterState
    
  7. 最后,我们可以停止YarnClient,释放资源,并关闭与YARN资源管理器的连接。

    yarnClient.stop()
    

结论

通过对Spark On YARN启动流程的源码分析,我们可以更加深入地了解Spark在YARN上的运行机制,以及在YARN上为应用程序分配资源的过程。同时,我们也可以通过修改相关配置,实现对应用程序的调优和性能优化。希望本篇博客能够对读者有所帮助。

参考资料

[^1^]: Spark On YARN(Yarn-Cluster模式)启动流程源码分析(一)


全部评论: 0

    我有话说: