IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark之创建Rdd、DataFrame、Dataset -> 正文阅读

[大数据]Spark之创建Rdd、DataFrame、Dataset

一、RDD

1.1 通过本地集合创建RDD

  val seq1 = Seq(1001,"liming",24,95)
  val seq2 = Seq(1,2,3)
  //可以不指定分区数
  val rdd1: RDD[Any] = sc.parallelize(seq1,2)
  println(rdd1.collect().mkString(","))
  val rdd2: RDD[Int] = sc.makeRDD(Seq(1,2,3,4),2)
  //也可以使用Array、List
  val rdd3 = sc.parallelize(List(1, 2, 3, 4))
  val rdd4 = sc.parallelize(Array(1, 2, 3, 4))
  rdd3.take(10).foreach(println)

1.2 通过外部数据创建RDD

//外部数据(文件)创建RDD
  val rdd1 = sc.textFile("file_path")
  //1、textFile传入的是一个路径
  //2、分区是由HDFS中的block决定的

1.3 通过RDD衍生新的RDD

//RDD衍生RDD
  val rdd1 = sc.parallelize(Seq("zhangsan","lisi","wangwu"))
  //通过RDD执行算子操作会产生RDD
  val rdd2 = rdd1.map(item => (item, 1))

二、DataFrame

1.1 通过Seq生成

val df = spark.createDataFrame(Seq(
  ("ming", 20, 15552211521L),
  ("hong", 19, 13287994007L),
  ("zhi", 21, 15552211523L)
)).toDF("name", "age", "phone")

df.show()

1.2 通过读取外部结构化数据

1.2.1 读取Json文件生成

json文件内容:

{"name":"ming","age":20,"phone":15552211521}
{"name":"hong", "age":19,"phone":13287994007}
{"name":"zhi", "age":21,"phone":15552211523}

代码:

    val dfJson = spark.read.format("json").load("/Users/hadoop/sparkLearn/data/student.json")
    dfJson.show()

1.2.2 读取csv文件生成

csv文件:

name,age,phone
ming,20,15552211521
hong,19,13287994007
zhi,21,15552211523

代码:

val dfCsv = spark.read.format("csv").option("header", true).load("/Users/hadoop/sparkLearn/data/students.csv")
dfCsv.show()

1.2.3 读取parquet文件

val dfparquet = spark.read.parquet("/Users/hadoop/sparkLearn/data/students.parquet")

1.3 通过jdbc创建

    //读取数据库(mysql)
    val options = new util.HashMap[String,String]()
    options.put("url", "jdbc:mysql://localhost:3306/spark")
    options.put("driver","com.mysql.jdbc.Driver")
    options.put("user","root")
    options.put("password","hollysys")
    options.put("dbtable","user")

    spark.read.format("jdbc").options(options).load().show()

1.4 动态创建schema

1.4.1 ArrayList + Schema

    val schema = StructType(List(
      StructField("name", StringType, true),
      StructField("age", IntegerType, true),
      StructField("phone", LongType, true)
    ))
    val dataList = new util.ArrayList[Row]()
    dataList.add(Row("ming",20,15552211521L))
    dataList.add(Row("hong",19,13287994007L))
    dataList.add(Row("zhi",21,15552211523L))
    spark.createDataFrame(dataList,schema).show()

1.4.2 RDD + Schema

文件内容:

54655326,44039606001410150001,2020-11-28 20:51:04,2020-11-28 20:51:04,2020-11-28 18:45:16,61,http://190.176.35.157/photos/sensord
oor_face/20201128/6c33fe6e2215e74997a57a11916ece43/614374_1.jpg,szgj_img/20201128-ac8a6897-000a580ae00068-07f7ca68-00003eb0
54655326,44039606001410150001,2020-11-28 20:51:04,2020-11-28 20:51:04,2020-11-28 18:45:16,61,http://190.176.35.157/photos/sensord
oor_face/20201128/6c33fe6e2215e74997a57a11916ece43/614374_1.jpg,szgj_img/20201128-ac8a6897-000a580ae00068-07f7ca68-00003eb0
54655327,44039603011410050002,2020-11-28 20:51:04,2020-11-28 20:51:04,2020-11-28 19:37:49,272,http://190.176.35.157/photos/sensor
door_face/20201128/0ff9f28494461eafca57e569d62493f7/9578502_0.jpg,szgj_img/20201128-a96baad0-000a580ae01a5d-0885e0e0-000022b0
 //从原始RDD中创建一个包含Row的RDD
    val FacialProfileRDD: RDD[Row] = linesRDD.map(line => {
      val strs: Array[String] = line.split(",")
      Row(strs(0), strs(1), strs(2), strs(3), strs(4), strs(5), strs(6), strs(7))
    })

//创建由StructType表示的模式,该模式与上述步骤中创建的RDD中的行结构相匹配
    val structType: StructType = StructType(
      StructField("aid", StringType, true) ::
        StructField("gbNO", StringType, true) ::
        StructField("create_time", StringType, true) ::
        StructField("update_time", StringType, true) ::
        StructField("acquisition_time", StringType, true) ::
        StructField("recordId", StringType, true) ::
        StructField("bigcaptureFaceUrl", StringType, true) ::
        StructField("smallcaptureFaceUrl", StringType, true) ::
        Nil
    )

    //DataFrame = RDD + Schema
    //通过SparkSession提供的createDataFrame方法,将模式应用到Row的RDD中
    val FacialProfileDF: DataFrame = spark.createDataFrame(FacialProfileRDD, structType)

1.5 通过样例类case class 创建

文件内容同1.4.2

创建样例类:

package com.sibat.applications.part0

//样例类facialprofile:"aid","gbNo","create_time","update_time","acquisition_time","recordId","bigcaptureFaceUrl","smallcaptureFaceUrl"
case class FacialProfile(aid:String, gbNO:String, create_time:String, update_time:String, acquisition_time:String, recordId:String, bigcaptureFaceUrl:String, smallcaptureFaceUrl:String)

组装:

//profile泛型是FacialProfile ,FacialProfile既有数据,又有字段
    //数据 + Scheme
    val FacialProfileRDD: RDD[FacialProfile] = linesRDD.map(line => {
      val column: Array[String] = line.split(",")
      FacialProfile(column(0), column(1), column(2), column(3),column(4), column(5), column(6), column(7))
    })
    //将RDD转成DF  需要导入隐式转换
    //该隐式转换在SparkSession中
    import spark.implicits._
    val FacialProfileDF: DataFrame = FacialProfileRDD.toDF()

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-28 00:16:32  更:2021-07-28 00:16:57 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 0:17:27-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码