大数据技术之 Flink 优化(资源配置调优)
本博客总结为B站尚硅谷大数据Flink2.0调优,Flink性能优化视频的笔记总结。尚硅谷
https://so.csdn.net/so/search?q=%E5%B0%9A%E7%A1%85%E8%B0%B7&spm=1001.2101.3001.7020大数据Flink2.0调优,Flink性能优化
https://www.bilibili.com/video/BV1Q5411f76P
资源配置调优
Flink 性能调优的第一步,就是为任务分配合适的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。
提交方式主要是 yarn-per-job,资源的分配在使用脚本提交 Flink 任务时进行指定。
➢
标准的 Flink 任务提交脚本(Generic CLI 模式)
从 1.11 开始,增加了通用客户端模式,参数使用-D
指定
bin/flink run
-t yarn-per-job
-d
-p 5 指定并行度
-Dyarn.application.queue=test 指定 yarn 队列
-Djobmanager.memory.process.size=1024mb 指定 JM 的总进程大小
-Dtaskmanager.memory.process.size=1024mb 指定每个 TM 的总进程大小
-Dtaskmanager.numberOfTaskSlots=2 指定每个 TM 的 slot 数
-c com.atguigu.flink.tuning.UvDemo
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
参数列表:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/deployment/config.html
内存设置
TaskManager 内存模型

1、内存模型详解
JVM 特定内存:
JVM 本身使用的内存,包含 JVM 的 metaspace 和 over-head
1)JVM metaspace:JVM 元空间
taskmanager.memory.jvm-metaspace.size,默认 256mb
2)JVM over-head 执行开销:JVM 执行时自身所需要的内容,包括线程堆栈、IO、编译
缓存等所使用的内存。
taskmanager.memory.jvm-overhead.fraction,默认 0.1taskmanager.memory.jvm-overhead.min,默认 192mbtaskmanager.memory.jvm-overhead.max,默认 1gb
总进程内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max
大小
框架内存:
Flink 框架,即 TaskManager 本身所占用的内存,
不计入 Slot 的资源中。
堆内:taskmanager.memory.framework.heap.size,默认 128MB堆外:taskmanager.memory.framework.off-heap.size,默认 128MB
Task 内存:
Task 执行用户代码时所使用的内存
堆内:taskmanager.memory.task.heap.size,默认 none,由 Flink 内存扣除掉其他部分的内存得到。堆外:taskmanager.memory.task.off-heap.size,默认 0,表示不使用堆外内存
网络内存:
网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区
堆外:taskmanager.memory.network.fraction,默认 0.1taskmanager.memory.network.min,默认 64mbtaskmanager.memory.network.max,默认 1gb
Flink 内存*fraction,如果小于配置的 min(或大于配置的 max)大小,则使用 min/max
大小
托管内存:
用于 RocksDB State Backend 的本地内存和批的排序、哈希表、缓存中间结果。
堆外:taskmanager.memory.managed.fraction,默认 0.4taskmanager.memory.managed.size,默认 none
如果 size 没指定,则等于 Flink 内存*fraction
案例分析
基于Yarn模式,一般参数指定的是总进程内存,taskmanager.memory.process.size,
比如指定为 4G,每一块内存得到大小如下:
(1)计算 Flink 内存
JVM 元空间 256m
JVM 执行开销: 4g*0.1=409.6m,在[192m,1g]之间,最终结果 409.6m
Flink 内存=4g-256m-409.6m=3430.4m
(2)网络内存=3430.4m*0.1=343.04m,在[64m,1g]之间,最终结果 343.04m
(3)托管内存=3430.4m*0.4=1372.16m
(4)框架内存,堆内和堆外都是 128m
(5)Task 堆内内存=3430.4m-128m-128m-343.04m-1372.16m=1459.2m
所以进程内存给多大,每一部分内存需不需要调整,可以看内存的使用率来调整。
2 生产资源配置示例
bin/flink run
-t yarn-per-job
-d
-p 5 指定并行度
-Dyarn.application.queue=test 指定 yarn 队列
-Djobmanager.memory.process.size=2048mb JM2~4G 足够
-Dtaskmanager.memory.process.size=4096mb 单个 TM2~8G 足够
-Dtaskmanager.numberOfTaskSlots=2 与容器核数 1core:1slot 或 2core:1slot
-c com.atguigu.flink.tuning.UvDemo
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
Flink 是实时流处理,关键在于资源情况能不能抗住高峰时期每秒的数据量,通常用 QPS/TPS 来描述数据情况。
合理利用 cpu 资源
Yarn 的容量调度器默认情况下是使用“DefaultResourceCalculator”分配策略,只根据内存调度资源,所以在 Yarn 的资源管理页面上看到每个容器的 vcore 个数还是 1。
可以修改策略为 DominantResourceCalculator,该资源计算器在计算资源的时候会综合考虑 cpu 和内存的情况。在 capacity-scheduler.xml 中修改属性:
yarn.scheduler.capacity.resource-calculator
org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
使用 DefaultResourceCalculator 策略
bin/flink run
-t yarn-per-job
-d
-p 5
-Drest.flamegraph.enabled=true
-Dyarn.application.queue=test
-Djobmanager.memory.process.size=1024mb
-Dtaskmanager.memory.process.size=4096mb
-Dtaskmanager.numberOfTaskSlots=2
-c com.atguigu.flink.tuning.UvDemo
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
可以看到一个容器只有一个 vcore:

