大数据中Spark任务和集群启动流程是什么样的

这篇文章将为大家详细讲解有关大数据中Spark任务和集群启动流程是什么样的,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。大数据分享Spark任务和集群启动流程,Spark集群启动流程1.调用start-all.sh脚本,开始启动Master2.Master启动以后,preStart方法调用了一个定时器,定时检查超时的Worker后删除3.启动脚本会解析slaves配置文件,找到启动Worker的相应节点.开始启动Worker4.Worker服务启动后开始调用preStart方法开始向所有的Master进行注册5.Master接收到Worker发送过来的注册信息,Master开始保存注册信息并把自己的URL响应给Worker6.Worker接收到Master的URL后并更新,开始调用一个定时器,定时的向Master发送心跳信息任务提交流程1.Driver端会通过spark-submit脚本启动SaparkSubmit进程,此时创建了一个非常重要的对象(SparkContext),开始向Master发送消息2.Master接收到发送过来的信息后开始生成任务信息,并把任务信息放到一个对列里3.Master把所有有效的Worker过滤出来,按照空闲的资源进行排序4.Master开始向有效的Worker通知拿取任务信息并启动相应的Executor5.Worker启动Executor并向Driver反向注册6.Driver开始把生成的task发送给相应的Executor,Executor开始执行任务集群启动流程1.首先创建Master类import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.{Config, ConfigFactory}import scala.collection.mutableimport scala.concurrent.duration._class Master(val masterHost: String, val masterPort: Int) extends Actor{// 用来存储Worker的注册信息val idToWorker = new mutable.HashM 香港云主机ap[String, WorkerInfo]()// 用来存储Worker的信息val workers = new mutable.HashSet[WorkerInfo]()// Worker的超时时间间隔val checkInterval: Long = 15000// 生命周期方法,在构造器之后,receive方法之前只调用一次override def preStart(): Unit = {// 启动一个定时器,用来定时检查超时的Workerimport context.dispatchercontext.system.scheduler.schedule(0 millis, checkInterval millis, self, CheckTimeOutWorker)}// 在preStart方法之后,不断的重复调用override def receive: Receive = {// Worker -> Mastercase RegisterWorker(id, host, port, memory, cores) => {if (!idToWorker.contains(id)){val workerInfo = new WorkerInfo(id, host, port, memory, cores)idToWorker += (id -> workerInfo)workers += workerInfoprintln(“a worker registered”)sender ! RegisteredWorker(s”akka.tcp://${Master.MASTER_SYSTEM}” +s”@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}”)}}case HeartBeat(workerId) => {// 通过传过来的workerId获取对应的WorkerInfoval workerInfo: WorkerInfo = idToWorker(workerId)// 获取当前时间val currentTime = System.currentTimeMillis()// 更新最后一次心跳时间workerInfo.lastHeartbeatTime = currentTime}case CheckTimeOutWorker => {val currentTime = System.currentTimeMillis()val toRemove: mutable.HashSet[WorkerInfo] =workers.filter(w => currentTime – w.lastHeartbeatTime > checkInterval)// 将超时的Worker从idToWorker和workers中移除toRemove.foreach(deadWorker => {idToWorker -= deadWorker.idworkers -= deadWorker})println(s”num of workers: ${workers.size}”)}}}object Master{val MASTER_SYSTEM = “MasterSystem”val MASTER_ACTOR = “Master”def main(args: Array[String]): Unit = {val host = args(0)val port = args(1).toIntval configStr =s”””|akka.actor.provider = “akka.remote.RemoteActorRefProvider”|akka.remote.netty.tcp.hostname = “$host”|akka.remote.netty.tcp.port = “$port””””.stripMargin// 配置创建Actor需要的配置信息val config: Config = ConfigFactory.parseString(configStr)// 创建ActorSystemval actorSystem: ActorSystem = ActorSystem(MASTER_SYSTEM, config)// 用actorSystem实例创建ActoractorSystem.actorOf(Props(new Master(host, port)), MASTER_ACTOR)actorSystem.awaitTermination()}}2.创建RemoteMsg特质trait RemoteMsg extends Serializable{}// Master -> self(Master)case object CheckTimeOutWorker// Worker -> Mastercase class RegisterWorker(id: String, host: String,port: Int, memory: Int, cores: Int) extends RemoteMsg// Master -> Workercase class RegisteredWorker(masterUrl: String) extends RemoteMsg// Worker -> selfcase object SendHeartBeat// Worker -> Master(HeartBeat)case class HeartBeat(workerId: String) extends RemoteMsg3.创建Worker类import java.util.UUIDimport akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}import com.typesafe.config.{Config, ConfigFactory}import scala.concurrent.duration._class Worker(val host: String, val port: Int, val masterHost: String,val masterPort: Int, val memory: Int, val cores: Int) extends Actor{// 生成一个Worker IDval workerId = UUID.randomUUID().toString// 用来存储MasterURLvar masterUrl: String = _// 心跳时间间隔val heartBeat_interval: Long = 10000// master的Actorvar master: ActorSelection = _override def preStart(){// 获取Master的Actormaster = context.actorSelection(s”akka.tcp://${Master.MASTER_SYSTEM}” +s”@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}”)master ! RegisterWorker(workerId, host, port, memory, cores)}override def receive: Receive = {// Worker接收到Master发送过来的注册成功的信息(masterUrl)case RegisteredWorker(masterUrl) => {this.masterUrl = masterUrl// 启动一个定时器,定时给Master发送心跳import context.dispatchercontext.system.scheduler.schedule(0 millis, heartBeat_interval millis, self, SendHeartBeat)}case SendHeartBeat => {// 向Master发送心跳master ! HeartBeat(workerId)}}}object Worker{val WORKER_SYSTEM = “WorkerSystem”val WORKER_ACTOR = “Worker”def main(args: Array[String]): Unit = {val host = args(0)val port = args(1).toIntval masterHost = args(2)val masterPort = args(3).toIntval memory = args(4).toIntval cores = args(5).toIntval configStr =s”””|akka.actor.provider = “akka.remote.RemoteActorRefProvider”|akka.remote.netty.tcp.hostname = “$host”|akka.remote.netty.tcp.port = “$port””””.stripMargin// 配置创建Actor需要的配置信息val config: Config = ConfigFactory.parseString(configStr)// 创建ActorSystemval actorSystem: ActorSystem = ActorSystem(WORKER_SYSTEM, config)// 用actorSystem实例创建Actorval worker: ActorRef = actorSystem.actorOf(Props(new Worker(host, port, masterHost, masterPort, memory, cores)), WORKER_ACTOR)actorSystem.awaitTermination()}}4.创建初始化类class WorkerInfo(val id: String, val host: String, val port: Int,val memory: Int, val cores: Int) {// 初始化最后一次心跳的时间var lastHeartbeatTime: Long = _}5.本地测试需要传入参数:关于大数据中Spark任务和集群启动流程是什么样的就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

相关推荐: 如何使用plink进行连锁不平衡分析

本篇文章为大家展示了如何使用plink进行连锁不平衡分析,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。plink是进行连锁不平衡分析的常用工具之一,需要两个基本的输入文件,后缀分别为ped和map。ped文件格式在之前…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 09/23 18:18
下一篇 09/23 18:31

相关推荐

发表评论

您的电子邮箱地址不会被公开。