一、前言
随着大数据技术的飞速发展,海量数据存储和计算的解决方案层出不穷,生产环境和大数据环境的交互日益密切。数据仓库作为海量数据落地和扭转的重要载体,承担着数据从生产环境到大数据环境、经由大数据环境计算处理回馈生产应用或支持决策的重要角色。
数据仓库的主题覆盖度、性能、易用性、可扩展性及数据质量都是衡量数据仓库解决方案好坏的重要指标。携程机票部门数据仓库也在不断摸索向着这些目标砥砺前行。
二、携程机票数据仓库技术栈
携程机票部门的数据仓库建设主要基于公司公共部门的大数据基础环境及数据调度平台,辅以部分自运维的开源存储引擎和基于开源组件二次开发的数据同步工具和运维工具。
2.1 数仓技术演进历史
机票部门的数据仓库源于 2008 年,当时生产环境数据落地主要使用 SQLServer,数据仓库处理的目标数据体量不大,因此选择的 SQLServer、Informaticas、Kettle 这样的数据仓库方案,数据模型设计及报表定制使用 SAP 的商用平台 BO。
随着机票业务系统的日益复杂,特别是生产环境引入消息中间件 Kafka 存储日志数据后,这套方案不可扩展性的缺点日趋明显,SQLServer 的存储和计算能力很大程度上限制了数仓数据的主题覆盖度及性能。
在 2014 年,公司公共部门 hadoop 集群部署上线,并且引入了 zeus 调度平台及 DataX 同步工具,各个 BU 的数据仓库开始逐步转为基于 Hive 建设。
随着生产业务对实时监控、流量回放的需求增强,2016 年机票部门部署了 ElasticSearch,用以实时落地从 Kafka 同步的各个主流程服务日志,并通过统一的交易标识 (transactionID) 串联用户的一次完整的搜索、下单等行为,用于生产排障和流量回放。基于 Hive 的搜索性能一直被广泛诟病,特别是针对 adhoc 查询,机票部门在 2016 年调研并部署了 Facebook 开源的基于内存和 Pipeline 的查询引擎 Presto,在没有享受到 local 数据获取的前提下,查询性能较原生的 Hive 引擎或者 Spark 引擎都有很大的提升。
在 2018 年,为了支持数仓数据的可视化运营平台,我们先后引入了 ClickHouse 和 CrateDB 作为后台的存储和查询引擎,特别是引入 CrateDB 以后,亿级体量的表四个维度的聚合耗时 P90 下降到了 4 秒。
实时数据处理技术也经过了 Esper,Storm,Spark Streaming 和 Flink 的迭代,并慢慢收敛到 Flink。总体的技术演进历史如图 1 所示。
图 2 携程机票数仓技术栈
2.3 实时 VS 离线
当前机票部门的数据仓库建设主要基于离线数据,一方面跟 OTA 销售产品不属于快消品相关,实时当前并不是刚需;另一方面实时处理场景下需要对计算资源、存储资源稳定性有更高的要求,保持数据一致性的代价很大。结合两方面,如果业务对实时需求不高就铺开做实时数仓,ROI 很难达标。
当然,随着携程业务体量的增长,数据使用方对数据实时性要求日益增高,我们团队在 2020 年也会探索实时数据仓库的实施方案,并在一两个重要的数据主题域上先行试点。
三、数据仓库建设时涉及的共性问题
从团队职能上来讲,数据仓库团队需要负责从生产环境同步数据,在内部完成各层级的扭转计算,参与所有数仓流程及报表的运维,并基于数仓公共数据层和应用数据层数据开发相关应用。
3.1 数据同步
为了保持数仓数据主题覆盖足够全面,我们部门几乎将所有生产表和 Kafka topics 都同步到了 Hive。以下会对同步最常见的两种场景 DB->Hive 和 Kafka->Hive 相关的实践做介绍。
3.1.1 DB 同步到 Hive
特别对生产表到 Hive 的同步,人工配置脚本的方式显然不能处理数以万计的表,因此需要一个自动化的同步方案。自动同步方案需要不仅仅要解决自动创建表脚本、创建对应的同步脚本问题,还需要在当表结构发生变更的时候,能够自动地感知表结构的变化,并且修改表结构和对应的同步脚本。
DB 到 Hive 同步需要依赖两个数据源,1)Schema 表的元数据信息,简单地包括各个字段信息、字段类型及主键定义;2)统计数据,它主要描述的是这个表在数据产生后有没有 UPDATE 和 DELETE,这个决定着后续表的分区方式。
对业务型数据,一条数据生成后可能会有 Update,因为在数仓里绝大部分场景需要用到数据的最新状态,所以我们会用一个分区存放所有历史数据的最新状态,这类表我们称之为历史切片表。对日志型数据,生产上数据产生后就不会有任何修改,我们会选择使用增量分区,每个分区会放当天的增量数据。对基础数据,整个表的数据增加、更新的频率都非常低,在 ods 层我们会每天全量同步一份到最新数据分区,并且会建立一个无分区的下游维表,将数据状态为有效的数据放到这张下游无分区维表中方便流程使用。
有了上述这两个数据源以后,我们会根据 DBA Schema 服务返回的元数据信息生成 Hive 表的脚本,并调度执行生成新的 Hive 表,再依据统计数据决定表的分区方式,进而生成对应新建表的同步脚本。当表创建或者表结构发生变更的时候,通过 Schema 服务两天输出的比对,我们会发现表结构的变更并映射到对应 Hive 表结构变更,同时可以改变对应的同步脚本。还有一种思路是可以通过 DB 发布系统的日志,获知每天 DB 创建、表创建以及表结构变化的增量。
图 4 转化为 json 字符串 RDD 代码示例
如果选择推断的模式,实现的时候可以使用 sampling 的方式,类似 spark jsonRDD 第二个参数,比如说 0.001,Hamal 可以直接指定采样数据条数,从 Kafka topic 中拉取出来,通过 jsonRDD 推断出 StructType,并映射成 Hive 建表语句。对于建好的表,通过表的字段匹配获取数据,最终写入 Hive 表,最后会提交消费记录到一张 Hive 的 ConsumerRecord 表里面。这样其实基于这个表,我们既可以获取 Kafka topic 和 Hive 表的血缘,也可以方便地监控每次同步的数据量。
图 6 数仓分层设计
3.3 数据解析
数据在同步至数据 ods 层后,产品经常会提的一个需求是将 ods 层某个含报文字段的表按照字段设计展开,如果要支持此类需求,数据开发就需要了解生产上这个表各个字段含义及报文字段的契约定义,而这些对应表的写入开发非常熟悉。因此,为了提高整体的工作效率,我们开发了一套数据解析框架,对业务开发封装了大数据组件的 API 调用及相关参数调整,让业务开发更高效地完成熟悉的单条数据解析开发。
图 8 数据质量相关特征
这是我们简单的一个日志输出,第一张是 Spark 的执行日志,下面一张是 MapReduce 的执行日志。
图 9 MR 和 Spark 引擎执行日志示例
有了数据质量特征提取的逻辑,实时流程异常发现可以如下实施:我们可以将质量特征数据计算分成两块,一块是实时的针对单个流程日志的解析出相关特征,一块是离线的基于历史特征数据的统计。我们从消息队列中消费实时获取执行完成的流程 id 和 actionid,通过运维团队提供的详情日志查询接口获取完整日志,通过特征解析逻辑,解析出实时的流程质量相关特征,匹配历史数据,应用规则。当满足异常规则,可以通过元数据信息中的血缘判断影响的范围,推送告警信息。
图 11 价格监控系统
六、小结
一套完整的数据仓库实施方案应该包括但不局限于上面介绍的数据同步方案、数据存储方案、数据规范、元数据建设、数据质量体系、运维工具等,每个实施团队应该根据面临的实际情况选择针对每个点的具体技术方案。
携程机票数据仓库团队也正朝着建设全面、规范、易用、高效、精准的数仓路上探索前行,当前在数据同步、数仓数据扭转以及出仓应用方面的实践方案还在随着需求的变化而迭代。接下来,我们团队会着重在数据仓库规范彻底落地以及实时数仓实施这些方向上努力。
本文转载自公众号携程技术(ID:ctriptech)。