通过本文可以解决以下3个问题。
- 了解flink内存和配置项相关概念。
- 清楚UI中TM和JM各内存组件实际内存值的计算规则。
- 根据实际情况对内存进行调整。
1. Flink进程内存
TM和JM二者均为JVM进程(JVM通常分成堆内和堆外两部分)。TM和JM的内存定义为进程总内存(total process memory
),从功能上分为flink应用程序使用的内存(堆内、堆外)和JVM运行进程使用的内存(堆外)两大部分。
- flink应用程序使用内存(
total flink momory
),下图虚线内部分 - JVM运行进程使用内存(JVM Metaspace和JVM Overhead),下图虚线外部分
设置JM或TM内存最简单的方式是设置total process memory
或total flink momory
之一。其他内存组件将根据默认值和额外配置自动调整。不建议同时显式设置total process memory
和total flink momory
,可能会造成内存配置冲突导致部署失败。配置其他内存组成部分时也需要注意可能产生的冲突配置。
JM和TM的total process memory
和total flink momory
的配置项分别为jobmanager.memory.process.size
、jobmanager.memory.flink.size
和 taskmanager.memory.process.size
、taskmanager.memory.flink.size
。该4个参数均无默认值。
设置JM或TM内存的另一种方法是配置total flink momory
中以下内存组件的大小。
- JM:
jobmanager.memory.heap.size
(上图蓝色部分)。 - TM:
taskmanager.memory.task.heap.size
(上图蓝色部分) 和taskmanager.memory.managed.size
(上图绿色部分)。
上述3个参数无默认值,当手动配置这些参数后,建议既不要配置total process memory
,也不要配置total flink momory
,否则很容易导致内存配置冲突。
在内存组件中以两部分内存按比例分配,同时受最大、最小值限制。
- JVM Overhead(JM、TM) 按比例分配
total process memory
的一部分 - Network memory(TM) 按比例分配
total flink momory
的一部分
示例如下
total Process memory = 1000MB
JVM Overhead min = 64MB
JVM Overhead max = 128MB
JVM Overhead fraction = 0.1
上述配置下,JVM Overhead的内存大小为1000*0.1=100MB,在64~128之间。
如果将min和max设置成相同的值,则会将内存固定为该大小。如果按比例计算出的内存小于最小值,则实际的内存大小将为最小值的大小。如果设置了total process memory
和其他内存组件的大小,可能会忽略比例配置的情况,这时JVM Overhead则为total process memory
的剩余部分,但结果仍然受最大最小值的限制,否则配置将失败。
1.1. JVM参数
Flink将在启动进程时根据配置或派生的内存组件值明确添加如下与内存相关的JVM参数
-Xmx and -Xms
TM:Framework + Task Heap Memory。
JM:JVM Heap Memory-XX:MaxDirectMemorySize
始终仅为TM添加,JM时只有当设置了jobmanager.memory.enable-jvm-direct-memory-limit
参数时该JVM参数才会添加到JM中
TM:Framework + Task Off-heap + Network Memory
JM:Off-heap Memory
注意:用户代码使用的native non-direct内存也可算作堆外内存的一部分-XX:MaxMetaspaceSize
TM:JVM Metaspace
JM:JVM Metaspace
2. TaskManager内存
2.1. 内存构成
- 应用程序使用内存,图中虚线部分
- JVM heap,虚线内蓝色部分
- managed memory,虚线内绿色部分
- other direct(or native) memory,虚线内黄色部分
- frameworkd off-heap内存,flink框架使用的堆外内存
- task off-heap内存,task使用的堆外内存
- network memory,网络内存
- JVM运行进程内存,图中JVM specific memory部分(JVM Metaspace和JVM Overhead)
内存模型明细
可以将native non-direct memory(堆外非直接内存)使用量算作框架堆外或任务堆外内存的一部分,但是这样会导致直接内存限制更高。
JVM内存通常情况下分成heap memory 和 off-heap memory,即堆内存和堆外内存。堆内存是JVM管理的主要内存区域,用于存储对象和类实例。堆外内存通常关注与直接内存和手动管理的内存区域。
native memeory、direct memory、non-direct memeory(本地内存、直接内存和非直接内存)?
本地内存指的是JVM外部所有的内存,包括堆外内存等。
直接内存由java.nio包中的ByteBuffer
类通过allocateDirect()
方法分配的内存(JVM堆外内存),直接分配在操作系统的内存中。大小受限于操作系统的可用内存。
非直接内存由ByteBuffer
类的allocate()
方法分配的内存。这部分内存通常是在堆内的,由JVM管理。直接内存可以直接与操作系统的本地I/O操作交互,避免了在Java堆和本地操作系统内存之间的数据复制,因此数据处理起来非常高效,但是由于在堆外,不受JVM垃圾回收机制的直接管理,因此使用起来需要小心。
非直接内存在堆内,优缺点同直接内存相反。
2.2. TM中所有内存组成部分参数
- 框架堆内存(Framework Heap Memory)
taskmanager.memory.framework.heap.size
默认值128MB。
分配给 Flink 框架的 JVM 堆内存(进阶配置)。不建议手动配置 - 任务堆内存(Task Heap Memory)
taskmanager.memory.task.heap.size
无默认值。
分配给 Flink 应用程序运行算子和用户代码的 JVM 堆内存。 - 管理内存(Managed memory)
taskmanager.memory.managed.size
无默认值。
taskmanager.memory.managed.fraction
默认值0.4(total flink momory
占比)
由 Flink 管理的本地内存(堆外),用于排序、哈希表、缓存中间结果及 RocksDB State Backend。 - 框架堆外内存(Framework Off-heap Memory)
taskmanager.memory.framework.off-heap.size
默认值128MB。不建议手动配置
用于 Flink 框架的堆外(直接或本地)内存((进阶配置)。 - 任务堆外内存(Task Off-heap Memory)
taskmanager.memory.task.off-heap.size
默认值0bytes。
分配给 Flink 应用的算子的堆外(直接或本地)内存。 - 网络内存(Network Memory)
taskmanager.memory.network.min
默认值64MB。
taskmanager.memory.network.max
默认值无穷大。
taskmanager.memory.network.fraction
默认值0.1(total flink momory
占比)
用于任务之间数据传输的本地内存(例如网络传输缓冲)。这块内存被用于分配网络缓冲。 - JVM 空间(JVM Metaspace)
taskmanager.memory.jvm-metaspace.size
默认值256MB。
Flink JVM 进程的 Metaspace。 - JVM 开销(JVM Overhead)
taskmanager.memory.jvm-overhead.min
默认值192MB。
taskmanager.memory.jvm-overhead.max
默认值1GB。
taskmanager.memory.jvm-overhead.fraction
默认值0.1(total process memory
占比)
用于JVM 其他开销的本地内存,例如栈、代码缓存、垃圾回收空间等。
当在IDE中本地启动flink应用程序时,则只有taskmanager.memory.task.heap.size
、taskmanager.memory.task.off-heap.size
、taskmanager.memory.managed.size
、taskmanager.memory.network.min
、taskmanager.memory.network.max
参数会起作用。
2.3. 内存配置
配置内存最简单的方式是配置total process memory
或total flink momory
,前文已提到。
除此之外设置内存的另一种方式是设置Task Heap Memory和Managed Memory。当手动设置这两部分内存后,建议即不要配置total process memory
,也不要配置total flink momory
,否则很容易导致内存配置冲突。
Managed memory顾名思义即由flink自己管理的内存(off-heap)。以下内容使用Manager memory
- 流处理作业中用于RocksDB方式的State Backend。
- 流处理和批处理作业中用于排序、哈希表及缓存中间结果。
- 流处理和批处理作业中用于在python进程中执行UDF。
可以通过taskmanager.memory.managed.size
或taskmanager.memory.managed.fraction
参数来设置managed memory内存的大小,前者直接设定内存大小的绝对值,后者通过total Flink memory
内存的百分比来计算大小。如果二者同时设定占比的方式将会被覆盖。如果二者都未设置将默认使用占比的形式。
flink将框架堆外内存和任务堆外内存包含在JVM直接内存限制中。虽然本机非直接内存使用量可以算作框架堆外内存或任务堆外内存的一部分,但在这种情况下会导致更高的 JVM 直接内存限制。网路内存也是JVM直接内存的一部分,但它由flink管理,并永远保证不会超过其配置的大小。因此在这种情况下调整网络内存大小不会有帮助。
举个例子。
官方版本中,采用容器化部署方式,假设容器申请4GB内存,JVM Metaspace和JVM Overhead默认配置下,total flink momory
的可用大小为
taskmanager.memory.process.size=4GB
JVM Metaspace=256MB
JVM Overhead=4GB*0.1=409MB
taskmanager.memory.flink.size=taskmanager.memory.process.size-JVM Metaspace-JVM Overhead=3431MB。
3. JobManager内存
JobManager的内存与TaskManager内存相似,但是结构更简单。
3.1. 内存组成
- JVM heap
jobmanager.memory.heap.size
无默认值。
堆内存,用于flink框架和作业提交期间(如某些批处理源)或检查点完成回调中执行的用户代码 - off-heap
jobmanager.memory.off-heap.size
默认值128mb。
JM的堆外内存,包括堆外内促你的直接内存和本地内存 - JVM Metaspace
jobmanager.memory.jvm-metaspace.size
默认值256mb
flink JVM进程的元空间 - JVM Overhead
jobmanager.memory.jvm-overhead.min
默认值192mb
jobmanager.memory.jvm-overhead.max
默认值1gb
jobmanager.memory.jvm-overhead.fraction
默认值0.1(total process memory
占比)
用于其他JVM开销的本地内存,如stack、代码缓存、垃圾收集器空间等
3.2. 内存配置
配置内存最简单的方式是配置total process memory
或total flink momory
,前文已提到。
除此之外设置内存的另一种方式是配置JVM Heap Memory。当手动配置改内存后,建议既不要配置total process memory
,也不要配置total flink momory
,否则很容易导致内存配置冲突。
当在IDE中本地启动应用程序时,不需要配置内存选项,并且JM的内存选项都将不生效。
4. 内存调优指南
- standalone部署模式
建议通过配置total flink momory
的方式为flink自身配置内存值。
如果因JVM metaspace 导致问题,也可以调整该配置项。
由于JVM Overhead并不由flink或部署环境控制因此total process memory
是无关紧要的。这种情况下只有执行机器的物理资源才重要。
- 容器化部署
建议配置total process memory
,它声明应为flink进程分配多少内存,并与请求的容器大小相对应。
如果配置了total flink momory
,flink将隐式的添加JVM内存组件来计算total process memory
的值,并请求具有相同内存大小的容器。
如果flink或用户代码分配了超过容器大小的非托管堆外内存,则作业可能会失败。因为部署环境可能会杀死有问题的容器。
- state backend
仅和TM相关
使用HashMap state backend时将managed memory大小设置为0,从而使用户代码可以分配到最大的堆内存。
使用rocksDB state backend时,在默认情况下rocksDB设置为native内存分配限制为managed memory内存大小。因此为状态保留足够大的managed memory内存非常重要。
- 批作业
仅和TM相关
flink批作业运算符利用managed memory内存来提高运行效率。这样,某些操作可以直接在原始数据上执行,无需反序列化为Java对象。
对批作业flink将在不超过managed memory限制的前提下,尝试分配和使用尽可能多的managed memory。当managed memory内存不足时,将会溢出到磁盘。