Flink資源調優(yōu)
1. 內存設置
1.1 TaskManager 內存模型
TaskManager的內存模型如下圖所示(1.10之后版本內存模型):

Flink使用了堆上內存和堆外內存。
- Flink 框架內存使用了堆外內存和堆外內存,不計入slot資源。
- Task執行的內存使用了堆上內存和堆外內存。
- 網(wǎng)絡(luò )緩沖內存:網(wǎng)絡(luò )數據交換所使用的內存大小,如網(wǎng)絡(luò )數據交換緩沖區。
框架堆外內存、Task堆外內存、網(wǎng)絡(luò )緩沖內存都在堆外的直接內存里面。
- 管理內存:Flink堆外內存的管理,用于管理排序,hash表,緩沖中間結果以及RocksDb 狀態(tài)后端的本地內存。
- JVM特有內存:JVM本身占用的內存,包括元數據和執行開(kāi)銷(xiāo)。
Flink 使用內存 = 框架堆內和堆外內存 + Task堆內和堆外內存 + 網(wǎng)絡(luò )緩沖內存 + 管理內存。
進(jìn)程內存 - Flink 內存 + JVM特有內存
1.1.1 JVM特有內存詳解
JVM特定內存: JVM本身使用的內存,包含JVM的metaspace和over-head
- JVM的metaspace:JVM 元空間。
taskmanager.memory.jvm-meta-space.size,默認為256mb。 - JVM over-head執行開(kāi)銷(xiāo):JVM 執行時(shí)自身所需要的內容,包括線(xiàn)程堆棧、IO、編譯內存等所使用的內存。
taskmanager.memory.jvm-overhead.fraction, 默認0.1
taskmanager.memory.jvm-overhead.min,默認192mb
taskmanager.memory.jvm-overhead.max,默認1gb
總進(jìn)程內存*fraction,如果小于配置的min或者大于配置的max大小,則使用min/max
1.1.2 框架內存
Flink框架,即TaskManager本身占用的內存,不計入Slot的資源中。
堆內:taskmanager.memory.framework.heap.size ,默認128mb。
堆外:taskmanager.memory.framework.off-heap.size,默認128mb。
1.1.3 TaskManager內存
Task執行用戶(hù)代碼所使用的內存。
堆內:taskmanager.memory,task,heap.size,默認none,由Flink內存扣除掉其他部分內存得到。
堆外:taskmanager.memory,task.off-heap.size,默認為0,表示不適用堆外內存。
1.1.4 網(wǎng)絡(luò )內存
網(wǎng)絡(luò )數據交換所使用的堆外內存大小,如網(wǎng)絡(luò )數據交換緩沖區。
堆外:taskmanager.memory.network.fraction,默認0.1。
taskmanager.memory.network.min,默認為64mb。
taskmanager.memory.network.max,默認為1gb。
Flink內存*fraction,如果小于配置的min或者大于配置的max大小,則使用min/max
1.1.5 管理內存
用于RocksDB 狀態(tài)后端的本地內存和批的排序、hash、緩沖中間結果。
堆外:
taskmanager.memory.managed.fraction,默認0.4。
taskmanager.memory.managed.size ,默認為none。
如果size沒(méi)指定,則等于Flink內存 * fraction 。
查看TaskManager內存圖,如下所示,如果內存長(cháng)時(shí)間占用比例過(guò)高就需要調整Flink作業(yè)內存了。

- 如果未使用RocksDB作為狀態(tài)后端,則可以將管理內存調整為0.
- 單個(gè)TaskManager內存大小為2-8G之間。
2. 并行度設置
并行度的設置和具體的作業(yè)強關(guān)聯(lián)。
2.1 并行度設置
- 2.1.1 flink-conf.yml設置
在我們提交一個(gè)Job的時(shí)候如果沒(méi)有考慮并行度的話(huà),那么Flink會(huì )使用默認配置文件中的并行度。配置如下:
parallelism.default: 5
- 2.1.2 env級別
env的級別就是Environment 級別。也就是通過(guò)ExecutionEnvironment 來(lái)設置整體的Job并行度。
val env = Stream...
env.setParallelism(5);
- 2.1.3 客戶(hù)端級別
如果在執行Job時(shí)候,發(fā)現代碼中沒(méi)有設置并行度而又不修改配置文件的話(huà),可以通過(guò)Client來(lái)設置Job的并行度。
./bin/flink run -p 5 ../wordCount-java*.jar
-p 即設置WordCount的Job并行度為5。
- 2.1.4 算子級別
我們在編寫(xiě)Flink項目時(shí),可能對于不同的Operator設置不同的并行度,例如為了實(shí)現讀取Kafka的最高效
讀取需要參考Kafka的partition的數量對并行度進(jìn)行設置,在Sink時(shí)需要對于Sink的介質(zhì)設置不同的并行
度。這樣就會(huì )存在一個(gè)Job需要有多個(gè)并行度。這樣就需要用到算子級別的并行度設置:
val env = Stream...
val text = ...
text.keyBy(XXX)
.flatMap(XXX).setParallelism(5) //計算時(shí)設置為5
.addSink(XXXXX).setParallelism(1) //寫(xiě)入數據庫時(shí)候設置為1
從優(yōu)先級上來(lái)看: 算子級別 > env級別 > Client級別 > 系統默認級別
并行度的高級別會(huì )覆蓋低級別的配置。例如在算子中設置的策略會(huì )覆蓋配置文件中的parallelism。
在實(shí)際的使用中,我們需要設置合理的并行度來(lái)保證數據的高效處理,在一般情況下例如source,Sink等
可能會(huì )需要不同的并行度來(lái)保證數據的快速讀取與寫(xiě)入負載等。
評論