ClickHouse学习笔记六从一次报警开始调优

ClickHouse学习笔记六从一次报警开始调优

解决方案goocz2025-03-26 10:55:0834A+A-

前言

我们正开发一套广告系统,通过收集用户点击广告的行为,做数据分析,以便更加智能给用户推荐感兴趣的广告。广告点击是一个海量数据,我们采用消息队列对数据异步处理,最后存储在ClickHouse中。最近我们的ClickHouse 频繁报警提示,这里总结下优化方法。

■【腾讯云可观测平台告警】

您好!您账号(账号ID: 1000*****,昵称: **@**.com)的腾讯云可观测平台告警已触发

告警内容: CDWCH-CK告警|cpu峰值使用率 > 85%

当前数据: 92.659% (cpu峰值使用率)

告警对象: 集群ID:cdwch-11111|集群名称:***#127.0.0.1

项目|地域: - | 北京

告警策略: clickhouse节点预警

触发时间: 2024-04-23 13:27:00 (UTC+08:00)

您可以登录腾讯云可观测平台控制台查看告警详情,或在腾讯云助手小程序查看告警详情

优化ClickHouse建议

1、在规划分区时,应该合理规划分区个数,并尽可能利用分区。一张表分区数不建议超过 1000 个,可以在查询时有效帮助进行数据过滤,使用得当可以提升数倍查询性能,通常按天分区是比较普遍的做法。分区也不建议过多,因为 ClickHouse 不同分区的数据不会合并,容易导致 part 过多,从而导致查询和重启变得很慢。

2、攒批写入:ClickHouse 必须攒批写入,至少 1000 条/批,建议 5k - 10w 一批写入ClickHouse,每一次写入都会在底层生成 1 个或者多个 part 存储目录,后台任务自动合并小 part 到一个大 part ,如果写入频次过高会出现 part 过多,merge 速度跟不上导致写入失败报错: Too many parts(301). Merges are processing significantly slower than inserts。

下面会从这两方面优化ClickHouse , 一个是修改 Click表的分区方式,另一个是优化Kafka消费。

Partition 优化

click 点击表:按月分区修改为 按天分区

CREATE TABLE launch.click_new
(
    `id` UInt64,
    `app_type` UInt32,
    `product_type` UInt32,
    `channel_type` String,
    `agent_name` String,
    `advertiser_id` String,
    `aid` String,
    `aid_name` String,
    `cid` String,
    `cid_name` String,
    `campaign_id` String,
    `campaign_name` String,
    `ctype` String,
    `csite` String,
    `convert_id` String,
    `request_id` String,
    `sl` String,
    `imei` String,
    `idfa` String,
    `idfa_md5` String,
    `android_id` String,
    `oaid` String,
    `oaid_md5` String,
    `os` String,
    `mac` String,
    `mac1` String,
    `ip` String,
    `ipv6` String,
    `ua` String,
    `ts` String,
    `callback_param` String,
    `callback_url` String,
    `model` String,
    `caid` String,
    `caid_md5` String,
    `book_id` UInt64,
    `chapter_id` UInt64,
    `status` UInt32,
    `brand` String,
    `os_version` String,
    `material_id` String,
    `version` UInt32,
    `data` String,
    `expire` DateTime,
    `create_time` DateTime COMMENT '创建时间',
    `channel_id` String
)
ENGINE = ReplicatedReplacingMergeTree()
PARTITION BY toYYYYMMDD(create_time)
PRIMARY KEY id
ORDER BY id

system.merges

system.merges 表包含有关 MergeTree 系列表中当前正在处理的 merge 和 part 突变的信息。

SELECT
table,
count(*)
FROM clusterAllReplicas('default_cluster', system.merges)
GROUP BY table

@KafkaListener注解详解

topics:

描述: 指定监听的 Kafka 主题,可以是一个字符串数组。这是最基本的参数,它定义了监听器将从哪个或哪些主题接收消息。

 @KafkaListener(topics = "click_topic")

