阿里妹导读
讲述消息系统在现代化演进中软硬一体化,百万队列,分级存储等诸多竞争力特性的诞生和落地效果。探讨业界领先的 Shared-Log 存储计算分离,FFM与协程,RDMA 传输,列式存储等技术,将消息向流的领域延伸。
1970年代末,消息系统用于管理多主机的打印作业,这种削峰解耦的能力逐渐被标准化为“点对点模型”和稍复杂的“发布订阅模型”,实现了数据处理的分布式协同。随着时代的发展,Kafka,Amazon SQS,RocketMQ,Azure Service Bus,Google Pub/Sub,RabbitMQ等众多产品为开发者在不同业务场景下提供了富有竞争力的解决方案,并扩展出顺序,事务,定时消息,精确一次投递等丰富的语义和特性,让消息系统成为分布式系统中的标准组件。消息系统伴随着云原生与微服务理念的成长,见证了Serverless和事件驱动这样高度动态和异步的系统强大的生命力,而消息队列本身的架构和模型也随之改变。希望和大家分享一下消息系统这样的PaaS / MaaS 层基础设施服务,在现代化演进中是如何享受软硬件发展红利的,如何满足Stream Processing和Real-Time Analysis场景下日益增长的数据分析需要和平衡成本与机器效能之间的矛盾。让消息基础架构真正拥抱云原生,结合近年来火热的Shared-Log存储计算分离来降本,配合FFM,协程,用户态TCP和RDMA传输,列式存储等技术进一步将消息向流的领域延伸,成为未来EDA,全面Serverless和轻计算的底座。
一、消息的发送和接收
1.1. 发送,公平还是贪心
从用户或者客户端视角看,消息的发送看似只是通过rpc的方式,将序列化后的数据传输给消息队列服务端并处理响应。在分布式环境中要考虑的细节较多,例如:
1.发送延迟:消息系统服务端一般会跨可用区部署来提供容灾能力,如何就近发送减少延迟和流量费用?
2.多语言支持:生产者相比于在云上规模化部署的消费者而言,来源更加广泛,从小小的单片机,五花八门的前端页面,到复杂的后端服务内部通信,对多语言多协议的支持有着更强烈的诉求。
3.多场景支持:需要适配多种模型,例如同步binlog的顺序消息场景,物联网海量客户端的场景等。
4.发送失败:由于服务端宕机或网络问题发送失败,客户端行为是什么?是选择数据反压还是快速失败。
图:客户端请求直接传输到存储节点Broker和经过Proxy的区别
当我们回望网络速度不够快,“移动数据” 不如 “移动计算” 的时代,往往会提及经典的直连网络架构。典型产品有HDFS,Kafka,Redis和Elasticsearch,客户端和多个服务端节点之间直接建立连接,相比于所有流量先经过Proxy处理的架构,有效降低延迟,提高实时性。事实上客户端不得不对服务端的各种分布式问题进行容错,产生了更复杂的服务发现和负载均衡机制,客户端还需要知道如何 “优雅的” 处理单点故障,这些都加大了客户端版本滚动升级的难度。而通过Proxy模式实现存储计算分离,还可以在Proxy实现请求缓存,共享鉴权和服务发现等元数据。这种职责的划分也能显著简化跨地域组网,异地多活等高阶能力的配置。在有了Proxy之后,多语言客户端接入的问题也被简化,只需要在无状态的Proxy解析多种协议即可。至于多了一跳通信延迟的问题,Proxy在服务端内部和后端存储集群之间可以使用性能更高的私有协议,配合FlatBuffer这样低序列化开销的库以及RDMA通信等技术的发展,不会有显著差异。
图:TCP 拥塞控制 [2]
现代的消息队列朝着更 “聪明的” 错误处理能力不断努力着,并提供用户选择策略的机会。从网络层面看发送失败,本质上这是一种不可达,也可以认为是一种拥塞现象。一般有两种解决方案:TCP-BBR的拥塞控制技术和类似锐速的多倍发包算法。在一些偏日志收集型的应用里,消息队列会更加关注全局吞吐,选择类似TCP-BBR 这样基于溢水原理的算法,用术语说是找到一个最合适的BDP(带宽延迟积),当服务端繁忙时通过反压来降低客户端的发送速度,例如Kafka的攒批机制 (accumulate,参数为linger.ms) 和Nagle算法的思想类似,都是通过将小包攒为大包提升性能,也有在框架层实现滑动窗口和Flink基于信任值的流控这样的方式,来更优雅的处理反压。与之对应的,锐速采用了多倍发包来最大化利用带宽,事实上消息队列也会提供快速失败的策略,由用户决定错误处理行为,是重试更多次还是走兜底链路。即将发送消息的超时时间配置较短,通过快速多次重试来重发消息,这是一种全局非公平的贪心策略。多用于像RocketMQ这样更加注重数据重要性和实时性的消息队列里 (服务端异步写延迟<1ms,同步写或多副本延迟为几毫秒),在存储层繁忙时选择 “快速失败” 而非排队等待,另一个角度也可以说是服务端希望用更高优先级去满足那些在线应用的需求。
1.2. 消费,记录多种模型的状态
消息的消费,本质上是一个服务端配合下的两阶段提交,有一定分布式基础的工程师会这样描述 “服务端有一些逻辑或物理上的队列,客户端使用长轮询对服务端发起请求,有消息则返回,没有消息时请求会在服务端挂起。消息在应用中进行业务逻辑的处理后,向服务端报告消费结果。” 服务端的使命就是更好更快的满足这些嗷嗷待哺的客户端们,记录队列中当前拉取的位点,维护取走的消息句柄和消息量。对于消息队列的存储层来说,需要精心设计数据结构来维护这些内容,并使用Paxos/Raft这样的一致性协议在服务端内传播来保持高可用,它们也被称为服务端的 “状态”。我们先来看一些简单的例子:例如Kafka用内置Topic记录Group的位点提交,复用消息多副本的复制链路保证可靠性,方便位点的监控与回溯。RocketMQ中订阅组数量相比 Kafka高出两三个数量级,因此选择定时对内存位点做Checkpoint。对于一个特定的订阅组和一个队列,仅需要一个位点数字就可以描述消费进度,队列模型在绝大多数场景下工作的简单高效,但也有一些天生的缺陷:
1.消费者按照队列维度负载均衡存在前提与假设:
a.队列数不均等导致负载不均,例如8个队列3个消费者,最佳分配是3,3,2。
b.该模型假设了各个客户端能力均等,实际生产中新旧机型混部,无法充分利用计算能力。
2.队列中有慢任务会阻塞整个队列。例如有位点为34567的5条消息,消费offset = 5 时业务逻辑耗时非常久,并发消费模式下67两条消息消费较快,而观察到的堆积一直为3造成误判。
3.消费者或者服务端宕机,业务对产生几秒的消费重复依然敏感,影响用户体验,例如短信推送场景。
甚至,我们还有更有代表性的场景来命中这些 “缺陷”,例如渲染业务,队列中每一条消息代表一个渲染任务。
1.消费者数量较多,同一个订阅组可能有成百上千台机器同时消费。
2.该场景下单条数据的处理耗时较长,需要几秒至几个小时不等。
3.由于消费者负载高和大量使用竞价实例,导致消费方进程假死和宕机率远高于一般业务。
传统的消息队列一般会采用和Kafka类似的队列模型,就会遇到很经典的 “Work-Stealing” 难题,任务的负载无法均衡的分配到所有消费方,单条消息的阻塞会影响后续消费成功消息位点的提交。此时我们想要的是一个基于不可见时间的投递算法,该算法大致的工作流程如下:
1.客户端设置一个不可见时间,例如5分钟,并向服务端拉取一批消息。
2.服务端返回一批消息,并在后台开始倒计时5分钟,消息上会附加一个字段用来标识,也称为handle。
3.如果客户端5分钟内没有提交消费成功(ack by handle),5分钟后客户端再次可以获取到这批消息。
很快我们就会发现这个模型还是有缺陷的,假如消费者拉取消息1分钟后立刻宕机了,业务不得不忍受4分钟的延迟才能再次处理,哪怕此时其他消费者还是空闲状态。这个时候就可以选择将消息的不可见时间设置为1分钟,在客户端处理业务的同时不停的refresh不可见时间,例如每隔30秒就调用change invisible time,使剩余的不可见时间更新为1分钟,此时无论客户端何时宕机,消息的延迟时间会控制在1分钟之内。在RocketMQ中,这种基于区间和单条消息进行消费的方式被称为 “pop消费”,对应的客户端实现是SimpleConsumer,它的简单性在于客户端不再需要关心复杂的负载均衡和位点管理,也更容易适配多语言。这种管理能力在Pulsar中会更加复杂,被称为WorkQueue模式和Range Ack位点管理。当服务端做的越多,客户端和用户担心的事就越少,带来了极大的灵活性。这种业务模型的演进驱动了消息云存储模型的变迁,当然这一切也是有代价的,例如SimpleConsumer这样的无状态消费模式来说,消息拉取的平均耗时要高于常用的PullConsumer,服务端客户端交互的次数也会更多。
二、服务端能力提升
这些客户端接口和策略的丰富,依托于服务端技术的提升。AIO,零拷贝,DirectIO 这些技术逐渐普及,极大地简化了构建高性能系统的复杂度,设计合理的单机存储引擎通常能够达到每秒处理十万甚至百万级别的写入性能。用户的关注点也从单纯的读写吞吐能力转向了产品的功能特性,毕竟吞吐量可以通过水平扩容来解决,而增加一套单独的组件,维护难度指数上升。作为用户,总是期望可以尽可能的减少外部依赖,不会仅仅因为产品有一个精致的大盘页面而选择一款产品,但可能会由于产品的功能缺失而被迫放弃,例如金融级产品强依赖传输和存储加密能力。不同于开放的社区,对于云厂商来说消息队列中很重要的核心竞争力就是构建 “统一的消息内核”,在其上适配多种产品的接入协议,为所有产品提供一致的底层能力,来最大化功能复用的收益。这种情况下,每适配一个新的产品,所付出的边际成本是递减的,这也导致NoSQL类的数据库,消息队列,缓存组件,甚至日志服务都逐渐走向一个融合的生态。我认为现代消息队列对于存储特性的富化,主要会体现在以下几个方面:海量队列支持,分级存储降成本,随着分布式文件系统的成熟而进行Layered Replication 架构演进产生多模存储的形态,多副本策略的灵活调整,以及更好更快的支持流式任务
2.1. 海量队列与多模统一
不同消息产品的侧重点有差异,个人理解Kafka其实更侧重于全局吞吐量,RocketMQ更偏向于实时性的应用,而RabbitMQ通过消息的模型去承载了一定的业务逻辑,MQTT场景则需要支持海量的设备和Topic。这些差异化特性和竞争力是消息队列两种领域模型的扩展,即基于主题的发布订阅模型和基于队列的点对点模型,而 “统一的消息内核” 需要很好的适配多模的场景。
众所周知,社区版本的Kafka会对每个分区构建独立的LogSegment来存储消息,配合文件的磁盘空间预分配等策略,在海量队列的场景下,存在明显的性能问题。RocketMQ中消息以CommitLog的形式混合存储,为了保证前台写入性能,将所有的数据是混合存储,历史版本中也曾出现过使用IO线程中做零拷贝,结果大量的缺页中断导致线程阻塞的问题。在存储引擎层面不得不进行大量的定制优化,使用单独的冷热分离服务进行权重计算,使用单独的冷读线程池等。但索引仍然是独立的小文件,默认每个消费队列的文件存储30万条消息索引,一个索引占用20个字节,这样每个索引文件的大小是300 * 1000 * 20 / 1024 / 1024 ≈ 5.72M,百万个队列会占用数 TB 的磁盘空间。对于队列数量极多的业务场景下,必然选择将这些索引也混合存储,在文件系统层面合并为大文件。我们发现,类似RocksDB这样支持排序的LSM结构,能够合并小文件批量写入SST,显著改进了大量小文件的碎片化问题。性能测试表明,使用RocksDB存储索引替代使用原生文件版索引的情况下,单机可以支持百万级别的队列数,在单机4万队列数 (含重试队列) 的场景下本地磁盘的索引空间占用从200G降低到30G,相比于使用文件版cq作为索引,cpu开销增加10%左右。
那么类LSM的结构的真正优势在什么地方?众所周知,存储引擎有in-place updates(原地更新)和out-of-place updates(异地更新)两种结构。原地更新中最典型的结构是B树及其变种,而B树是一种平衡多路搜索树,在每个叶子中插入多个条目来降低树的高度,提升查询的性能和稳定性。由于B树的连续性结构(索引结构能够为新插入的索引条目根据键值最终整理排序,与所有其他已经存在的条目放置在一起),从B树读入的数据不太可能在其在缓冲区中的短时间内再次被引用以进行第二次插入,因此在B树中写请求无法像LSM做成组提交。第二,通常存储引擎持久化的数据量远超内存,对于任何可合并操作(比如说按照队列进行顺序访问)的访问率来说,都是朝着冷数据的方向发展的,而在LSM结构中Compaction机制会有一个局部聚簇的批处理效应,即连续取回数据能较好的配合预读,这是一个极其显著的优势。诚然Compaction机制在数据库领域工作的非常好,但在消息队列场景下大Value的特殊性,额外的读写放大开销对消息系统来是非常高的,也会尝试使用fast 论文中类似WiscKey的一些KV分离特性减少读写放大。Compaction机制也可以使用类似TerarkDB和Titan提出的方案来优化,这也增加了消息系统实现 “Topic粒度的 TTL” 和 “位点周期性订正” 的业务特性的复杂度。
2.2. 分级存储转化数据资产
近些年来,如何控制好不断膨胀的基础设施成本,在社区里是一个热议的话题,在商业产品可以转化为价格竞争力上,是吸引用户从VM自建模式转向商业产品的重要卖点之一。过去消息系统基于本地磁盘或者块存储(云盘)构建存储,云SSD磁盘的价格在0.8-1 元/GB/月,而对象存储的价格普遍在0.1-0.12元/GB/月,在成本上有数倍的差距。生产中单机的存储空间使用量为2TB,每个月的存储成本为1600元,使用对象存储存储冷数据,假设热数据占比20%,存储成本为每月200 * 0.8 + 1800 * 0.1 = 340元。为什么不进一步压缩块存储的容量,做到几乎极致的成本呢?事实上,在分级存储的场景下,一味的追求过小本地磁盘容量带来的 “边际效益” 是递减的。
主要有以下原因:
  • 故障冗余,消息队列作为基础设施中重要的一环,稳定性高于一切。虽然对象存储本身可用性较高,如果遇到网络波动等问题时,使用对象存储作为主存储,非常容易产生反压导致热数据无法写入,而热数据读写影响在线生产业务,这对于业务可用性的影响是致命的。
  • 过小的本地磁盘,在价格上没有明显的优势。众所周知,云计算是注重普惠和公平的,使用分层存储后计算成本占比上升,热数据的写入流量不变。如果选用50G左右的块存储,又需要等价200G的ESSD级别的块存储能提供的IOPS读写能力,则其单位成本几乎是低IOPS的普通块存储的数倍。
  • 上传时能够更好的使用本地磁盘 “攒批” 减少对象存储的请求费用。
  • 读取时能够对“温热” 数据提供更低的延迟和节约读取成本,对取回的冷数据也可以放置在本地磁盘缓存。
