送福利啦
关注鸿蒙技术社区,回复【鸿蒙】送价值399元的鸿蒙开发板套件(数量有限,先到先得),还可以免费下载鸿蒙入门资料
👇扫码立刻关注
👇

专注开源技术,共建鸿蒙生态
最近,我一直在研究 Pulsar 及其与 Kafka 的比较。通过快速搜索,你会看到这两个最著名的开源消息传递系统之间正在进行的"战争"。
图片来自 Pexels
作为 Kafka 的用户,我着实对 Kafka 的某些问题感到困惑,但 Pulsar 却让人眼前一亮、令我非常兴奋。所以最后,我设法花了一些时间了解背景资料,并且做了很多研究。

在本文中,我将重点介绍 Pulsar 的优势,并说明 Pulsar 胜于 Kafka 的理由。让我们开始!

Kafka 基础知识
Kafka 是消息传递系统之王。它由 LinkedIn 于 2011 年创建,并在 Confluent 的支持下得到了广泛的传播。

Confluent 已向开源社区发布了许多新功能和附加组件,例如用于模式演化的 Schema Registry,用于从其他数据源轻松流式传输的 Kafka Connect 等。

数据库到 Kafka,Kafka Streams 进行分布式流处理,最近使用 KSQL 对 Kafka topic 执行类似 SQL 的查询等等。

Kafka 快速,易于安装,非常受欢迎,可用于广泛的范围或用例。从开发人员的角度来看,尽管 Apache Kafka 一直很友好,但在操作运维方面却是一团糟。

因此,让我们回顾一下 Kafka 的一些痛点:
Kafka 演示[2]
Kakfa的诸多痛点如下:
  • 扩展 Kafka 十分棘手,这是由于 broker 与存储数据的耦合架构结构所致。剥离一个 broker 意味着它必须复制 topic 分区和副本,这非常耗时。

  • 没有与租户完全隔离的本地多租户。

  • 存储会变得非常昂贵,尽管可以长时间存储数据,但是由于成本问题却很少用到它。

  • 万一副本不同步,有可能丢失消息。

  • 必须提前计划和计算 broker、topic、分区和副本的数量(确保计划的未来使用量增长),以避免扩展问题,这非常困难。

  • 如果仅需要消息传递系统,则使用偏移量可能会很复杂。
  • 集群重新平衡会影响相连的生产者和消费者的性能。

  • MirrorMaker[3] Geo 复制机制存在问题。像 Uber 这样的公司已经创建了自己的解决方案来克服这些问题。
如您所见,大多数问题与操作运维方面有关。尽管安装起来相对容易,但 Kafka 难以管理和调优。而且,它也缺乏应有的灵活和弹性。

Pulsar 基础知识
Pulsar 由 Yahoo!在 2013 年创建,并于 2016 年捐赠给 Apache 基金会。Pulsar 现在是 Apache 软件基金会的顶级项目。

Yahoo!、Verizon、Twitter 等公司已在生产中使用它来处理成千上万消息。它具有运行成本低、灵活等特性。Pulsar 旨在解决 Kafka 的大部分难题,使其更易于扩展。

Pulsar 非常灵活:
它既可以应用于像 Kafka 这样的分布式日志应用场景,也可以应用于像 RabbitMQ 这样的纯消息传递系统场景。

它支持多种类型的订阅、多种交付保证、保留策略以及处理模式演变的方法,以及其他诸多特性。
Pulsar 架构图[4]

