ClickHouse学习笔记十之ClickHouse-Kafka Engine 正确的使用方式
简介
Kafka 是大数据领域非常流行的一款分布式消息中间件,是实时计算中必不可少的一环,同时一款 OLAP 系统能否对接 Kafka 也算是考量是否具备流批一体的衡量指标之一。ClickHouse 的 Kafka 表引擎能够直接与 Kafka 系统对接,进而订阅 Kafka 中的 Topic 并实时接受消息数据。
ClickHouse Kafka引擎是ClickHouse数据库管理系统提供的一种特殊表引擎,它允许ClickHouse直接从Apache Kafka中读取数据流。这种集成方式极大地提高了实时数据处理和分析的能力,使得ClickHouse能够实时地消费Kafka中的消息,并将其存储在ClickHouse的表中,以便进行快速查询和分析
ClickHouse Kafka引擎的工作原理
ClickHouse Kafka引擎的工作原理基于Kafka的消费者API。当创建了一个Kafka引擎表时,ClickHouse会作为Kafka的消费者订阅指定的主题(topic)。Kafka引擎表会不断地从Kafka中拉取新的消息,并将这些消息转换为ClickHouse能够理解的格式(如JSON、CSV等),然后存储在ClickHouse的表中。由于ClickHouse是列式存储数据库,它能够高效地处理这些结构化数据,提供快速的查询和分析能力。
ClickHouse Kafka引擎的使用场景
ClickHouse Kafka引擎的使用场景非常广泛,主要包括以下几个方面:
- 实时数据分析:将Kafka中的实时数据流导入ClickHouse,实现数据的实时查询和分析。
- 日志处理:将应用或系统的日志推送到Kafka,然后使用ClickHouse Kafka引擎进行日志的实时收集和处理,便于后续的日志分析和审计。
- 事件驱动系统:在事件驱动架构中,使用Kafka作为消息队列,ClickHouse Kafka引擎则负责消费这些事件,进行实时处理和响应。
- 数据集成:在数据集成过程中,将多个数据源的数据推送到Kafka,然后使用ClickHouse Kafka引擎进行统一处理和分析。
如何使用 Kafka Engine
Kafka 表引擎的声明方式如下所示:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];
其中必填参数如下:
kafka_broker_list: Broker 服务的地址列表,多个地址之间使用逗号分割,例如: ‘kafka-1.data:9092, kafka-2.data:9092’
kafka_topic_list: 表示订阅消息的 topic 名称列表,多个 topic 之间使用逗号分割,例如: ‘topic01,topic02’
kafka_group_name: 表示消费者组名称,遵循 kafka 消费者组订阅逻辑
kafka_format: 表示用于解析消息的数据格式,所有 topic 中的数据应保持指定的数据格式否则无法解析且 format 必须是 ClickHouse 提供的格式之一,例如:TSV、CSV、JSONEachRow等。
Kafka Engine同步数据流程
创建消息
@Getter
@Setter
@ToString
public class ClickDTO implements DTO {
@JSONField(name = "id")
private Long id;
@JSONField(name = "product_type")
private Integer productType;
@JSONField(name = "channel_type")
private String channelType;
@JSONField(name = "agent_name")
private String agentName;
@JSONField(name = "advertiser_id")
private String advertiserId;
@JSONField(name = "plan_id")
private String planId;
@JSONField(name = "adgroup_id")
private String adgroupId;
@JSONField(name = "creative_id")
private String creativeId;
@JSONField(name = "request_id")
private String requestId;
/**
* 媒体投放系统获取的用户终端的公共IP地址
*/
@JSONField(name = "ip")
private String ip;
/**
* 用户代理(User Agent),一个特殊字符串头,使得服务器能够识别客户使用的操作系统及版本、CPU类型、浏览器及版本、浏览器渲染引擎、浏览器语言、浏览器插件等。
*/
@JSONField(name = "ua")
private String ua;
/**
* 客户端发生广告点击事件的时间,以毫秒为单位时间戳
*/
@JSONField(name = "ts")
private String ts;
/**
* 安卓的设备 ID 的 md5 摘要,32位
*/
@JSONField(name = "imei")
private String imei;
/**
* IOS 6+的设备id字段,32位
*/
@JSONField(name = "idfa")
private String idfa;
/**
* 安卓id原值的md5,32位
*/
@JSONField(name = "android_id")
private String androidId;
/**
* Android Q及更高版本的设备号,32位
*/
@JSONField(name = "oaid")
private String oaid;
/**
* 操作系统平台 安卓:0;iOS:1;其他:3
*/
@JSONField(name = "os")
private Integer os;
/**
* 移动设备mac地址,转换成大写字母,去掉“:”,并且取md5摘要后的结果
*/
@JSONField(name = "mac")
private String mac;
@JSONField(name = "model")
private String model;
@JSONField(name = "brand")
private String brand;
@JSONField(name = "os_version")
private String osVersion;
@JSONField(name = "content_Id")
private Long contentId;
@JSONField(name = "content_index")
private Long contentIndex;
@JSONField(name = "callback")
private String callback;
@JSONField(name = "params")
private String params;
@JSONField(name = "data")
private String data;
@JSONField(name = "create_time")
@JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
@JSONField(name = "update_time")
@JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
private Date updateTime;
@JSONField(name = "ipv6")
private String ipv6;
@JSONField(name = "click_id")
private String clickId;
}
创建消费表
CREATE TABLE dwd.dwd_click_kafka
(
`id` UInt64,
`product_type` UInt32,
`channel_type` String,
`agent_name` String,
`advertiser_id` String,
`plan_id` String,
`adgroup_id` String,
`creative_id` String,
`request_id` String,
`ip` String,
`ua` String,
`ts` String,
`imei` String,
`idfa` String,
`android_id` String,
`oaid` String,
`os` UInt32,
`mac` String,
`model` String,
`brand` String,
`os_version` String,
`content_Id` UInt64,
`content_index` UInt64,
`callback` String,
`params` String,
`data` String,
`create_time` DateTime,
`update_time` DateTime,
`ipv6` String,
`click_id` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '172.0.0.1:9092', kafka_topic_list = 'click_ck_topic', kafka_group_name = 'click_ck_group', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 100, kafka_num_consumers = 4
创建存储表
因为 Kafka 消费表不能直接作为结果表使用。Kafka 消费表只是用来消费Kafka数据,没有真正的存储所有数据,只要查询一次,数据就会清空。因此需要在 ClickHouse 中创建存储表保存数据。
CREATE TABLE dwd.click
(
`id` UInt64,
`product_type` UInt32,
`channel_type` String,
`agent_name` String,
`advertiser_id` String,
`plan_id` String,
`adgroup_id` String,
`creative_id` String,
`request_id` String,
`ip` String,
`ua` String,
`ts` String,
`imei` String,
`idfa` String,
`android_id` String,
`oaid` String,
`os` UInt32,
`mac` String,
`model` String,
`brand` String,
`os_version` String,
`content_Id` UInt64,
`content_index` UInt64,
`callback` String,
`params` String,
`data` String,
`create_time` DateTime,
`update_time` DateTime,
`ipv6` String,
`click_id` String
)
ENGINE = ReplicatedMergeTree
PARTITION BY toYYYYMMDD(create_time)
ORDER BY (product_type, advertiser_id, id)
primary key (id)
COMMENT '投放点击表'
创建数据同步视图【物化视图】
创建 view 把 kafka 消费表消费到的数据导入 ClickHouse 存储表:
CREATE MATERIALIZED VIEW dwd.click_view TO dwd.click
(
`id` UInt64,
`product_type` UInt32,
`channel_type` String,
`agent_name` String,
`advertiser_id` String,
`plan_id` String,
`adgroup_id` String,
`creative_id` String,
`request_id` String,
`ip` String,
`ua` String,
`ts` String,
`imei` String,
`idfa` String,
`android_id` String,
`oaid` String,
`os` UInt32,
`mac` String,
`model` String,
`brand` String,
`os_version` String,
`content_Id` UInt64,
`content_index` UInt64,
`callback` String,
`params` String,
`data` String,
`create_time` DateTime,
`update_time` DateTime,
`ipv6` String,
`click_id` String
) AS
SELECT *
FROM dwd.click_kafka
使用ClickHouse Kafka引擎时可能遇到的常见问题及解决方案
常见问题:
- 数据不一致:当Kafka中的数据与ClickHouse中的数据不一致时,可能是由于消息丢失、重复或处理错误等原因造成的。
- 性能瓶颈:在处理大量数据时,ClickHouse Kafka引擎可能会遇到性能瓶颈,导致数据处理速度变慢。
- 配置错误:错误的配置可能导致ClickHouse无法正确连接到Kafka或消费消息。
解决方案:
- 数据一致性:确保Kafka和ClickHouse之间的数据同步机制是可靠的,并且设置适当的数据重试和错误处理策略。
- 性能优化:根据实际需求调整Kafka和ClickHouse的配置参数,如增加消费者数量、优化查询语句等,以提高处理性能。
- 配置检查:仔细检查Kafka和ClickHouse的配置文件,确保所有配置项都是正确的,并且符合当前环境的需求。此外,还可以使用监控工具来实时监控Kafka和ClickHouse的性能指标,以便及时发现和解决问题。