实验介绍
本实验通过分析普通查询过程中存在的性能瓶颈点,通过执行计划的分析找到可能的性能优化点并加以实施,最终达到优化的效果,重点关注分布式关联相关查询语句的优化。
实验目的
了解通过合理定义分布列实现分布式关联的性能优化。
实验步骤
步骤1 使用以下语句查询
Explain analyze select
l_orderkey,
o_orderdate,
o_shippriority
from
customer,
orders,
lineitem
where
c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate < '1995-03-15'::date and l_shipdate > '1995-03-15'::date
order by
o_orderdate limit 10;
步骤2 修改分布键
观察查询语句中,需要对 customer、order 和 lineitem 三张表进行关联,其中关联条件有c_custkey = o_custkey 和 l_orderkey = o_orderkey。如果只考虑本查询的优化,在该列数据没有显著倾斜的情况下,优先考虑 customer.c_custkey 和 lineitem.l_orderkey 作为分布键,从而减少 DN 数据的重分布。
customer、lineitem 建表语句,并导入数据文件customer.csv、lineitem.csv。
Drop table if exists customer;CREATE TABLE CUSTOMER (
C_CUSTKEY INTEGER NOT NULL,
C_NAME VARCHAR(25) NOT NULL,
C_ADDRESS VARCHAR(400) NOT NULL,
C_NATIONKEY INTEGER NOT NULL,
C_PHONE CHAR(15) NOT NULL,
C_ACCTBAL DECIMAL(15,2) NOT NULL,
C_MKTSEGMENT CHAR(10) NOT NULL,
C_COMMENT VARCHAR(400) NOT NULL
)
DISTRIBUTE BY HASH(c_custkey);Drop table if exists lineitem;CREATE TABLE LINEITEM(
L_ORDERKEY INTEGER NOT NULL,
L_PARTKEY INTEGER NOT NULL,
L_SUPPKEY INTEGER NOT NULL,
L_LINENUMBER INTEGER NOT NULL,
L_QUANTITY DECIMAL(15,2) NOT NULL,
L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
L_DISCOUNT DECIMAL(15,2) NOT NULL,
L_TAX DECIMAL(15,2) NOT NULL,
L_RETURNFLAG CHAR(1) NOT NULL,
L_LINESTATUS CHAR(1) NOT NULL,
L_SHIPDATE DATE NOT NULL,
L_COMMITDATE DATE NOT NULL,
L_RECEIPTDATE DATE NOT NULL,
L_SHIPINSTRUCT CHAR(25) NOT NULL,
L_SHIPMODE CHAR(10) NOT NULL,
L_COMMENT VARCHAR(44) NOT NULL)
DISTRIBUTE BY HASH(l_orderkey);COPY CUSTOMER FROM '/tmp/customer.csv' DELIMITER ',' QUOTE '"' ESCAPE '"' ENCODING 'GBK' CSV;COPY LINEITEM FROM '/tmp/lineitem_.csv' DELIMITER ',' QUOTE '"' ESCAPE '"' ENCODING 'UTF-8' CSV;
步骤3 尝试使用 o_custkey 作为 orders 的分布键,并导入数据文件orders.csv。
Drop table if exists orders; CREATE TABLE orders (
o_orderkey bigint NOT NULL,
o_custkey bigint NOT NULL,
o_orderstatus character(1) NOT NULL,
o_totalprice numeric(15,2) NOT NULL,
o_orderdate date NOT NULL,
o_orderpriority character(15) NOT NULL,
o_clerk character(15) NOT NULL,
o_shippriority bigint NOT NULL,
o_comment character varying(79) NOT NULL
)WITH (orientation=row, compression=no)
DISTRIBUTE BY HASH(o_custkey);COPY ORDERS FROM '/tmp/orders.csv' DELIMITER ',' QUOTE '"' ESCAPE '"' ENCODING 'UTF-8' CSV;
执行步骤一中的查询语句,设置参数:
set enable_fast_query_shipping = off;
set enable_stream_operator = on;
该两个参数为会话级,只在本次会话期间生效。
jiang=# Explain analyze
jiang-# select
jiang-# l_orderkey,
jiang-# o_orderdate,
jiang-# o_shippriority
jiang-# from
jiang-# customer, orders, lineitem
jiang-# where
;jiang-# c_custkey = o_custkey
jiang-# and l_orderkey = o_orderkey
jiang-# and o_orderdate < '1995-03-15'::date and l_shipdate > '1995-03-15'::date
jiang-# order by
jiang-# o_orderdate limit 10;id | operation | A-time | A-rows | E-rows | Peak Memory | A-width | E-width | E-costs
----+-----------------------------------------------------+-------------------+--------+--------+-------------+---------+---------+----------1 | -> Limit | 782.531 | 10 | 10 | 2KB | | 16 | 11271.182 | -> Streaming (type: GATHER) | 782.528 | 10 | 10 | 176KB | | 16 | 11271.353 | -> Limit | [753.865,768.892] | 30 | 18 | [2KB,2KB] | | 16 | 11270.604 | -> Sort | [753.861,768.887] | 30 | 18 | [28KB,29KB] | [32,32] | 16 | 11270.605 | -> Hash Join (6,7) | [752.256,767.242] | 25917 | 17 | [9KB,9KB] | | 16 | 11270.516 | -> Seq Scan on lineitem | [85.445,98.007] | 565702 | 564893 | [41KB,41KB] | | 4 | 10763.067 | -> Hash | [594.646,597.109] | 713105 | 10 | [12MB,13MB] | [40,40] | 20 | 28.738 | -> Streaming(type: REDISTRIBUTE) | [373.185,449.521] | 713105 | 10 | [69KB,69KB] | | 20 | 28.739 | -> Hash Join (10,11) | [362.084,403.154] | 713105 | 10 | [9KB,9KB] | | 20 | 28.4410 | -> Seq Scan on customer | [14.668,18.917] | 147100 | 30 | [34KB,35KB] | | 4 | 14.1411 | -> Hash | [199.532,206.435] | 727305 | 10 | [14MB,15MB] | [48,48] | 28 | 14.1812 | -> Seq Scan on orders | [145.599,150.525] | 727305 | 10 | [33KB,33KB] | | 28 | 14.18
(12 rows)Predicate Information (identified by plan id)
----------------------------------------------------------------------------------5 --Hash Join (6,7)Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)6 --Seq Scan on lineitemFilter: (l_shipdate > '1995-03-15'::date), (LLVM Optimized, Jit Execute)Rows Removed by Filter: 4828749 --Hash Join (10,11)Hash Cond: (customer.c_custkey = orders.o_custkey)12 --Seq Scan on ordersFilter: (o_orderdate < '1995-03-15'::date)Rows Removed by Filter: 772695
(10 rows)Memory Information (identified by plan id)
-----------------------------------------------------------------------Coordinator Query Peak Memory:Query Peak Memory: 2MBDatanode:Max Query Peak Memory: 15MBMin Query Peak Memory: 14MB4 --SortSort Method: top-N heapsort Memory: 26kB ~ 26kBSort Method: top-N heapsort Disk: 1024kB ~ 0kB7 --HashMax Buckets: 32768 Max Batches: 1 Max Memory Usage: 13010kBMin Buckets: 32768 Min Batches: 1 Min Memory Usage: 12984kB11 --HashMax Buckets: 32768 Max Batches: 1 Max Memory Usage: 15310kBMin Buckets: 32768 Min Batches: 1 Min Memory Usage: 14980kB
(14 rows)User Define Profiling
---------------------------------------------------------------------------Segment Id: 3 Track name: Datanode build connection(actual time=[0.203, 0.232], calls=[1, 1])Plan Node id: 2 Track name: coordinator get datanode connection(actual time=[0.029, 0.029], calls=[1, 1])Plan Node id: 2 Track name: Coordinator check and update node definition(actual time=[0.001, 0.001], calls=[1, 1])Plan Node id: 2 Track name: Coordinator serialize plan(actual time=[0.904, 0.904], calls=[1, 1])Plan Node id: 2 Track name: Coordinator send query id with sync(actual time=[0.220, 0.220], calls=[1, 1])Plan Node id: 2 Track name: Coordinator send begin command(actual time=[0.001, 0.001], calls=[1, 1])Plan Node id: 2 Track name: Coordinator start transaction and send query(actual time=[0.025, 0.025], calls=[1, 1])Plan Node id: 3 Track name: Datanode start up stream thread(actual time=[0.032, 0.049], calls=[1, 1])
(16 rows)====== Query Summary =====
--------------------------------------------------------------------------------------------Datanode executor start time [dn_6007_6008_6009, dn_6004_6005_6006]: [7.810 ms,9.919 ms]Datanode executor run time [dn_6007_6008_6009, dn_6004_6005_6006]: [753.898 ms,768.921 ms]Datanode executor end time [dn_6007_6008_6009, dn_6001_6002_6003]: [0.082 ms,0.092 ms]Coordinator executor start time: 6.973 msCoordinator executor run time: 782.544 msCoordinator executor end time: 0.031 msPlanner runtime: 0.904 msPlan size: 6167 byteQuery Id: 72902018968525132Total runtime: 789.565 ms
(10 rows)
此时,orders 和 customer 两表由于关联列都在各自的分布键上,所以可以本地进行关联,然后其结果集再根据和 lineitem 的关联条件作为重分布的分布列进行重分布。
步骤4 尝试使用 o_orderkey 作为 orders 的分布列,并导入数据文件orders.csv。
Drop table if exists orders;CREATE TABLE orders (
o_orderkey bigint NOT NULL,
o_custkey bigint NOT NULL,
o_orderstatus character(1) NOT NULL,
o_totalprice numeric(15,2) NOT NULL,
o_orderdate date NOT NULL,
o_orderpriority character(15) NOT NULL,
o_clerk character(15) NOT NULL,
o_shippriority bigint NOT NULL,
o_comment character varying(79) NOT NULL
)
WITH (orientation=row, compression=no)
DISTRIBUTE BY HASH(o_orderkey);COPY ORDERS FROM '/tmp/orders.csv' DELIMITER ',' QUOTE '"' ESCAPE '"' ENCODING 'UTF-8' CSV;
执行步骤 1 中查询语句:
jiang=# Explain analyze
jiang-# select
jiang-# l_orderkey,
jiang-# o_orderdate,
jiang-# o_shippriority
jiang-# from
jiang-# customer, orders, lineitem
jiang-# where
jiang-# c_custkey = o_custkey
jiang-# and l_orderkey = o_orderkey
jiang-# and o_orderdate < '1995-03-15'::date and l_shipdate > '1995-03-15'::date
jiang-# order by
jiang-# o_orderdate limit 10;id | operation | A-time | A-rows | E-rows | Peak Memory | A-width | E-width | E-costs
----+-----------------------------------------------------+-------------------+--------+--------+---------------+---------+---------+----------1 | -> Limit | 419.741 | 10 | 10 | 2KB | | 16 | 13063.962 | -> Streaming (type: GATHER) | 419.738 | 10 | 10 | 176KB | | 16 | 13064.133 | -> Limit | [415.101,416.042] | 30 | 18 | [2KB,2KB] | | 16 | 13063.384 | -> Sort | [415.097,416.037] | 30 | 18 | [28KB,29KB] | [32,32] | 16 | 13063.385 | -> Hash Join (6,7) | [413.825,414.770] | 25917 | 17 | [9KB,9KB] | | 16 | 13063.296 | -> Seq Scan on customer | [10.589,10.882] | 147100 | 147100 | [34KB,35KB] | | 4 | 1684.337 | -> Hash | [393.875,394.071] | 26500 | 17 | [840KB,840KB] | [44,44] | 24 | 11254.188 | -> Streaming(type: REDISTRIBUTE) | [387.692,387.929] | 26500 | 17 | [70KB,70KB] | | 24 | 11254.189 | -> Hash Join (10,11) | [360.934,376.886] | 26500 | 17 | [10KB,10KB] | | 24 | 11253.6010 | -> Seq Scan on lineitem | [88.832,100.285] | 565702 | 564893 | [41KB,41KB] | | 4 | 10763.0611 | -> Hash | [199.541,202.174] | 727305 | 10 | [15MB,15MB] | [48,48] | 28 | 14.1812 | -> Seq Scan on orders | [145.768,147.778] | 727305 | 10 | [33KB,33KB] | | 28 | 14.18
(12 rows)Predicate Information (identified by plan id)
---------------------------------------------------------------------5 --Hash Join (6,7)Hash Cond: (customer.c_custkey = orders.o_custkey)9 --Hash Join (10,11)Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)10 --Seq Scan on lineitemFilter: (l_shipdate > '1995-03-15'::date), (LLVM Optimized)Rows Removed by Filter: 48287412 --Seq Scan on ordersFilter: (o_orderdate < '1995-03-15'::date)Rows Removed by Filter: 772695
(10 rows)Memory Information (identified by plan id)
-----------------------------------------------------------------------Coordinator Query Peak Memory:Query Peak Memory: 2MBDatanode:Max Query Peak Memory: 1MBMin Query Peak Memory: 1MB4 --SortSort Method: top-N heapsort Memory: 26kB ~ 26kBSort Method: top-N heapsort Disk: 1024kB ~ 0kB7 --HashMax Buckets: 32768 Max Batches: 1 Max Memory Usage: 530kBMin Buckets: 32768 Min Batches: 1 Min Memory Usage: 511kB11 --HashMax Buckets: 32768 Max Batches: 1 Max Memory Usage: 15162kBMin Buckets: 32768 Min Batches: 1 Min Memory Usage: 15137kB
(14 rows)User Define Profiling
---------------------------------------------------------------------------Segment Id: 3 Track name: Datanode build connection(actual time=[0.179, 0.226], calls=[1, 1])Plan Node id: 2 Track name: coordinator get datanode connection(actual time=[0.030, 0.030], calls=[1, 1])Plan Node id: 2 Track name: Coordinator check and update node definition(actual time=[0.001, 0.001], calls=[1, 1])Plan Node id: 2 Track name: Coordinator serialize plan(actual time=[0.881, 0.881], calls=[1, 1])Plan Node id: 2 Track name: Coordinator send query id with sync(actual time=[0.246, 0.246], calls=[1, 1])Plan Node id: 2 Track name: Coordinator send begin command(actual time=[0.001, 0.001], calls=[1, 1])Plan Node id: 2 Track name: Coordinator start transaction and send query(actual time=[0.026, 0.026], calls=[1, 1])Plan Node id: 3 Track name: Datanode start up stream thread(actual time=[0.028, 0.030], calls=[1, 1])
(16 rows)====== Query Summary =====
--------------------------------------------------------------------------------------------Datanode executor start time [dn_6007_6008_6009, dn_6004_6005_6006]: [0.337 ms,0.367 ms]Datanode executor run time [dn_6007_6008_6009, dn_6001_6002_6003]: [415.130 ms,416.059 ms]Datanode executor end time [dn_6001_6002_6003, dn_6004_6005_6006]: [0.069 ms,0.101 ms]Coordinator executor start time: 6.699 msCoordinator executor run time: 419.754 msCoordinator executor end time: 0.031 msPlanner runtime: 0.679 msPlan size: 6325 byteQuery Id: 72902018968532990Total runtime: 426.499 ms
(10 rows)
从 id=8 所在行可以看到,orders 和 customer 表进行关联时由于分布列不同,customer 选择了广播的计划,再和orders 表进行管理。之后orders+customer 的结果集具有orders 相同的分布列,所以可以和lineitem 在本地进行关联。
实验总结
本实验通过调整数据表分布键的方法,对查询进行了调优,选择不同的分布键对查询性能会产生一定的影响,主要遵循以下几个原则:
1)选择没有显著倾斜性的字段作为分布列;
2)尽可能选择查询中的关联列作为表的分布键;
3)存在多个满足上面条件的分布列时,尽可能选择数据量较少的表进行重分布或广播,优先满足大表间的分布键统一。
思考题
选择行存表的分布列的原则有哪些?
答案:
分布列选择有以下建议:
选择的分布列字段和分布算法能够将表数据在均匀分布到各个 DN 节点;
该分布字段在执行 SQL 时经常被用于作为连接字段;
进行数据访问时最大限度地减少跨 DN 节点数据访问。