新浪新闻客户端

Apache Spark在小米的生产实践

Apache Spark在小米的生产实践
2024年04月09日 13:00 新浪网 作者 DataFunTalk

  导读Apache Spark 是被广泛使用的大数据离线计算引擎。小米基于Spark3.1 建设了新一代一站式数据开发平台的批处理能力,新平台在作业迁移、性能优化、稳定性优化中都遇到了一些问题。本文将分享其中的典型问题以及小米 Spark 团队的解决方案。

  本次分享围绕以下五点展开:

  1. Multiple Catalog 落地与应用

  2. Hive SQL 迁移 Spark SQL

  3.离线场景下 Spark 的稳定性与性能优化

  4. 未来规划

  5. 问答环节

  分享嘉宾|蔡灿 小米 软件研发工程师

  编辑整理|许通

  内容校对|李瑶

  出品社区|DataFun

  01

  Multiple Catalog 落地与应用

  首先介绍一下 Multiple Catalog 的相关背景。

  1. 相关背景

  2021 年之前,在小米集团存在多个数据开发平台。当时这些平台只支持处理Hive 数据源。其他数据源,如:MySQL、Kudu 等,需要注册为 Hive 外表,作为 Hive 表才能使用。当时的数据管理比较分散,数据引擎也比较落后。

  为此,小米计算团队开始着手打造新一代的数据工场,作为集团统一的数据开发门户。

  当时 Spark3 已经发布一段时间,并且社区测评结果显示 Spark3 相比 Spark2.4 性能提升近 2 倍。因此,我们引入了 Spark3.1 作为数据处理引擎。

  在旧平台中,Spark 只支持 Hive 数据源。其他数据源需要注册为 Hive 外表。元数据存储在 Hive 的 Metastore 中,存在一些问题:

  •   源表名称和 Hive 表名称可能不一致,造成数据血缘异常

  •   元数据二次存储,可能造成源表的 schema 变更在 Hive 表中无法实时感知

  •   每一张表都需要手动注册,工作量巨大

  2. Spark3 的元数据管理

  新一代的数据工场中基于 Metacat 对数据源进行统一管理。Metacat 不对元数据进行二次存储,而是实时获取。因此,Metacat 解决了旧平台表名不一致、Schema 不一致等痛点问题。同时,我们基于 Spark3.1 的 CatalogPlugin 实现了不同数据源的 Catalog,解决了旧平台上只能查询 hive 单一数据源的问题。

  目前,在数据工场中,Multiple Catalog 已经得到广泛应用。在 ETL 作业与数据查询中,可以在一条 SQL 中指定多种不同的数据源,进行联邦分析和查询。示例中,同时涵盖了 Doris、Hive、Iceberg 多种数据源。

  02

  Hive SQL 迁移 Spark SQL

  1. 相关背景

  在新一代数据工场的推广过程中,我们不断推动用户迁移旧平台上的作业至新平台,同时推动用户将迁移后的作业版本升级到 Spark3.1。

  初期,迁移工作主要是人工操作。在将近一年的推广中,数据工场平台上 Spark3 SQL 占比达到了 51%,但是 Spark2 SQL 和 Hive SQL 仍有很高占比。

  由于 Spark 效率更高,并且数据湖离线场景是结合 Spark3 落地的,为了更好地推广数据湖的使用,我们开始了 Hive SQL 和 Spark2 SQL 的批量升级工作。

  2. 语法检测与数据一致性校验

  整个迁移、升级工作主要分为四步:语法检测、数据一致性校验、批量自动化升级SQL 引擎版本、监控已升级作业运行状态与效率。

  在语法检测中我们使用了 Kyuubi 的 Plan Only 模式。它指定 SQL 只执行 parse与 analyze 两个阶段。Plan Only 模式下输出的 LogicalPlan 已经可以检测出不同引擎在语法上存在的差异。

  通过语法校验后,我们会进行数据的一致性校验。通过 SQL 双跑对比不同引擎的计算结果是否一致。如上图,直接读目标表,其存储的数据是基于 Hive SQ 或 Spark2 SQL 写入的,通过 Spark3.1 读取并计算其行数与每个列的哈希值,再通过 Spark3.1 执行提取的 Select 子查询,并计算其行数与每个列的哈希值,直接比较二者的行数以及对应列的哈希值是否一致,无需将双跑结果落盘后再做对比。

  3. 批量自动化升级

  对于语法兼容且数据一致的 SQL,经过对检测后的数据进行分析,发现 SQL 中还有 DDL 和 DML 类型。此外,数仓的 SQL 作业有 SLA 要求。因此,我们根据 SQL 升级的难易程度、作业优先级、作业耗时以及集群规模等因素对升级过程制定了细粒度的升级策略:先升级 DDL,再升级 DML;先升级低优任务,再升级高优任务;先升级耗时短的,再升级耗时长的;先升级小集群,再升级大集群。核心目的是为了降低升级过程的风险。

  批量升级后,切换 SQL 引擎为 Spark3,自动创建 Spark3 版本 SQL 作业,并将新版本作业启用调度。

  第二天凌晨开始调度时,我们提供了定时检测工具对已升级作业进行执行状态与效率检测。如果发现不符合预期,比如:状态失败、性能倒退,记录异常作业详情,并将作业回滚到上一个版本。

  我们会介入分析不符合预期的原因,并解决问题,再更改已经解决了问题的 SQL 状态为待升级,重新安排升级。

  4. 升级过程中的典型问题

  接下来介绍几个升级过程中遇到的典型问题。

  在语法检测中,基于 Kyuubi PlanOnly 模式检测 SQL 语法兼容性,部分 SQL 需要真正执行,如:

  •   Use db

  •   create temporary view ...

  •   create temporary funtion ...

  否则引用了临时视图、临时函数的 SQL 将无法正常执行。

  在迁移过程中,发现部分作业产生的小文件比较多。产生小文件是因为同一个 Task处理了多个不同分区的数据。在覆盖写的场景中,通常包含来自多个分区的数据,在同一个 Task 中需要将不同分区的数据写入对应的分区目录,因此会创建不同的分区文件。当同一个分区的数据分发到不同的 Task 时,文件数就会膨胀。

  在 insert 操作之前,插入一个 repartition 操作,将属于同一个分区的数据集中到一个 Task 去处理,并基于 Spark AQE 合并小分区的功能去减少 shuffle partition 数量,从而减少最终生成的文件数量。

  在解决小文件问题中,我们发现为了减少小文件数量而增大了分区数据量大小的阈值,导致 insert 操作之前的其他 stage 并行度降低,SQL 执行效率降低很多。

  为了解决这个问题,我们引入了配置隔离机制,将 final stage 与其他 stage 的配置进行隔离,为 final stage 配置独立的分区合并阈值,不改变其他 stage 的并行度,不影响其他 stage 的执行效率。

  批量升级操作历时一个季度,我们将数据工场的 Spark3 SQL 占比从 51% 提升到 90%。根据升级过程中的监控结果对比,作业平均效率提升了 32%。

  03

  离线场景下 Spark 的稳定性与性能优化

  在 SQL 升级到 Spark3 的过程中,我们同时也在进行 Spark3+ 数据湖查询优化工作。

  1. 性能优化

  我们主要的优化方式是 Data skipping。

  Iceberg 文件存储格式使用 parquet,结合 Spark 的谓词下推、列裁剪等功能,可以在 Scan 阶段提前进行分区裁剪、文件裁剪、RowGroup 裁剪以及列裁剪。这样可以有效减少 Scan 阶段所读取的数据量,进而减少磁盘 IO、网络 IO 等消耗,从而提高查询效率。

  在应用过程中,我们也遇到了一些问题,比如:分区无法下推。OR 查询条件中同时存在分区字段与函数表达式,会导致分区无法下推,进而引起全表扫描。我们内部实现了提取这种场景下 SQL 的分区字段并把它下推到数据源做分区裁剪,可以显著提高效率。

  当前 Data skipping 到 RowGroup 级别,在 RowGroup 中即使只有一条数据满足筛选条件,整个 RowGroup 的数据也都会被读取出来,但是其中可能还有很多数据并不满足 where 条件,那么需要在计算层再应用一次全量的过滤条件才能得到正确的结果。对于 where 条件中存在复杂计算的,我们内部实现了自动优化谓词的先后顺序,将可以进行下推的筛选条件前置,将需要复杂计算的谓词放到最后。通过这样的优化,可以避免不满足下推条件的 Row 再去做其他复杂谓词的计算。

  我们的用户实测,在有大量数据被过滤的情况下,调整谓词顺序后性能直接提升 93%。

  前面的优化中,Data skipping 仅到 RowGroup 级别,而 Parquet 的最小粒度是 Page,在 Data skipping 中最小粒度可以达到 Page 级别。

  通过 Page 级别的 min max 索引,记录 Page 的上下界信息。在 Scan 阶段可以根据指定条件过滤满足条件的 Page,进一步减少要读取的数据。这种情况要求下推字段是有序的。数据有序,通过 Page min max 信息才能尽可能减少读取不必要的 Page,从而进一步减少要读取的数据量。

  内部 Benchmark 结果显示,在下推字段有序情况下性能最高提升 67%,对于无序数据性能基本无明显变化。

  此外,我们对 Join 操作进行了一系列优化,包括:Broadcast Join、HashJoin 等。

  在几种 Join 中,BroadcastHashJoin 一般是效率比较高的,因为将小表 Broadcast 可以避免 Shuffle,而拼接过程基于内存查找也比较高效。

  最初内部 Spark 仅支持根据数据源预估大小决定是否进行 Broadcast。但在使用过程中我们发现数据源预估大小存在预估不准的问题,导致将很大的表 Broadcast,进而引起 Driver OOM。此外,刚开始设置的 Broadcast 阈值较小,导致Broadcast Join 生效范围小。

  为此,我们禁用了基于预估大小的 Broadcast 方式,转为基于 AQE Shuffle 数据量指标进行 Broadcast。这种情况下避免了数据源预估不准确带来的 Driver OOM 问题。不过这样也引入新问题。原来可以直接进行 Broadcast 操作,现在需要多做一次 shuffle。

  针对这个问题,我们建立了 Join 指标,收集了 Join 左右表的预估大小和真实大小。对于预估不准确的表,禁止直接走 BroadcastHashJoin,将预估数据大小设置为 Long 最大值。并对这些表开启 AQE,根据 shuffle 统计结果判断是否执行 Broadcast。此外,我们调大了 Boradcast 的阈值,扩大了 Broadcast 的生效范围。

  经过这些优化,我们规避了基于数据源数据量预估大小做 Broadcast 带来的 Driver OOM 风险,同时也避免部分 SQL Broadcast 前做额外的 Shuffle。全量上线后,我们约有 20% 的 SQL 性能提升 38%。

  我们基于 Join 指标数据进行进一步分析,发现存在一些表的大小比 Broadcast 阈值稍大。但这些 SQL 默认执行 SortMergeJoin,需要执行 Shuffle,需要排序。我们将这类 SQL 转为 ShuffledHashJoin 执行,这样消除了 Sort 操作带来的排序消耗。

  内部测试结果显示,重计算场景 Q95 性能倒退会特别严重,其他 Query 平均提升 14%。

  对 Q95 进行详细分析,主要是存在连续进行 SortMergeJoin 操作,而第一次 Join 后数据量膨胀数百倍。连续的SortMergeJoin,由于数据有序连续两个 SortMergeJoin 之间不需要额外 Sort 操作。但是转为 ShuffledHashJoin 后,由于执行结果是无序的,当执行到 SortMergeJoin 需要先插入 Sort 操作节点。由于第一个 Join 操作数据膨胀数百倍,Sort 操作节点耗时较多,导致性能严重倒退。

  针对这个问题,我们建立了 SortMergeJoin 多维度指标。根据这个模型追踪线上符合 shuffledHashJoin 典型问题的 SQL。预估线上可能受到影响的 SQL 有 20 多个。针对这些 SQL 手动关闭 shuffledHashJoin 开关,再在集群维度默认打开 ShuffledHashJoin 开关。

  2. 稳定性优化

  除了性能之外,我们也遇到了一些稳定性问题。

  Spark3 写 Iceberg 表时,存在小文件问题。在线上环境,这个情况会引发 executor 发生 OOM。根据分析显示,主要原因是 Task 创建了过多 Writer。

  本质原因是单个 Task 同时处理过多分区数据。这种情况需要将不同分区数据写入对应分区目录,需要创建过多 Writer,进而引发 executor OOM。

  我们借鉴之前 Spark 写 Hive 小文件的处理方案,通过添加 repartition 节点将相同分区数据集中处理。

  但是,处理 Iceberg 小文件与处理 Hive 小文件也有不同点。Iceberg 是隐式分区,在集中数据时需要使用分区函数的计算结果作为表达式,而非字段名。

  在处理 Iceberg 小文件时也会遇到并行度减少的问题。这个问题也是通过配置隔离来解决的。

  在 SQL 鉴权方面,我们也遇到了 driver 不稳定的问题。

  Spark3 基于 Ranger 进行统一权限管理。在新平台初期,迁移过来的作业量比较少,整体权限记录比较少,我们使用 Range Plugin 将权限记录同步到 Driver 本地进行鉴权。

  随着迁移量增加以及新业务增长,权限记录达到 10 万量级,本地鉴权导致 Driver JVM 吞吐量下降了 68%,且 driver 随时可能发生 OOM。

  因此,我们优化了服务架构,将本地鉴权改为 Range 中心化鉴权服务提供鉴权,避免把权限记录拉到 Driver 内存,缓解了 Driver 内存紧张的问题。

  优化将 Driver JVM 吞吐量从 68% 提升到 95%,降低了 Driver 发生 OOM 的风险。

  04

  未来规划

  接下来,简要介绍未来规划。

  我们目前在规划落地向量化引擎。内部基于 Spark 3.3 + gluten 0.5 + velox,测试 TPC-DS 重计算场景性能提升 69%,重 IO 场景提升 44%,TPC-DS 全部 case 综合提升 43%。

  此外,我们将考虑在数据工场中进一步提升用户体验,如支持自动优化作业不合理配置等。

  05

  问答环节

  Q1:Spark 联邦查询统一使用 Spark SQL 语法吗?

  A1:联邦查询在执行 SQL 时,会智能选择 SQL 引擎,在不同应用场景会使用不同的引擎。在数据量比较小的时候,Presto 会比较快。在大数据量时,Spark 会更稳定。在语法检验上,统一用 Spark SQL 语法。

  Q2:小米的基础架构是 Iceberg 和 Hudi 同时,还是只投入一个?

  A2:只投入了 Iceberg,Hudi 没有投入。因为当时 Iceberg 兼容性比较好,支持 Flink、Spark,但当时 Hudi 只支持 Spark。我们内部对 Flink 有实时场景需求。所以选择投入 Iceberg。当然现在 Hudi 对兼容性支持也越来越好。

  Q3:Hive ETL 迁移 Spark 有遇到哪些兼容性问题?

  A3:数据校验时,会有浮点型数据在 Spark 和 Hive 不一样。我们对此做了优化,把行为调成一致。升级之前,有语法兼容性校验和数据一致性校验。经过数据一致性对比会提前暴露这类问题。

  Q4:Iceberg 中使用的 repartition 函数是自行开发的吗?

  A4:Iceberg 社区支持的。

  Q5:Repartition 加上 bucket 函数的作用不大吧?

  A5:Iceberg 是隐式分区,提供了函数对分区字段值进行额外操作得到一个值。将这个值映射到指定的分区中。Bucket 函数的作用可以简单理解为:对一个值进行求模,根据这个值将数据放到指定的桶中。不调用 Bucket 函数,没办法很好地让数据集中到对应的桶中。

  Q6:Gluten 的测试数据有具体建议的配置吗?

  A6:Gluten 的配置是用社区版进行测试。

  以上就是本次分享的内容,谢谢大家。

  分享嘉宾

  INTRODUCTION

  蔡灿

  小米

  软件研发工程师

  现就职于小米,主要负责 Spark 稳定性、性能优化等相关工作

特别声明:以上文章内容仅代表作者本人观点,不代表新浪网观点或立场。如有关于作品内容、版权或其它问题请于作品发表后的30日内与新浪网联系。
来自于:北京
权利保护声明页/Notice to Right Holders

举报邮箱:jubao@vip.sina.com

Copyright © 1996-2024 SINA Corporation

All Rights Reserved 新浪公司 版权所有