1. ClickHouse的数据一致性问题

  在生产环境中,数据一致性的重要性,不论如何强调都不过分。而 ClickHouse 在进行数据变更时,都会产生一个临时分区,而不会更改原始数据文件,对数据文件的修改操作会要等到数据合并时才进行。所以 ClickHouse 只能保证数据的最终一致性,而不能保证强一致性。很可能数据变更后,程序通过 ClickHouse 查到之前的错误数据。因此使用 ClickHouse ,要尽量避免数据的增删改这类数据变更操作。但是实际使用时,又不可避免的要使用数据变更操作。这时就需要有一套策略来全面处理数据一致性问题。
  首先,对于分布式表,最好的办法是尽量避免使用。如果非要使用分布式表,一定要打开internal_replication。每个分片一定要配置多副本机制,使用副本机制来保证副本之间的数据一致性。
  一般来说,分布式表会带来非常多的问题。往分布式表中导入数据时,数据是异步写入到不同的分片当中的,这样数据写入过程中就不可避免的有先有后。在最后一个分片的数据写入完成之前,不可避免的就会产生数据一致性的问题。
  另外,对于分布式表,如果在数据写入时,这个分片的服务宕机了,那么插入的数据就有可能会丢失。ClickHouse 的做法是将这个数据分片转移到 broken 子目录中,并不再使用这个数据分片。也就是说,这时,ClickHouse 这一次的数据写入操作 ius 丢失了。造成的结果就是有可能就是一次 update 操作要更新 1000 条数据,但是最终却只更新了 900 条。
  然后,对于本地的数据库,也一定要注意多副本造成的数据一致性问题。ClickHouse 中,即使是提供了去重功能的 ReplacingMergeTree,它只能保证在数据合并时会去重,只能保证数据的最终一致性,而不能保证强一致性(具体可参考官网说明:https://clickhouse.com/docs/zh/engines/table-engines/mergetree-family/replacingmergetree/)。


  我们在使用 ReplacingMergeTree、SummingMergeTree 这类表引擎的时候,会出现短暂数据不一致的情况。
  在某些对一致性非常敏感的场景,通常有以下几种解决方案。

2. 准备测试表和数据

  1. 创建表:
    1
    2
    3
    4
    5
    6
    7
    CREATE TABLE test_a(
    user_id UInt64,
    score String,
    deleted UInt8 DEFAULT 0,
    create_time DateTime DEFAULT toDateTime(0)
    ) ENGINE= ReplacingMergeTree(create_time)
    ORDER BY user_id;
    user_id是数据去重更新的标识;
    create_time是版本号字段,每组数据中 create_time 最大的一行表示最新的数据;
    deleted是自定的一个标记位,比如 0 代表未删除,1 代表删除数据。
  2. 写入 1000 万测试数据:
    1
    2
    3
    4
    5
    INSERT INTO TABLE test_a(user_id,score)
    WITH(
    SELECT ['A','B','C','D','E','F','G']
    )AS dict
    SELECT number AS user_id, dict[number%7+1] FROM numbers(10000000);
  3. 修改前 50 万行数据,修改内容包括 name 字段和 create_time 版本号字段:
    1
    2
    3
    4
    5
    6
    INSERT INTO TABLE test_a(user_id,score,create_time)
    WITH(
    SELECT ['AA','BB','CC','DD','EE','FF','GG']
    )AS dict
    SELECT number AS user_id, dict[number%7+1], now() AS create_time FROM
    numbers(500000);
  4. 统计总数:
    1
    2
    SELECT COUNT() FROM test_a;
    10500000
    还未触发分区合并,所以还未去重。

3. 手动OPTIMIZE

  对于 MergeTree 系列引擎,要注意他的合并操作不是定时的,是后台定时任务去自动进行 merge 合并操作。这个任务执行时间是无法设置或者掌控的。一般 merge 时间是在写入操作完成后的 10~15 分钟。但是如果某个分区一直不写入新的数据, 那也有可能这个分区一直不会 merge。这个时候,也只能通过 OPTIMIZE 语句手动进行更新。
  在写入数据后,立刻执行 OPTIMIZE 强制触发新写入分区的合并动作:

1
2
3
OPTIMIZE TABLE test_a FINAL; 

语法:OPTIMIZE TABLE [db.]name [ON CLUSTER cluster] [PARTITION partition | PARTITION ID 'partition_id'] [FINAL] [DEDUPLICATE [BY expression]]

  但是 OPTIMIZE 语句强制合并数据 CPU 重操作,数据量大时,会非常消耗 CPU 资源,影响到线上的查询功能。因此,建议在晚上系统负载比较小的时候执行。另外,merge 合并操作是没有锁的概念的,合并过程中依然可以正常写入。
  实际生产中,在某些对数据一致性要求比较高的场景,可以自行采用乐观锁来屏蔽数据一致性的问题。例如,在创建一张表时,增加两个字段 sign 和 version。sign 表示这条数据是否删除,version 数据表示这条数据的更新版本,像这样:

1
2
3
4
5
6
create table A
(
xxx,
_sign UInt8,
_version UInt32
)

  当进行数据更新时,不再进行更新操作,改为插入一条新的数据,同时 version 版本号加 1。这样查询时,只要过滤 verion 版本号最大的一条数据就可以查询到最新的数据。
  对于删除操作,同样改为新插入一条数据,version 版本号加 1 的同时,把 sign 设置为 -1,表示已删除。查询时,同样是找到版本号最大的这条数据,通过判断 sign 是不是等于 -1,就能判断出这条数据是否被删除了。
  但是这种方案需要注意过期数据要另行定期删除。

4. 通过Group by去重

  可以根据自己实际重复的字段进行去重,然后对每个重复的组里选自己想要的数据。

1
2
3
4
5
6
7
8
9
select 
user_id,
argMax(score, create_time) AS score,
argMax(deleted, create_time) AS deleted,
max(create_time) AS ctime
from test_a
group by user_id
# 分组完成后查询未被删除的数据
HAVING deleted = 0;
  • argMax(field1,field2):按照 field2 的最大值取 field1 的值。

  当我们更新数据时,会写入一行新的数据,例如上面语句中,通过查询最大的 create_time 得到修改后的 score 字段值。
  这种固定的查询语句我们可以提前封装为一个视图,以后只查视图就好了:

1
2
3
4
5
6
7
8
9
CREATE VIEW view_test_a AS
SELECT
user_id,
argMax(score, create_time) AS score,
argMax(deleted, create_time) AS deleted,
max(create_time) AS ctime
FROM test_a
GROUP BY user_id
HAVING deleted = 0;

  插入重复数据,再次查询:

1
2
3
4
5
6
7
8
9
# 再次插入一条数据
INSERT INTO TABLE test_a(user_id,score,create_time)
VALUES(0,'AAAA',now())

# 再次查询
SELECT
*
FROM view_test_a
WHERE user_id = 0

  删除数据测试:

1
2
3
4
5
6
7
8
# 再次插入一条标记为删除的数据
INSERT INTO TABLE test_a(user_id,score,deleted,create_time) VALUES(0,'AAAA',1,now());

# 再次查询,刚才那条数据看不到了
SELECT
*
FROM view_test_a
WHERE user_id = 0;

  这行数据并没有被真正的删除,而是被过滤掉了,在一些合适的场景下,可以结合表级别的 TTL 最终将物理数据删除。

5. 通过FINAL查询

  在查询语句后增加 FINAL 修饰符,这样在查询的过程中将会执行 Merge 的特殊逻辑(例如数据去重,预聚合等)。
  但是这种方法在早期版本基本没有人使用,因为在增加 FINAL 之后,我们的查询将会变成一个单线程的执行过程,查询速度非常慢。
  在 v20.5.2.7-stable 版本中,FINAL 查询支持多线程执行,并且可以通过 max_final_threads 参数控制单个查询的线程数。但是目前读取 part 部分的动作依然是串行的。
  FINAL 查询最终的性能和很多因素相关,列字段的大小、分区的数量等等都会影响到最终的查询时间,所以还要结合实际场景取舍。
  参考链接:https://github.com/ClickHouse/ClickHouse/pull/10463
  普通语句查询:

1
2
3
4
select
*
from visits_v1
WHERE StartDate = '2014-03-17' limit 100 settings max_threads = 2;

  查看执行计划:

1
2
3
4
5
6
7
8
9
10
11
explain pipeline select
*
from visits_v1 WHERE StartDate = '2014-03-17' limit 100 settings max_threads = 2;

(Expression)
ExpressionTransform × 2
(SettingQuotaAndLimits)
(Limit)
Limit 22
(ReadFromMergeTree)
MergeTreeThread × 2 01

  明显将由 2 个线程并行读取 part 查询。
  FINAL 查询:

1
2
3
select
*
from visits_v1 final WHERE StartDate = '2014-03-17' limit 100 settings max_final_threads = 2;

  查询速度没有普通的查询快,但是相比之前已经有了一些提升,查看 FINAL 查询的执行计划:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
explain pipeline select
*
from visits_v1 final WHERE StartDate = '2014-03-17' limit 100 settings max_final_threads = 2;

(Expression)
ExpressionTransform × 2
(SettingQuotaAndLimits)
(Limit)
Limit 22
(ReadFromMergeTree)
ExpressionTransform × 2
CollapsingSortedTransform × 2
Copy 12
AddingSelector
ExpressionTransform
MergeTree 01

  从 CollapsingSortedTransform 这一步开始已经是多线程执行,但是读取 part 部分的动作还是串行。

参考文献

  【1】https://clickhouse.com/docs/zh/
  【2】https://www.bilibili.com/video/BV1Yh411z7os?from=search&seid=4579023877699743987&spm_id_from=333.337.0.0
  【3】https://clickhouse.com/docs/zh/