https://clickhouse.tech/docs/en/engines/table-engines/integrations/kafka/
kafka引擎并不存储数据,只相当于一个Publish or subscribe,如果要真正存储消费后的数据,需要结合物化视图和MergeTree引擎系列使用。而且kafka引擎只会消费一次,所以你在select 一次后然后在select就会发现之前的数据没有了~。最麻烦的是kafka引擎在消费中千万不要出现无法解析数据类型,否则你就只能先DETACH TABLE kafka reset-offsets ATTACH TABLE这样的流程来恢复数据消费服务了。
1,新建kafka引擎表
CREATE TABLE ck_kafka_csv_01( `id` Int16,`name` String,`createDate` Date ) ENGINE = Kafka('xxxx:6667', 'ckkfktestcsv', 'ck_csv_001', 'CSV')
2,生产kafka数据
./kafka-console-producer.sh --broker-list xxxx:6667 --topic ckkfktestcsv
>1,www,2021-04-05
>2,wanghg11,2021-04-06
>3,test,2021-04-07
3,第一次select
4,第二次select
5,查看kafka引擎表
SELECT column AS `字段名`, any(type) AS `类型`, formatReadableSize(sum(column_data_uncompressed_bytes)) AS `原始大小`, formatReadableSize(sum(column_data_compressed_bytes)) AS `压缩大小`, sum(rows) AS `行数` FROM system.parts_columns WHERE (database = 'default') AND (table = 'ck_kafka_csv_01') GROUP BY column ORDER BY column ASC
使用物化视图触发写MergeTree引擎是主要的使用方案。
思路
1,新建topic
--测试引擎消费
./kafka-topics.sh --create --topic ckkfktest_origin --partitions 3 --zookeeper xxxx:2181 --replication-factor 1
--测试引擎生产
./kafka-topics.sh --create --topic ckkfktest_producer --partitions 3 --zookeeper xxxx:2181 --replication-factor 1
2,新建表
--新建kafka引擎消费表
CREATE TABLE kfk_ckkfktest_origin( q_date Date, level String, message String) ENGINE = Kafka('xxxx:6667', 'ckkfktest_origin', 'kfk_ckkfktest_groupid_001', 'CSV');
--新建kafka引擎生产表
CREATE TABLE kfk_ckkfktest_producer( day Date, level String, message String) ENGINE = Kafka SETTINGS kafka_broker_list = 'xxxx:6667',kafka_topic_list = 'ckkfktest_producer', kafka_group_name = 'kfk_ckkfktest_groupid_002',kafka_format = 'CSV';
--新建MergeTree表
CREATE TABLE t_daily ( day Date, level String, message String ) ENGINE = MergeTree(day, (day, level), 8192);
--新建物化视图表
CREATE MATERIALIZED VIEW mv_consumer TO t_daily AS SELECT q_date AS day, level, message FROM kfk_ckkfktest_origin;
CREATE MATERIALIZED VIEW mv_producer TO kfk_ckkfktest_producer AS SELECT q_date AS day, level, message FROM kfk_ckkfktest_origin;
3,写kafka数据
./kafka-console-producer.sh --broker-list xxxxx:6667 --topic ckkfktest_origin
>2020-04-07,1,www
>2020-04-06,2,aaaa
>2020-04-08,3,bbb
4,结果
MergeTree数据表中和kafka发送的数据同步。
kafka 引擎生产数据正常同步。
总结
一个数据库就应该做一个数据库做的事,应该尽量避免使用kafka引擎,如果是小数据量的数据同步,且数据格式严格准确,可以结合物化视图加MergeTree系列引擎的方式实现kafka的数据消费生产功能。
为啥不建议用kafka engine?
0,数据库就应该只做数据库的事
1,业务的可控性,如果是使用kafka engine 则需要在写之前就要做好计算逻辑,在写kafka,那还不如flink和spark或者代码直接写ck
2,kafka engine 之前有公司用过,不够稳定,很难维护,而且最近业内说好像还有bug,自己维护源码成本较高
3,kafka engine属于ck的引擎表,是db进程,使用这个会影响系统的稳定性,业内的的kafka导入好像是和db进程分开的,db就应该做db的事。
4,kafka engine 不支持自定义分区策略
5,kafka engine数据不能保证一致性
6,等等其他