一 SQL操作
基本上来说传统关系型数据库(以MySQL为例)的SQL语句,基本支持但是也有不一样的地方。这里只介绍Clickhouse与标准SQL(MySQL)不一致的地方。
1 Insert
基本与标准SQL(MySQL)基本一致
包括标准 insert into [table_name] values(…),(….)
以及 从表到表的插入
insert into [table_name] select a,b,c from [table_name_2]
2 Update 和 Delete
ClickHouse提供了Delete 和Update的能力,这类操作被称为Mutation查询,它可以看做Alter 的一种。
虽然可以实现修改和删除,但是和一般的OLTP数据库不一样,Mutation语句是一种很“重”的操作,而且不支持事务。
“重”的原因主要是每次修改或者删除都会导致放弃目标数据的原有分区,重建新分区。所以尽量做批量的变更,不要进行频繁小数据的操作。
删除操作
alter table t_order_smt delete where sku_id ='sku_001';
修改操作
alter table t_order_smt
update total_amount=toDecimal32(2000.00,2)
where uid = 102;
由于操作比较“重”,所以 Mutation语句分两步执行,同步执行的部分其实只是进行新增数据新增分区和并把旧分区打上逻辑上的失效标记。直到触发分区合并的时候,才会删除旧数据释放磁盘空间。
3 查询操作
clickhouse基本上与标准SQL 差别不大。
模拟数据
insert into t_order_mt
values(101,'sku_001',1000.00,'2020-06-01 12:00:00') ,
(101,'sku_002',2000.00,'2020-06-01 12:00:00'),
(101,'sku_004',2500.00,'2020-06-01 12:00:00'),
(101,'sku_002',2000.00,'2020-06-01 12:00:00')
(101,'sku_003',600.00,'2020-06-02 12:00:00'),
(110,'sku_001',1000.00,'2020-06-04 12:00:00'),
(110,'sku_002',2000.00,'2020-06-04 12:00:00'),
(110,'sku_004',2500.00,'2020-06-04 12:00:00'),
(110,'sku_002',2000.00,'2020-06-04 12:00:00'),
(110,'sku_003',600.00,'2020-06-01 12:00:00')
select uid,sku_id,sum(total_amount) from t_order_mt group by uid,sku_id with rollup;
with rollup : 从右至左去掉维度进行小计【上卷】。
结果如图:

select uid,sku_id,sum(total_amount) from t_order_mt group by uid,sku_id with cube;
with cube : 从右至左去掉维度进行小计,再从左至右去掉维度进行小计。
结果如图:

select uid,sku_id,sum(total_amount) from t_order_mt group by uid,sku_id with totals;
with totals: 只计算合计。

4 alter操作
同mysql的修改字段基本一致,
新增字段
alter table tableName add column newcolname String after col1
after col1 是指在哪个字段后面加新字段
修改字段类型
alter table tableName modify column newcolname String ;
删除字段
alter table tableName drop column newcolname;
5 导出数据
即席查询,将数据导出为一个文件。
clickhouse-client --query "select toHour(create_time) hr ,count(*) from test1.order_wide where dt='2020-06-23' group by hr" --format CSVWithNames> ~/rs1.csv
如
[hzy@hadoop101 ~]$ clickhouse-client --query "select uid,sku_id,sum(total_amount) from test.t_order_mt group by uid,sku_id with cube" --format CSVWithNames> ~/rs1.csv
文件内如如下
"uid","sku_id","sum(total_amount)"
110,"sku_003",1200.00
101,"sku_003",600.00
101,"sku_002",4000.00
101,"sku_004",2500.00
101,"sku_001",1000.00
110,"",1200.00
101,"",8100.00
0,"sku_003",1800.00
0,"sku_004",2500.00
0,"sku_001",1000.00
0,"sku_002",4000.00
0,"",9300.00
二 副本(高可用)
副本的目的主要是保障数据的高可用性,即使一台clickhouse节点宕机,那么也可以从其他服务器获得相同的数据。
1 副本写入流程

