【源码解析】Flink SQL 执行框架脉络梳理
在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发过程是未来的整体发展趋势。尽管 SQL 简化了使用的门槛,但是如何将 SQL 等价转换到现有的数据处理引擎中却并非易事,尤其是在流式数据处理框架中。
最近,Flink 发布1.14 版本,该版本保留一个Planner(Blink Planner在Flink 1.9 引入,Flink 1.13 默认使用 Blink Planner,Flink 1.14 去除Blink Planner痕迹,见jira FLINK-22879)。接下来介绍 Flink SQL 的整体执行流程。
背景知识
SQL 的执行流程一般分为四个主要的阶段,Flink 主要依赖于 Calicte 来完成这一流程:
Parse:语法解析,把 SQL 语句转换成为一个抽象语法树(AST),在 Calcite 中用
SqlNode
来表示;Validate:语法校验,根据元数据信息进行验证,例如查询的表、使用的函数是否存在等,校验之后仍然是
SqlNode
构成的语法树;Optimize:查询计划优化,这里其实包含了两部分,1)首先将
SqlNode
语法树转换成关系表达式 RelNode
构成的逻辑树,2)然后使用优化器基于规则进行等价变换,例如我们比较熟悉的谓词下推、列裁剪等,经过优化器优化后得到最优的查询计划;Execute:将逻辑查询计划翻译成物理执行计划,生成对应的可执行代码,提交运行。
Flink SQL 的处理也大体遵循上述处理流程。Calcite 自身的概念较为庞杂,尤其是其内部使用的 HepPlanner 和 VolcanoPlanner 优化器更是非常复杂,但好在 Calcite 的可扩展性很强,优化器的优化规则也可以很容易地进行扩展,因此如果只是了解 Flink SQL 的基本原理和扩展方法,也无需对 Calcite 的代码了解的非常透彻。关于 Calcite 的基本介绍可以参考这个Slide。
Flink SQL 接口
通过对 table 模块进行拆分和重构,Flink SQL 抽象出了
Planner
接口和 Executor
接口,可以支持多个不同的SQL 执行器,用户可以自行选择希望使用的 Runner。不同的 Runner 只需要正确地实现这两个接口即可。在 1.9 版本中,Flink 提供了两套 SQL Runner,分别对应之前社区已有的 Flink SQL Runner 和新的 Blink Runner,Blink Runner 目前只是一个预览版本,在后续的迭代中会取代现有的 Flink Runner。Planner
接口
@Internal
publicinterfacePlanner{
Parser getParser();
List<Transformation >> translate(List<ModifyOperation> modifyOperations);
Executor
接口
publicinterfaceExecutor{
Pipeline createPipeline(
List<Transformation<?>> transformations,
ReadableConfig tableConfiguration,
@Nullable String defaultJobName);
JobExecutionResult execute(Pipeline pipeline)throws Exception;
Flink SQL 代码流程
先看一个完整的Flink SQL 代码示例,有个整体的印象。
// 创建执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment bsTableEnv = TableEnvironment.create(settings);
//注册source和sink
bsTableEnv.executeSql(sourceDDL);
bsTableEnv.executeSql(sinkDDL);
bsTableEnv.executeSql(sql);
SQL 解析
SQL 的解析在
ParserImpl.parse()
中实现:1)首先使用 Calcite 的解析出抽象语法树 SqlNode
,2)然后结合元数据对 SQL 语句进行验证,3)将 SqlNode
转换为 RelNode
,4)并最终封装为 Flink 内部对查询操作的抽象 QueryOperation
。
publicclassTableEnvironmentImplimplementsTableEnvironmentInternal{
public Parser getParser() {
return getPlanner().getParser();
}
abstractclassPlannerBase(
executor: Executor,
config: TableConfig,
val moduleManager: ModuleManager,
val functionCatalog: FunctionCatalog,
val catalogManager: CatalogManager,
isStreamingMode: Boolean)
extends Planner {
override def getParser: Parser = {
if (parser == null || getTableConfig.getSqlDialect != currentDialect) {
parser = createNewParser
currentDialect = getTableConfig.getSqlDialect
}
parser
}
/** Implementation of {@link Parser} that uses Calcite. */
publicclassParserImplimplementsParser{
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// 先用ExtendedParser解析Calcite不支持的SQL语句
Optional<Operation> command = EXTENDED_PARSER.parse(statement);
if (command.isPresent()) {
return Collections.singletonList(command.get());
}
// parse the sql query
// 1)这一步解析得到 SqlNode
SqlNode parsed = parser.parse(statement);
// 使用 SqlToOperationConverter 将 SqlNode 转化为 Operation
Operation operation =
SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
publicclassSqlToOperationConverter{
publicstatic Optional<Operation> convert(
FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {
// 2) 结合元数据验证 Sql 的合法性
final SqlNode validated = flinkPlanner.validate(sqlNode);
// 将 SqlNode 转化为 Operation
SqlToOperationConverter converter =
new SqlToOperationConverter(flinkPlanner, catalogManager);
if (validated instanceof SqlCreateCatalog) {
return Optional.of(converter.convertCreateCatalog((SqlCreateCatalog) validated));
} elseif (validated instanceof SqlDropCatalog) {
return Optional.of(converter.convertDropCatalog((SqlDropCatalog) validated));
...
// 查询语句转化为 Operation
} elseif (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
return Optional.empty();
}
/** Fallback method for sql query. */
private Operation convertSqlQuery(SqlNode node){
return toQueryOperation(flinkPlanner, node);
}
private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated){
// 3) 使用 SqlToRelConverter 将 SqlNode 转换成 RelNode
RelRoot relational = planner.rel(validated);
// 4) 将 RelNode 封装成 PlannerQueryOperation
returnnew PlannerQueryOperation(relational.project());
}
Flink 主要借助于 Calcite 完成 SQl 的解析和优化,而后续的优化部分其实都是直接基于 RelNode 来完成的,那么这里为什么又多出了一个 QueryOperation 呢?Flink SQL 支持纯 SQL 语句和Table API 编程,对 Table API 的支持主要借助 Table, Operation 和 Expression 等接口,这块后续专开一篇文章来写。
SQL 转换及优化
SQL 语句解析成 Operation 后,为了得到 Flink 运行时的具体操作算子,需要进一步将 ModifyOperation 转换为 Transformation。
注意,Planner.translate(List<ModifyOperation> modifyOperations) 方法接收的参数是 ModifyOperation,ModifyOperation 对应的是一个 DML 的操作。比如,在将查询结果插入到一张结果表或者转换为 DataStream 时,就会得到 ModifyOperation。
转换的流程主要分为四个部分,即 1)将 Operation 转换为 RelNode,2)优化 RelNode,3)转换成 ExecNodeGraph,4)转换为底层的 Transformation 算子。
abstractclassPlannerBase(
executor: Executor,
config: TableConfig,
val moduleManager: ModuleManager,
val functionCatalog: FunctionCatalog,
val catalogManager: CatalogManager,
isStreamingMode: Boolean)
extends Planner {
override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
validateAndOverrideConfiguration()
if (modifyOperations.isEmpty) {
return List.empty[Transformation[_]]
}
// 1)将 Operation 转换为 RelNode
val relNodes = modifyOperations.map(translateToRel)
// 2)优化 RelNode
val optimizedRelNodes = optimize(relNodes)
// 3)转换成 ExecNodeGraph,其中ExecNodeGraph 是 ExecNode的一种拓扑表示
val execGraph = translateToExecNodeGraph(optimizedRelNodes)
// 4)转换为底层的 Transformation 算子
val transformations = translateToPlan(execGraph)
cleanupInternalConfigurations()
transformations
}
首先是将 Operation 转换为 RelNode,这个转换操作借助 QueryOperationConverter 完成。Operation 其实类似于 SQL 语法树,也构成一个树形结构,并实现了访问者模式,支持使用 QueryOperationVisitor 遍历整棵树,QueryOperationConverter 实现了 QueryOperationVisitor 接口。对于 PlannerQueryOperation,其内部封装的就是已经构建好的 RelNode,直接取出即可;对于其它类型的 Operation,则按需转换为对应的 RelNode。
abstractclassPlannerBase(
executor: Executor,
config: TableConfig,
val moduleManager: ModuleManager,
val functionCatalog: FunctionCatalog,
val catalogManager: CatalogManager,
isStreamingMode: Boolean)
extends Planner {
private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {
val dataTypeFactory = catalogManager.getDataTypeFactory
modifyOperation match {
...
// 这里主要看 QueryOperation,其它暂时略去便于理解
case collectModifyOperation: CollectModifyOperation =>
val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
DynamicSinkUtils.convertCollectToRel(
getRelBuilder,
input,
collectModifyOperation,
getTableConfig.getConfiguration,
getClassLoader
)
...
case _ =>
throw new TableException(s"Unsupported ModifyOperation: $modifyOperation")
}
}
得到 RelNode 后, Calcite 使用 CommonSubGraphBasedOptimizer 优化器(针对流式、批次的优化分别实现了子类StreamCommonSubGraphBasedOptimizer、BatchCommonSubGraphBasedOptimizer)进行 RelNode 的优化流程。优化器将拥有共同子树的 RelNode 看作一个 DAG 结构,并将 DAG 划分成 RelNodeBlock,然后在RelNodeBlock 的基础上进行优化操作。这和正常的 Calcite 处理流程还是保持一致的。
abstract class PlannerBase(
executor: Executor,
config: TableConfig,
val moduleManager: ModuleManager,
val functionCatalog: FunctionCatalog,
val catalogManager: CatalogManager,
isStreamingMode: Boolean)
extends Planner {
@VisibleForTesting
private[flink] def optimize(relNodes: Seq[RelNode]): Seq[RelNode] = {
val optimizedRelNodes = getOptimizer.optimize(relNodes)
require(optimizedRelNodes.size == relNodes.size)
optimizedRelNodes
}
abstractclassCommonSubGraphBasedOptimizerextendsOptimizer{
override def optimize(roots: Seq[RelNode]): Seq[RelNode] = {
//以RelNodeBlock为单位进行优化,在子类(StreamCommonSubGraphBasedOptimizer,BatchCommonSubGraphBasedOptimizer)中实现
val sinkBlocks = doOptimize(roots)
//获得优化后的逻辑计划
val optimizedPlan = sinkBlocks.map { block =>
val plan = block.getOptimizedPlan
require(plan != null)
plan
}
//将 RelNodeBlock 使用的中间表展开
expandIntermediateTableScan(optimizedPlan)
}
接着讲下具体的优化现实。Caclite 用一套基于规则的框架优化逻辑计划,用户可以通过添加规则进行扩展,Flink 也是基于自定义规则来实现整个优化过程。优化器主要涉及三个方法doOptimize 、optimizeBlock、optimizeTree。
下面FlinkBatchProgram.buildProgram是调用Batch链式优化的入口。
classBatchCommonSubGraphBasedOptimizer(planner: BatchPlanner)
extends CommonSubGraphBasedOptimizer {
overrideprotected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
// build RelNodeBlock plan
val rootBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, planner.getTableConfig)
// optimize recursively RelNodeBlock
rootBlocks.foreach(optimizeBlock)
rootBlocks
}
private def optimizeBlock(block: RelNodeBlock): Unit = {
block.children.foreach { child =>
if (child.getNewOutputNode.isEmpty) {
optimizeBlock(child)
}
}
val originTree = block.getPlan
// optimizeTree
val optimizedTree = optimizeTree(originTree)
optimizedTree match {
case _: LegacySink | _: Sink => // ignore
case _ =>
val name = createUniqueIntermediateRelTableName
val intermediateRelTable = new IntermediateRelTable(Collections.singletonList(name),
optimizedTree)
val newTableScan = wrapIntermediateRelTableToTableScan(intermediateRelTable, name)
block.setNewOutputNode(newTableScan)
block.setOutputTableName(name)
}
block.setOptimizedPlan(optimizedTree)
}
private def optimizeTree(relNode: RelNode): RelNode = {
val config = planner.getTableConfig
// 分FlinkStreamProgram 和 FlinkBatchProgram设置优化规则
// FlinkBatchProgram.buildProgram 是调用链式优化的入口。
val programs = TableConfigUtils.getCalciteConfig(config).getBatchProgram
.getOrElse(FlinkBatchProgram.buildProgram(config.getConfiguration))
Preconditions.checkNotNull(programs)
val context = relNode.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext])
programs.optimize(relNode, new BatchOptimizeContext {
override def isBatchMode: Boolean = true
override def getTableConfig: TableConfig = config
override def getFunctionCatalog: FunctionCatalog = planner.functionCatalog
override def getCatalogManager: CatalogManager = planner.catalogManager
override def getModuleManager: ModuleManager = planner.moduleManager
override def getSqlExprToRexConverterFactory: SqlExprToRexConverterFactory =
context.getSqlExprToRexConverterFactory
override def getFlinkRelBuilder: FlinkRelBuilder = planner.getRelBuilder
override def needFinalTimeIndicatorConversion: Boolean = true
})
}
Flink 构造了一个链式的优化程序FlinkChainedProgram,实际创建了一个FlinkChainedProgram序列,可以按顺序使用多套规则集合完成 RelNode 的优化过程。通过addFirst、addLast 、 addBefore等方法调整规则顺序。
object FlinkBatchProgram {
def buildProgram(config: Configuration): FlinkChainedProgram[BatchOptimizeContext] = {
val chainedProgram = new FlinkChainedProgram[BatchOptimizeContext]()
...
// 优化物理逻辑计划
chainedProgram.addLast(
PHYSICAL,
FlinkVolcanoProgramBuilder.newBuilder
.add(FlinkBatchRuleSets.PHYSICAL_OPT_RULES)
.setRequiredOutputTraits(Array(FlinkConventions.BATCH_PHYSICAL))
.build())
// 物理逻辑计划重写
chainedProgram.addLast(
PHYSICAL_REWRITE,
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.PHYSICAL_REWRITE)
.build())
chainedProgram
}
}
在FlinkBatchProgram 中定义了一系列扩展规则,用于构造逻辑计划的优化器。Flink 扩展了 RelNode,抽象出一个FlinkRelNode,增加了 FlinkLogicalRel 和 FlinkPhysicalRel 这两类 RelNode,对应的 Convention 分别为 FlinkConventions.LOGICAL 和 FlinkConventions.BATCH_PHYSICAL (或FlinkConventions.STREAM_PHYSICAL)。在优化器的处理过程中,RelNode 会从 Calcite 内部定义的节点转换为 FlinkLogicalRel 节点(FlinkConventions.LOGICAL),并最终被转换为 FlinkPhysicalRel 节点FlinkConventions.BATCH_PHYSICAL (或FlinkConventions.STREAM_PHYSICAL)。这两类转换规则(逻辑优化、物理逻辑优化)分别对应 FlinkBatchRuleSets.LOGICAL_OPT_RULES 和 FlinkBatchRuleSets.PHYSICAL_OPT_RULES。在 FlinkStreamProgram 中的代码逻辑类似。
经过优化器处理后,得到的逻辑树中的所有节点都是FlinkPhysicalRel ,以待生成物理执行计划了。
abstractclassPlannerBase(
executor: Executor,
config: TableConfig,
val moduleManager: ModuleManager,
val functionCatalog: FunctionCatalog,
val catalogManager: CatalogManager,
isStreamingMode: Boolean)
extends Planner {
private[flink] def translateToExecNodeGraph(optimizedRelNodes: Seq[RelNode]): ExecNodeGraph = {
val nonPhysicalRel = optimizedRelNodes.filterNot(_.isInstanceOf[FlinkPhysicalRel])
if (nonPhysicalRel.nonEmpty) {
throw new TableException("The expected optimized plan is FlinkPhysicalRel plan, " +
s"actual plan is ${nonPhysicalRel.head.getClass.getSimpleName} plan.")
}
require(optimizedRelNodes.forall(_.isInstanceOf[FlinkPhysicalRel]))
// Rewrite same rel object to different rel objects
// in order to get the correct dag (dag reuse is based on object not digest)
val shuttle = new SameRelObjectShuttle()
val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle))
// reuse subplan
val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, config)
// 将 FlinkPhysicalRel DAG 转化城 ExecNodeGraph
val generator = new ExecNodeGraphGenerator()
val execGraph = generator.generate(reusedPlan.map(_.asInstanceOf[FlinkPhysicalRel]))
// process the graph
val context = new ProcessorContext(this)
val processors = getExecNodeGraphProcessors
processors.foldLeft(execGraph)((graph, processor) => processor.process(graph, context))
}
}
首先要将 FlinkPhysicalRel 构成的 DAG 转换成 ExecNodeGraph ,因为可能存在共用子树的情况,这里还会尝试共用相同的子逻辑计划。由于通常 FlinkPhysicalRel 的具体实现类通常也实现了 ExecNode 接口,所以这一步转换较为简单。
classBatchPlanner(
executor: Executor,
config: TableConfig,
moduleManager: ModuleManager,
functionCatalog: FunctionCatalog,
catalogManager: CatalogManager)
extendsPlannerBase(executor, config, moduleManager, functionCatalog, catalogManager,
isStreamingMode = false) {
overrideprotected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {
validateAndOverrideConfiguration()
valplanner = createDummyPlanner()
valtransformations = execGraph.getRootNodes.map {
生成 Transformation
casenode: BatchExecNode[_] => node.translateToPlan(planner)
case_ =>
thrownew TableException("Cannot generate BoundedStream due to an invalid logical plan. " +
is a bug and should not happen. Please file an issue.")
}
cleanupInternalConfigurations()
transformations
}
}
在得到由
ExecNodeGraph
后,就可以尝试生成物理执行计划了,也就是将 ExecNodeGraph
转换为 Flink 内部的 Transformation
算子。不同的 ExecNodeGraph
按照各自的需求生成不同的 Transformation
(上面代码展示的是BatchPlanner),基于这些 Transformation
构建 Flink 的 DAG。SQL 执行
Executor抽象了SQL执行过程,DefaultExecutor是其默认实现。在得到 Transformation 后,利用 Transformation 生成 Pipeline (也就是StreamGraph),然后就可以提交 Flink 任务执行了。
Flink SQL 的执行入口在TableEnvironmentImpl,它调用了DefaultExecutor的实现方法。
在Flink 1.14 中,Flink SQL 统一使用 StreamExecutionEnvironment 。
publicinterfaceExecutor{
// 将Transformation转换成Pipeline
Pipeline createPipeline(
List<Transformation<?>> transformations,
ReadableConfig tableConfiguration,
@Nullable String defaultJobName);
// 执行 Pipeline
JobExecutionResult execute(Pipeline pipeline)throws Exception;
// 异步执行 Pipeline
JobClient executeAsync(Pipeline pipeline)throws Exception;
}
publicclassTableEnvironmentImplimplementsTableEnvironmentInternal{
public TableResultInternal executeInternal(Operation operation){
if (operation instanceof ModifyOperation) {
return executeInternal(Collections.singletonList((ModifyOperation) operation));
} elseif (operation instanceof CreateTableOperation) {
...
// 查询语句的执行
} elseif (operation instanceof QueryOperation) {
return executeQueryOperation((QueryOperation) operation);
} elseif (operation instanceof CreateTableASOperation) {
executeInternal(((CreateTableASOperation) operation).getCreateTableOperation());
return executeInternal(((CreateTableASOperation) operation).getInsertOperation());
} else {
thrownew TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
}
}
private TableResultInternal executeQueryOperation(QueryOperation operation){
final UnresolvedIdentifier unresolvedIdentifier =
UnresolvedIdentifier.of(
"Unregistered_Collect_Sink_" + CollectModifyOperation.getUniqueId());
final ObjectIdentifier objectIdentifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
CollectModifyOperation sinkOperation =
new CollectModifyOperation(objectIdentifier, operation);
List<Transformation<?>> transformations =
translate(Collections.singletonList(sinkOperation));
final String defaultJobName = "collect";
// 用transformation 生成执行Pipeline
Pipeline pipeline =
execEnv.createPipeline(
transformations, tableConfig.getConfiguration(), defaultJobName);
try {
// 异步执行 pipeline
JobClient jobClient = execEnv.executeAsync(pipeline);
ResultProvider resultProvider = sinkOperation.getSelectResultProvider();
resultProvider.setJobClient(jobClient);
return TableResultImpl.builder()
.jobClient(jobClient)
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.schema(operation.getResolvedSchema())
.resultProvider(resultProvider)
.setPrintStyle(
PrintStyle.tableauWithTypeInferredColumnWidths(
// sinkOperation.getConsumedDataType() handles legacy types
DataTypeUtils.expandCompositeTypeToSchema(
sinkOperation.getConsumedDataType()),
resultProvider.getRowDataStringConverter(),
PrintStyle.DEFAULT_MAX_COLUMN_WIDTH,
false,
isStreamingMode))
.build();
} catch (Exception e) {
thrownew TableException("Failed to execute sql", e);
}
}
侠天,专注于大数据、机器学习和数学相关的内容,分享相关技术文章。若发现以上文章有任何不妥,请联系我。
接商务合作:大数据项目(包括数据中台的,技术选型、培训、开发、疑难问题救火)。
阅读原文 关键词
方法
接口
逻辑
框架
源码
最新评论
推荐文章
作者最新文章
你可能感兴趣的文章
Copyright Disclaimer: The copyright of contents (including texts, images, videos and audios) posted above belong to the User who shared or the third-party website which the User shared from. If you found your copyright have been infringed, please send a DMCA takedown notice to [email protected]. For more detail of the source, please click on the button "Read Original Post" below. For other communications, please send to [email protected].
版权声明:以上内容为用户推荐收藏至CareerEngine平台,其内容(含文字、图片、视频、音频等)及知识版权均属用户或用户转发自的第三方网站,如涉嫌侵权,请通知[email protected]进行信息删除。如需查看信息来源,请点击“查看原文”。如需洽谈其它事宜,请联系[email protected]。
版权声明:以上内容为用户推荐收藏至CareerEngine平台,其内容(含文字、图片、视频、音频等)及知识版权均属用户或用户转发自的第三方网站,如涉嫌侵权,请通知[email protected]进行信息删除。如需查看信息来源,请点击“查看原文”。如需洽谈其它事宜,请联系[email protected]。