groupId:

描述: 指定 Kafka 消费者组的 ID。每个消费者都有自己所属的组。一个组中可以有多个消费者。

@KafkaListener(topics = "click_topic", groupId = "click_group")

id:

描述: 每个Listener实例的重要标识。默认是一个自动生成的唯一 ID。如果不指定groupId,那么id将直接作为groupId。在同一应用中,如果有多个监听器,可以使用不同的id来标识不同的监听器容器。

@KafkaListener(id = "myListener", topics = "click_topic", groupId = "click_group")

concurrency:

描述: 指定并发消费者的数量,即监听器容器的线程数。控制监听器的并发性,每个线程会创建一个消费者实例。较大的并发性可以提高消息处理的吞吐量。

@KafkaListener(topics = "click_topic", groupId = "click_group", concurrency = "3")

containerFactory:

描述: 指定用于创建监听器容器的工厂类。可以用于配置监听器容器的属性。通过设置 containerFactory,可以更灵活地配置监听器容器的一些属性,例如消息转换器、错误处理器等。

@KafkaListener(topics = "click_topic", groupId = "click_group", concurrency = "3", containerFactory = "myContainerFactory")

autoStartup:

描述: 指定是否在启动时自动启动监听器容器。默认是 true。如果设置为false,则需要手动调用容器的start() 方法来启动监听器。

@KafkaListener(topics = "click_topic", groupId = "click_group", concurrency = "3", autoStartup = "false")

clientIdPrefix:

描述: 指定 Kafka 消费者的客户端 ID 前缀。可以通过设置clientIdPrefix来自定义消费者的客户端 ID。

 @KafkaListener(topics = "click_topic", clientIdPrefix = "my-client")

containerGroup:

描述: 指定监听器容器所属的组。如果有多个应用使用相同的消费者组,可以通过设置 containerGroup来区分它们。

@KafkaListener(topics = "click_topic", containerGroup = "my-group")

errorHandler:

描述: 指定错误处理器,用于处理监听器方法抛出的异常。定义一个错误处理器,可以在发生异常时进行自定义处理。

 @KafkaListener(topics = "click_topic", errorHandler = "myErrorHandler")

properties:

描述: 指定其他的消费者配置属性,以键值对的形式提供。这种方式允许你通过注解的方式灵活地设置特定的消费者属性,而不必在全局配置文件中进行设置。请确保设置的属性是合法的 Kafka 消费者属性,并符合你的应用需求。

@KafkaListener(topics = "click_topic", groupId = "click_group", concurrency = "3",
        properties = {"fetch.max.wait.ms=30000", "fetch.min.bytes=202400", "max.poll.records=500"})

fetch.min.bytes

该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka 中拉取的最小数据量,默认值为1(B)。Kafka在收到Consumer的拉取请求时,如果返回给Consumer的数据量小于这个参数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取了。

fetch.max.wait.ms

这个参数也和 fetch.min.bytes参数有关,如果Kafka仅仅参考fetch.min.bytes参数的要求,那么有可能会一直阻塞等待而无法发送响应给Consumer,显然这是不合理的。fetch.max.wait.ms参数用于指定Kafka的等待时间,默认值为500 (ms)。如果Kafka中没有足够多的消息而满足不了fetch.min.bytes参数的要求,那么最终会等待500ms。这个参数的设定和Consumer 与Kafka之间的延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。

max.poll.records

这个参数用来配置Consumer 在一次拉取请求中拉取的最大消息数,默认值为500(条)。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。

这些参数可以根据实际需求进行组合和配置,以满足特定场景的要求。例如,通过调整 concurrency 可以控制监听器的并发性,通过设置 autoStartup 可以控制监听器容器是否在应用启动时自动启动。其他参数也可以根据需要进行调整。

点击这里复制本文地址 以上内容由goocz整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!

果子教程网 © All Rights Reserved.  蜀ICP备2024111239号-5