2 副本配置
- 这时需要启动zookeeper集群 和另外一台clickhouse 服务器
- 另外一台clickhouse服务器的安装完全和第一台一直即可。
- 在两台服务器的/etc/clickhouse-server/config.d目录下创建一个名为metrika.xml的配置文件(声明zookeeper的集群):
<?xml version="1.0"?>
<yandex>
<zookeeper-servers>
<node index="1">
<host>hadoop101</host>
<port>2181</port>
</node>
<node index="2">
<host>hadoop102</host>
<port>2181</port>
</node>
<node index="3">
<host>hadoop103</host>
<port>2181</port>
</node>
</zookeeper-servers>
</yandex>
在 /etc/clickhouse-server/config.xml 中增加
<zookeeper incl="zookeeper-servers" optional="true" />
<include_from>/etc/clickhouse-server/config.d/metrika.xml</include_from>
启动zookeeper
zk.sh start
分别启动ClickHouse
sudo systemctl start clickhouse-server
只用是表引擎为ReplicatedMergeTree的表才能够进行同步,专用于主从复制。
在两台机器上分别建表
A机器
create table rep_t_order_mt_1214 (
uid UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time Datetime
) engine =ReplicatedMergeTree('/clickhouse/tables/01/rep_t_order_mt_1214','rep_hdp1')
partition by toYYYYMMDD(create_time)
primary key (uid)
order by (uid ,sku_id );
B机器
create table rep_t_order_mt_1214 (
uid UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time Datetime
) engine =ReplicatedMergeTree('/clickhouse/tables/01/rep_t_order_mt_1214','rep_hdp2')
partition by toYYYYMMDD(create_time)
primary key (uid)
order by (uid ,sku_id );
3 参数解释
ReplicatedMergeTree 中
- 第一参数是分片的zk_path,一般按照: /clickhouse/table/{shard}/{table_name} 的格式写,如果只有一个分片就写01即可。
- 第二个参数是副本名称,相同的分片副本名称不能相同。
现向任意一张表插入数据,在两张表中都可以查找到数据
insert into rep_t_order_mt_1214
values(101,'sku_001',1000.00,'2020-06-01 12:00:10') ,
(102,'sku_002',2000.00,'2020-06-01 12:00:10'),
(103,'sku_004',2500.00,'2020-06-01 12:00:10'),
(104,'sku_002',2000.00,'2020-06-01 12:00:20')
(105,'sku_003',600.00,'2020-06-02 12:00:10')
特别说明 :
ReplicatedMergeTree,为了防止数据复制过程中的重试操作造成数据重复,加入了幂等性处理。目前幂等性识别是以数据完成一致(两条语句一模一样)为准进行去重。
每次插入数据,以分区为单位,如果插入的一批数据中,有一条数据不同,就会将所有的数据全部插入进去。
三 分片集群
副本虽然能够提高数据的可用性,降低丢失风险,但是对数据的横向扩容(多台机器共同分担一份数据)没有解决。每台机子实际上必须容纳全量数据,对集群的性能并没有提升。
要解决数据水平切分的问题,需要引入分片的概念。通过分片把一份完整的数据进行切分,不同的分片分布到不同的节点上。在通过Distributed表引擎把数据拼接起来一同使用。
Distributed表引擎本身不存储数据,有点类似于MyCat之于MySql,成为一种中间件,通过分布式逻辑表来写入、分发、路由来操作多台节点不同分片的分布式数据。
1 写入原理
在ClickHouse中分摊数据需要一个中间结点,称为分布式表,有点类似于一个代理,分布式表负责数据的存储位置和如何分摊。
以后有请求,直接发送给分布式表,分布式表根据写入的算法(如哈希值)分发到不同的机器上,真实存储数据的表称为本地表,分布式表不存储数据,只是一个逻辑表,分布式表不用部署在多台机器上,在一台机器上就足以,如下图中,分布式表和A本地表可以在同一个机器上。
这里分为两种数据,一是分布式表和本地表同节点的数据,一是分布式表和本地表不同节点的数据,如果直接发送,远端到分布式表,分布式表再到不同节点,这样远端会阻塞这个请求。为了提高写入性能,不想让用户等请求转发完才算写入完成,会将这份数据缓存在分布式表的节点上,然后异步发送给其他节点,只要缓存完成了,这次写入操作就算完成了。
为了提高数据的稳定性,既要分片,又要高可用,所以有了下图的AA,BB,CC,通过zookeeper协调A和AA,B和BB的关系,这种方式称为内部拷贝;还可以在分布式表写入数据时,同时向A和AA分别写一份,官方推荐使用内部拷贝,不实用内部拷贝会给分布式表带来更大的负担。


2 读取原理
分布式表会把读取的请求分发到所有分片
同一分片的不同副本如何发送
- 先看副本的错误计数,优先选择小的
- 错误计算相同有随机、顺序、随机、host名称近似等四种选择方式

