Spring Cloud Flow与Apache Spark集成
点击左上角,关注:“锅外的大佬”
专注分享国外最新技术内容帮助每位开发者更优秀地成长
1.简介
SpringCloudDataFlow
是用于构建数据集成和实时数据处理管道的工具包。
在这种情况下,管道(Pipelines)是使用 SpringCloudStream
或 SpringCloudTask
框架构建的 SpringBoot
应用程序。在本教程中,我们将展示如何将
SpringCloudDataFlow
与 ApacheSpark
一起使用。2.本地数据流服务
首先,我们需要运行数据流服务器(Data Flow Server)才能部署我们的作业(
jobs
)。要在本地运行数据流服务器,需要使用 spring-cloud-starter-dataflow-server-local
依赖创建一个新项目:<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
<version>1.7.4.RELEASE</version>
</dependency>
之后,使用
@EnableDataFlowServer
来注解服务中的主类(main class):@EnableDataFlowServer
@SpringBootApplication
publicclassSpringDataFlowServerApplication{
publicstaticvoid main(String[] args){
SpringApplication.run(
SpringDataFlowServerApplication.class, args);
}
}
运行此应用程序后,本地数据流服务运行在端口
9393
。3.新建工程
我们将
SparkJob
作为本地单体应用程序创建,这样我们就不需要任何集群来运行它。3.1依赖
首先,添加
Spark
依赖<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.4.0</version>
</dependency>
3.2 创建job
对
job
来说,就是为了求 pi
的近似值:publicclassPiApproximation{
publicstaticvoid main(String[] args){
SparkConf conf =newSparkConf().setAppName("BaeldungPIApproximation");
JavaSparkContext context =newJavaSparkContext(conf);
int slices = args.length >=1?Integer.valueOf(args[0]):2;
int n =(100000L* slices)>Integer.MAX_VALUE ?Integer.MAX_VALUE :100000* slices;
List<Integer> xs =IntStream.rangeClosed(0, n)
.mapToObj(element ->Integer.valueOf(element))
.collect(Collectors.toList());
JavaRDD<Integer> dataSet = context.parallelize(xs, slices);
JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer ->{
double x =Math.random()*2-1;
double y =Math.random()*2-1;
return(x * x + y * y )<1?1:0;
});
int count = pointsInsideTheCircle.reduce((integer, integer2)-> integer + integer2);
System.out.println("The pi was estimated as:"+ count / n);
context.stop();
}
}
4. Data Flow Shell
DataFlowShell
是一个* 允许我们与服务器交互的应用程序*。 Shell
使用 DSL
命令来描述数据流。要使用
DataFlowShell
,我们要创建一个运行它的项目。 首先,需要 spring-cloud-dataflow-shell
依赖:<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-shell</artifactId>
<version>1.7.4.RELEASE</version>
</dependency>
添加依赖项后,我们可以创建主类来运行
DataFlowShell
:@EnableDataFlowShell
@SpringBootApplication
publicclassSpringDataFlowShellApplication{
publicstaticvoid main(String[] args){
SpringApplication.run(SpringDataFlowShellApplication.class, args);
}
}
5.部署项目
为了部署我们的项目,可在三个版本(
cluster
, yarn
和 client
)中使用 ApacheSpark
所谓 任务运行器(task runner)
—— 我们将使用 client
版本。 任务运行器(task runner)
是真正运行 Sparkjob
的实例。为此,我们首先需要使用 DataFlowShell
注册 task
:app register--type task --name spark-client --uri
maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT
task
允许我们指定多个不同的参数,其中一些参数是可选的,但是一些参数是正确部署 Sparkjob
所必需的:- spark.app-class,已提交
job
的主类 - spark.app-jar,包含
job
的fat-jar
路径 - spark.app-name,
job
的名称 - spark.app-args,将传递给
job
的参数 我们可以使用注册的任务spark-client
提交我们的工作,记住提供所需的参数:
task create spark1 --definition "spark-client \
--spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \
--spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"
请注意,
spark.app-jar
是我们 job
中 fat-jar
的路径。成功创建任务后,我们可以使用以下命令继续运行它:- task launch spark1
这将调用
task
的执行。6.总结
在本教程中,我们展示了如何使用
SpringCloudDataFlow
框架来处理 ApacheSpark
数据。 有关 SpringCloudDataFlow
框架的更多信息,请参阅文档。所有代码示例都可以在
GitHub
上找到。原文链接:https://www.baeldung.com/spring-cloud-data-flow-spark作者:baeldung译者:Leesen
推荐阅读:快速掌握FileChannel
点击在看,和我一起帮助更多开发者!
阅读原文 最新评论
推荐文章
作者最新文章
你可能感兴趣的文章
Copyright Disclaimer: The copyright of contents (including texts, images, videos and audios) posted above belong to the User who shared or the third-party website which the User shared from. If you found your copyright have been infringed, please send a DMCA takedown notice to [email protected]. For more detail of the source, please click on the button "Read Original Post" below. For other communications, please send to [email protected].
版权声明:以上内容为用户推荐收藏至CareerEngine平台,其内容(含文字、图片、视频、音频等)及知识版权均属用户或用户转发自的第三方网站,如涉嫌侵权,请通知[email protected]进行信息删除。如需查看信息来源,请点击“查看原文”。如需洽谈其它事宜,请联系[email protected]。
版权声明:以上内容为用户推荐收藏至CareerEngine平台,其内容(含文字、图片、视频、音频等)及知识版权均属用户或用户转发自的第三方网站,如涉嫌侵权,请通知[email protected]进行信息删除。如需查看信息来源,请点击“查看原文”。如需洽谈其它事宜,请联系[email protected]。