clickhouse-KAFKA引擎
2024-04-10 07:50:30  阅读数 2851

参考文档

https://clickhouse.tech/docs/en/engines/table-engines/integrations/kafka/

前言

kafka引擎并不存储数据,只相当于一个Publish or subscribe,如果要真正存储消费后的数据,需要结合物化视图和MergeTree引擎系列使用。而且kafka引擎只会消费一次,所以你在select 一次后然后在select就会发现之前的数据没有了~。最麻烦的是kafka引擎在消费中千万不要出现无法解析数据类型,否则你就只能先DETACH TABLE kafka reset-offsets ATTACH TABLE这样的流程来恢复数据消费服务了。

image2021-4-7_18-27-41.png

数据消费

1,新建kafka引擎表

CREATE TABLE ck_kafka_csv_01( `id` Int16,`name` String,`createDate` Date ) ENGINE = Kafka('xxxx:6667', 'ckkfktestcsv', 'ck_csv_001', 'CSV')

2,生产kafka数据

./kafka-console-producer.sh --broker-list xxxx:6667 --topic ckkfktestcsv
>1,www,2021-04-05
>2,wanghg11,2021-04-06
>3,test,2021-04-07

3,第一次select


1.png

4,第二次select

2.png

这说明kafka引擎只会消费一次数据

5,查看kafka引擎表

SELECT  column AS `字段名`, any(type) AS `类型`, formatReadableSize(sum(column_data_uncompressed_bytes)) AS `原始大小`, formatReadableSize(sum(column_data_compressed_bytes)) AS `压缩大小`, sum(rows) AS `行数` FROM system.parts_columns WHERE (database = 'default') AND (table = 'ck_kafka_csv_01') GROUP BY column ORDER BY column ASC

3.png

你会发下并没有任何数据。所以kafka引擎并不存储数据

结合物化视图

使用物化视图触发写MergeTree引擎是主要的使用方案。

思路


4.png

1,新建topic

--测试引擎消费
./kafka-topics.sh --create --topic ckkfktest_origin --partitions  3  --zookeeper xxxx:2181 --replication-factor 1
 
--测试引擎生产
./kafka-topics.sh --create --topic ckkfktest_producer --partitions  3  --zookeeper xxxx:2181 --replication-factor 1

2,新建表

--新建kafka引擎消费表
CREATE TABLE kfk_ckkfktest_origin( q_date Date, level String, message String) ENGINE =  Kafka('xxxx:6667', 'ckkfktest_origin', 'kfk_ckkfktest_groupid_001', 'CSV');
 
--新建kafka引擎生产表
CREATE TABLE kfk_ckkfktest_producer( day Date, level String,  message String) ENGINE = Kafka SETTINGS kafka_broker_list = 'xxxx:6667',kafka_topic_list = 'ckkfktest_producer', kafka_group_name = 'kfk_ckkfktest_groupid_002',kafka_format = 'CSV';
 
--新建MergeTree表
CREATE TABLE t_daily ( day Date, level String,  message String ) ENGINE = MergeTree(day, (day, level), 8192);
 
--新建物化视图表
CREATE MATERIALIZED VIEW mv_consumer TO t_daily AS SELECT q_date AS day, level, message FROM kfk_ckkfktest_origin;
CREATE MATERIALIZED VIEW mv_producer TO kfk_ckkfktest_producer AS SELECT q_date AS day, level, message FROM kfk_ckkfktest_origin;

3,写kafka数据

./kafka-console-producer.sh --broker-list xxxxx:6667 --topic ckkfktest_origin
>2020-04-07,1,www
>2020-04-06,2,aaaa
>2020-04-08,3,bbb

4,结果


5.png

MergeTree数据表中和kafka发送的数据同步。

6.png

kafka 引擎生产数据正常同步。

总结
一个数据库就应该做一个数据库做的事,应该尽量避免使用kafka引擎,如果是小数据量的数据同步,且数据格式严格准确,可以结合物化视图加MergeTree系列引擎的方式实现kafka的数据消费生产功能。

Tips

为啥不建议用kafka engine?
0,数据库就应该只做数据库的事
1,业务的可控性,如果是使用kafka engine 则需要在写之前就要做好计算逻辑,在写kafka,那还不如flink和spark或者代码直接写ck
2,kafka engine 之前有公司用过,不够稳定,很难维护,而且最近业内说好像还有bug,自己维护源码成本较高
3,kafka engine属于ck的引擎表,是db进程,使用这个会影响系统的稳定性,业内的的kafka导入好像是和db进程分开的,db就应该做db的事。
4,kafka engine 不支持自定义分区策略
5,kafka engine数据不能保证一致性
6,等等其他