“谁离数据最近,谁就越有可能成功” 是数据驱动决策的核心要点,消息队列作为应用开发的数据通道,受限于本地磁盘的容量和成本约束,往往只能存储几天。分级存储提供了大幅延长消息的生命周期的低成本方案,在转冷时可以灵活动态的使用 FlatBuffer,Parquet等多种数据格式,将消息队列从通道转化为用户的数据资产的存储池,这些技术进一步将消息向流的领域延伸,成为未来EDA和轻计算的底座。
2.3. 基于分布式存储降计算成本
分级存储能够有效解决冷数据存储成本问题,并不能很好的降低整体拥有成本的。消息系统内部对于热数据使用了多副本技术,一方面是为了保证数据的可靠性和服务的高可用,同时副本均可读能够提供了更大的读取带宽,适配一写多读的消息场景。这种架构一些场景下不可避免的会有如下问题:
  • 写性能不够极致,消息队列为了降低客户端复杂度,往往采用Y型写,服务端内部的Raft或者其他日志复制的算法占用了较多带宽,带来写性能和吞吐的下降。每次消息数据更新的时候,需要先更新主副本,通过一致性协议进行复制和 Quorum计算,写入时延至少在四跳网络RT(客户端->主,主->备复制,备响应进度,主返回成功),且会有长尾。
  • 计算成本浪费,备机一般不需要处理大量客户端的读请求,CPU使用率通常为主的一半。当主副本就能完全满足读写请求时,备副本的计算能力就会浪费。这一点也很难通过使用混部,单进程多容器化(例如Flink的Slot机制,RocketMQ的 Broker Container机制)的等技术优化。
  • 扩容缩容慢,当出现热点或者需要紧急扩容的时候,例如社区版Kafka需要进行数据复制,生效慢。
  • 多副本需要管控面组件来进行身份裁决,而许多团队可能会害怕维护像ZK这样的服务,担心他出各种问题。而Kafka引入Kraft,RocketMQ的Dledger同步 CommitLog,JRaft Controller也都具有一定的复杂度,增加了整个系统架构的运维负担。
