在数据处理领域,无论是实时数据处理还是离线数据处理,使用 SQL 简化开发过程是未来的整体发展趋势。尽管 SQL 简化了使用的门槛,但是如何将 SQL 等价转换到现有的数据处理引擎中却并非易事,尤其是在流式数据处理框架中。
最近,Flink 发布1.14 版本,该版本保留一个Planner(Blink Planner在Flink 1.9 引入,Flink 1.13 默认使用 Blink Planner,Flink 1.14 去除Blink Planner痕迹,见jira FLINK-22879)。接下来介绍 Flink SQL 的整体执行流程。

背景知识

SQL 的执行流程一般分为四个主要的阶段,Flink 主要依赖于 Calicte 来完成这一流程:
Parse:语法解析,把 SQL 语句转换成为一个抽象语法树(AST),在 Calcite 中用 SqlNode 来表示;
Validate:语法校验,根据元数据信息进行验证,例如查询的表、使用的函数是否存在等,校验之后仍然是 SqlNode 构成的语法树;
Optimize:查询计划优化,这里其实包含了两部分,1)首先将 SqlNode 语法树转换成关系表达式 RelNode 构成的逻辑树,2)然后使用优化器基于规则进行等价变换,例如我们比较熟悉的谓词下推、列裁剪等,经过优化器优化后得到最优的查询计划;
Execute:将逻辑查询计划翻译成物理执行计划,生成对应的可执行代码,提交运行。
Flink SQL 的处理也大体遵循上述处理流程。Calcite 自身的概念较为庞杂,尤其是其内部使用的 HepPlanner 和 VolcanoPlanner 优化器更是非常复杂,但好在 Calcite 的可扩展性很强,优化器的优化规则也可以很容易地进行扩展,因此如果只是了解 Flink SQL 的基本原理和扩展方法,也无需对 Calcite 的代码了解的非常透彻。关于 Calcite 的基本介绍可以参考这个Slide

Flink SQL 接口

通过对 table 模块进行拆分和重构,Flink SQL 抽象出了 Planner 接口和 Executor 接口,可以支持多个不同的SQL 执行器,用户可以自行选择希望使用的 Runner。不同的 Runner 只需要正确地实现这两个接口即可。在 1.9 版本中,Flink 提供了两套 SQL Runner,分别对应之前社区已有的 Flink SQL Runner 和新的 Blink Runner,Blink Runner 目前只是一个预览版本,在后续的迭代中会取代现有的 Flink Runner。

Planner 接口

