PostgreSQL 重复 数据清洗 优化教程

    xiaoxiao2026-04-20  9

    标签

    PostgreSQL , 重复数据清洗 , with recursive , 递归 , 流式计算 , pipelinedb , 窗口查询 , file_fdw , insert on conflict , LLVM , 并行创建索引


    背景

    重复数据清洗是一个比较常见的业务需求,比如有些数据库不支持唯一约束,或者程序设计之初可能没有考虑到需要在某些列上面加唯一约束,导致应用在上线一段时间后,产生了一些重复的数据。

    那么重复数据的清洗需求就来了。

    有哪些清洗手段,如何做到高效的清洗呢?

    一个小小的应用场景,带出了10项数据库技术点,听我道来。

    重复数据清洗手段

    比如一个表,有几个字段本来应该是唯一的,产生了重复值,现在给你一个规则,保留重复值中的一条,其他删掉。

    例子

    postgres=# create table tbl_dup( id serial8, sid int, crt_time timestamp, mdf_time timestamp, c1 text default md5(random()::text), c2 text default md5(random()::text), c3 text default md5(random()::text), c4 text default md5(random()::text), c5 text default md5(random()::text), c6 text default md5(random()::text), c7 text default md5(random()::text), c8 text default md5(random()::text) );

    删除重复的 (sid + crt_time) 组合,并保留重复值中,mdf_time最大的一条。

    生成测试数据100万条,1/10 的重复概率,同时为了避免重复数据在一个数据块中,每跳跃500条生成一条重复值。

    就生成测试数据 ,是不是觉得已经很炫酷了呢?一条SQL就造了一批这样的数据。

    insert into tbl_dup (sid, crt_time, mdf_time) select case when mod(id,11)=0 then id+500 else id end, case when mod(id,11)=0 then now()+(''||id+500||' s')::interval else now()+(''||id||' s')::interval end, clock_timestamp() from generate_series(1,1000000) t(id);

    验证, 重复记录的ctid不在同一个数据块中。

    验证方法是不是很酷呢?用了窗口查询。

    postgres=# select * from (select ctid,sid,crt_time,mdf_time, count(*) over(partition by sid,crt_time) as cnt from tbl_dup) t where t.cnt>=2; ctid | sid | crt_time | mdf_time | cnt ------------+--------+----------------------------+----------------------------+----- (0,11) | 511 | 2016-12-29 17:42:13.935348 | 2016-12-29 17:33:43.092625 | 2 (20,11) | 511 | 2016-12-29 17:42:13.935348 | 2016-12-29 17:33:43.102726 | 2 (20,22) | 522 | 2016-12-29 17:42:24.935348 | 2016-12-29 17:33:43.102927 | 2 (0,22) | 522 | 2016-12-29 17:42:24.935348 | 2016-12-29 17:33:43.09283 | 2 (21,8) | 533 | 2016-12-29 17:42:35.935348 | 2016-12-29 17:33:43.103155 | 2 (1,8) | 533 | 2016-12-29 17:42:35.935348 | 2016-12-29 17:33:43.093191 | 2 (21,19) | 544 | 2016-12-29 17:42:46.935348 | 2016-12-29 17:33:43.103375 | 2 (1,19) | 544 | 2016-12-29 17:42:46.935348 | 2016-12-29 17:33:43.093413 | 2 ....

    包含重复的值大概这么多

    postgres=# select count(*) from (select * from (select ctid,sid,crt_time,mdf_time, count(*) over(partition by sid,crt_time) as cnt from tbl_dup) t where t.cnt=2) t; count -------- 181726 (1 row) Time: 1690.709 ms

    你如果觉得这个还挺快的,偷偷告诉你测试环境CPU型号。

    Intel(R) Xeon(R) CPU E5-2630 0 @ 2.30GHz

    接下来开始去重了

    方法1, 插入法

    将去重后的结果插入一张新的表中,耗时5.8秒

    create table tbl_uniq(like tbl_dup including all); insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8) select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from (select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t where t.rn=1; INSERT 0 909137 Time: 5854.349 ms

    分析优化空间,显示排序可以优化

    postgres=# explain (analyze,verbose,timing,costs,buffers) insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8) select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from (select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t where t.rn=1; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Insert on public.tbl_uniq (cost=423098.84..458098.84 rows=5000 width=292) (actual time=5994.723..5994.723 rows=0 loops=1) Buffers: shared hit=1021856 read=36376 dirtied=36375, temp read=37391 written=37391 -> Subquery Scan on t (cost=423098.84..458098.84 rows=5000 width=292) (actual time=1715.278..3620.269 rows=909137 loops=1) Output: t.id, t.sid, t.crt_time, t.mdf_time, t.c1, t.c2, t.c3, t.c4, t.c5, t.c6, t.c7, t.c8 Filter: (t.rn = 1) Rows Removed by Filter: 90863 Buffers: shared hit=40000, temp read=37391 written=37391 -> WindowAgg (cost=423098.84..445598.84 rows=1000000 width=300) (actual time=1715.276..3345.392 rows=1000000 loops=1) Output: row_number() OVER (?), tbl_dup.id, tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8 Buffers: shared hit=40000, temp read=37391 written=37391 -> Sort (cost=423098.84..425598.84 rows=1000000 width=292) (actual time=1715.263..2174.426 rows=1000000 loops=1) Output: tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.id, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8 Sort Key: tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time DESC Sort Method: external sort Disk: 299128kB Buffers: shared hit=40000, temp read=37391 written=37391 -> Seq Scan on public.tbl_dup (cost=0.00..50000.00 rows=1000000 width=292) (actual time=0.012..398.007 rows=1000000 loops=1) Output: tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.id, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8 Buffers: shared hit=40000 Planning time: 0.174 ms Execution time: 6120.921 ms (20 rows)

    优化1

    索引,消除排序,优化后只需要3.9秒

    对于在线业务,PostgreSQL可以使用并行CONCURRENTLY创建索引,不会堵塞DML。

    postgres=# create index CONCURRENTLY idx_tbl_dup on tbl_dup(sid,crt_time,mdf_time desc); CREATE INDEX Time: 765.426 ms postgres=# truncate tbl_uniq; TRUNCATE TABLE Time: 208.808 ms postgres=# insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8) select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from (select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t where t.rn=1; INSERT 0 909137 Time: 3978.425 ms postgres=# explain (analyze,verbose,timing,costs,buffers) insert into tbl_uniq (id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8) select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from (select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tbl_dup) t where t.rn=1; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Insert on public.tbl_uniq (cost=0.42..159846.13 rows=5000 width=292) (actual time=4791.360..4791.360 rows=0 loops=1) Buffers: shared hit=1199971 read=41303 dirtied=36374 -> Subquery Scan on t (cost=0.42..159846.13 rows=5000 width=292) (actual time=0.061..2177.768 rows=909137 loops=1) Output: t.id, t.sid, t.crt_time, t.mdf_time, t.c1, t.c2, t.c3, t.c4, t.c5, t.c6, t.c7, t.c8 Filter: (t.rn = 1) Rows Removed by Filter: 90863 Buffers: shared hit=218112 read=4929 -> WindowAgg (cost=0.42..147346.13 rows=1000000 width=300) (actual time=0.060..1901.174 rows=1000000 loops=1) Output: row_number() OVER (?), tbl_dup.id, tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8 Buffers: shared hit=218112 read=4929 -> Index Scan using idx_tbl_dup on public.tbl_dup (cost=0.42..127346.13 rows=1000000 width=292) (actual time=0.051..601.249 rows=1000000 loops=1) Output: tbl_dup.id, tbl_dup.sid, tbl_dup.crt_time, tbl_dup.mdf_time, tbl_dup.c1, tbl_dup.c2, tbl_dup.c3, tbl_dup.c4, tbl_dup.c5, tbl_dup.c6, tbl_dup.c7, tbl_dup.c8 Buffers: shared hit=218112 read=4929 Planning time: 0.304 ms Execution time: 4834.392 ms (15 rows) Time: 4835.484 ms

    优化2

    递归查询、递归收敛

    有几个CASE用这种方法提升了几百倍性能

    《时序数据合并场景加速分析和实现 - 复合索引,窗口分组查询加速,变态递归加速》

    《distinct xx和count(distinct xx)的变态递归优化方法 - 索引收敛(skip scan)扫描》

    《用PostgreSQL找回618秒逝去的青春 - 递归收敛优化》

    当重复值很多时,可以使用此法,效果非常好

    with recursive skip as ( ( select tbl_dup as tbl_dup from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from tbl_dup order by sid,crt_time,mdf_time desc limit 1) ) union all ( select ( select tbl_dup from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from tbl_dup t where t.sid>(s.tbl_dup).sid or (t.sid=(s.tbl_dup).sid and t.crt_time>(s.tbl_dup).crt_time) and t.sid is not null order by t.sid,t.crt_time,t.mdf_time desc limit 1) ) from skip s where (s.tbl_dup).sid is not null ) -- 这里的where (s.tbl_dup).sid is not null 一定要加, 否则就死循环了. ) select (t.tbl_dup).sid, (t.tbl_dup).crt_time from skip t where t.* is not null;

    有UK时这样用

    with recursive skip as ( ( select tbl_dup as tbl_dup from tbl_dup where (id) in (select id from tbl_dup order by sid,crt_time,mdf_time desc limit 1) ) union all ( select ( select tbl_dup from tbl_dup where id in (select id from tbl_dup t where t.sid>(s.tbl_dup).sid or (t.sid=(s.tbl_dup).sid and t.crt_time>(s.tbl_dup).crt_time) and t.id is not null order by t.sid,t.crt_time,t.mdf_time desc limit 1) ) from skip s where (s.tbl_dup).id is not null ) -- 这里的where (s.tbl_dup).id is not null 一定要加, 否则就死循环了. ) select (t.tbl_dup).sid, (t.tbl_dup).crt_time from skip t where t.* is not null;

    方法3, 删除法

    导入需要处理的时,新增一个row_number字段,并建立where row_number<>1的partial index.

    删除时删除此部分记录即可,2秒搞定需求。

    postgres=# delete from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from (select sid,crt_time,mdf_time,row_number() over(partition by sid,crt_time order by mdf_time desc) as rn from tbl_dup) t where t.rn<>1); DELETE 90863 Time: 2079.588 ms postgres=# explain delete from tbl_dup where (sid,crt_time,mdf_time) in (select sid,crt_time,mdf_time from (select sid,crt_time,mdf_time,row_number() over(partition by sid,crt_time order by mdf_time desc) as rn from tbl_dup) t where t.rn<>1); QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------ Delete on tbl_dup (cost=187947.63..283491.75 rows=995000 width=50) -> Hash Semi Join (cost=187947.63..283491.75 rows=995000 width=50) Hash Cond: ((tbl_dup.sid = t.sid) AND (tbl_dup.crt_time = t.crt_time) AND (tbl_dup.mdf_time = t.mdf_time)) -> Seq Scan on tbl_dup (cost=0.00..50000.00 rows=1000000 width=26) -> Hash (cost=159846.13..159846.13 rows=995000 width=64) -> Subquery Scan on t (cost=0.42..159846.13 rows=995000 width=64) Filter: (t.rn <> 1) -> WindowAgg (cost=0.42..147346.13 rows=1000000 width=28) -> Index Only Scan using idx_tbl_dup on tbl_dup tbl_dup_1 (cost=0.42..127346.13 rows=1000000 width=20) (9 rows)

    验证

    postgres=# select count(*) , count(distinct (sid,crt_time)) from tbl_dup; count | count --------+-------- 909137 | 909137 (1 row)

    一气呵成的方法

    假如重复数据来自文本,从文本去重后,导入数据库,再导出文本。

    怎么听起来像把数据库当成了文本处理工具在用呢?

    没关系,反正目的就是要快速。

    怎么一气呵成呢?

    首先是文件外部表,其次是COPY管道,一气呵成。

    https://www.postgresql.org/docs/9.6/static/file-fdw.html

    postgres=# create extension file_fdw; CREATE EXTENSION postgres=# copy tbl_dup to '/home/digoal/tbl_dup.csv' ; COPY 1000000 postgres=# create server file foreign data wrapper file_fdw; CREATE SERVER CREATE FOREIGN TABLE ft_tbl_dup ( id serial8, sid int, crt_time timestamp, mdf_time timestamp, c1 text default md5(random()::text), c2 text default md5(random()::text), c3 text default md5(random()::text), c4 text default md5(random()::text), c5 text default md5(random()::text), c6 text default md5(random()::text), c7 text default md5(random()::text), c8 text default md5(random()::text) ) server file options (filename '/home/digoal/tbl_dup.csv' ); postgres=# copy (select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from (select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from ft_tbl_dup) t where t.rn=1) to '/home/digoal/tbl_uniq.csv'; COPY 909137 Time: 10973.289 ms

    并行处理优化手段

    拆分成多个文件,并行处理,耗时降低到800毫秒左右。注意这没有结束,最后还需要merge sort对全局去重。

    split -l 50000 tbl_dup.csv load_test_ for i in `ls load_test_??` do psql <<EOF & drop foreign table "ft_$i"; CREATE FOREIGN TABLE "ft_$i" ( id serial8, sid int, crt_time timestamp, mdf_time timestamp, c1 text default md5(random()::text), c2 text default md5(random()::text), c3 text default md5(random()::text), c4 text default md5(random()::text), c5 text default md5(random()::text), c6 text default md5(random()::text), c7 text default md5(random()::text), c8 text default md5(random()::text) ) server file options (filename '/home/digoal/$i' ); \timing copy (select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from (select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from "ft_$i") t where t.rn=1) to '/home/digoal/uniq_csv.$i'; EOF done

    速度提升到了1秒以内完成,还可以继续提高并行度,总耗时降低到200毫秒左右。

    COPY 45500 Time: 764.978 ms COPY 45500 Time: 683.255 ms COPY 45500 Time: 775.625 ms COPY 45500 Time: 733.227 ms COPY 45500 Time: 750.978 ms COPY 45500 Time: 766.984 ms COPY 45500 Time: 796.796 ms COPY 45500 Time: 797.016 ms COPY 45500 Time: 881.682 ms COPY 45500 Time: 794.691 ms COPY 45500 Time: 812.932 ms COPY 45500 Time: 921.792 ms COPY 45500 Time: 890.095 ms COPY 45500 Time: 845.815 ms COPY 45500 Time: 867.456 ms COPY 45500 Time: 874.979 ms COPY 45500 Time: 882.578 ms COPY 45500 Time: 880.131 ms COPY 45500 Time: 901.515 ms COPY 45500 Time: 904.857 ms

    注意这没有结束,最后还需要merge sort对全局去重。 略

    一气呵成方法2

    并行导入单表处理后倒出,中间结果不需要保存,所以使用UNLOGGED TABLE

    CREATE unlogged TABLE tmp ( id serial8, sid int, crt_time timestamp, mdf_time timestamp, c1 text default md5(random()::text), c2 text default md5(random()::text), c3 text default md5(random()::text), c4 text default md5(random()::text), c5 text default md5(random()::text), c6 text default md5(random()::text), c7 text default md5(random()::text), c8 text default md5(random()::text) ) with (autovacuum_enabled=off, toast.autovacuum_enabled=off); create index idx_tmp_1 on tmp (sid,crt_time,mdf_time desc); split -l 20000 tbl_dup.csv load_test_ date +%F%T.%N for i in `ls load_test_??` do psql <<EOF & truncate tmp; copy tmp from '/home/digoal/$i'; EOF done for ((i=1;i>0;i=1)) do sleep 0.0001 cnt=`ps -ewf|grep -v grep|grep -c psql` if [ $cnt -eq 0 ]; then break fi done psql <<EOF copy (select id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from (select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, * from tmp) t where t.rn=1) to '/dev/shm/tbl_uniq.csv'; EOF date +%F%T.%N 2016-12-3000:59:42.309126109 2016-12-3000:59:47.589134168

    5.28秒。

    重复数据清洗优化手段 - 技术点分享

    前面用到了很多种方法来进行优化,下面总结一下

    1. 窗口查询

    主要用于筛选出重复值,并加上标记。

    需要去重的字段作为窗口,规则字段作为排序字段,建立好复合索引,即可开始了。

    2. 外部表

    如果你的数据来自文本,那么可以采用一气呵成的方法来完成去重,即把数据库当成文本处理平台,通过PostgreSQL的file_fdw外部表直接访问文件,在SQL中进行去重。

    3. 并行计算

    如果你的数据来自文本,可以将文本切割成多个小文件,使用外部表,并行的去重,但是注意,去完重后,需要用merge sort再次去重。

    另一方面,PostgreSQL 9.6已经支持单个QUERY使用多个CPU核来处理,可以线性的提升性能。(去重需要考虑合并的问题)。

    4. 递归查询、递归收敛

    使用递归查询,可以对重复度很高的场景进行优化,曾经在几个CASE中使用,优化效果非常明显,从几十倍到几百倍不等。

    《时序数据合并场景加速分析和实现 - 复合索引,窗口分组查询加速,变态递归加速》

    《distinct xx和count(distinct xx)的变态递归优化方法 - 索引收敛(skip scan)扫描》

    《用PostgreSQL找回618秒逝去的青春 - 递归收敛优化》

    5. insert on conflict

    PostgreSQL 9.5新增的特性,可以在数据导入时完成去重的操作。 直接导出结果。

    CREATE unlogged TABLE tmp_uniq ( id serial8, sid int, crt_time timestamp, mdf_time timestamp, c1 text default md5(random()::text), c2 text default md5(random()::text), c3 text default md5(random()::text), c4 text default md5(random()::text), c5 text default md5(random()::text), c6 text default md5(random()::text), c7 text default md5(random()::text), c8 text default md5(random()::text), unique (sid,crt_time) ) with (autovacuum_enabled=off, toast.autovacuum_enabled=off);

    并行装载(目前不能在同一条QUERY中多次UPDATE一条记录)

    ERROR: 21000: ON CONFLICT DO UPDATE command cannot affect row a second time HINT: Ensure that no rows proposed for insertion within the same command have duplicate constrained values. LOCATION: ExecOnConflictUpdate, nodeModifyTable.c:1133 split -l 20000 tbl_dup.csv load_test_ for i in `ls load_test_??` do psql <<EOF & drop foreign table "ft_$i"; CREATE FOREIGN TABLE "ft_$i" ( id serial8, sid int, crt_time timestamp, mdf_time timestamp, c1 text default md5(random()::text), c2 text default md5(random()::text), c3 text default md5(random()::text), c4 text default md5(random()::text), c5 text default md5(random()::text), c6 text default md5(random()::text), c7 text default md5(random()::text), c8 text default md5(random()::text) ) server file options (filename '/home/digoal/$i' ); \timing insert into tmp_uniq select * from "ft_$i" on conflict do update set id=excluded.id, sid=excluded.sid, crt_time=excluded.crt_time, mdf_time=excluded.mdf_time, c1=excluded.c1,c2=excluded.c2,c3=excluded.c3,c4=excluded.c4,c5=excluded.c5,c6=excluded.c6,c7=excluded.c7,c8=excluded.c8 where mdf_time<excluded.mdf_time ; EOF done

    6. LLVM

    处理多行时,减少上下文切换。

    性能可以提升一倍左右。

    《分析加速引擎黑科技 - LLVM、列存、多核并行、算子复用 大联姻 - 一起来开启PostgreSQL的百宝箱》

    7. 流式计算

    在数据导入过程中,流式去重,是不是很炫酷呢。

    create stream ss_uniq ( id int8, sid int, crt_time timestamp, mdf_time timestamp, c1 text default md5(random()::text), c2 text default md5(random()::text), c3 text default md5(random()::text), c4 text default md5(random()::text), c5 text default md5(random()::text), c6 text default md5(random()::text), c7 text default md5(random()::text), c8 text default md5(random()::text) ); CREATE CONTINUOUS VIEW cv_uniq as select row_number() over(partition by sid,crt_time order by mdf_time desc) as rn, id,sid,crt_time,mdf_time,c1,c2,c3,c4,c5,c6,c7,c8 from ss_uniq;

    《流计算风云再起 - PostgreSQL携PipelineDB力挺IoT》

    8. 并行创建索引

    在创建索引时,为了防止堵塞DML操作,可以使用concurrently的方式创建,不会影响DML操作。

    建立索引时,加大maintenance_work_mem可以提高创建索引的速度。

    9. 并行读取文件片段导入

    为了加快导入速度,可以切片,并行导入。

    将来可以在file_fdw这种外部访问接口中做到分片并行导入。

    10. bulk load, nologgin

    如果数据库只做计算,也就是说在数据库中处理的中间结果无需保留时,可以适应bulk的方式导入,或者使用unlogged table。

    可以提高导入的速度,同时导入时也可以关闭autovacuum.

    小结

    1. 如果数据已经在数据库中,在原表基础上,删除重复数据,耗时约2秒。

    2. 如果数据要从文本导入,并将去重后的数据导出,整个流程约耗时5.28秒。

    参考

    《分析加速引擎黑科技 - LLVM、列存、多核并行、算子复用 大联姻 - 一起来开启PostgreSQL的百宝箱》

    《流计算风云再起 - PostgreSQL携PipelineDB力挺IoT》

    《时序数据合并场景加速分析和实现 - 复合索引,窗口分组查询加速,变态递归加速》

    《distinct xx和count(distinct xx)的变态递归优化方法 - 索引收敛(skip scan)扫描》

    《用PostgreSQL找回618秒逝去的青春 - 递归收敛优化》

    相关资源:《Python3爬虫、数据清洗与可视化》pdf 配套代码 数据集.zip
    最新回复(0)