Pulsar 的特性如下:

  • 内置多租户,
    不同的团队可以使用相同的集群并将其隔离,解决了许多管理难题。它支持隔离、身份验证、授权和配额。

  • 多层体系结构:Pulsar 将所有 topic 数据存储在由 Apache BookKeeper 支持的专业数据层中。
    存储和消息传递的分离解决了扩展、重新平衡和维护集群的许多问题。它还提高了可靠性,几乎不可能丢失数据。
    另外,在读取数据时可以直连 BookKeeper,且不影响实时摄取。例如,可以使用 Presto 对 topic 执行 SQL 查询,类似于 KSQL,但不会影响实时数据处理。

  • 虚拟 topic:
    由于采用 n 层体系结构,因此对 topic 的数量没有限制,topic 及其存储是分离的。用户还可以创建非持久性 topic。

  • N 层存储:
    Kafka 的一个问题是,存储费用可能变高。因此,它很少用于存储"冷"数据,并且消息经常被删除,Apache Pulsar 可以借助分层存储自动将旧数据卸载到 Amazon S3 或其他数据存储系统,并且仍然向客户端展示透明视图;Pulsar 客户端可以从时间开始节点读取,就像所有消息都存在于日志中一样。

  • Pulsar Function:
    易于部署、轻量级计算过程、对开发人员友好的 API,无需运行自己的流处理引擎(如 Kafka)。

  • 安全性:
    它具有内置的代理、多租户安全性、可插拔的身份验证等特性。

  • 快速重新平衡:
    分区被分为易于重新平衡的分片。

  • 服务器端重复数据删除和无效字段:
    无需在客户端中执行此操作,也可以在压缩期间删除重复数据。

  • 内置 Schema registry(架构注册表):
    支持多种策略,易于操作。

  • 地理复制和内置 Discovery:
    易于将集群复制到多个区域。

  • 集成的负载均衡器
    和 Prometheus 指标。

  • 多重集成:
    Kafka、RabbitMQ 等。

  • 支持多种编程语言,
    例如 GoLang、Java、Scala、Node、Python…...

  • 分片和数据分区在服务器端透明进行,客户端不需要了解分片与分区数据。
Pulsar 特性列表[5]
Pulsar 入门
Pulsar 入门非常容易,使用前提是安装 JDK。

下载 Pulsar 并解压缩(备注:目前 Apache Pulsar 最新版本为 2.7.0):

$ wget 
https:
/
/archive.apache.org/dist/pulsar/pulsar
-
2.6
.
1
/apache-pulsar-
2.6
.
1
-bin.tar.gz

下载连接器(可选):

$ wget 
https:
/
/archive.apache.org/dist/pulsar/pulsar
-
2.6
.
1
/connectors/{connector}-
2.6
.
1
.nar

下载 nar 文件后,将文件复制到 Pulsar 目录中的 Connectors 目录。

启动 Pulsar!

$ bin/pulsar standalone

Pulsar 提供了一个称为 Pulsar-Client 的 CLI 工具,我们可以使用它与集群进行交互。

生产消息:

$ bin/pulsar-client produce 
my
-topic --messages 
"hello-pulsar"
消费消息:

$ bin/pulsar-client consume 
my
-topic -
s"first-subscription"
Akka 流示例
举一个客户端示例,我们在 Akka 上使用 Pulsar4s。

首先,我们需要创建一个 Source 来消费数据流,所需要的只是一个函数,该函数将按需创建消费者并查找消息 ID:

val topic = Topic(
"persistent://standalone/mytopic"
)

val consumerFn = 
() =>
 client.consumer(ConsumerConfig(topic, subscription))

然后,我们传递 ConsumerFn 函数来创建源:

import
 com.sksamuel.pulsar4s.akka.streams._

val
 pulsarSource = source(consumerFn, Some(MessageId.earliest))

Akka 源的物化值是 Control 的一个实例,该对象提供了一种"关闭"方法,可用于停止消费消息。现在,我们可以像往常一样使用 Akka Streams 处理数据。

要创建一个接收器:
val topic = Topic(
"persistent://standalone/mytopic"
)

val producerFn = 
() =>
 client.producer(ProducerConfig(topic))

import
 com.sksamuel.pulsar4s.akka.streams._

val pulsarSink = sink(producerFn)

