ClickHouse学习笔记十之ClickHouse-Kafka Engine 正确的使用方式

ClickHouse学习笔记十之ClickHouse-Kafka Engine 正确的使用方式

解决方案goocz2025-03-26 10:54:5324A+A-

简介

Kafka 是大数据领域非常流行的一款分布式消息中间件,是实时计算中必不可少的一环,同时一款 OLAP 系统能否对接 Kafka 也算是考量是否具备流批一体的衡量指标之一。ClickHouse 的 Kafka 表引擎能够直接与 Kafka 系统对接,进而订阅 Kafka 中的 Topic 并实时接受消息数据。


ClickHouse Kafka引擎是ClickHouse数据库管理系统提供的一种特殊表引擎,它允许ClickHouse直接从Apache Kafka中读取数据流。这种集成方式极大地提高了实时数据处理和分析的能力,使得ClickHouse能够实时地消费Kafka中的消息,并将其存储在ClickHouse的表中,以便进行快速查询和分析

ClickHouse Kafka引擎的工作原理

ClickHouse Kafka引擎的工作原理基于Kafka的消费者API。当创建了一个Kafka引擎表时,ClickHouse会作为Kafka的消费者订阅指定的主题(topic)。Kafka引擎表会不断地从Kafka中拉取新的消息,并将这些消息转换为ClickHouse能够理解的格式(如JSON、CSV等),然后存储在ClickHouse的表中。由于ClickHouse是列式存储数据库,它能够高效地处理这些结构化数据,提供快速的查询和分析能力。

ClickHouse Kafka引擎的使用场景

ClickHouse Kafka引擎的使用场景非常广泛,主要包括以下几个方面:

  • 实时数据分析:将Kafka中的实时数据流导入ClickHouse,实现数据的实时查询和分析。
  • 日志处理:将应用或系统的日志推送到Kafka,然后使用ClickHouse Kafka引擎进行日志的实时收集和处理,便于后续的日志分析和审计。
  • 事件驱动系统:在事件驱动架构中,使用Kafka作为消息队列,ClickHouse Kafka引擎则负责消费这些事件,进行实时处理和响应。
  • 数据集成:在数据集成过程中,将多个数据源的数据推送到Kafka,然后使用ClickHouse Kafka引擎进行统一处理和分析。

如何使用 Kafka Engine

Kafka 表引擎的声明方式如下所示:

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];

其中必填参数如下:

kafka_broker_list: Broker 服务的地址列表,多个地址之间使用逗号分割,例如: ‘kafka-1.data:9092, kafka-2.data:9092’

kafka_topic_list: 表示订阅消息的 topic 名称列表,多个 topic 之间使用逗号分割,例如: ‘topic01,topic02’

kafka_group_name: 表示消费者组名称,遵循 kafka 消费者组订阅逻辑

kafka_format: 表示用于解析消息的数据格式,所有 topic 中的数据应保持指定的数据格式否则无法解析且 format 必须是 ClickHouse 提供的格式之一,例如:TSV、CSV、JSONEachRow等。

Kafka Engine同步数据流程

创建消息

@Getter
@Setter
@ToString
public class ClickDTO implements DTO {
    @JSONField(name = "id")
    private Long id;

    @JSONField(name = "product_type")
    private Integer productType;

    @JSONField(name = "channel_type")
    private String channelType;

    @JSONField(name = "agent_name")
    private String agentName;

    @JSONField(name = "advertiser_id")
    private String advertiserId;

    @JSONField(name = "plan_id")
    private String planId;

    @JSONField(name = "adgroup_id")
    private String adgroupId;

    @JSONField(name = "creative_id")
    private String creativeId;

    @JSONField(name = "request_id")
    private String requestId;

    /**
     * 媒体投放系统获取的用户终端的公共IP地址
     */
    @JSONField(name = "ip")
    private String ip;

    /**
     * 用户代理(User Agent),一个特殊字符串头,使得服务器能够识别客户使用的操作系统及版本、CPU类型、浏览器及版本、浏览器渲染引擎、浏览器语言、浏览器插件等。
     */
    @JSONField(name = "ua")
    private String ua;

    /**
     * 客户端发生广告点击事件的时间,以毫秒为单位时间戳
     */
    @JSONField(name = "ts")
    private String ts;

    /**
     * 安卓的设备 ID 的 md5 摘要,32位
     */
    @JSONField(name = "imei")
    private String imei;

    /**
     * IOS 6+的设备id字段,32位
     */
    @JSONField(name = "idfa")
    private String idfa;

    /**
     * 安卓id原值的md5,32位
     */
    @JSONField(name = "android_id")
    private String androidId;

    /**
     * Android Q及更高版本的设备号,32位
     */
    @JSONField(name = "oaid")
    private String oaid;

    /**
     * 操作系统平台 安卓:0;iOS:1;其他:3
     */
    @JSONField(name = "os")
    private Integer os;

    /**
     * 移动设备mac地址,转换成大写字母,去掉“:”,并且取md5摘要后的结果
     */
    @JSONField(name = "mac")
    private String mac;

