日期:2023/07/09 07:05作者:佚名人气:
来源 | zh.ververica.com作者 | 王治江(淘江)该文为补发昨天的文章。zh.ververica.com是新Flink中文站。
一. 概述
本文讲述的shuffle概念范围如下图虚线框所示,从上游算子产出数据到下游算子消费数据的全部流程,基本可以划分成三个子模块:
当job被调度开始运行后,除了算子内部的业务逻辑开销外,整个runtime引擎的运行时开销基本都在shuffle过程,其中涉及了数据序列化、编解码、内存拷贝和网络传输等复杂操作,因此可以说shuffle的整体性能决定了runtime引擎的性能。
Flink对于batch和streaming job的shuffle架构设计是统一的,从性能的角度我们设计实现了统一的网络流控机制,针对序列化和内存拷贝进行了优化。从batch job可用性角度,我们实现了external shuffle service以及重构了插件化的shuffle manager机制,在功能、性能和扩展性方面进行了全方位的提升,下面从三个主要方面分别具体介绍。
二. 新流控机制
Flink原有的网络传输机制是上游随机push,下游被动接收模式:
2.1 反压的产生和影响
实际job运行过程中,经常会看到整个链路上下游的inqueue和outqueue队列全部塞满buffer造成反压,尤其在追数据和负载不均衡的场景下。
反压虽然是很难避免的,但现有的流控机制加剧了反压的影响:
2.2 Credit-based流控机制
通过上面分析可以看出,上下游信息不对称导致上游按照数据产出驱动盲目的向下游推送,当下游没有能力接收新数据时而被迫关闭了数据通道。因此需要一种上层更细粒度的流控机制,能够让复用同一个物理通道的所有逻辑链路互不影响进行数据传输。
我们借助了credit思想让下游随时反馈自己的接收能力,这样上游可以有针对性的选择有能力的下游发送对应的数据,即之前的上游盲目push模式变成了下游基于credit的pull模式。
2.3 实际线上效果
新流控机制在某条链路出现反压的场景下,可以保证共享物理通道的其它链路正常传输数据。我们用双11大屏的一个典型业务验证job整体throughput提升了20%(如下图),对于这种keyby类型的上下游all-to-all模式,性能的提升比例取决于反压后的数据分布情况。对于one-to-one模式的job,我们实验验证在出现反压场景下的性能提升可以达到1倍以上。
新流控机制保证上游发送的数据都是下游能正常接收的,这样数据不再堵塞在网络层,即netty buffer以及socket buffer中不再残留数据,相当于整体上in-flighting buffer比之前少了,这对于checkpoint的barrier对齐是有好处的。另外,基于新机制下每个input channel都有exclusive buffer而不会造成资源死锁,我们可以在下游接收端有倾向性的选择不同channel优先读取,这样可以保证barrier尽快对齐而触发checkpoint流程,如下图所示checkpoint对齐事件比之前明显快了几倍,这对于线上job的稳定性是至关重要的。
此外,基于新流控机制还可以针对很多场景做优化,比如对于非keyby的rebalance模式,上游采用round-robin方式轮询向不同下游产出数据,这种看似rebalance的做法在实际运行过程中往往会带来负载不均衡而触发反压,因为不同record的处理开销不同,以及不同下游task的物理环境load也不同。通过backlog的概念,上游产出数据不再按照简单的round-robin,而是参考不同partition中的backlog大小,backlog越大说明库存压力越大,反映下游的处理能力不足,优先向backlog小的partition中产出数据,这种优化对于很多业务场景下带来的收益非常大。新流控机制已经贡献回社区1.5版本,参考[1]。
三. 序列化和内存拷贝优化
如开篇所列,整个shuffle过程涉及最多的就是数据序列化和内存拷贝,在op业务逻辑很轻的情况下,这部分开销占整体比例是最大的,往往也是整个runtime的瓶颈所在,下面分别介绍这两部分的优化。
3.1 Broadcast序列化优化
Broadcast模式指上游同一份数据传输给下游所有的并发task节点,这种模式使用的场景也比较多,比如hash-join中build source端的数据就是通过broadcast分发的。
Flink为每个sub partition单独创建一个serializer,每个serializer内部维护两个临时ByteBuffer,一个用来存储record序列化后的长度信息,一个用来存储序列化后的数据信息。op产出的record先序列化到两个临时ByteBuffer中,再从local buffer pool中申请flink buffer进行长度和数据信息拷贝,最后插入到sub partition队列中。这种实现主要有两个问题:
一次序列化拷贝
针对上述问题,如上图我们从两个方面进行了优化:
这样上游数据产出的开销降低到了原来的1/n,极大的提升了broadcast的整体性能,这部分工作正在贡献回社区。
3.2 网络内存零拷贝
如前面流控中提到的,整个shuffle流程上下游网络端flink buffer各会经历两次数据拷贝:
Netty自身ByteBuffer pool的管理导致进程direct memory的使用无法准确评估,在socket channel数量特别多的场景下,进程的maxDirectMemory配置不合理很容易出现oom造成failover,因此我们打算让netty直接使用flink buffer,屏蔽掉netty内部的ByteBuffer使用。
经过上述优化,进程的direct memory使用大大降低了,从之前的默认320m配置调整为80m,整体的tps和稳定性都有了提高。
四. Shuffle架构改造
上面介绍的一系列优化对于streaming和batch job都是适用的,尤其对于streaming job目前的shuffle系统优势很明显,但对于batch job的场景还有很多局限性:
针对上述两个问题数据重构,我们对shuffle提出了两方面改造,一是实现了external shuffle service把shuffle服务和运行op的container进程解耦,二是定义了插件化的shuffle manager interface,在保留flink现有实现的基础上,扩展了新的文件存储格式。
4.1 External Shuffle Service
External shuffle service可以运行在flink框架外的任何container容器中,比如yarn模式下的NodeManager进程中,这样每台机器部署一个shuffle service统一服务于这台服务器上所有job的数据传输,对本地磁盘的读取可以更合理高效的全局控制。
我们从flink内置的internal shuffle service中提取了网络层的相关组件,主要包括result partition manager和transport layer,封装到external shuffle service中,上面提到的流控机制以及网络内存拷贝等优化同样收益于external shuffle service。
基于external shuffle service运行的batch job,上游结束后container资源可以立刻回收,资源利用率更加合理,external shuffle service根据磁盘类型和负载,合理控制读取充分发挥硬件性能。
4.2 插件化Shuffle Manager
为了解决flink batch job单一文件存储格式的局限性,我们定义了shuffle manager interface支持可扩展的上下游shuffle读写模式。job拓扑支持在边上设置不同的shuffle manager实现,来定义每条边的上下游之间如何shuffle数据。shuffle manager有三个功能接口:
基于上述interface,我们在上游新实现了一种sort-merge输出格式,即所有sub partition数据会先写到一个文件中,最终再merge成有限个文件,通过index文件索引来识别读取不同sub partition的数据。这种模式在某些场景下的表现会优于flink原有的单partition文件形式,也作为线上默认使用的模式。整体的重构工作也正在贡献回社区。
五. 展望
未来Flink shuffle工作在流上会追求更高的极致性能数据重构,如何用更少的资源跑出最好的效果,在批上充分利用现有流上积累的优势,更好的充分利用和发挥硬件的性能以及架构的统一。