完整示例摘自 Pulsar4s[6]:
object
 Example {

import
 com.sksamuel.pulsar4s.{ConsumerConfig, MessageId, ProducerConfig, PulsarClient, Subscription, Topic}

import
 org.apache.pulsar.client.api.Schema

  implicit 
val
 system: ActorSystem = ActorSystem()

  implicit 
val
 materializer: ActorMaterializer = ActorMaterializer()

  implicit 
val
 schema: Schema[Array[
Byte
]] = Schema.BYTES

val
 client = PulsarClient(
"pulsar://localhost:6650"
)

val
 intopic = Topic(
"persistent://sample/standalone/ns1/in"
)

val
 outtopic = Topic(
"persistent://sample/standalone/ns1/out"
)

val
 consumerFn = () => client.consumer(ConsumerConfig(topics = Seq(intopic), subscriptionName = Subscription(
"mysub"
)))

val
 producerFn = () => client.producer(ProducerConfig(outtopic))

val
 control = source(consumerFn, Some(MessageId.earliest))

    .map { consumerMessage => ProducerMessage(consumerMessage.
data
) }

    .to(sink(producerFn)).run()

  Thread.sleep(
10000
)

  control.stop()

}

Pulsar Function 示例
Pulsar Function 处理来自一个或多个 topic 的消息,对其进行转换并将结果输出到另一个 topic:
Pulsar Function[7]
可以在两个接口之间进行选择以编写函数:
  • 语言原生接口:
    不需要特定的 Pulsar 库或特殊的依赖项;无法访问上下文,仅支持 Java 和 Python。

  • Pulsar Function SDK:可用于 Java/Python/ Go,并提供更多功能,比如访问上下文对象。
只需编写一个简单的函数即可使用语言原生接口转换消息:

defprocess(input):
return"{}!"
.format(input)

用 Python 编写的这个简单函数只是向所有传入的字符串添加一个感叹号,并将结果字符串发布到 topic。

使用 SDK 需要导入依赖项,例如在 Go 中,我们可以编写:

package
 main

import
 (

"context"
"fmt"
"github.com/apache/pulsar/pulsar-function-go/pf"
)

funcHandleRequest(ctx context.Context, in []byte)error
 {

fmt.Println(
string
(in) + 
"!"
)

returnnil
}

funcmain()
 {

pf.Start(HandleRequest)

}

如果要发布无服务器功能并将其部署到集群,可以使用 Pulsar-Admin CL;如果使用 Python,我们可以编写:

$ bin/pulsar-admin functions 
create
 \

--py ~/router.py \
--classname router.RoutingFunction \
--tenant public \
--namespace default \
--name route-fruit-veg \
--inputs persistent://public/default/basket-items
Pulsar 
Function
 的一个重要功能是用户可以在发布该函数时设置交付保证:

bin
/pulsar-
admin
 functions 
create
 \

--name my-effectively-once-function \
--processing-guarantees EFFECTIVELY_ONCE
有以下选择:
Pulsar 的优势
与 Kafka 相比,让我们回顾下 Pulsar 的主要优势:
  • 更多功能:
    Pulsar Function、多租户、Schema registry、n 层存储、多种消费模式和持久性模式等。

  • 更大的灵活性:
    3 种订阅类型(独占,共享和故障转移),用户可以在一个订阅上管理多个 topic。

  • 持久性选项:
    非持久(快速)、持久、压缩(每个消息仅最后一个键),用户可以选择交付保证。Pulsar 具有服务器端重复数据删除和无效字样多保留政策和 TTL 的特性。

  • 无需提前定义扩展需求。
  • 支持队列与流两种消息消费模型,
    所以 Pulsar 既可以代替 RabbitMQ 也可以代替 Kafka。

  • 存储与 broker 分离,
    因此扩展性更好,重新平衡更快、更可靠。

  • 易于操作运维:
    架构解耦和 n 层存储。

  • 与 Presto 的 SQL 集成,
    可直接查询存储而不会影响 broker。

  • 借助 n 层自动存储选项,
    可以更低成本地存储。

  • 更快:
    基准测试[8]在各种情况下都表现出更好的性能。Pulsar 具有较低的延迟和更好的扩展功能。

  • Pulsar Function 支持无服务器计算,
    无需部署管理。

  • 集成 Schema registry。
  • 集成的负载平衡器
    和 Prometheus 指标。

  • 地理复制效果更好,更易于设置。
    Pulsar 内置 Discover-ability。

  • 创建 topic 数量没有限制。
  • 与 Kafka 兼容,
    易于集成。