多副本架构面临了众多难题,不容易实现单调读 (Monotonic Reads),副本之间还可能出现ISR(In-sync Replicas) 不同步的问题。幸运的是,我们有精彩的理论指导。微软于2008年发表的PacificA论文中,就提出了基于日志的系统架构及其实现数据一致性复制的三个方案:
图:来自微软论文 PacificA: Replication in Log-Based Distributed Storage Systems [8]
1.日志复制(Log Replication),类似于Raft中描述的Replicated State Machine(复制状态机),主服务器(Primary)和备服务器(Secondary)之间复制日志,每个节点照相同的顺序执行相同的指令。
2.日志合并(Log Merge),主维护数据结构,备服务器不保持内存中的数据结构,仅接收检查点(checkpoint)和日志复制。当主服务器发生故障时,备服务器可以通过加载和回放日志来恢复状态。
3.分层架构(Layered Replication),让数据的一致性复制直接交给底层HDFS这样的分布式文件系统处理。
图: 根据PolarDB团队优化,分层后主节点和只读节点之间的网络传输量减少了98%。
图片来自Analyze the Technical Essentials of PolarDB at the Architecture Level
消息队列是数据密集型和时延敏感型的存储应用,在分层架构中,写延迟得到了充分的优化,无论是Kafka的sendfile syscal还是RocketMQ的mmap,由于内核机制的一些问题,无法充分利用现代硬件。而分布式文件系统通常选择基于SPDK这样的用户态文件系统,Run-To-Complete线程模型和星型的2-3异步写入,保证数据可靠性的同时,写性能远超云盘(云盘底层也是基于分布式文件系统)和一些本地的旧 SSD。分层架构下,计算和内存资源的使用更加弹性,计算能力支持独立精细的管理,随着用户的负载动态扩缩容,计算节点 “无身份绑定”,扩容速度极快。数据的可靠性和读写性能被 “更专业的团队” 来解决。当然原有的一些技术也会随之改变,对于数据的读取不再能够依赖操作系统的pagecache,这一点业内也有很多漂亮的解决方案,例如PolarDB共享buffer Pool和WrapStream提出的 “分布式mmap” 都能很好的利用多个节点上的内存。现代应用层存储引擎践行 “Log is Streaming” 的理念,将存储的复杂度彻底卸载到更底层的分布式存储。因为任何时候人力资源总是有限的,应当适时进行“减法”处理。每一项技术的引入都会带来额外的复杂度,要避免过度工程化导致的维护成本攀升。以功能特性极为丰富的Flink为例,自运维社区版的复杂度吓退了很多中小型用户,保持存储引擎的低依赖会让产品拥有更广泛的开发者群体,做到可持续发展。
2.4. 自闭环的流式处理能力
多年来像RocketMQ这样的消息队列一直被用于交易、供应链履约等核心链路,流转着大量高价值的业务数据。Kafka等日志型消息产品也累积着大量的用户行为数据,为了由此社区提出了一些解决方案让 “数据资产” 通过计算产生价值,例如KStream 和KsqlDB这样的轻计算解决方案,我也熟悉Spark和Flink这样功能特性更加强大的平台,在我参与贡献了社区基于FLIP 27/191新版Flink-Connector-RocketMQ的改进后。我意识到,现代消息队列正朝着 “消息,事件,流” 一体的平台成长与发展。
Streaming Processing场景之所以复杂,是因为要在性能、正确性、代价上取平衡。
  • 可重复读,即数据的可回放性,不能产生“读摆动”,以便在发生故障时可以从上一次成功处理的状态(如快照)正确恢复。这意味着消息消费者(计算框架中一般称为source)必须能够在发生宕机或网络抖动等问题时,准确地定位并回放读取特定位点的消息。而消息存储系统必须保证下游消费者观察到的数据进度不会超出集群已确认提交的最新状态,即quorum原则下多数派的高水位,来确保消费过程的幂等性。
  • 分区策略与时间水印,一个工程上常见的需求是在数据源保持分区数稳定,可以避免因为分区变化引发的负载不平衡。这个模型其实有很多隐式语义,例如服务端高可用机制完善,各个分区没有热点和数据倾斜的情况,下游消费者所在的节点能力对等,以及消息队列对于周期性的时间水印支持等等。在无需感知分区数量模型下,如Google 的 Pub/Sub等服务,为了做到端到端的 “精确一次” 计算,则需要在计算侧维护大量的handle状态。在DSL机制中,用户需要为accumulations和撤回retractions提供对应的实现,知道怎样修正结果,例如join派生成新的table,那么修正会从当前子拓扑往后传播,这也造成了较高的端到端延迟。
  • 数据流转的IO开销。即便是在现代的百G网络环境下,流处理框架中的广播(broadcast)、聚合(aggregation)、数据重新分区(shuffle)等反复读写消息队列的操作有很长的IO耗时。而Flink Sink中TwoPhaseCommit等保持分布式事务的做法事实上也很复杂,给消息队列带来 “读写放大”。
