基于案例分析Spark Streaming流计算框架的运行源码怎么写


今天就跟大家聊聊有关基于案例分析Spark Streaming流计算框架的运行源码怎么写,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。第一部分案例:packagecom.dt.spark.sparkstreaming

importcom.robinspark.utils.ConnectionPool
importorg.apache.spark.SparkConf
importorg.apache.spark.sql.Row
importorg.apache.spark.sql.hive.HiveContext
importorg.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}
importorg.apache.spark.streaming.{Seconds,StreamingContext}

/**
*使用SparkStreaming+SparkSQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三种手机、电视这个类别
*下最热门的三种电视,该实例在实际生产环境下具有非常重大的意义;
*实现技术:SparkStreaming+SparkSQL,之所以SparkStreaming能够使用MLsqlgraphx等功能是因为有foreachRDDTransform
*等接口,这些接口中其实是基于RDD进行操作,所以以RDD为基石,就可以直接使用Spark其它所有的功能,就像直接调用API一样简单。
*假设说这里的数据的格式:useritemcategory,例如RockySamsungAndroid
*/
objectOnlineTheTop3ItemForEachCategory2DB{
defmain(args:Array[String]){
/**
*1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
*例如说通过setMaster来设置程序要链接的Spark集群的MasterURL,如果设置
*local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
*只有1G的内存)的初学者 *
*/
valconf=newSparkConf()//创建SparkConf对象
conf.setAppName(“OnlineTheTop3ItemForEachCategory2DB”)//设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setMaster(“spark://Master:7077”)//此时,程序在Spark集群
//conf.setMaster(“local[2]”)
//设置batchDuration时间间隔来控制Job生成的频率并且创建SparkStreaming执行的入口
valssc=newStreamingContext(conf,Seconds(5))
ssc.checkpoint(“/root/Documents/SparkApps/checkpoint”)
valuserClickLogsDStream=ssc.socketTextStream(“Master”,9999)
//格式为(category_item,1) =>(K,V)
valformattedUserClickLogsDStream=userClickLogsDStream.map(clickLog=>
(clickLog.split(“”)(2)+”_”+clickLog.split(“”)(1),1))
valcategoryUserClickLogsDStream=formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,
_-_,Seconds(60),Seconds(20))

categoryUserClickLogsDStream.foreachRDD{rdd=>{
if(rdd.isEmpty()){
println(“Nodatainputted!!!”)
}else{
valcategoryItemRow=rdd.map(reducedIte 香港云主机m=>{
valcategory=reducedItem._1.split(“_”)(0)
valitem=reducedItem._1.split(“_”)(1)
valclick_count=reducedItem._2
Row(category,item,click_count)
})

valstructType=StructType(Array(
StructField(“category”,StringType,true),
StructField(“item”,StringType,true),
StructField(“click_count”,IntegerType,true)
))

valhiveContext=newHiveContext(rdd.context)
valcategoryItemDF=hiveContext.createDataFrame(categoryItemRow,structType)
categoryItemDF.registerTempTable(“categoryItemTable”)
valreseltDataFram=hiveContext.sql(“SELECTcategory,item,click_countFROM(SELECTcategory,item,click_count,row_number()”+
“OVER(PARTITIONBYcategoryORDERBYclick_countDESC)rankFROMcategoryItemTable)subquery”+
“WHERErank reseltDataFram.show()

valresultRowRDD=reseltDataFram.rdd

resultRowRDD.foreachPartition{partitionOfRecords=>{

if(partitionOfRecords.isEmpty){
println(“ThisRDDisnotnullbutpartitionisnull”)
}else{
//ConnectionPoolisastatic,lazilyinitializedpoolofconnections
valconnection=ConnectionPool.getConnection()
partitionOfRecords.foreach(record=>{
valsql=”insertintocategorytop3(category,item,click_count)values(‘”+record.getAs(“category”)+”‘,'”+
record.getAs(“item”)+”‘,”+record.getAs(“click_count”)+”)”
valstmt=connection.createStatement();
stmt.executeUpdate(sql);

})
ConnectionPool.returnConnection(connection)//returntothepoolforfuturereuse
}
}
}
}
}
}
/**
*StreamingContext调用start方法的内部其实是会启动JobSchedulerStart方法,进行消息循环,在JobScheduler
*start内部会构造JobGeneratorReceiverTacker,并且调用JobGeneratorReceiverTackerstart方法:
* 1JobGenerator启动后会不断的根据batchDuration生成一个个的Job
* 2ReceiverTracker启动后首先在SparkCluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到
* 数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker
* 内部会通过ReceivedBlockTracker来管理接受到的元数据信息
*每个BatchInterval会产生一个具体的Job,其实这里的Job不是SparkCore中所指的Job,它只是基于DStreamGraph而生成的RDD
*DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个
*单独的线程来提交Job到集群运行(其实是在线程中基于RDDAction触发真正的作业的运行),为什么使用线程池呢?
* 1,作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;
* 2,有可能设置了JobFAIR公平调度的方式,这个时候也需要多线程的支持;
*
*/
ssc.start()
ssc.awaitTermination()
}
}第二部分源码解析:1.根据传递的SparkConf参数创建StreamingContext对象,在内部创建SparkContext
/***CreateaStreamingContextbyprovidingtheconfigurationnecessaryforanewSparkContext.*@paramconfaorg.apache.spark.SparkConfobjectspecifyingSparkparameters*@parambatchDurationthetimeintervalatwhichstreamingdatawillbedividedintobatches*/defthis(conf:SparkConf,batchDuration:Duration)={this(StreamingContext.createNewSparkContext(conf),null,batchDuration)} private[streaming]defcreateNewSparkContext(conf:SparkConf):SparkContext={newSparkContext(conf)}这说明Spark Streaming也是Spark上的一个应用程序2.创建Socket输入流,socketTextStream方法定义如下:
/***CreateainputstreamfromTCPsourcehostname:port.Dataisreceivedusing*aTCPsocketandthereceivebytesisinterpretedasUTF8encoded`n`delimited*lines.*@paramhostnameHostnametoconnecttoforreceivingdata*@paramportPorttoconnecttoforreceivingdata*@paramstorageLevelStorageleveltouseforstoringthereceivedobjects*(default:StorageLevel.MEMORY_AND_DISK_SER_2)*/defsocketTextStream(hostname:String,port:Int,storageLevel:StorageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):ReceiverInputDStream[String]=withNamedScope(“sockettextstream”){socketStream[String](hostname,port,SocketReceiver.bytesToLines,storageLevel)}可看到代码最后面调用socketStream,socketStream定义如下:/***CreateainputstreamfromTCPsourcehostname:port.Dataisreceivedusing*aTCPsocketandthereceivebytesitinterepretedasobjectusingthegiven*converter.*@paramhostnameHostnametoconnecttoforreceivingdata*@paramportPorttoconnecttoforreceivingdata*@paramconverterFunctiontoconvertthebytestreamtoobjects*@paramstorageLevelStorageleveltouseforstoringthereceivedobjects*@tparamTTypeoftheobjectsreceived(afterconvertingbytestoobjects)*/defsocketStream[T:ClassTag](hostname:String,port:Int,converter:(InputStream)=>Iterator[T],storageLevel:StorageLevel):ReceiverInputDStream[T]={newSocketInputDStream[T](this,hostname,port,converter,storageLevel)}实际上生成SocketInputDStream,SocketInputDStream类如下:private[streaming]classSocketInputDStream[T:ClassTag](ssc_:StreamingContext,host:String,port:Int,bytesToObjects:InputStream=>Iterator[T],storageLevel:StorageLevel)extendsReceiverInputDStream[T](ssc_){defgetReceiver():Receiver[T]={newSocketReceiver(host,port,bytesToObjects,storageLevel)}}SocketInputDStream继承ReceiverInputDStream。
其中实现getReceiver方法,返回SocketReceiver对象。
总结一下SocketInputDStream的继承关系:SocketInputDStream -> ReceiverInputDStream -> InputDStream -> DStream。DStream是生成RDD的模板,是逻辑级别,当达到Interval的时候这些模板会被BatchData实例化成为RDD和DAG。看看DStream的源码片段://RDDsgenerated,markedasprivate[streaming]sothattestsuitescanaccessit@transientprivate[streaming]vargeneratedRDDs=newHashMap[Time,RDD[T]]()看看DStream的getOrCompute方法:/***GettheRDDcorrespondingtothegiventime;eitherretrieveitfromcache*orcompute-and-cacheit.*/private[streaming]finaldefgetOrCompute(time:Time):Option[RDD[T]]={//IfRDDwasalreadygenerated,thenretrieveitfromHashMap,//orelsecomputetheRDDgeneratedRDDs.get(time).orElse{//ComputetheRDDiftimeisvalid(e.g.correcttimeinaslidingwindow)//ofRDDgeneration,elsegeneratenothing.if(isTimeValid(time)){valrddOption=createRDDWithLocalProperties(time,displayInnerRDDOps=false){//Disablechecksforexistingoutputdirectoriesinjobslaunchedbythestreaming//scheduler,sincewemayneedtowriteoutputtoanexistingdirectoryduringcheckpoint//recovery;seeSPARK-4835formoredetails.Weneedtohavethiscallherebecause//compute()mightcauseSparkjobstobelaunched.PairRDDFunctions.disableOutputSpecValidation.withValue(true){compute(time)}}rddOption.foreach{casenewRDD=>//RegisterthegeneratedRDDforcachingandcheckpointingif(storageLevel!=StorageLevel.NONE){newRDD.persist(storageLevel)logDebug(s”PersistingRDD${newRDD.id}fortime$timeto$storageLevel”)}if(checkpointDuration!=null&&(time-zeroTime).isMultipleOf(checkpointDuration)){newRDD.checkpoint()logInfo(s”MarkingRDD${newRDD.id}fortime$timeforcheckpointing”)}generatedRDDs.put(time,newRDD)}rddOption}else{None}}}主要是生成RDD,再将生成的RDD放在HashMap中。具体生成RDD过程以后剖析。目前大致讲了DStream和RDD这些核心概念在Spark Streaming中的使用。
先看看ScreamingContext的start()。start()方法启动StreamContext,由于Spark应用程序不能有多个SparkContext对象实例,所以SparkStreaming框架在启动时对状态进行判断。代码如下:
/***Starttheexecutionofthestreams.**@throwsIllegalStateExceptioniftheStreamingContextisalreadystopped.*/defstart():Unit=synchronized{statematch{caseINITIALIZED=>startSite.set(DStream.getCreationSite())StreamingContext.ACTIVATION_LOCK.synchronized{StreamingContext.assertNoOtherContextIsActive()try{validate()//Startthestreamingschedulerinanewthread,sothatthreadlocalproperties//likecallsitesandjobgroupscanberesetwithoutaffectingthoseofthe//currentthread.ThreadUtils.runInNewThread(“streaming-start”){sparkContext.setCallSite(startSite.get)sparkContext.clearJobGroup()sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,”false”)//启动JobSchedulerscheduler.start()}state=StreamingContextState.ACTIVE}catch{caseNonFatal(e)=>logError(“Errorstartingthecontext,markingitasstopped”,e)scheduler.stop(false)state=StreamingContextState.STOPPEDthrowe}StreamingContext.setActiveContext(this)}shutdownHookRef=ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)//RegisteringStreamingMetricsatthestartoftheStreamingContextassert(env.metricsSystem!=null)env.metricsSystem.registerSource(streamingSource)uiTab.foreach(_.attach())logInfo(“StreamingContextstarted”)caseACTIVE=>logWarning(“StreamingContexthasalreadybeenstarted”)caseSTOPPED=>thrownewIllegalStateException(“StreamingContexthasalreadybeenstopped”)}}初始状态时,会启动JobScheduler。来看下JobScheduler的启动过程start()。其中启动了EventLoop、StreamListenerBus、ReceiverTracker和jobGenerator等多项工作。defstart():Unit=synchronized{if(eventLoop!=null)return//schedulerhasalreadybeenstartedlogDebug(“StartingJobScheduler”) eventLoop=newEventLoop[JobSchedulerEvent](“JobScheduler”){overrideprotecteddefonReceive(event:JobSchedulerEvent):Unit=processEvent(event)overrideprotecteddefonError(e:Throwable):Unit=reportError(“Errorinjobscheduler”,e)}// 启动消息循环处理线程。用于处理JobScheduler的各种事件。eventLoop.start()//attachratecontrollersofinputstreamstoreceivebatchcompletionupdatesfor{inputDStream
rateController
}ssc.addStreamingListener(rateController)// 启动监听器。用于更新Spark UI中StreamTab的内容。listenerBus.start(ssc.sparkContext)receiverTracker=newReceiverTracker(ssc)// 生成InputInfoTracker。用于管理所有的输入的流,以及他们输入的数据统计。这些信息将通过StreamingListener监听。inputInfoTracker=newInputInfoTracker(ssc)// 启动ReceiverTracker。用于处理数据接收、数据缓存、Block生成。receiverTracker.start()// 启动JobGenerator。用于DStreamGraph初始化、DStream与RDD的转换、生成Job、提交执行等工作。jobGenerator.start()logInfo(“StartedJobScheduler”)}JobScheduler中的消息处理函数processEvent,处理三类消息:Job已开始,Job已完成,错误报告。privatedefprocessEvent(event:JobSchedulerEvent){try{eventmatch{caseJobStarted(job,startTime)=>handleJobStart(job,startTime)caseJobCompleted(job,completedTime)=>handleJobCompletion(job,completedTime)caseErrorReported(m,e)=>handleError(m,e)}}catch{casee:Throwable=>reportError(“Errorinjobscheduler”,e)}}我们再粗略地分析一下JobScheduler.start()中启动的工作。先看JobScheduler.start()启动的第一项工作EventLoop。EventLoop用于处理JobScheduler的各种事件。EventLoop中有事件队列:
privatevaleventQueue:BlockingQueue[E]=newLinkedBlockingDeque[E]()
还有一个线程处理队列中的事件:privatevaleventThread=newThread(name){setDaemon(true)
overridedefrun():Unit={try{while(!stopped.get){valevent=eventQueue.take()try{onReceive(event)}catch{caseNonFatal(e)=>{try{onError(e)}catch{caseNonFatal(e)=>logError(“Unexpectederrorin”+name,e)}}}}}catch{caseie:InterruptedException=>//exitevenifeventQueueisnotemptycaseNonFatal(e)=>logError(“Unexpectederrorin”+name,e)}}}这个线程中的onReceive、onError,在JobScheduler中的EventLoop实例化时已定义。
JobScheduler.start()启动的第二项工作StreamListenerBus。用于异步传递StreamingListenerEvents到注册的StreamingListeners。用于更新Spark UI中StreamTab的内容。  以下代码用于传递各种事件:
overridedefonPostEvent(listener:StreamingListener,event:StreamingListenerEvent):Unit={eventmatch{casereceiverStarted:StreamingListenerReceiverStarted=>listener.onReceiverStarted(receiverStarted)casereceiverError:StreamingListenerReceiverError=>listener.onReceiverError(receiverError)casereceiverStopped:StreamingListenerReceiverStopped=>listener.onReceiverStopped(receiverStopped)casebatchSubmitted:StreamingListenerBatchSubmitted=>listener.onBatchSubmitted(batchSubmitted)casebatchStarted:StreamingListenerBatchStarted=>listener.onBatchStarted(batchStarted)casebatchCompleted:StreamingListenerBatchCompleted=>listener.onBatchCompleted(batchCompleted)caseoutputOperationStarted:StreamingListenerOutputOperationStarted=>listener.onOutputOperationStarted(outputOperationStarted)caseoutputOperationCompleted:StreamingListenerOutputOperationCompleted=>listener.onOutputOperationCompleted(outputOperationCompleted)case_=>}}看JobScheduler.start()启动的第三项工作ReceiverTracker。ReceiverTracker的start()中,内部实例化ReceiverTrackerEndpoint这个Rpc消息通信体。
defstart():Unit=synchronized{if(isTrackerStarted){thrownewSparkException(“ReceiverTrackeralreadystarted”)}if(!receiverInputStreams.isEmpty){endpoint=ssc.env.rpcEnv.setupEndpoint(“ReceiverTracker”,newReceiverTrackerEndpoint(ssc.env.rpcEnv))if(!skipReceiverLaunch)launchReceivers()logInfo(“ReceiverTrackerstarted”)trackerState=Started}}
在ReceiverTracker启动的过程中会调用其launchReceivers方法:
/***GetthereceiversfromtheReceiverInputDStreams,distributesthemtothe*workernodesasaparallelcollection,andrunsthem.*/privatedeflaunchReceivers():Unit={valreceivers=receiverInputStreams.map(nis=>{valrcvr=nis.getReceiver()rcvr.setReceiverId(nis.id)rcvr})runDummySparkJob()logInfo(“Starting”+receivers.length+”receivers”)endpoint.send(StartAllReceivers(receivers))}
其中调用了runDummySparkJob方法来启动SparkStreaming的框架第一个Job,其中collect这个action操作会触发SparkJob的执行。这个方法是为了确保每个Slave都注册上,避免所有Receiver都在一个节点,使后面的计算能负载均衡。/***RunthedummySparkjobtoensurethatallslaveshaveregistered.Thisavoidsallthe*receiverstobescheduledonthesamenode.**TODOShouldpolltheexecutornumberandwaitforexecutorsaccordingto*”spark.scheduler.minRegisteredResourcesRatio”and*”spark.scheduler.maxRegisteredResourcesWaitingTime”ratherthanrunningadummyjob.*/privatedefrunDummySparkJob():Unit={if(!ssc.sparkContext.isLocal){ssc.sparkContext.makeRDD(1to50,50).map(x=>(x,1)).reduceByKey(_+_,20).collect()}assert(getExecutors.nonEmpty)}
ReceiverTracker.launchReceivers()还调用了endpoint.send(StartAllReceivers(receivers))方法,Rpc消息通信体发送StartAllReceivers消息。ReceiverTrackerEndpoint它自己接收到消息后,先根据调度策略获得Recevier在哪个Executor上运行,然后在调用startReceiver(receiver,executors)方法,来启动Receiver。overridedefreceive:PartialFunction[Any,Unit]={//LocalmessagescaseStartAllReceivers(receivers)=>valscheduledLocations=schedulingPolicy.scheduleReceivers(receivers,getExecutors)for(receiver
valexecutors=scheduledLocations(receiver.streamId)updateReceiverScheduledExecutors(receiver.streamId,executors)receiverPreferredLocations(receiver.streamId)=receiver.preferredLocationstartReceiver(receiver,executors)}在startReceiver方法中,ssc.sparkContext.submitJob提交Job的时候传入startReceiverFunc这个方法,因为startReceiverFunc该方法是在Executor上执行的。而在startReceiverFunc方法中是实例化ReceiverSupervisorImpl对象,该对象是对Receiver进行管理和监控。这个Job是SparkStreaming框架为我们启动的第二个Job,且一直运行。因为supervisor.awaitTermination()该方法会阻塞等待退出。/***Startareceiveralongwithitsscheduledexecutors*/privatedefstartReceiver(receiver:Receiver[_],scheduledLocations:Seq[TaskLocation]):Unit={defshouldStartReceiver:Boolean={//It’sokaytostartwhentrackerStateisInitializedorStarted!(isTrackerStopping||isTrackerStopped)}valreceiverId=receiver.streamIdif(!shouldStartReceiver){onReceiverJobFinish(receiverId)return}
valcheckpointDirOption=Option(ssc.checkpointDir)valserializableHadoopConf=newSerializableConfiguration(ssc.sparkContext.hadoopConfiguration)//FunctiontostartthereceiverontheworkernodevalstartReceiverFunc:Iterator[Receiver[_]]=>Unit=(iterator:Iterator[Receiver[_]])=>{if(!iterator.hasNext){thrownewSparkException(“Couldnotstartreceiverasobjectnotfound.”)}if(TaskContext.get().attemptNumber()==0){valreceiver=iterator.next()assert(iterator.hasNext==false)//实例化Receiver监控者valsupervisor=newReceiverSupervisorImpl(receiver,SparkEnv.get,serializableHadoopConf.value,checkpointDirOption)supervisor.start()supervisor.awaitTermination()}else{//It’srestartedbyTaskScheduler,butwewanttorescheduleitagain.Soexitit.}}
//CreatetheRDDusingthescheduledLocationstorunthereceiverinaSparkjobvalreceiverRDD:RDD[Receiver[_]]=if(scheduledLocations.isEmpty){ssc.sc.makeRDD(Seq(receiver),1)}else{valpreferredLocations=scheduledLocations.map(_.toString).distinctssc.sc.makeRDD(Seq(receiver->preferredLocations))}receiverRDD.setName(s”Receiver$receiverId”)ssc.sparkContext.setJobDescription(s”Streamingjobrunningreceiver$receiverId”)ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))valfuture=ssc.sparkContext.submitJob[Receiver[_],Unit,Unit](receiverRDD,startReceiverFunc,Seq(0),(_,_)=>Unit,())//WewillkeeprestartingthereceiverjobuntilReceiverTrackerisstoppedfuture.onComplete{caseSuccess(_)=>if(!shouldStartReceiver){onReceiverJobFinish(receiverId)}else{logInfo(s”RestartingReceiver$receiverId”)self.send(RestartReceiver(receiver))}caseFailure(e)=>if(!shouldStartReceiver){onReceiverJobFinish(receiverId)}else{logError(“Receiverhasbeenstopped.Trytorestartit.”,e)logInfo(s”RestartingReceiver$receiverId”)self.send(RestartReceiver(receiver))}}(submitJobThreadPool)logInfo(s”Receiver${receiver.streamId}started”)}接下来看下ReceiverSupervisorImpl的启动过程,先启动所有注册上的BlockGenerator对象,然后向ReceiverTrackerEndpoint发送RegisterReceiver消息,再调用receiver的onStart方法。/**Startthesupervisor*/defstart(){onStart()startReceiver()}其中的onStart():overrideprotecteddefonStart(){registeredBlockGenerators.foreach{_.start()}}
其中的startReceiver():
/**Startreceiver*/defstartReceiver():Unit=synchronized{try{if(onReceiverStart()){logInfo(“Startingreceiver”)receiverState=Startedreceiver.onStart()logInfo(“CalledreceiveronStart”)}else{//Thedriverrefusedusstop(“RegisteredunsuccessfullybecauseDriverrefusedtostartreceiver”+streamId,None)}}catch{caseNonFatal(t)=>stop(“Errorstartingreceiver”+streamId,Some(t))}}overrideprotecteddefonReceiverStart():Boolean={valmsg=RegisterReceiver(streamId,receiver.getClass.getSimpleName,host,executorId,endpoint)trackerEndpoint.askWithRetry[Boolean](msg)}其中在Driver运行的ReceiverTrackerEndpoint对象接收到RegisterReceiver消息后,将streamId,typ,host,executorId,receiverEndpoint封装为ReceiverTrackingInfo保存到内存对象receiverTrackingInfos这个HashMap中。overridedefreceiveAndReply(context:RpcCallContext):PartialFunction[Any,Unit]={//RemotemessagescaseRegisterReceiver(streamId,typ,host,executorId,receiverEndpoint)=>valsuccessful=registerReceiver(streamId,typ,host,executorId,receiverEndpoint,context.senderAddress)context.reply(successful)caseAddBlock(receivedBlockInfo)=>if(WriteAheadLogUtils.isBatchingEnabled(ssc.conf,isDriver=true)){walBatchingThreadPool.execute(newRunnable{overridedefrun():Unit=Utils.tryLogNonFatalError{if(active){context.reply(addBlock(receivedBlockInfo))}else{thrownewIllegalStateException(“ReceiverTrackerRpcEndpointshutdown.”)}}})}else{context.reply(addBlock(receivedBlockInfo))}/**Registerareceiver*/privatedefregisterReceiver(streamId:Int,typ:String,host:String,executorId:String,receiverEndpoint:RpcEndpointRef,senderAddress:RpcAddress):Boolean={if(!receiverInputStreamIds.contains(streamId)){thrownewSparkException(“Registerreceivedforunexpectedid”+streamId)}
if(isTrackerStopping||isTrackerStopped){returnfalse}
valscheduledLocations=receiverTrackingInfos(streamId).scheduledLocationsvalacceptableExecutors=if(scheduledLocations.nonEmpty){//Thisreceiverisregisteringandit’sscheduledby//ReceiverSchedulingPolicy.scheduleReceivers.Souse”scheduledLocations”tocheckit.scheduledLocations.get}else{//Thisreceiverisscheduledby”ReceiverSchedulingPolicy.rescheduleReceiver”,socalling//”ReceiverSchedulingPolicy.rescheduleReceiver”againtocheckit.scheduleReceiver(streamId)}
defisAcceptable:Boolean=acceptableExecutors.exists{caseloc:ExecutorCacheTaskLocation=>loc.executorId==executorIdcaseloc:TaskLocation=>loc.host==host}
if(!isAcceptable){//Refuseitsinceit’sscheduledtoawrongexecutorfalse}else{valname=s”${typ}-${streamId}”valreceiverTrackingInfo=ReceiverTrackingInfo(streamId,ReceiverState.ACTIVE,scheduledLocations=None,runningExecutor=Some(ExecutorCacheTaskLocation(host,executorId)),name=Some(name),endpoint=Some(receiverEndpoint))receiverTrackingInfos.put(streamId,receiverTrackingInfo)listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))logInfo(“Registeredreceiverforstream”+streamId+”from”+senderAddress)true}}
Receiver的启动,以ssc.socketTextStream(“localhost”,9999)为例,创建的是SocketReceiver对象。内部启动一个线程来连接SocketServer,读取socket数据并存储。
private[streaming]classSocketReceiver[T:ClassTag](host:String,port:Int,bytesToObjects:InputStream=>Iterator[T],storageLevel:StorageLevel)extendsReceiver[T](storageLevel)withLogging{defonStart(){//StartthethreadthatreceivesdataoveraconnectionnewThread(“SocketReceiver”){setDaemon(true)overridedefrun(){receive()}}.start()}
defonStop(){//Thereisnothingmuchtodoasthethreadcallingreceive()//isdesignedtostopbyitselfisStopped()returnsfalse}
/**Createasocketconnectionandreceivedatauntilreceiverisstopped*/defreceive(){varsocket:Socket=nulltry{logInfo(“Connectingto”+host+”:”+port)socket=newSocket(host,port)logInfo(“Connectedto”+host+”:”+port)valiterator=bytesToObjects(socket.getInputStream())while(!isStopped&&iterator.hasNext){store(iterator.next)}if(!isStopped()){restart(“Socketdatastreamhadnomoredata”)}else{logInfo(“Stoppedreceiving”)}}catch{casee:java.net.ConnectException=>restart(“Errorconnectingto”+host+”:”+port,e)caseNonFatal(e)=>logWarning(“Errorreceivingdata”,e)restart(“Errorreceivingdata”,e)}finally{if(socket!=null){socket.close()logInfo(“Closedsocketto”+host+”:”+port)}}}}接下来看JobScheduler.start()中启动的第四项工作JobGenerator。JobGenerator有成员RecurringTimer,用于启动消息系统和定时器。按照batchInterval时间间隔定期发送GenerateJobs消息。//根据创建StreamContext时传入的batchInterval,定时发送GenerateJobs消息privatevaltimer=newRecurringTimer(clock,ssc.graph.batchDuration.milliseconds,longTime=>eventLoop.post(GenerateJobs(newTime(longTime))),”JobGenerator”)JobGenerator的start()方法:/**Startgenerationofjobs*/defstart():Unit=synchronized{if(eventLoop!=null)return//generatorhasalreadybeenstarted
//CallcheckpointWriterheretoinitializeitbeforeeventLoopusesittoavoidadeadlock.//SeeSPARK-10125checkpointWriter
eventLoop=newEventLoop[JobGeneratorEvent](“JobGenerator”){overrideprotecteddefonReceive(event:JobGeneratorEvent):Unit=processEvent(event)
overrideprotecteddefonError(e:Throwable):Unit={jobScheduler.reportError(“Errorinjobgenerator”,e)}}// 启动消息循环处理线程eventLoop.start()
if(ssc.isCheckpointPresent){restart()}else{// 开启定时生成Job的定时器startFirstTime()}}
JobGenerator.start()中的startFirstTime()的定义:
/**Startsthegeneratorforthefirsttime*/privatedefstartFirstTime(){valstartTime=newTime(timer.getStartTime())graph.start(startTime-graph.batchDuration)timer.start(startTime.milliseconds)logInfo(“StartedJobGeneratorat”+startTime)}
JobGenerator.start()中的processEvent()的定义:
/**Processesallevents*/privatedefprocessEvent(event:JobGeneratorEvent){logDebug(“Gotevent”+event)eventmatch{caseGenerateJobs(time)=>generateJobs(time)caseClearMetadata(time)=>clearMetadata(time)caseDoCheckpoint(time,clearCheckpointDataLater)=>doCheckpoint(time,clearCheckpointDataLater)caseClearCheckpointData(time)=>clearCheckpointData(time)}}
其中generateJobs的定义:
/**Generatejobsandperformcheckpointforthegiven`time`.*/privatedefgenerateJobs(time:Time){//SettheSparkEnvinthisthread,sothatjobgenerationcodecanaccesstheenvironment//Example:BlockRDDsarecreatedinthisthread,anditneedstoaccessBlockManager//Update:ThisisprobablyredundantafterthreadlocalstuffinSparkEnvhasbeenremoved.SparkEnv.set(ssc.env)Try{
// 根据特定的时间获取具体的数据jobScheduler.receiverTracker.allocateBlocksToBatch(time)//allocatereceivedblockstobatch//调用DStreamGraph的generateJobs生成Jobgraph.generateJobs(time)//generatejobsusingallocatedblock}match{caseSuccess(jobs)=>valstreamIdToInputInfos=jobScheduler.inputInfoTracker.getInfo(time)jobScheduler.submitJobSet(JobSet(time,jobs,streamIdToInputInfos))caseFailure(e)=>jobScheduler.reportError(“Errorgeneratingjobsfortime”+time,e)}eventLoop.post(DoCheckpoint(time,clearCheckpointDataLater=false))}
/**Performcheckpointforthegive`time`.*/privatedefdoCheckpoint(time:Time,clearCheckpointDataLater:Boolean){if(shouldCheckpoint&&(time-graph.zeroTime).isMultipleOf(ssc.checkpointDuration)){logInfo(“Checkpointinggraphfortime”+time)ssc.graph.updateCheckpointData(time)checkpointWriter.write(newCheckpoint(ssc,time),clearCheckpointDataLater)}}
DStreamGraph的generateJobs方法,调用输出流的generateJob方法来生成Jobs集合。
// 输出流:具体Action的输出操作privatevaloutputStreams=newArrayBuffer[DStream[_]]()
defgenerateJobs(time:Time):Seq[Job]={logDebug(“Generatingjobsfortime”+time)valjobs=this.synchronized{outputStreams.flatMap{outputStream=>valjobOption=outputStream.generateJob(time)jobOption.foreach(_.setCallSite(outputStream.creationSite))jobOption}}logDebug(“Generated”+jobs.length+”jobsfortime”+time)jobs}
来看下DStream的generateJob方法,调用getOrCompute方法来获取当Interval的时候,DStreamGraph会被BatchData实例化成为RDD,如果有RDD则封装jobFunc方法,里面包含context.sparkContext.runJob(rdd,emptyFunc),然后返回封装后的Job。
/***GenerateaSparkStreamingjobforthegiventime.Thisisaninternalmethodthat*shouldnotbecalleddirectly.Thisdefaultimplementationcreatesajob*thatmaterializesthecorrespondingRDD.SubclassesofDStreammayoverridethis*togeneratetheirownjobs.*/private[streaming]defgenerateJob(time:Time):Option[Job]={getOrCompute(time)match{caseSome(rdd)=>{valjobFunc=()=>{valemptyFunc={(iterator:Iterator[T])=>{}}context.sparkContext.runJob(rdd,emptyFunc)}Some(newJob(time,jobFunc))}caseNone=>None}}接下来看JobScheduler的submitJobSet方法,向线程池中提交JobHandler。而JobHandler实现了Runnable接口,最终调用了job.run()这个方法。看一下Job类的定义,其中run方法调用的func为构造Job时传入的jobFunc,其包含了context.sparkContext.runJob(rdd,emptyFunc)操作,最终导致Job的提交。
defsubmitJobSet(jobSet:JobSet){if(jobSet.jobs.isEmpty){logInfo(“Nojobsaddedfortime”+jobSet.time)}else{listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))jobSets.put(jobSet.time,jobSet)jobSet.jobs.foreach(job=>jobExecutor.execute(newJobHandler(job)))logInfo(“Addedjobsfortime”+jobSet.time)}}
privateclassJobHandler(job:Job)extendsRunnablewithLogging{importJobScheduler._
defrun(){try{valformattedTime=UIUtils.formatBatchTime(job.time.milliseconds,ssc.graph.batchDuration.milliseconds,showYYYYMMSS=false)valbatchUrl=s”/streaming/batch/?id=${job.time.milliseconds}”valbatchLinkText=s”[outputoperation${job.outputOpId},batchtime${formattedTime}]”
ssc.sc.setJobDescription(s”””Streamingjobfrom$batchLinkText”””)ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY,job.time.milliseconds.toString)ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY,job.outputOpId.toString)
//Weneedtoassign`eventLoop`toatempvariable.Otherwise,because//`JobScheduler.stop(false)`mayset`eventLoop`tonullwhenthismethodisrunning,then//it’spossiblethatwhen`post`iscalled,`eventLoop`happenstonull.var_eventLoop=eventLoopif(_eventLoop!=null){_eventLoop.post(JobStarted(job,clock.getTimeMillis()))//Disablechecksforexistingoutputdirectoriesinjobslaunchedbythestreaming//scheduler,sincewemayneedtowriteoutputtoanexistingdirectoryduringcheckpoint//recovery;seeSPARK-4835formoredetails.PairRDDFunctions.disableOutputSpecValidation.withValue(true){job.run()}_eventLoop=eventLoopif(_eventLoop!=null){_eventLoop.post(JobCompleted(job,clock.getTimeMillis()))}}else{//JobSchedulerhasbeenstopped.}}finally{ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY,null)ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY,null)}}}}
看完上述内容,你们对基于案例分析Spark Streaming流计算框架的运行源码怎么写有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注开发云行业资讯频道,感谢大家的支持。

相关推荐: HDFS读写的示例分析

这篇文章主要介绍了HDFS读写的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。一、HDFS读写之前提  NameNode(元数据节点):存放元数据(名称空间、副本数、权限、块列表、集群配…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 09/23 18:18
下一篇 09/23 18:18

相关推荐