3 分片配置
副本虽然能够提高数据的可用性,降低丢失风险,但是对数据的横向扩容没有解决。每台机子实际上必须容纳全量数据。
要解决数据水平切分的问题,需要引入分片的概念。通过分片把一份完整的数据进行切分,不同的分片分布到不同的节点上。在通过Distributed表引擎把数据拼接起来一同使用。
Distributed表引擎本身不存储数据,有点类似于MyCat之于MySql,成为一种中间件,通过分布式逻辑表来写入、分发、路由来操作多台节点不同分片的分布式数据。
<yandex>
<clickhouse_remote_servers>
<gmall_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>hdp1</host>
<port>9000</port>
</replica>
<replica>
<host>hdp2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>hdp3</host>
<port>9000</port>
</replica>
<replica> <!—该分片的第二个副本-->
<host>hdp4</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>hdp5</host>
<port>9000</port>
</replica>
<replica>
<host>hdp6</host>
<port>9000</port>
</replica>
</shard>
</gmall_cluster>
</clickhouse_remote_servers>
</yandex>
4 三节点版本配置
三主三从需要六台机器,现将集群规模进行精简,两个分片,其中一个分片有两个节点,一个分片有一个节点,如下图。

hdp1 | hdp2 | hdp3 |
---|
01 rep_1_1 | 01 rep_1_2 | 02 rep_2_1 |
(1)metrika-shards.xml 的配置
在三台机器配置
sudo vim /etc/clickhouse-server/config.d/metrika-shards.xml
metrika-shards.xml 的配置如下 :
<yandex>
<clickhouse_remote_servers>
<gmall_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>hadoop101</host>
<port>9000</port>
</replica>
<replica>
<host>hadoop102</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>hadoop103</host>
<port>9000</port>
</replica>
</shard>
</gmall_cluster>
</clickhouse_remote_servers>
<zookeeper-servers>
<node index="1">
<host>hadoop101</host>
<port>2181</port>
</node>
<node index="2">
<host>hadoop102</host>
<port>2181</port>
</node>
<node index="3">
<host>hadoop103</host>
<port>2181</port>
</node>
</zookeeper-servers>
<macros>
<shard>01</shard>
<replica>rep_1_1</replica>
</macros>
</yandex>
不同机器的此项配置不同
hadoop101
<macros>
<shard>01</shard> <!--不同机器放的分片数不一样-->
<replica>rep_1_1</replica> <!--不同机器放的副本数不一样-->
</macros>
hadoop102
<macros>
<shard>01</shard> <!--不同机器放的分片数不一样-->
<replica>rep_1_2</replica> <!--不同机器放的副本数不一样-->
</macros>
hadoop103
<macros>
<shard>01</shard> <!--不同机器放的分片数不一样-->
<replica>rep_2_1</replica> <!--不同机器放的副本数不一样-->
</macros>
(2)集群服务器的配置
在三台机器执行
sudo vim /etc/clickhouse-server/config.xml
需要在config.xml ,中找到remote_servers 标签, 补充
<remote_servers incl="clickhouse_remote_servers"> <!--将(1)中关于远端的配置引用过来-->
(3)zookeeper的配置
在config.xml中加入
<zookeeper incl="zookeeper-servers" optional="true" />
<include_from>/etc/clickhouse-server/config.d/metrika-shards.xml</include_from>
三台机器重启服务
sudo systemctl restart clickhouse-server
ps -ef|grep clickhouse
(4)创建本地表
必须创建于三个节点都有的同名数据库下,在集群中任一节点执行以下语句
create table st_order_mt_1214 on cluster gmall_cluster (
uid UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time Datetime
) engine =ReplicatedMergeTree('/clickhouse/tables/{shard}/st_order_mt_1214','{replica}')
partition by toYYYYMMDD(create_time)
primary key (uid)
order by (uid ,sku_id );
完成之后会出现提示信息,并在其他节点也可以查询到该表。
(5)创建分布式表
create table st_order_mt_1214_all on cluster gmall_cluster
(
uid UInt32,
sku_id String,
total_amount Decimal(16,2),
create_time Datetime
)engine = Distributed(gmall_cluster,test, st_order_mt_1214,hiveHash(sku_id))
其中参数:
- Distributed( 集群名称,库名,本地表名,分片键)
- 分片键必须是整型数字 ,也可以rand()
插入数据
insert into st_order_mt_1214_all
values(201,'sku_001',1000.00,'2020-06-01 12:00:00') ,
(202,'sku_002',2000.00,'2020-06-01 12:00:00'),
(203,'sku_004',2500.00,'2020-06-01 12:00:00'),
(204,'sku_002',2000.00,'2020-06-01 12:00:00')
(205,'sku_003',600.00,'2020-06-02 12:00:00')
通过查询分布式表语句
SELECT * FROM st_order_mt_all
和本地表
select * from st_order_mt;
观察数据的分布是否正确。


- hadoop102:st_order_mt(与hadoop101是复制关系)

- hadoop103:st_order_mt(与hadoop101、102是互补关系)

|