计算框架和消息队列存储的分开维护,我们获得了将计算逻辑与数据存储解耦的灵活性,同时也增加了系统的维护难度和复杂性。未来,消息队列的存储层在其核心功能之上,也会集成一些轻量级的计算能力,如数据转换、滚动聚合、以及支持窗口等概念和操作。目前消息队列仅支持了一些简单的Tag过滤,SQL92过滤的动作,如果存储层引入schema的概念,能够根据具体需要取回对应部分的数据,就就能够进一步减少读写操作。
三、行业前沿探索
除了用户群体庞大的RocketMQ和Kafka等消息产品,我们也关注到细分市场中的挑战者,他们瞄准了行业痛点或者深挖性能,创造差异化的竞争力,典型的有WrapStream和Redpanda。
3.1. 彻底去除本地磁盘
Kafka节点宕机恢复时会涉及到数据的复制,流程复杂且耗时非常久。原因在于 Kafka强依赖本地磁盘,即使社区提出并实现了分级存储等KIP,热数据需要在多个本地磁盘上至少需要保留12-24小时,成本高。WarpStream选择与Kafka做协议兼容,完全去除对EBS的依赖,将Kafka直接构建在了对象存储S3上。架构上分为类似Proxy角色的Agent和支持百万TPS的Meta元数据管理服务。其核心处理流程如下:
1.发送消息,Agent将不同Topic的数据混合攒批写入对象存储,成功后让Meta服务定序。
a.由于需要攒批,节省调用费和对象存储本身的延迟较高的原因,导致写请求延迟很高,达到400ms。
b.对于多可用区场景,写负载均衡和切流,是通过hack客户端ID实现,不破坏原生Kafka协议。
c.对于幂等消息的支持,是通过offset由Meta服务裁决,并忽略没有成功返回的请求。这种设计在盘古的seal and new chunk调用,各种分级存储append模型下的 fast rolling设计是一致的。
2.接收消息,官方称为分布式mmap,多个Agent构成了一致性哈希到环,对相同分区的读被聚集到同一个Agent上以提高缓存命中率。同时后台执行Compation提高回放速度,解决TTL问题。这种设计完全无状态,组件水平扩容难度低,实现多租和大集群的实现相对简单。缺陷则是操作会过多依赖Meta服务。
另外官网也提到了一些低延迟改进的技术,例如对于Express One Zone这样单可用区版本的对象存储:
  • 减小上传buffer和超时时间换性能。
  • 支持写多个bucket来实现类似2-3写的技术,利用quorum原则来快速ack发送。
