Spark在供应链核算中的应用总结
一 业务背景
二 离线 SQL 模式存在的问题
1.对账问题定位困难,核算小二主要通过下载分录及对应的业务单据汇总数据进行对账,如果某一分录和业务数据有出入,只能逐一业务要素分析,由于缺乏通过分录精确追溯到关联业务单据的下钻能力,问题定位耗时较长,造成这一问题的主要原因在于通过离线SQL实现的原始加工逻辑无法精确的建立业务单据和原始凭证的关联关系。
2.日常运维困难,随着业务的不断发展,业务接入离线任务在不断的膨胀,最终成为一个横跨4个项目空间,150+离线任务、100+离线表的工程,任一节点的错误都会造成月结数据出错。
3.行业实施效率较低,每次新接入行业都需要开发小二新建一套离线表+离线任务,相应的也造成运维问题的持续恶化。
三 为什么选择Spark
1 核心诉求
1.核算规则(业务接入/记账/抛账)可配、可视,不存在黑盒的加工逻辑,加工流程对核算小二全透明(提升实施+对账效率)
2.建立整个核算链路单据维度的关联关系(业务单据<->原始凭证<->记账凭证<->抛账凭证),具备双向的单据追溯能力(提升对账效率)
2 Spark VS MapReduce
3 编程模式优势: RDD + DataFrame 的编程模式
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schemais encoded in a string
val schemaString = "name age"
// Generate the schema based on the stringofschema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convertrecordsof the RDD (people) toRows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schemato the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporaryviewusing the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporaryview created using DataFrames
val results = spark.sql("SELECT name FROM people")
四 spark基础介绍
1 基础概念
Rdd(Resilient distributed dataset):不可变的弹性分布式数据集(不可变性似于docker中的只读镜像层),只能通过其他的transformation算子创建新的RDD。 Operations:算子,spark包括两类算子,transformation(转换算子,通过对前置rdd的处理生成新的rdd)/action(触发spark job的拆分及执行,负责将rdd输出)。
Task:执行器执行的任务单元,一般基于当前rdd的分区数量拆分。
Job:包含多个task的集合,基于Action算子拆分。
Stage:基于当前rdd处理逻辑的宽窄依赖拆分,spark中非常重要的概念,stage的切换会涉及到IO。
Narrow/Wide dependencies:参考下图,区分的重要依据在于父节点是否会被多个子节点使用。
2 Spark on MaxCompute(ODPS)
Spark 有很多的后端的 Runtime,例如其商业化公司的Databricks Runtime, 弹内我们使用的是 AliSpark,是集团的适配 MaxComputer,同时在离线交互是使用了 Cupid-SDK 的 Client模式,这个模式不是独立集群的模式,类Serveless 模式,整体的成本上比独立集群要低,当然资源保障上没有独立集群好。
集团client模式将spark session作为服务提供,可以方便地与在线系统交互,包括任务的提交、关闭、实例的关闭等;
在使用集团提供的spark能力时,比较麻烦的在于如何方便的查看日志,从我们的实践看主要有以下2个路径;
申请odps对应项目空间的logview权限,可以直接在https://logview.alibaba-inc.com/中基于sparkInstanceId定位到具体的日志;
借助odps client+提交spark任务时返回的实例ID获取log地址,代码参考如下:
//instanceIdd对应odps client中的lookupName
Account account = new AliyunAccount(sparkSessionConfig.getAccessId(), sparkSessionConfig.getAccessKey());
Odps odps = new Odps(account);
odps.setEndpoint(sparkSessionConfig.getEndPoint());
odps.setDefaultProject(sparkSessionConfig.getNamespace());
//日志地址目前设定有效期为7*24小时
try {
return odps.logview().generateLogView(odps.instances().get(sparkInstanceId), 7 * 24L);
} catch (OdpsException e) {
LOGGER.error("生成logView地址失败,config:{},instanceId:{},e:{}", sparkSessionConfig, sparkInstanceId, e);
}
五 技术方案
1 整体方案
spark任务管理:负责spark任务相关生命周期的管理,承接核算任务和spark session之间的交互;
spark session管理:负责spark实例的创建、销毁、job提交等,另外针对不同类型的session,支持自定义所需资源,包括实例worker数量、分区大小等,主要与spark on odps交互;
核算任务管理:负责业务接入、记账、抛账等核算任务的生命周期管理;
spark job版本管理:spark任务所需jar包会不断的迭代,针对不同的核算场景可以定制所需的job版本;
2 数据处理流程
六 运维及调优
1 数据量评估
spark.hadoop.odps.input.split.size:用于设置spark读取odps离线表的分区大小,默认为256M,在实践过程中需要结合当前分区的大小进行调整,比如当前分区大小为1GB,那么默认情况下会拆分为4个分区 ;
spark读写lindorm(类hbase)的分区数主要受到region数量的影响,在供应链核算系统的实践中,由于初始region数量较少,导致分区数量很少,spark执行效率很差,,针对此问题我们实践了两种处理策略 ;
1.进行重分区(repartition算子):针对数据倾斜进行重新分区,但是会拆分stage,触发shuffle,增加额外的IO成本。
2.lindorm进行预分区,比如预分区为128个region,但此种实现方案需要结合rowkey的设计一起使用,会影响到scan的效率。
2 代码逻辑相关job/stage/task评估
慎用效率角度的算子,比如groupBy 尽量减少stage数量
3 计算存储资源评估
spark.executor.instances executor:设置当前实例的worker数量 ;
spark.executor.cores:核数,每个Executor中的可同时运行的task数目 ;
spark.executor.memory:executor内存 ;
4 其他参数
5 Spark UI
6 交互式开发测试
Web-based notebook that enables data-driven, interactive data analytics and collaborative documents with SQL, Scala, Python, R and more.
7 效果
七 总结
参考文档
Elasticsearch实战进阶营
关键词
任务
问题
问题
过程中
实例
最新评论
推荐文章
作者最新文章
你可能感兴趣的文章
Copyright Disclaimer: The copyright of contents (including texts, images, videos and audios) posted above belong to the User who shared or the third-party website which the User shared from. If you found your copyright have been infringed, please send a DMCA takedown notice to [email protected]. For more detail of the source, please click on the button "Read Original Post" below. For other communications, please send to [email protected].
版权声明:以上内容为用户推荐收藏至CareerEngine平台,其内容(含文字、图片、视频、音频等)及知识版权均属用户或用户转发自的第三方网站,如涉嫌侵权,请通知[email protected]进行信息删除。如需查看信息来源,请点击“查看原文”。如需洽谈其它事宜,请联系[email protected]。
版权声明:以上内容为用户推荐收藏至CareerEngine平台,其内容(含文字、图片、视频、音频等)及知识版权均属用户或用户转发自的第三方网站,如涉嫌侵权,请通知[email protected]进行信息删除。如需查看信息来源,请点击“查看原文”。如需洽谈其它事宜,请联系[email protected]。