Apache Iceberg 是一种开源数据 Lakehouse 表格式,提供强大的功能和开放的生态系统,如:Time travel,ACID 事务,partition evolution,schema evolution 等功能。
本文将讨论火山引擎EMR团队针对 Iceberg 组件的优化思路,通过引入索引来提高查询性能。
采用 Iceberg 构建数据湖仓火山引擎 E-MapReduce(简称 EMR)是火山引擎数智平台(VeDI)旗下的云原生开源大数据平台产品, 提供了企业级的 Hadoop、Spark、Flink、Hive、Presto、Kafka、StarRocks、Doris、Hudi、Iceberg 等大数据生态组件,100% 开源兼容,可以帮助企业快速构建企业级大数据平台,降低运维门槛。秉承业界领先的 EMR Stateless 理念,火山引擎 EMR 可以实现集群级别的弹性伸缩,即无业务需求时释放集群,有业务需求时再拉起集群,配合智能化的冷热数据分层存储能力,助力企业在大数据基建领域进一步降本提效。
(相关资料图)
基于火山引擎 EMR 产品,可以构建数据湖仓、近实时数仓、实时数仓等场景。例如,使用 Iceberg 构建数据湖仓,从 ODS 到 DWD 等不同的分层进行建模,将数据 HFDS 或 TOS(火山引擎对象存储产品)上,然后采用 Trino 或者 Spark 去做分析。
如何加速查询性能,使其尽可能接近专门的分布式数仓(如 ClickHouse 等),是需要思考和探究的问题。
索引是业界常用的提高查询性能的手段之一,针对 Iceberg 我们也采用了增加索引的方式。对常用的列字段构建 Index,在进行 table scan 时利用 Index 只返回匹配的数据,降低匹配数据量,从而大大提高查询性能。
Iceberg 介绍介绍 Iceberg Index 功能之前,我们先简单介绍下 Iceberg 的架构。Iceberg 具有分层的元数据架构,如下如所示。
Spark、Presto、Flink 等多种引擎读取 Iceberg 的数据,就是利用分层的元数据找到 data file 列表。例如,Spark 引擎解析 SQL 语句,然后调用 Iceberg 的接口,获取 data file 并进行 task 切分。
在 Manifest file 中记录了 data file 中字段的最大值和最小值。
"data_file": { "content": 0, "file_path": "hdfs://emr-cluster/warehouse/hive/db.db/sample/data/ts_day=2020-12-31/category=diamond/00000-0-220aa9a6-4530-499f-9450-da946d667624-00001.parquet", "file_format": "PARQUET", ...... "lower_bounds": { "array": [{ "key": 1, "value": "\u0006\u0000\u0000\u0000" }, { "key": 2, "value": "diamond" }, { "key": 3, "value": "\u0000\u0004ÜÅ·\u0005\u0000" }] }, "upper_bounds": { "array": [{ "key": 1, "value": "\u0007\u0000\u0000\u0000" }, { "key": 2, "value": "diamond" }, { "key": 3, "value": "\u0000¨odÆ·\u0005\u0000" }] }, ......}
利用这些信息,可以进行 data file 级别的初步过滤,把不符合条件的 data file 过滤掉,进而减少一部分数据的读取。
实现索引的必要性既然 Iceberg 已经提供 data file 级别的过滤。为什么我们还需要引入索引呢?以下面例子进行介绍,左边两个表格分别是 data file 文件里面的内容,右边表格是 data file 对应的 manifest file。
针对SELECT * FROM table WHERE age > 50
,利用 min-max 统计信息,很容易发现 data file 1 中没有满足条件的数据,因此 data file 1 就不会参与计算。
但是针对多维分析,如name = "LiLy" AND age > 30
,利用name
和age
的min-max的统计信息分别对条件name = "LiLy"
和age > 30
进行判断,得到 data file 1 和 data file 2 都满足条件。然而,仔细分析 data file 1 和 data file 2 的数据,并不存在符合条件的数据,因此 min-max 过滤效果不太理想。所以通过引入合适的索引功能,可以提高 data skipping 的概率,提高查询性能。
1. 首先探究索引类型
索引类型有多种,如 BloomFilter、Ribbon Filter、Dictionary Index、BitMap 等。为了满足多维分析场景,我们选择了[Range-Encoded BitMap]https://www.featurebase.com/blog/range-encoded-bitmaps ( Base-2, Bit-sliced Index),可适用于高基数场景,满足=、<、>、IN、BETWEEN 等操作的多维分析。
例如,对上面的 name 和 age 两列分别计算索引信息。由于 name 属于字符串类型,需要先进行字典编码再进行计算索引信息。采用 Range-Encoded 技术,根据数据的二进制相关信息以及对应的 pos 信息生成索引数据。利用索引数据分析得到,同时满足name = "LiLy"
和age > 30
的数据不在同一行,恰好可利用 Range-Encoded 的交并运算将数据进行过滤掉,因此 data file 1 不用参与计算。
也就是说,BitMap 的交并运算可以更好地在复杂过滤条件的情况下过滤掉更多的数据文件。
2. 接下来探究索引的粒度。
Iceberg 提供的 min-max,也是一种文件级别的索引。文件级别的索引就是根据 filter 条件过滤掉不符合条件的 data file。文件级别的索引可适用于多种文件类型,但这种粒度比较粗,只要 data file 中有一条数据符合条件,该 data file 中的数据就会全部读取出来参与计算,从而影响 SQL 的查询性能。
对于 Parquet、ORC 的文件格式,提供有 file chunk 的概念(row group or stripe),我们完全可以按照 row group / stripe 粒度,对数据进行过滤。(为了方便描述,我们将 row group 和 stripe 统称 split。)
如:SQL语句:SELECT * FROM table WHERE col_1> v1 AND col_2 = v2
,其中对 col_1 字段和 col_2 字段已构建 Index 信息。现在利用索引对 SQL 语句作用。
SQL 语句解析后,将符合条件的 data file 列表进行切分后,得到很多 split 的列表。利用索引,分析 split 中数据是否满足条件,如果不满足则跳过。如上图 data file 列表切分后,得到数万级别数量的 split 列表。将索引数据作用在 split1,发现 split1 中没有同时col_1> v1 AND col_2 = v2
满足条件的数据,该 split1 中的数据就不会参与计算。最后处理后,只得到了少量的 split 列表,数据过滤度达到 10% 以上,查询性能有明显提升。
因此,采用 row group / stripe 级别的细粒度索引,可以过滤大部分数据。
细粒度索引实现逻辑Iceberg 元数据中 manifest file 中除了提供 min-max 等统计信息,还提供有 split 相关信息:"split_offsets":{"array":[4,...]}
,极大方便我们实现 row group / stripe 级别的细粒度索引。
Iceberg 中提供构建索引的 API,引擎端调用该 API 即可实现索引构建功能。对于 Spark 3.3 及以上版本,已经提供有索引的 SQL 语句,在 Iceberg 的 Spark 模块实现 Spark 提供的索引接口即可。
构建索引我们采用异步构建索引,不影响主线任务。也提供了增量构建索引功能,只对 append 数据进行构建索引。调用 TableScan 读取数据,按照 data file 的 split offset 切分数据,进行构建索引,并保存索引数据和对应的元数据信息。为了避免出现小文件存在,我们会进行索引数据合并。
索引文件存储索引文件格式采用[puffin]https://iceberg.apache.org/puffin-spec/格式,这是一种二进制格式。Magic Blob₁ Blob₂ ... Blobₙ Footer
在 Footer 中保存每个 blob 的元数据信息。索引构建成功后,会生成类似于下面内容的文件。
索引带来的收益Range-Encoded BitMap 适用于多维分析场景,且 Ranger 范围较小时,效果非常明显。下面我们基于 Spark 引擎性能测试。
构造 1TB 的 SSB 测试数据,分别在构建 Index 前后,对以下用例进行测试。Q1: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice = 19665277Q2: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice = 19665277 AND lo_revenue = 2141624Q3: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice = 19665277 AND lo_revenue >=10304000Q4: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice = 21877827 AND lo_revenue >= 83800 AND lo_revenue <= 103800Q5: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice > 21877827 AND lo_revenue >= 83800 AND lo_revenue <= 93800Q6: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice >= 93565 AND lo_ordtotalprice < 93909Q7: SELECT count(*) FROM lineorder WHERE lo_ordtotalprice >= 93565 AND lo_ordtotalprice < 91003562 AND lo_revenue >=904300 AND lo_revenue <= 9904300
左图展示了 7 条 SQL 语句分别在没有 Index 和采用 Index 情况下的执行时间。右图展示采用 Index 后,7 条 SQL 语句读数据的 split 数量。很明显读数据的 split 数量越少,Index 效果越好。最糟糕的情况,所有的 split 都参数计算,这时和没有构建索引的效果类似。
采用 SSB 基准测试由于 SSB 提供的测试场景,和 Range-Encoded 有利的场景,不太匹配,所以 Index 的效果并没有明显的效果。但也不会比不采用 Index 的效果差。如下面左图,分别是构建索引前后,SQL 语句的执行时间,构建索引的优势并没有体现出来。右图中,可以看到所有的 split 都参与了计算。
总结根据上面的介绍,这里总结下 Iceberg 中索引实现的一些特征:
细粒度索引级别:提供 RowGroup/Stripe 级别的索引,可以更加精确的定位数据的查询范围,减少不必要数据输入,从而提高查询性能;索引作用于执行端:查询任务被分配多个执行端,每个执行端只判断该节点上的 RowGroup/Stripe 数据是否符合即可;适配多种引擎:索引构建后,可用于多种引擎;提供异步构建 Index,从而不影响主业务的进行;适用于高基数 & 低基数场景,且占有存储空间小。满足范围查询、等值查询等场景。且范围越小,收益效果越明显。