使用 DominantResourceCalculator 策略
修改后 yarn 配置后,分发配置并重启 yarn,再次提交 flink 作业:
bin/flink run
-t yarn-per-job
-d
-p 5
-Drest.flamegraph.enabled=true
-Dyarn.application.queue=test
-Djobmanager.memory.process.size=1024mb
-Dtaskmanager.memory.process.size=4096mb
-Dtaskmanager.numberOfTaskSlots=2
-c com.atguigu.flink.tuning.UvDemo
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar
看到容器的 vcore 数变了:

JobManager1 个,占用 1 个容器,vcore=1
TaskManager3 个,占用 3 个容器,每个容器 vcore=2,总 vcore=2*3=6,因为
默认 单个容器的 vcore 数=单 TM 的 slot 数
使 用 DominantResourceCalculator 策 略 并 指 定 容 器 vcore 数
指定 yarn 容器的 vcore 数,提交:
bin/flink run
-t yarn-per-job
-d
-p 5
-Drest.flamegraph.enabled=true
-Dyarn.application.queue=test
-Dyarn.containers.vcores=3
-Djobmanager.memory.process.size=1024mb
-Dtaskmanager.memory.process.size=4096mb
-Dtaskmanager.numberOfTaskSlots=2
-c com.atguigu.flink.tuning.UvDemo
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar

JobManager1 个,占用 1 个容器,vcore=1
TaskManager3 个,占用 3 个容器,每个容器
vcore =3
,总 vcore=3*3=9
并行度设置
全局并行度计算
开发完成后,先进行
压测
。任务并行度给 10 以下,测试单个并行度的处理上限。然后
总 QPS/单并行度的处理能力 = 并行度
开发完 Flink 作业,压测的方式很简单,先在 kafka 中积压数据,之后开启 Flink 任务, 出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。
不能只从 QPS 去得出并行度,因为有些字段少、逻辑简单的任务,单并行度一秒处理几万条数据。而有些数据字段多,处理逻辑复杂,单并行度一秒只能处理 1000 条数据。
最好根据高峰期的 QPS 压测,
并行度*1.2 倍
,富余一些资源。
Source 端并行度的配置
数据源端是 Kafka,Source 的并行度设置为 Kafka 对应 Topic 的分区数。
如果已经等于 Kafka 的分区数,消费速度仍跟不上数据生产速度,考虑下 Kafka 要扩大分区,同时调大并行度等于分区数。
Flink 的一个并行度可以处理一至多个分区的数据,如果并行度多于 Kafka 的分区数,那么就会造成有的并行度空闲,浪费资源。
Transform 端并行度的配置
➢
Keyby 之前的算子
一般不会做太重的操作,都是比如 map、filter、flatmap 等处理较快的算子,并行度 可以和 source 保持一致。
➢
Keyby 之后的算子
如果并发较大,建议设置并行度为 2 的整数次幂,例如:128、256、512; 小并发任务的并行度不一定需要设置成 2 的整数次幂;大并发任务如果没有 KeyBy,并行度也无需设置为 2 的整数次幂;
Sink 端并行度的配置
Sink 端是数据流向下游的地方,可以根据
Sink 端的数据量
及
下游的服务抗压能力
进行评估。如果 Sink 端是 Kafka,可以设为 Kafka 对应 Topic 的分区数。
Sink 端的数据量小,比较常见的就是监控告警的场景,并行度可以设置的小一些。
Source 端的数据量是最小的,拿到 Source 端流过来的数据后做了细粒度的拆分,数据量不断的增加,到 Sink 端的数据量就非常大。那么在 Sink 到下游的存储中间件的时候就需要提高并行度。
另外 Sink 端要与下游的服务进行交互,并行度还得根据下游的服务抗压能力来设置,如果在 Flink Sink 这端的数据量过大的话,且 Sink 处并行度也设置的很大,但下游的服务完全撑不住这么大的并发写入,可能会造成下游服务直接被写挂,所以最终还是要在 Sink
处的并行度做一定的权衡。