3.2. 用原生语言重构
Redpanda 选择充分利用现代硬件特性,以原生语言实现低延迟和低云成本为卖点,特别是对于发送长尾的改善。Redpanda的节点依赖了改进后的Raft(including an optimistic approach to Raft and parallel commits),冷数据依赖对象存储。Kafka的挑战者很多,大部分厂商会选择以LogSegment作为切面,以减少Kafka计算层协议演进带来的兼容性问题。而Redpanda则选择自底向上的重构,让单个固定线程来执行对于单个分区的所有操作,包括网络轮询,异步IO,获取事件和调度计算任务等,这种线程模型称为thread-per-core,也称为run-to-complete。Actor模型是一种强大的并发模型,缩小了临界区的范围,取代了Reactor下通过互斥锁实现的多线程,预期所有操作可以在500 us以内处理完成,而使用C++开发带来的确定性延迟能够有效的降低基于JVM应用的长尾,带来可预测的P99延迟。
第二条路线是对于热数据转冷后的维护,是由每个分区的Leader负责上传,复用Raft链路复制元数据。
  • 上传的组件scheduler_service和archival_metadata_stm维护了一个类PID Controller的公平调度器算法,通过计算需要上传到对象存储的数据总量,并动态更新优先级。对于积压量较大的分区,则优先级会更高。如果上传积压较小,则上传过程的优先级会较低以减少背景流量对前台读写的干扰。
  • 数据取回时remote_partition和cache_service负责从对象存储下载数据并缓存,根据消费这请求的分区和位点计算出底层对应的混合日志段的相对偏移(hydrated log segment),然后预取并缓存以减少对对象存储的调用次数,减小平均RT,也支持一些就近读取策略减少跨可用区的流量以降低费用。
  • 一些难点和提高性能的设计,例如Seastar (Using Boost.Beast)开发http客户端提高对对象存储访问的性能。数据上传和缓存的管理需要考虑公平性,每个分区内部的状态管理也很复杂。
