IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark源码解析(六)Spark RPC样例 -> 正文阅读

[大数据]Spark源码解析(六)Spark RPC样例

1.RpcServer类:

object RpcServerTest {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    val sparkSession = SparkSession
      .builder()
      .config(conf)
      .master("local[*]")
      .appName("test rpc").getOrCreate()
    val sparkContext: SparkContext = sparkSession.sparkContext
    val sparkEnv: SparkEnv = sparkContext.env
    /**
      * 注释: 系统
      */
    val rpcEnv = RpcEnv
      .create(HelloRpcSettings.getName(), HelloRpcSettings.getHostname(), HelloRpcSettings.getHostname(), HelloRpcSettings.getPort(), conf,
        sparkEnv.securityManager, 1, false)

    // TODO 注释:创建endpoint
    // RpcEndPoint
    val helloEndpoint: RpcEndpoint = new HelloEndPoint(rpcEnv)

    // 启动endpoint

    rpcEnv.setupEndpoint(HelloRpcSettings.getName(), helloEndpoint)

    rpcEnv.awaitTermination()

  }
}

2.RpcClient类

object RpcClientTest {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()

    val sparkSession =
      SparkSession
      .builder()
      .config(conf)
        .master("local[*]")
        .appName("test rpc")
        .getOrCreate()
    val sparkContext: SparkContext = sparkSession.sparkContext
    val sparkEnv: SparkEnv = sparkContext.env

    /**
      * 注释: RpcEnv
      */
    val rpcEnv: RpcEnv = RpcEnv
      .create(HelloRpcSettings.getName(), HelloRpcSettings.getHostname(), HelloRpcSettings.getPort(), conf, sparkEnv.securityManager, false)

    /**
      * 注释: endPointRef
      */
    val endPointRef: RpcEndpointRef = rpcEnv
      .setupEndpointRef(RpcAddress(HelloRpcSettings.getHostname(), HelloRpcSettings.getPort()), HelloRpcSettings.getName())

    import scala.concurrent.ExecutionContext.Implicits.global

    // TODO 注释: 同步。无返回结果
    endPointRef.send(SayHi("test send"))

    // TODO 注释:同步,有返回结果
    val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
    future.onComplete { case scala.util.Success(value) => println(s"Got the result = $value")
    case scala.util.Failure(e) => println(s"Got error: $e")
    }
    Await.result(future, Duration.apply("30s"))

    // TODO 注释:异步,有返回结果
    val res = endPointRef.askSync[String](SayBye("test askSync"))
    println(res)

    sparkSession.stop()
  }
}

3.RpcServer处理类

class HelloEndPoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {

    override def onStart(): Unit = {
        println(rpcEnv.address)
        println("start hello endpoint")
    }
    
    override def receive: PartialFunction[Any, Unit] = {
        case SayHi(msg) => println(s"receive $msg")
    }
    
    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
        case SayHi(msg) => {
            println(s"receive $msg")
            context.reply(s"hi, $msg")
        }
        case SayBye(msg) => {
            println(s"receive $msg")
            context.reply(s"bye, $msg")
        }
    }
    
    override def onStop(): Unit = {
        println("stop hello endpoint")
    }
}

case class SayHi(msg: String)

case class SayBye(msg: String)

4.Util

object HelloRpcSettings {
    val rpcName = "hello-rpc-service"
    val port = 9527
    val hostname = "localhost"
    
    def getName() = {
        rpcName
    }
    
    def getPort(): Int = {
        port
    }
    
    def getHostname(): String = {
        hostname
    }
}

5.代码解析

1. new HelloEndPoint(rpcEnv): Endpoint的构造方法,初始化HelloEndPoint,则则个类中{}里面的所有能执行(属性赋值,代码块)的代码都会执行,不能执行的为其内部类,方法
????????rpcEnv.setupEndpoint(HelloRpcSettings.getName(), helloEndpoint) 调用其preStart()

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-26 12:08:54  更:2021-07-26 12:10:04 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 2:45:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码