IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 实现flink自定义幂等写入ClickHouse,并封装成通用工具类 -> 正文阅读

[大数据]实现flink自定义幂等写入ClickHouse,并封装成通用工具类

**

实现flink自定义幂等写入ClickHouse,并封装成通用工具类

**

ClickHouse建表语句(按user分区,一个用户一个区,重复写入,只会改变url和timestamp,user的值不会发生改变,通过调整order by 后的字段,可以调整幂等写入时值不会发生改变的字段)

create table  Event(
        user  String ,
        url  String ,
        timestamp UInt64
        ) engine =ReplacingMergeTree(timestamp)
        partition by user
        order by  (user);

自定义操作ClickHouse的通用工具类,传参String字符串(一条sql语句)

import com.atguigu.flinktest.selfdemo.bean.TransientSink;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * 操作ClickHouse的工具类
 */

public class ClickHouseUtil {
    public static <T> SinkFunction<T> getJdbcSink(String sql) {
        SinkFunction<T> sinkFuction = JdbcSink.<T>sink(
                sql,
                new JdbcStatementBuilder<T>() {
                    @Override
                    public void accept(PreparedStatement ps, T obj) throws SQLException {
//                        获取流中对象的类型
//                        Class<?> clz = obj.getClass();

                        //获取流中对象所属类型中定义的属性
                        Field[] fieds = obj.getClass().getDeclaredFields();

                        //对属性进行遍历
                        int skipNum = 0;
                        for (int i = 0; i < fieds.length; i++) {
                            //每次遍历得到类中的一个属性对象
                            Field fied = fieds[i];
                            //设置访问权限  true代表私有变量也可以操作
                            fied.setAccessible(true);

                            //判断当前属性是否需要保存到CK中 通过当前属性获取TransientSink,判断其上有没有TransientSink注解
                            TransientSink transientSink = fied.getAnnotation(TransientSink.class);//该属性从自定义注解中获取值(对象)
                            //当transientSink为空时,其上没有TransientSink注解,允许写入Clickhouse
                            if (transientSink != null) {
                                skipNum++;
                                continue;
                            }

                            try {
                                //获取属性的值
                                Object value = fied.get(obj);
                                //将属性的值赋值给(sql中)对应的问号占位符
                                ps.setObject(i + 1 - skipNum, value);

                            } catch (IllegalAccessException e) {
                                e.printStackTrace();
                            }

                        }
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(5)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUrl("jdbc:clickhouse://hadoop102:8123/test")
                        .build()

        );
        return sinkFuction;
    }
}

自定义数据源

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Random;

// 自定义数据源
public class Demo1 {


    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new com.atguigu.flinktest.day02.Example1.ClickSource()).print();
        env.execute();
    }

    public static class Event{
        public String user;
        public String url;
        public Long timestamp;

        public Event() {
        }

        public Event(String user, String url, Long timestamp) {
            this.user = user;
            this.url = url;
            this.timestamp = timestamp;
        }

        @Override
        public String toString() {
            return "Event{" +
                    "user='" + user + '\'' +
                    ", url='" + url + '\'' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }
    }

    // 自定义数据源
    // SourceFunction<T>, T是数据流中的元素类型
    // SourceFunction只能用来产生并行度为1的数据源
    // ParallelSourceFunction可以产生并行数据源
    public static class ClickSource implements SourceFunction<Event> {
        private boolean running=true;
        private Random random=new Random();
        private String[] userArr={"Mary", "Bob", "Alice", "Liz"};
        private String[] urlArr={"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};

        // 任务开始时触发run的调用
        @Override
        public void run(SourceContext<Event> sourceContext) throws Exception {
            while (running){
                String user=userArr[random.nextInt(userArr.length)];
                String url=urlArr[random.nextInt(urlArr.length)];
                long timestamp = Calendar.getInstance().getTimeInMillis();// 毫秒时间戳(机器时间)
                Event event=new Event(user,url,timestamp);
                // 使用collect方法发射数据
                sourceContext.collect(event);
                Thread.sleep(100L);

            }
        }

        // 在取消任务时触发调用cancel,例如在web ui点击任务的cancel按钮
        @Override
        public void cancel() {
            running=false;
        }
    }

}

自定义注解(本文对应数据库的实体类是,Demo1.Event,如果不想让其内属性插入到数据库中,可以在属性上加自定义注解)

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 自定义注解,用该注解标注的属性 不需要保存到ClinkHoouse中
 */
@Target(ElementType.FIELD) //元注解 => 指明这个自定义注解加的位置 ,这里是放到属性上
@Retention(RetentionPolicy.RUNTIME) //元注解 => 指明有效时刻(运行、编译、源码) ,这里是运行时有效
public @interface TransientSink {

}

测试类

import com.atguigu.flinktest.selfdemo.bean.Demo1;
import com.atguigu.flinktest.selfdemo.util.ClickHouseUtil;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class IntoClickhouse {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //获取数据源
        DataStreamSource<Demo1.Event> eventDs = env.addSource(new Demo1.ClickSource());
        //测试输出
        eventDs.print(">>>>>");
        //写入clickhouse
        eventDs.addSink(ClickHouseUtil.getJdbcSink("insert into Event values(?,?,?)"));
        env.execute();
    }
}

用到的flink依赖在pom.xml里

    <properties>
        <flink.version>1.13.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <!-->flink-connector-jdbc flink版本需在1.11.0之后<!-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
<!--   clickhouse的连接     ======-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-->clickhouse jdbc连接<!-->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.1.55</version>
        </dependency>

<!--        ==========-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
<!--   json的相关依赖     -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

<!--     阿里巴巴 fast json   -->

        <dependency>
            <groupId>org.openjdk.jol</groupId>
            <artifactId>jol-core</artifactId>
            <version>0.9</version>
        </dependency>
    </dependencies>
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-26 12:08:54  更:2021-07-26 12:12:11 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 5:17:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码