@InternalpublicinterfacePlanner{ Parser getParser();List<Transformation<?>> translate(List<ModifyOperation> modifyOperations);

Executor 接口

@InternalpublicinterfaceExecutor{Pipeline createPipeline( List<Transformation<?>> transformations, ReadableConfig tableConfiguration, @Nullable String defaultJobName);JobExecutionResult execute(Pipeline pipeline)throws Exception;

Flink SQL 代码流程

先看一个完整的Flink SQL 代码示例,有个整体的印象。
// 创建执行环境 EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); TableEnvironment bsTableEnv = TableEnvironment.create(settings);//注册source和sink bsTableEnv.executeSql(sourceDDL); bsTableEnv.executeSql(sinkDDL);        bsTableEnv.executeSql(sql);

SQL 解析

SQL 的解析在 ParserImpl.parse() 中实现:1)首先使用 Calcite 的解析出抽象语法树 SqlNode,2)然后结合元数据对 SQL 语句进行验证,3)将 SqlNode 转换为 RelNode,4)并最终封装为 Flink 内部对查询操作的抽象 QueryOperation
@InternalpublicclassTableEnvironmentImplimplementsTableEnvironmentInternal{@Overridepublic Parser getParser() {return getPlanner().getParser(); }abstractclassPlannerBase( executor: Executor, config: TableConfig,val moduleManager: ModuleManager,val functionCatalog: FunctionCatalog,val catalogManager: CatalogManager, isStreamingMode: Boolean) extends Planner {override def getParser: Parser = {if (parser == null || getTableConfig.getSqlDialect != currentDialect) { parser = createNewParser currentDialect = getTableConfig.getSqlDialect } parser }/** Implementation of {@link Parser} that uses Calcite. */publicclassParserImplimplementsParser{@Overridepublic List<Operation> parse(String statement) { CalciteParser parser = calciteParserSupplier.get(); FlinkPlannerImpl planner = validatorSupplier.get();// 先用ExtendedParser解析Calcite不支持的SQL语句 Optional<Operation> command = EXTENDED_PARSER.parse(statement);if (command.isPresent()) {return Collections.singletonList(command.get()); }// parse the sql query// 1)这一步解析得到 SqlNode SqlNode parsed = parser.parse(statement);// 使用 SqlToOperationConverter 将 SqlNode 转化为 Operation Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed) .orElseThrow(() -> new TableException("Unsupported query: " + statement));return Collections.singletonList(operation);    }
publicclassSqlToOperationConverter{publicstatic Optional<Operation> convert( FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {// 2) 结合元数据验证 Sql 的合法性final SqlNode validated = flinkPlanner.validate(sqlNode);// 将 SqlNode 转化为 Operation SqlToOperationConverter converter =new SqlToOperationConverter(flinkPlanner, catalogManager);if (validated instanceof SqlCreateCatalog) {return Optional.of(converter.convertCreateCatalog((SqlCreateCatalog) validated)); } elseif (validated instanceof SqlDropCatalog) {return Optional.of(converter.convertDropCatalog((SqlDropCatalog) validated)); ...// 查询语句转化为 Operation } elseif (validated.getKind().belongsTo(SqlKind.QUERY)) {return Optional.of(converter.convertSqlQuery(validated)); } else {return Optional.empty(); }/** Fallback method for sql query. */private Operation convertSqlQuery(SqlNode node){return toQueryOperation(flinkPlanner, node); }private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated){// 3) 使用 SqlToRelConverter 将 SqlNode 转换成 RelNode RelRoot relational = planner.rel(validated);// 4) 将 RelNode 封装成 PlannerQueryOperationreturnnew PlannerQueryOperation(relational.project()); }
Flink 主要借助于 Calcite 完成 SQl 的解析和优化,而后续的优化部分其实都是直接基于 RelNode 来完成的,那么这里为什么又多出了一个 QueryOperation 呢?Flink SQL 支持纯 SQL 语句和Table API 编程,对 Table API 的支持主要借助 Table, Operation 和 Expression 等接口,这块后续专开一篇文章来写。

SQL 转换及优化

SQL 语句解析成 Operation 后,为了得到 Flink 运行时的具体操作算子,需要进一步将 ModifyOperation 转换为 Transformation。
注意,Planner.translate(List<ModifyOperation> modifyOperations) 方法接收的参数是 ModifyOperation,ModifyOperation 对应的是一个 DML 的操作。比如,在将查询结果插入到一张结果表或者转换为 DataStream 时,就会得到 ModifyOperation。
转换的流程主要分为四个部分,即 1)将 Operation 转换为 RelNode,2)优化 RelNode,3)转换成 ExecNodeGraph,4)转换为底层的 Transformation 算子。
abstractclassPlannerBase( executor: Executor, config: TableConfig,val moduleManager: ModuleManager,val functionCatalog: FunctionCatalog,val catalogManager: CatalogManager, isStreamingMode: Boolean) extends Planner {override def translate( modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = { validateAndOverrideConfiguration()if (modifyOperations.isEmpty) {return List.empty[Transformation[_]] }// 1)将 Operation 转换为 RelNodeval relNodes = modifyOperations.map(translateToRel)// 2)优化 RelNodeval optimizedRelNodes = optimize(relNodes)// 3)转换成 ExecNodeGraph,其中ExecNodeGraph 是 ExecNode的一种拓扑表示val execGraph = translateToExecNodeGraph(optimizedRelNodes)// 4)转换为底层的 Transformation 算子val transformations = translateToPlan(execGraph) cleanupInternalConfigurations() transformations }
首先是将 Operation 转换为 RelNode,这个转换操作借助 QueryOperationConverter 完成。Operation 其实类似于 SQL 语法树,也构成一个树形结构,并实现了访问者模式,支持使用 QueryOperationVisitor 遍历整棵树,QueryOperationConverter 实现了 QueryOperationVisitor 接口。对于 PlannerQueryOperation,其内部封装的就是已经构建好的 RelNode,直接取出即可;对于其它类型的 Operation,则按需转换为对应的 RelNode。
abstractclassPlannerBase( executor: Executor, config: TableConfig,val moduleManager: ModuleManager,val functionCatalog: FunctionCatalog,val catalogManager: CatalogManager, isStreamingMode: Boolean) extends Planner {@VisibleForTestingprivate[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {val dataTypeFactory = catalogManager.getDataTypeFactory modifyOperation match { ...// 这里主要看 QueryOperation,其它暂时略去便于理解 case collectModifyOperation: CollectModifyOperation =>val input = getRelBuilder.queryOperation(modifyOperation.getChild).build() DynamicSinkUtils.convertCollectToRel( getRelBuilder, input, collectModifyOperation, getTableConfig.getConfiguration, getClassLoader ) ... case _ =>throw new TableException(s"Unsupported ModifyOperation: $modifyOperation") } }
得到 RelNode 后, Calcite 使用 CommonSubGraphBasedOptimizer 优化器(针对流式、批次的优化分别实现了子类StreamCommonSubGraphBasedOptimizer、BatchCommonSubGraphBasedOptimizer)进行 RelNode 的优化流程。优化器将拥有共同子树的 RelNode 看作一个 DAG 结构,并将 DAG 划分成 RelNodeBlock,然后在RelNodeBlock 的基础上进行优化操作。这和正常的 Calcite 处理流程还是保持一致的。
abstract class PlannerBase( executor: Executor, config: TableConfig, val moduleManager: ModuleManager, val functionCatalog: FunctionCatalog, val catalogManager: CatalogManager, isStreamingMode: Boolean) extends Planner { @VisibleForTesting private[flink] def optimize(relNodes: Seq[RelNode]): Seq[RelNode] = { val optimizedRelNodes = getOptimizer.optimize(relNodes)require(optimizedRelNodes.size == relNodes.size) optimizedRelNodes }
abstractclassCommonSubGraphBasedOptimizerextendsOptimizer{ override def optimize(roots: Seq[RelNode]): Seq[RelNode] = {//以RelNodeBlock为单位进行优化,在子类(StreamCommonSubGraphBasedOptimizer,BatchCommonSubGraphBasedOptimizer)中实现 val sinkBlocks = doOptimize(roots)//获得优化后的逻辑计划 val optimizedPlan = sinkBlocks.map { block => val plan = block.getOptimizedPlanrequire(plan != null) plan }//将 RelNodeBlock 使用的中间表展开 expandIntermediateTableScan(optimizedPlan) }
接着讲下具体的优化现实。Caclite 用一套基于规则的框架优化逻辑计划,用户可以通过添加规则进行扩展,Flink 也是基于自定义规则来实现整个优化过程。优化器主要涉及三个方法doOptimize 、optimizeBlock、optimizeTree。
下面FlinkBatchProgram.buildProgram是调用Batch链式优化的入口。
classBatchCommonSubGraphBasedOptimizer(planner: BatchPlanner) extends CommonSubGraphBasedOptimizer {overrideprotected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {// build RelNodeBlock planval rootBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, planner.getTableConfig)// optimize recursively RelNodeBlock rootBlocks.foreach(optimizeBlock) rootBlocks }private def optimizeBlock(block: RelNodeBlock): Unit = { block.children.foreach { child =>if (child.getNewOutputNode.isEmpty) { optimizeBlock(child) } }val originTree = block.getPlan// optimizeTreeval optimizedTree = optimizeTree(originTree) optimizedTree match { case _: LegacySink | _: Sink => // ignore case _ =>val name = createUniqueIntermediateRelTableNameval intermediateRelTable = new IntermediateRelTable(Collections.singletonList(name), optimizedTree)val newTableScan = wrapIntermediateRelTableToTableScan(intermediateRelTable, name) block.setNewOutputNode(newTableScan) block.setOutputTableName(name) } block.setOptimizedPlan(optimizedTree) }private def optimizeTree(relNode: RelNode): RelNode = {val config = planner.getTableConfig// 分FlinkStreamProgram 和 FlinkBatchProgram设置优化规则// FlinkBatchProgram.buildProgram 是调用链式优化的入口。val programs = TableConfigUtils.getCalciteConfig(config).getBatchProgram .getOrElse(FlinkBatchProgram.buildProgram(config.getConfiguration)) Preconditions.checkNotNull(programs)val context = relNode.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]) programs.optimize(relNode, new BatchOptimizeContext {override def isBatchMode: Boolean = trueoverride def getTableConfig: TableConfig = configoverride def getFunctionCatalog: FunctionCatalog = planner.functionCatalogoverride def getCatalogManager: CatalogManager = planner.catalogManageroverride def getModuleManager: ModuleManager = planner.moduleManageroverride def getSqlExprToRexConverterFactory: SqlExprToRexConverterFactory = context.getSqlExprToRexConverterFactoryoverride def getFlinkRelBuilder: FlinkRelBuilder = planner.getRelBuilderoverride def needFinalTimeIndicatorConversion: Boolean = true })  }
Flink 构造了一个链式的优化程序FlinkChainedProgram,实际创建了一个FlinkChainedProgram序列,可以按顺序使用多套规则集合完成 RelNode 的优化过程。通过addFirst、addLast 、 addBefore等方法调整规则顺序。
object FlinkBatchProgram {def buildProgram(config: Configuration): FlinkChainedProgram[BatchOptimizeContext] = { val chainedProgram = new FlinkChainedProgram[BatchOptimizeContext]() ...// 优化物理逻辑计划 chainedProgram.addLast( PHYSICAL, FlinkVolcanoProgramBuilder.newBuilder .add(FlinkBatchRuleSets.PHYSICAL_OPT_RULES) .setRequiredOutputTraits(Array(FlinkConventions.BATCH_PHYSICAL)) .build()) // 物理逻辑计划重写 chainedProgram.addLast( PHYSICAL_REWRITE, FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) .add(FlinkBatchRuleSets.PHYSICAL_REWRITE) .build()) chainedProgram }}
在FlinkBatchProgram 中定义了一系列扩展规则,用于构造逻辑计划的优化器。Flink 扩展了 RelNode,抽象出一个FlinkRelNode,增加了 FlinkLogicalRel 和 FlinkPhysicalRel 这两类 RelNode,对应的 Convention 分别为 FlinkConventions.LOGICAL 和 FlinkConventions.BATCH_PHYSICAL (或FlinkConventions.STREAM_PHYSICAL)。在优化器的处理过程中,RelNode 会从 Calcite 内部定义的节点转换为 FlinkLogicalRel 节点(FlinkConventions.LOGICAL),并最终被转换为 FlinkPhysicalRel 节点FlinkConventions.BATCH_PHYSICAL (或FlinkConventions.STREAM_PHYSICAL)。这两类转换规则(逻辑优化、物理逻辑优化)分别对应 FlinkBatchRuleSets.LOGICAL_OPT_RULES 和 FlinkBatchRuleSets.PHYSICAL_OPT_RULES。在 FlinkStreamProgram 中的代码逻辑类似。
经过优化器处理后,得到的逻辑树中的所有节点都是FlinkPhysicalRel ,以待生成物理执行计划了。
abstractclassPlannerBase( executor: Executor, config: TableConfig,val moduleManager: ModuleManager,val functionCatalog: FunctionCatalog,val catalogManager: CatalogManager, isStreamingMode: Boolean) extends Planner {@VisibleForTestingprivate[flink] def translateToExecNodeGraph(optimizedRelNodes: Seq[RelNode]): ExecNodeGraph = {val nonPhysicalRel = optimizedRelNodes.filterNot(_.isInstanceOf[FlinkPhysicalRel])if (nonPhysicalRel.nonEmpty) {throw new TableException("The expected optimized plan is FlinkPhysicalRel plan, " + s"actual plan is ${nonPhysicalRel.head.getClass.getSimpleName} plan.") } require(optimizedRelNodes.forall(_.isInstanceOf[FlinkPhysicalRel]))// Rewrite same rel object to different rel objects// in order to get the correct dag (dag reuse is based on object not digest)val shuttle = new SameRelObjectShuttle()val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle))// reuse subplanval reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, config)// 将 FlinkPhysicalRel DAG 转化城 ExecNodeGraphval generator = new ExecNodeGraphGenerator()val execGraph = generator.generate(reusedPlan.map(_.asInstanceOf[FlinkPhysicalRel]))// process the graphval context = new ProcessorContext(this)val processors = getExecNodeGraphProcessors processors.foldLeft(execGraph)((graph, processor) => processor.process(graph, context)) } }
首先要将 FlinkPhysicalRel 构成的 DAG 转换成 ExecNodeGraph ,因为可能存在共用子树的情况,这里还会尝试共用相同的子逻辑计划。由于通常 FlinkPhysicalRel 的具体实现类通常也实现了 ExecNode 接口,所以这一步转换较为简单。
classBatchPlanner(executor: Executor,config: TableConfig,moduleManager: ModuleManager,functionCatalog: FunctionCatalog,catalogManager: CatalogManager)extendsPlannerBase(executor, config, moduleManager, functionCatalog, catalogManager,isStreamingMode = false) {overrideprotected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {validateAndOverrideConfiguration()valplanner = createDummyPlanner()valtransformations = execGraph.getRootNodes.map {//生成 Transformationcasenode: BatchExecNode[_] => node.translateToPlan(planner)case_ =>thrownew TableException("Cannot generate BoundedStream due to an invalid logical plan. " +"Thisis a bug and should not happen. Please file an issue.")}cleanupInternalConfigurations()transformations}}
在得到由ExecNodeGraph 后,就可以尝试生成物理执行计划了,也就是将 ExecNodeGraph转换为 Flink 内部的 Transformation 算子。不同的 ExecNodeGraph按照各自的需求生成不同的 Transformation(上面代码展示的是BatchPlanner),基于这些 Transformation 构建 Flink 的 DAG。

SQL 执行

Executor抽象了SQL执行过程,DefaultExecutor是其默认实现。在得到 Transformation 后,利用 Transformation 生成 Pipeline (也就是StreamGraph),然后就可以提交 Flink 任务执行了。

Flink SQL 的执行入口在TableEnvironmentImpl,它调用了DefaultExecutor的实现方法。
在Flink 1.14 中,Flink SQL 统一使用 StreamExecutionEnvironment 。
@InternalpublicinterfaceExecutor{// 将Transformation转换成PipelinePipeline createPipeline( List<Transformation<?>> transformations, ReadableConfig tableConfiguration, @Nullable String defaultJobName);// 执行 PipelineJobExecutionResult execute(Pipeline pipeline)throws Exception;// 异步执行 PipelineJobClient executeAsync(Pipeline pipeline)throws Exception;}
@InternalpublicclassTableEnvironmentImplimplementsTableEnvironmentInternal{@Overridepublic TableResultInternal executeInternal(Operation operation){if (operation instanceof ModifyOperation) {return executeInternal(Collections.singletonList((ModifyOperation) operation)); } elseif (operation instanceof CreateTableOperation) { ...// 查询语句的执行 } elseif (operation instanceof QueryOperation) {return executeQueryOperation((QueryOperation) operation); } elseif (operation instanceof CreateTableASOperation) { executeInternal(((CreateTableASOperation) operation).getCreateTableOperation());return executeInternal(((CreateTableASOperation) operation).getInsertOperation()); } else {thrownew TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG); } }private TableResultInternal executeQueryOperation(QueryOperation operation){final UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of("Unregistered_Collect_Sink_" + CollectModifyOperation.getUniqueId());final ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); CollectModifyOperation sinkOperation =new CollectModifyOperation(objectIdentifier, operation); List<Transformation<?>> transformations = translate(Collections.singletonList(sinkOperation));final String defaultJobName = "collect";// 用transformation 生成执行Pipeline Pipeline pipeline = execEnv.createPipeline( transformations, tableConfig.getConfiguration(), defaultJobName);try {// 异步执行 pipeline JobClient jobClient = execEnv.executeAsync(pipeline); ResultProvider resultProvider = sinkOperation.getSelectResultProvider(); resultProvider.setJobClient(jobClient);return TableResultImpl.builder() .jobClient(jobClient) .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .schema(operation.getResolvedSchema()) .resultProvider(resultProvider) .setPrintStyle( PrintStyle.tableauWithTypeInferredColumnWidths(// sinkOperation.getConsumedDataType() handles legacy types DataTypeUtils.expandCompositeTypeToSchema( sinkOperation.getConsumedDataType()), resultProvider.getRowDataStringConverter(), PrintStyle.DEFAULT_MAX_COLUMN_WIDTH,false, isStreamingMode)) .build(); } catch (Exception e) {thrownew TableException("Failed to execute sql", e); }    }
接商务合作:大数据项目(包括数据中台的,技术选型、培训、开发、疑难问题救火)。
继续阅读
阅读原文