Pulsar 的劣势
Pulsar 并不完美,Pulsar 也存在一些问题:
  • 相对缺乏支持、文档和案例。

  • n 层体系结构导致需要更多组件:BookKeeper。
  • 插件和客户端相对 Kafka 较少。

  • 云中的支持较少,Confluent 具有托管云产品。
不过,上面的情况都在得到快速改善,目前 Pulsar 也逐渐被越来越多的公司和组织使用。

Apache Pulsar 商业支持公司 StreamNative 也推出了 StreamNative Cloud,Apache Pulsar 正在快速成长,我们都可以看到令人欣喜的变化。

Confluent 曾发布博客对比 Pulsar 和 Kafka ,但请注意,这些问题可能有偏见。

Pulsar 使用场景
Pulsar 可用于广泛的场景:
  • 发布/订阅队列消息传递。

  • 分布式日志。

  • 事件溯源,用于永久性事件存储。

  • 微服务。

  • SQL 分析。

  • Serverless 功能。

什么时候应该考虑 Pulsar?

  • 同时需要像 RabbitMQ 这样的队列和 Kafka 这样的流处理程序。

  • 需要易用的地理复制。

  • 实现多租户,并确保每个团队的访问权限。

  • 需要长时间保留消息,并且不想将其卸载到另一个存储中。

  • 需要高性能,基准测试表明 Pulsar 提供了更低的延迟和更高的吞吐量。
如果在云端,请注意考虑基于云的解决方案。云提供商拥有涵盖某些场景的不同服务。

例如,对于队列消息,云提供商提供了许多服务,比如 Google pub / sub;对于分布式日志,有 Confluent 云或 AWS Kinesis;StreamNative 也提供了基于 Pulsar 的云端服务。

云提供商还提供了非常好的安全性。Pulsar 的优势在于可以在一个平台上提供许多功能。

一些团队可能将其用作微服务的消息传递系统,而另一些团队则将其用作数据处理的分布式日志。

结论
我是 Kafka 的忠实粉丝,我对 Pulsar 如此感兴趣的原因是:竞争驱动创新。

Kafka 是一种成熟,富有弹性且经过考验的产品,在世界范围内获得了巨大成功,无法想象大多数公司没有它会怎样。

但是我确实看到 Kafka 成为其自身成功的受害者,由于需要支持许多大型公司导致巨大的增长减慢了功能开发的速度、移除 ZooKeeper 依赖项等重要功能花费的时间太长,这为诸如 Pulsar 等工具蓬勃发展创造了空间。

Pulsar 虽然年轻却势头很猛,在将 Pulsar 纳入组织之前,需进行分析、基准测试、研究并进行 POC。

从小处着手,在将 Kafka 迁移到 Pulsar 之前进行概念验证,并在决定进行完全迁移之前评估影响。

引用链接:

  • [1] 《Pulsar Advantages Over Kafka》:
    https://itnext.io/pulsar-advantages-over-kafka-7e0c2affe2d6
  • [2] Kafka 演示:
    https://talks.rmoff.net/pZC6Za/slides
  • [3] MirrorMaker:
    https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330
  • [4] Pulsar 架构图:
    https://pulsar.apache.org/docs/en/concepts-architecture-overview/
  • [5] Pulsar 特性列表:
  • https://pulsar.apache.org/
  • [6] Pulsar4s:
    https://github.com/sksamuel/pulsar4s/blob/master/pulsar4s-akka-streams/src/test/scala/com/sksamuel/pulsar4s/akka/streams/Example.scala
  • [7] Pulsar Function:
    https://pulsar.apache.org/docs/en/functions-overview/

  • [8] 基准测试:
    https://medium.com/swlh/performance-comparison-between-apache-pulsar-and-kafka-latency-79fb0367f407
作者:闻数起舞
编辑:陶家龙
出处:转载自 Java 高级架构,原中文版本由闻数起舞翻译自 Lewis Fairweather 的文章《Pulsar Advantages Over Kafka》[1],文章转载时有改动。
精彩文章推荐:
继续阅读
阅读原文