《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接
《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接
《深入理解Spark:核心思想与源码分析》一书第二章的内容请看链接
《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接
《深入理解Spark:核心思想与源码分析》一书第三章第二部分的内容请看链接
《深入理解Spark:核心思想与源码分析》一书第三章第三部分的内容请看链接
《深入理解Spark:核心思想与源码分析》一书第三章第四部分的内容请看链接
Akka简介
Scala认为Java线程通过共享数据以及通过锁来维护共享数据的一致性是糟糕的做法,容易引起锁的争用,而且线程的上下文切换会带来不少开销,降低并发程序的性能,甚至会引入死锁的问题。在Scala中只需要自定义类型继承Actor,并且提供act方法,就如同Java里实现Runnable接口,需要实现run方法一样。但是不能直接调用act方法,而是通过发送消息的方式(Scala发送消息是异步的),传递数据。如:
Actor ! message Akka是Actor编程模型的高级类库,类似于JDK 1.5之后越来越丰富的并发工具包,简化了程序员并发编程的难度。Akka是一款提供了用于构建高并发的、分布式的、可伸缩的、基于Java虚拟机的消息驱动应用的工具集和运行时环境。从下面Akka官网提供的一段代码示例,可以看出Akka并发编程的简约。case class Greeting(who: String)class GreetingActor extends Actor with ActorLogging { def receive = { case Greeting(who) ⇒ log.info("Hello " + who) }}val system = ActorSystem("MySystem")val greeter = system.actorOf(Props[GreetingActor], name = "greeter")greeter ! Greeting("Charlie Parker")
Akka提供了分布式的框架,意味着用户不需要考虑如何实现分布式部署,Akka官网提供了下面的示例演示如何获取远程Actor的引用。
// config on all machinesakka { actor { provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote = akka.tcp://MySystem@machine1:2552 } } }}// ------------------------------// define the greeting actor and the greeting messagecase class Greeting(who: String) extends Serializableclass GreetingActor extends Actor with ActorLogging { def receive = { case Greeting(who) ⇒ log.info("Hello " + who) }}// ------------------------------// on machine 1: empty system, target for deployment from machine 2val system = ActorSystem("MySystem")// ------------------------------// on machine 2: Remote Deployment - deploying on machine1val system = ActorSystem("MySystem")val greeter = system.actorOf(Props[GreetingActor], name = "greeter")// ------------------------------// on machine 3: Remote Lookup (logical home of “greeter” is machine2, remote deployment is transparent)val system = ActorSystem("MySystem")val greeter = system.actorSelection("akka.tcp://MySystem@machine2:2552/user/greeter")greeter ! Greeting("Sonny Rollins")
Actor之间最终会构成一棵树,作为父亲的Actor应当对所有儿子的异常失败进行处理(监管)Akka给出了简单的示例,代码如下。
class Supervisor extends Actor { override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException ⇒ Resume case _: NullPointerException ⇒ Restart case _: Exception ⇒ Escalate } val worker = context.actorOf(Props[Worker]) def receive = { case n: Int => worker forward n }}
Akka的更多信息请访问官方网站:http://akka.io/
基于Akka的分布式消息系统ActorSystem
Spark使用Akka提供的消息系统实现并发:ActorSystem是Spark中最基础的设施,Spark既使用它发送分布式消息,又用它实现并发编程。正是因为Actor轻量级的并发编程、消息发送以及ActorSystem支持分布式消息发送等特点,Spark选择了ActorSystem。
SparkEnv中创建ActorSystem时用到了AkkaUtils工具类,代码如下。val (actorSystem, boundPort) =Option(defaultActorSystem) match { case Some(as) => (as, port) case None => val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)}
AkkaUtils.createActorSystem方法用于启动ActorSystem,代码如下。
def createActorSystem( name: String, host: String, port: Int, conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { val startService: Int => (ActorSystem, Int) = { actualPort => doCreateActorSystem(name, host, actualPort, conf, securityManager) } Utils.startServiceOnPort(port, startService, conf, name)}
AkkaUtils使用了Utils的静态方法startServiceOnPort, startServiceOnPort最终会回调方法startService: Int=> (T, Int),此处的startService实际是方法doCreateActorSystem。真正启动ActorSystem是由doCreateActorSystem方法完成的,doCreateActorSystem的具体实现细节请见AkkaUtils的详细介绍。关于startServiceOnPort的实现,请参阅[《Spark中常用工具类Utils的简明介绍》](http://blog.csdn.net/beliefer/article/details/50904662)一文的内容。
AkkaUtils
AkkaUtils是Spark对Akka相关API的又一层封装,这里对其常用的功能进行介绍。
(1)doCreateActorSystem
功能描述:创建ActorSystem。
private def doCreateActorSystem( name: String, host: String, port: Int, conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) val akkaTimeout = conf.getInt("spark.akka.timeout", 100) val akkaFrameSize = maxFrameSizeBytes(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" if (!akkaLogLifecycleEvents) { Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL)) } val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off" val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000) val akkaFailureDetector = conf.getDouble("spark.akka.failure-detector.threshold", 300.0) val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000) val secretKey = securityManager.getSecretKey() val isAuthOn = securityManager.isAuthenticationEnabled() if (isAuthOn && secretKey == null) { throw new Exception("Secret key is null with authentication on") } val requireCookie = if (isAuthOn) "on" else "off" val secureCookie = if (isAuthOn) secretKey else "" logDebug("In createActorSystem, requireCookie is: " + requireCookie) val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback( ConfigFactory.parseString( s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] |akka.stdout-loglevel = "ERROR" |akka.jvm-exit-on-fatal-error = off |akka.remote.require-cookie = "$requireCookie" |akka.remote.secure-cookie = "$secureCookie" |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = $port |akka.remote.netty.tcp.tcp-nodelay = on |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B |akka.remote.netty.tcp.execution-pool-size = $akkaThreads |akka.actor.default-dispatcher.throughput = $akkaBatchSize |akka.log-config-on-start = $logAkkaConfig |akka.remote.log-remote-lifecycle-events = $lifecycleEvents |akka.log-dead-letters = $lifecycleEvents |akka.log-dead-letters-during-shutdown = $lifecycleEvents """.stripMargin)) val actorSystem = ActorSystem(name, akkaConf) val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.getDefaultAddress.port.get (actorSystem, boundPort)}
(2)makeDriverRef
功能描述:从远端ActorSystem中查找已经注册的某个Actor。
def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = { val driverActorSystemName = SparkEnv.driverActorSystemName val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name" val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)}