对于Native语言的优劣势,我们一直期望服务端能够 “Write Once,Run EveryWhere”,实际上由于基础设施想要充分利用好现代硬件的能力,不得不嵌入一些JNI或做一些指令集优化来提升热点函数的性能。Redpanda特别提到对于Arm的支持,由于项目完全采用C++开发,对于依赖库需要打开一些开关进行动态编译,这个操作并没有想象中跨平台那么复杂。最终的结论是Arm比x86能够带来20%左右的成本下降,和我们让RocketMQ尝试从x86移植到Arm的收益是接近的。
3.3. 新技术引入与尝试
现代消息队列除了扩展场景以外,也在致力于尝试新兴技术和软硬一体化,以进一步提高性能降低成本。例如:
  • 通信层:计算层Proxy和存储节点使用TCP进行通信,而TCP通信存在一定的延迟,同时在高密容器模式部署时也存在带宽限制。TCP协议是为广域网而设计的,不能够很好的支持数据中心的场景,我们尝试将这条链路改为RDMA通信。RDMA技术可以让应用程序直接访问远程主机内存,在不涉及到网络软件栈的情况下。数据能够被直接发送到缓冲区或者能够直接从缓冲区里接收,而不需要被复制到网络层。可以直接在用户态执行数据传输,不需要在内核态与用户态之间做上下文切换,访问远程主机内存而不消耗远程主机中的任何CPU。并且将网络协议栈的很多处理操作通过硬件offload,最终降低了网络数据传输端到端的时延,保证了消息实现持久化存储、吞吐量大且实时性高。从实际测试结果来看,能减少8%左右的CPU。当然,对于没有用cpu set模式进行部署的Java应用来说,会导致更多的请求长尾。
  • 计算层:消息队列自身也在尝试引入JDK17的协程技术来改进大量异步操作带来的代码可维护性的问题。很多传统优化项依然没有达到极致的状态,例如大家所熟知的基于引用计数减少buffer的拷贝,分析一些热点并定向JNI优化,或通过Native语言重构。例如在x86和Arm的不同架构下,SpinLock性能存在较大差异,讲这些小功能优化为平台相关动态链接库并引入,能够以较低的代价获得较大的性能提升。又比如对于一些重复的操作可以用JDK21中的FFM和SIMD技术进行优化,显著的降低CPU开销。
  • 存储层:在消息队列的场景中,相同Topic下消息 Payload 存在着非常大的数据相关性,典型的压缩比可以达到10:1。随着消息向流的领域延伸,我们也尝试在消息的存储格式中引入FlatBuffer和Parquet这样的内存友好,反序列化开销低的存储格式来提升查询性能。
