博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark如何使用Akka实现进程、节点通信的简明介绍
阅读量:5334 次
发布时间:2019-06-15

本文共 7517 字,大约阅读时间需要 25 分钟。

《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接

《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接

《深入理解Spark:核心思想与源码分析》一书第二章的内容请看链接

《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接

《深入理解Spark:核心思想与源码分析》一书第三章第二部分的内容请看链接

《深入理解Spark:核心思想与源码分析》一书第三章第三部分的内容请看链接

《深入理解Spark:核心思想与源码分析》一书第三章第四部分的内容请看链接

Akka简介

  Scala认为Java线程通过共享数据以及通过锁来维护共享数据的一致性是糟糕的做法,容易引起锁的争用,而且线程的上下文切换会带来不少开销,降低并发程序的性能,甚至会引入死锁的问题。在Scala中只需要自定义类型继承Actor,并且提供act方法,就如同Java里实现Runnable接口,需要实现run方法一样。但是不能直接调用act方法,而是通过发送消息的方式(Scala发送消息是异步的),传递数据。如:

Actor ! message
Akka是Actor编程模型的高级类库,类似于JDK 1.5之后越来越丰富的并发工具包,简化了程序员并发编程的难度。Akka是一款提供了用于构建高并发的、分布式的、可伸缩的、基于Java虚拟机的消息驱动应用的工具集和运行时环境。从下面Akka官网提供的一段代码示例,可以看出Akka并发编程的简约。

case class Greeting(who: String)class GreetingActor extends Actor with ActorLogging {  def receive = {    case Greeting(who) ⇒ log.info("Hello " + who)  }}val system = ActorSystem("MySystem")val greeter = system.actorOf(Props[GreetingActor], name = "greeter")greeter ! Greeting("Charlie Parker")

Akka提供了分布式的框架,意味着用户不需要考虑如何实现分布式部署,Akka官网提供了下面的示例演示如何获取远程Actor的引用。

// config on all machinesakka {  actor {    provider = akka.remote.RemoteActorRefProvider    deployment {      /greeter {        remote = akka.tcp://MySystem@machine1:2552      }    }  }}// ------------------------------// define the greeting actor and the greeting messagecase class Greeting(who: String) extends Serializableclass GreetingActor extends Actor with ActorLogging {  def receive = {    case Greeting(who) ⇒ log.info("Hello " + who)  }}// ------------------------------// on machine 1: empty system, target for deployment from machine 2val system = ActorSystem("MySystem")// ------------------------------// on machine 2: Remote Deployment - deploying on machine1val system = ActorSystem("MySystem")val greeter = system.actorOf(Props[GreetingActor], name = "greeter")// ------------------------------// on machine 3: Remote Lookup (logical home of “greeter” is machine2, remote deployment is transparent)val system = ActorSystem("MySystem")val greeter = system.actorSelection("akka.tcp://MySystem@machine2:2552/user/greeter")greeter ! Greeting("Sonny Rollins")

Actor之间最终会构成一棵树,作为父亲的Actor应当对所有儿子的异常失败进行处理(监管)Akka给出了简单的示例,代码如下。

class Supervisor extends Actor {  override val supervisorStrategy =  OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {    case _: ArithmeticException ⇒ Resume    case _: NullPointerException ⇒ Restart    case _: Exception ⇒ Escalate  }  val worker = context.actorOf(Props[Worker])  def receive = {    case n: Int => worker forward n  }}

Akka的更多信息请访问官方网站:http://akka.io/

基于Akka的分布式消息系统ActorSystem

  Spark使用Akka提供的消息系统实现并发:ActorSystem是Spark中最基础的设施,Spark既使用它发送分布式消息,又用它实现并发编程。正是因为Actor轻量级的并发编程、消息发送以及ActorSystem支持分布式消息发送等特点,Spark选择了ActorSystem。

SparkEnv中创建ActorSystem时用到了AkkaUtils工具类,代码如下。

val (actorSystem, boundPort) =Option(defaultActorSystem) match {  case Some(as) => (as, port)  case None =>    val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName    AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)}

AkkaUtils.createActorSystem方法用于启动ActorSystem,代码如下。

def createActorSystem(  name: String,  host: String,  port: Int,  conf: SparkConf,  securityManager: SecurityManager): (ActorSystem, Int) = {  val startService: Int => (ActorSystem, Int) = { actualPort =>    doCreateActorSystem(name, host, actualPort, conf, securityManager)  }  Utils.startServiceOnPort(port, startService, conf, name)}