    @JSONField(name = "model")
    private String model;

    @JSONField(name = "brand")
    private String brand;

    @JSONField(name = "os_version")
    private String osVersion;

    @JSONField(name = "content_Id")
    private Long contentId;

    @JSONField(name = "content_index")
    private Long contentIndex;

    @JSONField(name = "callback")
    private String callback;

    @JSONField(name = "params")
    private String params;

    @JSONField(name = "data")
    private String data;

    @JSONField(name = "create_time")
    @JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
    private Date createTime;

    @JSONField(name = "update_time")
    @JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
    private Date updateTime;

    @JSONField(name = "ipv6")
    private String ipv6;

    @JSONField(name = "click_id")
    private String clickId;
}

创建消费表

CREATE TABLE dwd.dwd_click_kafka
(
    `id` UInt64,
    `product_type` UInt32,
    `channel_type` String,
    `agent_name` String,
    `advertiser_id` String,
    `plan_id` String,
    `adgroup_id` String,
    `creative_id` String,
    `request_id` String,
    `ip` String,
    `ua` String,
    `ts` String,
    `imei` String,
    `idfa` String,
    `android_id` String,
    `oaid` String,
    `os` UInt32,
    `mac` String,
    `model` String,
    `brand` String,
    `os_version` String,
    `content_Id` UInt64,
    `content_index` UInt64,
    `callback` String,
    `params` String,
    `data` String,
    `create_time` DateTime,
    `update_time` DateTime,
    `ipv6` String,
    `click_id` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '172.0.0.1:9092', kafka_topic_list = 'click_ck_topic', kafka_group_name = 'click_ck_group', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 100, kafka_num_consumers = 4

创建存储表

因为 Kafka 消费表不能直接作为结果表使用。Kafka 消费表只是用来消费Kafka数据,没有真正的存储所有数据,只要查询一次,数据就会清空。因此需要在 ClickHouse 中创建存储表保存数据。

CREATE TABLE dwd.click
(
    `id` UInt64,
    `product_type` UInt32,
    `channel_type` String,
    `agent_name` String,
    `advertiser_id` String,
    `plan_id` String,
    `adgroup_id` String,
    `creative_id` String,
    `request_id` String,
    `ip` String,
    `ua` String,
    `ts` String,
    `imei` String,
    `idfa` String,
    `android_id` String,
    `oaid` String,
    `os` UInt32,
    `mac` String,
    `model` String,
    `brand` String,
    `os_version` String,
    `content_Id` UInt64,
    `content_index` UInt64,
    `callback` String,
    `params` String,
    `data` String,
    `create_time` DateTime,
    `update_time` DateTime,
    `ipv6` String,
    `click_id` String
)
ENGINE = ReplicatedMergeTree
PARTITION BY toYYYYMMDD(create_time)
ORDER BY (product_type, advertiser_id, id)
primary key (id)
COMMENT '投放点击表'

创建数据同步视图【物化视图】

创建 view 把 kafka 消费表消费到的数据导入 ClickHouse 存储表:

CREATE MATERIALIZED VIEW dwd.click_view TO dwd.click
(
    `id` UInt64,
    `product_type` UInt32,
    `channel_type` String,
    `agent_name` String,
    `advertiser_id` String,
    `plan_id` String,
    `adgroup_id` String,
    `creative_id` String,
    `request_id` String,
    `ip` String,
    `ua` String,
    `ts` String,
    `imei` String,
    `idfa` String,
    `android_id` String,
    `oaid` String,
    `os` UInt32,
    `mac` String,
    `model` String,
    `brand` String,
    `os_version` String,
    `content_Id` UInt64,
    `content_index` UInt64,
    `callback` String,
    `params` String,
    `data` String,
    `create_time` DateTime,
    `update_time` DateTime,
    `ipv6` String,
    `click_id` String
) AS
SELECT *
FROM dwd.click_kafka

使用ClickHouse Kafka引擎时可能遇到的常见问题及解决方案

常见问题

  • 数据不一致:当Kafka中的数据与ClickHouse中的数据不一致时,可能是由于消息丢失、重复或处理错误等原因造成的。
  • 性能瓶颈:在处理大量数据时,ClickHouse Kafka引擎可能会遇到性能瓶颈,导致数据处理速度变慢。
  • 配置错误:错误的配置可能导致ClickHouse无法正确连接到Kafka或消费消息。

解决方案

  • 数据一致性:确保Kafka和ClickHouse之间的数据同步机制是可靠的,并且设置适当的数据重试和错误处理策略。
  • 性能优化:根据实际需求调整Kafka和ClickHouse的配置参数,如增加消费者数量、优化查询语句等,以提高处理性能。
  • 配置检查:仔细检查Kafka和ClickHouse的配置文件,确保所有配置项都是正确的,并且符合当前环境的需求。此外,还可以使用监控工具来实时监控Kafka和ClickHouse的性能指标,以便及时发现和解决问题。
点击这里复制本文地址 以上内容由goocz整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!

果子教程网 © All Rights Reserved.  蜀ICP备2024111239号-5