IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【实战】Flink批量写数据到kudu -> 正文阅读

[大数据]【实战】Flink批量写数据到kudu

package risen.source

import java.util
import java.util.Properties
import com.google.gson.JsonObject
import com.google.gson.JsonParser
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kudu.client.SessionConfiguration.FlushMode
import org.apache.kudu.client._
import risen.util.ParameterToolUtil
import scala.collection.JavaConversions._

object KafkaSource {

  def main(args: Array[String]): Unit = {
    //配置环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //全局参数
    val parameterTool: ParameterTool = ParameterToolUtil.getParameter
    env.getConfig.setGlobalJobParameters(parameterTool)
    enableCheckPoint(env)
    //kafka消费者
    val consumer: FlinkKafkaConsumer[String] = buildFlinkKafkaConsumer(parameterTool)
    //consumer.setStartFromLatest()
    consumer.setStartFromLatest()
    val kafkaStream: DataStreamSource[String] = env.addSource(consumer)
    kafkaStream.map(new MapFunction[String, JsonObject] {
      override def map(t: String): JsonObject = {
        println(t)
        JsonParser.parseString(t).getAsJsonObject
      }
    }).addSink(new RichSinkFunction[JsonObject] {
      var client: KuduClient = null
      var session: KuduSession = null
      var count: Int = 0

      override def open(parameters: Configuration): Unit = {
        client = new KuduClient.KuduClientBuilder(parameterTool.get("kudu.master")).build
        session = client.newSession
        session.setFlushMode(FlushMode.MANUAL_FLUSH)
        session.setMutationBufferSpace(parameterTool.getInt("kudu.flush.buffer.bytes"))
        session.setFlushInterval(parameterTool.getInt("kudu.flush.ms"))
      }

      override def invoke(value: JsonObject, context: SinkFunction.Context): Unit = {
        println("开始写数据")
        val source: JsonObject = JsonParser.parseString(value.get("source").toString).getAsJsonObject
        val after: JsonObject = JsonParser.parseString(value.get("after").toString).getAsJsonObject
        val tableName = source.get("table").getAsString
        //val table: KuduTable = client.openTable("impala::ld_yq_bigdata.ods_" + tableName)
        val table: KuduTable = client.openTable(parameterTool.get("mysql.table.prefix") + tableName)
        println(parameterTool.get("mysql.table.prefix") + tableName)

        val upsert: Upsert = table.newUpsert()
        // 设置字段内容
        val afterValue: util.Iterator[String] = after.keySet().iterator()
        count += 1
        while (afterValue.hasNext) {
          val key = afterValue.next()
          var com = key.toLowerCase()
          //正则表达式可能有误差,直接查kudu的类型即可
          //if (Pattern.compile("\\d*").matcher(after.get(key).toString).matches())
          if (after.get(key).isJsonNull)
            upsert.getRow.isNull(com)
          else {
            if (table.getSchema.getColumn(com).getType.toString.split(" ")(1) == "string")
              upsert.getRow.addString(com, after.get(key).getAsString)
            else {
              if (table.getSchema.getColumn(com).getType.toString.split(" ")(1) == "int64")
                upsert.getRow.addLong(com, after.get(key).getAsLong)
              else
                upsert.getRow.addInt(com, after.get(key).getAsInt)
            }
          }
        }
        session.apply(upsert)
        if (count >= parameterTool.getInt("kudu.flush.buffer.bytes") / 2) {
          session.flush
        }
      }

      override def close(): Unit = {
        session.close()
        client.close()
      }
    }
    ).setParallelism(5)
    env.execute()
  }


  def buildFlinkKafkaConsumer(parameter: ParameterTool) = {
    val properties = new Properties
    properties.setProperty("bootstrap.servers", parameter.get("kafka.bootstrap.servers"))
    properties.setProperty("group.id", parameter.get("kafka.group.id"))
    properties.setProperty("request.timeout.ms", parameter.get("kafka.request.timeout.ms"))
    properties.setProperty("fetch.max.bytes", parameter.get("kafka.fetch.max.bytes"))
    val topic: String = parameter.get("kafka.bootstrap.topic")
    //new FlinkKafkaConsumer[String](java.util.regex.Pattern.compile(topic + ".*"), new SimpleStringSchema, properties)
    new FlinkKafkaConsumer[String](topic.split(",").toList, new SimpleStringSchema, properties)
  }

  def enableCheckPoint(env: StreamExecutionEnvironment): Unit = {
    env.enableCheckpointing(1000 * 60 * 3)
    val chkConfig: CheckpointConfig = env.getCheckpointConfig
    chkConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    chkConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    chkConfig.setMinPauseBetweenCheckpoints(1000 * 60 * 2)
    chkConfig.setCheckpointTimeout(1000 * 60 * 15)
    chkConfig.setMaxConcurrentCheckpoints(5)
    chkConfig.setTolerableCheckpointFailureNumber(500)
  }

}

读配置文件的工具类

package risen.util;


import org.apache.flink.api.java.utils.ParameterTool;

import java.io.IOException;

import static org.apache.kafka.common.requests.DeleteAclsResponse.log;


public class ParameterToolUtil {
    private static final String PROPERTIES_FILE_PATH = "/application.properties";

    /**
     * ParameterTool全局参数
     *
     * @return
     */
    public static ParameterTool getParameter() {
        try {
            return ParameterTool
                    .fromPropertiesFile(ParameterToolUtil.class.getResourceAsStream(PROPERTIES_FILE_PATH))
                    .mergeWith(ParameterTool.fromSystemProperties());

        } catch (IOException e) {
            log.error("获取ParameterTool全局参数异常");
        }
        return ParameterTool.fromSystemProperties();
    }

}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-17 11:59:21  更:2021-07-17 12:01:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 4:21:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码