AkkaUtils使用了Utils的静态方法startServiceOnPort, startServiceOnPort最终会回调方法startService: Int=> (T, Int),此处的startService实际是方法doCreateActorSystem。真正启动ActorSystem是由doCreateActorSystem方法完成的,doCreateActorSystem的具体实现细节请见AkkaUtils的详细介绍。关于startServiceOnPort的实现,请参阅[《Spark中常用工具类Utils的简明介绍》](http://blog.csdn.net/beliefer/article/details/50904662)一文的内容。

AkkaUtils

  AkkaUtils是Spark对Akka相关API的又一层封装,这里对其常用的功能进行介绍。

(1)doCreateActorSystem

功能描述:创建ActorSystem。

private def doCreateActorSystem(  name: String,  host: String,  port: Int,  conf: SparkConf,  securityManager: SecurityManager): (ActorSystem, Int) = {  val akkaThreads = conf.getInt("spark.akka.threads", 4)  val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)  val akkaTimeout = conf.getInt("spark.akka.timeout", 100)  val akkaFrameSize = maxFrameSizeBytes(conf)  val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)  val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"  if (!akkaLogLifecycleEvents) {    Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))  }  val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"  val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000)  val akkaFailureDetector =    conf.getDouble("spark.akka.failure-detector.threshold", 300.0)  val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)  val secretKey = securityManager.getSecretKey()  val isAuthOn = securityManager.isAuthenticationEnabled()  if (isAuthOn && secretKey == null) {    throw new Exception("Secret key is null with authentication on")  }  val requireCookie = if (isAuthOn) "on" else "off"  val secureCookie = if (isAuthOn) secretKey else ""  logDebug("In createActorSystem, requireCookie is: " + requireCookie)  val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(    ConfigFactory.parseString(    s"""    |akka.daemonic = on    |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]    |akka.stdout-loglevel = "ERROR"    |akka.jvm-exit-on-fatal-error = off    |akka.remote.require-cookie = "$requireCookie"    |akka.remote.secure-cookie = "$secureCookie"    |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s    |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s    |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector    |akka.actor.provider = "akka.remote.RemoteActorRefProvider"    |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"    |akka.remote.netty.tcp.hostname = "$host"    |akka.remote.netty.tcp.port = $port    |akka.remote.netty.tcp.tcp-nodelay = on    |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s    |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B    |akka.remote.netty.tcp.execution-pool-size = $akkaThreads    |akka.actor.default-dispatcher.throughput = $akkaBatchSize    |akka.log-config-on-start = $logAkkaConfig    |akka.remote.log-remote-lifecycle-events = $lifecycleEvents    |akka.log-dead-letters = $lifecycleEvents    |akka.log-dead-letters-during-shutdown = $lifecycleEvents    """.stripMargin))  val actorSystem = ActorSystem(name, akkaConf)  val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider  val boundPort = provider.getDefaultAddress.port.get  (actorSystem, boundPort)}

(2)makeDriverRef

功能描述:从远端ActorSystem中查找已经注册的某个Actor。

def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = {  val driverActorSystemName = SparkEnv.driverActorSystemName  val driverHost: String = conf.get("spark.driver.host", "localhost")  val driverPort: Int = conf.getInt("spark.driver.port", 7077)  Utils.checkHost(driverHost, "Expected hostname")  val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"  val timeout = AkkaUtils.lookupTimeout(conf)  logInfo(s"Connecting to $name: $url")  Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)}

转载于:https://www.cnblogs.com/jiaan-geng/p/5385993.html

你可能感兴趣的文章
bzoj1048 [HAOI2007]分割矩阵
查看>>
python中的__init__ 、__new__、__call__等内置函数的剖析
查看>>
Java中的编码
查看>>
PKUWC2018 5/6
查看>>
As-If-Serial 理解
查看>>
雷林鹏分享:Redis 简介
查看>>
洛谷P1005 矩阵取数游戏
查看>>
在Silverlight中使用HierarchicalDataTemplate为TreeView实现递归树状结构
查看>>
无线通信基础(一):无线网络演进
查看>>
如何在工作中快速成长?阿里资深架构师给工程师的10个简单技巧
查看>>
WebSocket 时时双向数据,前后端(聊天室)
查看>>
关于python中带下划线的变量和函数 的意义
查看>>
linux清空日志文件内容 (转)
查看>>
安卓第十三天笔记-服务(Service)
查看>>
Servlet接收JSP参数乱码问题解决办法
查看>>
【bzoj5016】[Snoi2017]一个简单的询问 莫队算法
查看>>
Ajax : load()
查看>>
MySQL-EXPLAIN执行计划Extra解释
查看>>
Zookeeper概述
查看>>
Zookeeper一致性级别
查看>>