参考文献
1.John K. Ousterhout, et al. "Cloud Programming Simplified: A Berkeley View on Serverless Computing." arXiv preprint arXiv:1902.03383v1 (2019). https://arxiv.org/abs/1902.03383v1
2.TCP Slow-Start and Congestion Avoidance, https://commons.wikimedia.org/w/index.php?title=File:TCP_Slow-Start_and_Congestion_Avoidance.svg
3.Asterios Katsifodimos, et al. "Consistency and Completeness: Rethinking Distributed Stream Processing in Apache Kafka." In Proceedings of the ACM SIGMOD International Conference on Management of Data (SIGMOD '21). Association for Computing Machinery, 2021.
4.Flink, Apache. "Stateful computations over data streams." Accessed: Apr 23 (2021): 2021.
5.State management in Apache Flink®: consistent stateful distributed stream processing. https://dl.acm.org/doi/10.14778/3137765.3137777
6.Michael Armbrust, et al. "Delta Lake: High-performance ACID Table Storage over Cloud Object Stores." Databricks White Paper, Aug. 2020. https://www.databricks.com/wp-content/uploads/2020/08/p975-armbrust.pdf
7.Wei Cao, Zhenjun Liu, Peng Wang, Sen Chen, Caifeng Zhu, Song Zheng, Yuhui Wang, and Guoqing Ma. 2018. PolarFS: an ultra-low latency and failure resilient distributed file system for shared storage cloud database. Proc. VLDB Endow. 11, 12 (August 2018), 1849–1862. https://doi.org/10.14778/3229863.3229872
8.PacificA: Replication in Log-Based Distributed Storage Systems, https://www.microsoft.com/en-us/research/publication/pacifica-replication-in-log-based-distributed-storage-systems/
9.Heidi Howard and Richard Mortier. 2020. Paxos vs Raft: have we reached consensus on distributed consensus? In Proceedings of the 7th Workshop on Principles and Practice of Consistency for Distributed Data (PaPoC '20). Association for Computing Machinery, New York, NY, USA, Article 8, 1–9. https://doi.org/10.1145/3380787.3393681
10.Abhishek Verma, etc. 2015. Large-scale cluster management at Google with Borg. In Proceedings of the Tenth European Conference on Computer Systems (EuroSys '15). Association for Computing Machinery, New York, NY, USA, Article 18, 1–17. https://doi.org/10.1145/2741948.2741964
11.Peter A. Alsberg and John D. Day. 1976. A principle for resilient sharing of distributed resources. In Proceedings of the 2nd international conference on Software engineering (ICSE '76). IEEE Computer Society Press, Washington, DC, USA, 562–570. https://dl.acm.org/doi/10.5555/800253.807732
12.Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. 2003. The Google file system. SIGOPS Oper. Syst. Rev. 37, 5 (December 2003), 29–43. https://doi.org/10.1145/1165389.945450
13.K. Mani Chandy and Leslie Lamport. 1985. Distributed snapshots: determining global states of distributed systems. ACM Trans. Comput. Syst. 3, 1 (Feb. 1985), 63–75. https://doi.org/10.1145/214451.214456
继续阅读
阅读原文