在线免费国产视频,亚洲无码成人免费在线,又黄又爽又无遮挡国产,日韩爆乳av少妇无码,国产福利精品98视频一区二区

Flink 常見(jiàn)問(wèn)題總結

非法參數

如果您看到從 TaskExecutorProcessUtils 或 JobManagerProcessUtils 拋出的IllegalConfigurationException,通常表明
存在無(wú)效的配置值(例如負內存大小、大于 1 的 分數等)或配置沖突。請重新配置內存參數。

Java堆棧異常

如果報 OutOfMemoryError: Java heap space 異常,通常表示 JVM Heap 太小。

可以嘗試通過(guò)增加總內存來(lái)增加 JVM 堆大小。也可以直接為 TaskManager 增加任務(wù)堆內存或為 JobManager 增加 JVM
堆內存。
還可以為 TaskManagers 增加框架堆內存,但只有在確定 Flink 框架本身需要更多內存時(shí)才應該更改此選項。

直接緩沖存儲器異常

如果報 OutOfMemoryError: Direct buffer memory 異常,通常表示 JVM 直接內存限制太小或存在直接內存泄漏。檢查用
戶(hù)代碼或其他外部依賴(lài)項是否使用了JVM 直接內存,以及它是否被正確考慮??梢試L試通過(guò)調整直接堆外內存來(lái)增加其限制。
可以參考如何為 TaskManagers、 JobManagers 和 Flink 設置的 JVM 參數配置堆外內存。

元空間異常

如果報 OutOfMemoryError: Metaspace 異常,通常表示 JVM 元空間限制配置得太小。您可以嘗試加大 JVM 元空間
TaskManagers 或 JobManagers 選項。

網(wǎng)絡(luò )緩沖區數量不足

如果報 IOException: Insufficient number of network buffers 異常,這僅與 TaskManager 相關(guān)。通常表示配置的網(wǎng)絡(luò )
內存大小不夠大。您可以嘗試增加網(wǎng)絡(luò )內存。

超出容器內存異常

如果 Flink 容器嘗試分配超出其請求大?。╕arn 或 Kubernetes)的內存,這通常表明 Flink 沒(méi)有預留足夠的本機內存。
當容器被部署環(huán)境殺死時(shí),可以通過(guò)使用外部監控系 統或從錯誤消息中觀(guān)察到這一點(diǎn)。如果在 JobManager 進(jìn)程中遇到這個(gè)
問(wèn)題,還可以通過(guò)設置排除可能的 JVM Direct Memory 泄漏的選項來(lái)開(kāi)啟 JVM Direct Memory 的限制:

jobmanager.memory.enable-jvm-direct-memory-limit: true

如果想手動(dòng)多分一部分內存給 RocksDB 來(lái)防止超用,預防在云原生的環(huán)境因 OOM 被 K8S kill,可將 JVM OverHead 內存
調大。 之所以不調大 Task Off-Heap,是由于目前 Task Off-Heap 是和 Direct Memeory 混在一起的,即使調大整體,也
并不一定會(huì )分給 RocksDB 來(lái)做 Buffer,所以我們推薦通 過(guò)調整 JVM OverHead 來(lái)解決內存超用的問(wèn)題。

Checkpoint 失敗

Checkpoint 失敗大致分為兩種情況:Checkpoint Decline 和 Checkpoint Expire。

Checkpoint Decline

我們能從 jobmanager.log 中看到類(lèi)似下面的日志:


Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.

我們可以在 jobmanager.log 中查找 execution id,找到被調度到哪個(gè) taskmanager 上,類(lèi)似如下所示:

2022-04-02 14:26:20,972 INFO [jobmanager-future-thread-61]
org.apache.flink.runtime.executiongraph.ExecutionGraph - XXXXXXXXXXX
(100/269) (87b751b1fd90e32af55f02bb2f9a9892) switched from SCHEDULED to
DEPLOYING.
2022-04-02 14:26:20,972 INFO [jobmanager-future-thread-61]
org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying
XXXXXXXXXXX (100/269) (attempt #0) to slot
container_e24_1566836790522_8088_04_013155_1 on hostnameABCDE

上面的日志我們知道該 execution 被調度 到 hostnameABCDE 的
container_e24_1566836790522_8088_04_013155_1 slot 上, 接下來(lái)我們就可以到 container
container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找 Checkpoint 失敗的具體原因了。

另外對于 Checkpoint Decline 的情況,有一種情況在這里單獨抽取出來(lái)進(jìn)行介紹: Checkpoint Cancel。

當前 Flink 中如果較小的 Checkpoint 還沒(méi)有對齊的情況下,收到了更大的Checkpoint,則會(huì )把較小的 Checkpoint 給取消
掉。我們可以看到類(lèi)似下面的日志:

$taskNameWithSubTaskAndID: Received checkpoint barrier for checkpoint 20 before
completing current checkpoint 19. Skipping current checkpoint.

這個(gè)日志表示,當前 Checkpoint 19 還在對齊階段,我們收到了 Checkpoint 20 的barrier。然后會(huì )逐級通知到下游的 
task checkpoint 19 被取消了,同時(shí)也會(huì )通知 JM 當前 Checkpoint 被 decline 掉了。

在下游 task 收到被 cancelBarrier 的時(shí)候,會(huì )打印類(lèi)似如下的日志:

DEBUG

$taskNameWithSubTaskAndID: Checkpoint 19 canceled, aborting alignment.

或者DEBUG

$taskNameWithSubTaskAndID: Checkpoint 19 canceled, skipping alignment.

或者WARN

$taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint 20 before
completing current checkpoint 19. Skipping current checkpoint

上面三種日志都表示當前 task 接收到上游發(fā)送過(guò)來(lái)的 barrierCancel 消息,從而取 消了對應的 Checkpoint。


## Checkpoint Expire

如果 Checkpoint 做的非常慢,超過(guò)了 timeout 還沒(méi)有完成,則整個(gè) Checkpoint 也會(huì )失敗。當一個(gè) Checkpoint
由于超時(shí)而失敗是,會(huì )在 jobmanager.log 中看到如下的 日志:

Checkpoint 1 of job 85d268e6fbc19411185f7e4868a44178 expired before completing

表示 Chekpoint 1 由于超時(shí)而失敗,這個(gè)時(shí)候可以可以看這個(gè)日志后面是否有類(lèi)似下 面的日志:

Received late message for now expired checkpoint attempt 1 from 0b60f08bf8984085b59f8d9bc74ce2e1 of job
85d268e6fbc19411185f7e4868a44178.

找到對應的 taskmanager.log 查看具體信息。 我們按照下面的日志把 TM 端的 snapshot 分為三個(gè)階段:

- 開(kāi)始做 snapshot 前
- 同步階段
- 異步階段,需要開(kāi)啟 DEBUG 才能看到:

DEBUG
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)

上面的日志表示 TM 端 barrier 對齊后,準備開(kāi)始做 Checkpoint。

DEBUG
2019-08-06 13:43:02,613 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@70442baf,
checkpointDirectory=xxxxxxxx, sharedStateDirectory=xxxxxxxx,
taskOwnedStateDirectory=xxxxxx, metadataFilePath=xxxxxx, reference=(default),
fileStateSizeThreshold=1024}, synchronous part) in thread Thread[Async calls on
Source: xxxxxx_source -> Filter (27/70),5,Flink Task Threads] took 0 ms.

上面的日志表示當前這個(gè) backend 的同步階段完成,共使用了 0 ms。

DEBUG
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@7908affe,
checkpointDirectory=xxxxxx, sharedStateDirectory=xxxxx,
taskOwnedStateDirectory=xxxxx, metadataFilePath=xxxxxx, reference=(default),
fileStateSizeThreshold=1024}, asynchronous part) in thread
Thread[pool-48-thread-14,5,Flink Task Threads] took 369 ms

上面的日志表示異步階段完成,異步階段使用了 369 ms。 
在現有的日志情況下,我們通過(guò)上面三個(gè)日志,定位 snapshot 是開(kāi)始晚,同步階段做的慢,還是異步階段做的慢。然后再
按照情況繼續進(jìn)一步排查問(wèn)題。

# Checkpoint 慢
Checkpoint 慢的情況如下:比如 Checkpoint interval 1 分鐘,超時(shí) 10 分鐘,Checkpoint 經(jīng)常需要做 9 分鐘(我們希
望 1 分鐘左右就能夠做完),而且我們預期 state size 不是非常大。

## Source Trigger Checkpoint 慢

這個(gè)一般發(fā)生較少,但是也有可能,因為 source 做 snapshot 并往下游發(fā)送 barrier的時(shí)候,需要搶鎖(Flink1.10 開(kāi)始,
用 mailBox 的方式替代當前搶鎖的方式,詳情參考

[[FLINK-12477] Change threading-model in StreamTask to a mailbox-based approach - ASF JIRA](https://issues.apache.org/jira/browse/FLINK-12477)

如果一直搶不到鎖的話(huà),則可能 導致 Checkpoint 一直得不到機會(huì )進(jìn)行。如果在 Source 所在的 taskmanager.log 中找不
到開(kāi)始做 Checkpoint 的 log,則可以考慮是否屬于這種情況,可以通過(guò) jstack 進(jìn)行進(jìn) 一步確認鎖的持有情況。


## 使用增量 Checkpoint 

現在 Flink 中 Checkpoint 有兩種模式,全量 Checkpoint 和 增量 Checkpoint,其中全量Checkpoint會(huì )把當前的 state 
全部備份一次到持久化存儲 ,而增量Checkpoint,則只備份上一次 Checkpoint 中不存在的 state,因此增量 Checkpoint 每
次上傳的內容會(huì )相對更好,在速度上會(huì )有更大的優(yōu)勢?,F在 Flink 中僅在 RocksDBStateBackend 中支持增量 Checkpoint,
如果你已經(jīng)使用 RocksDBStateBackend,可以通過(guò)開(kāi)啟增量 Checkpoint 來(lái)加速。


## 作業(yè)存在反壓或者數據傾斜

task 僅在接受到所有的 barrier 之后才會(huì )進(jìn)行 snapshot,如果作業(yè)存在反壓,或者有數據傾斜,則會(huì )導致全部的 channel 
或者某些 channel 的 barrier 發(fā)送慢,從而整體影響 Checkpoint 的時(shí)間。

## Barrier 對齊慢

從前面我們知道 Checkpoint 在 task 端分為 barrier 對齊(收齊所有上游發(fā)送過(guò)來(lái) 的 barrier),然后開(kāi)始同步階段,
再做異步階段。如果 barrier 一直對不齊的話(huà),就不會(huì ) 開(kāi)始做 snapshot。

barrier 對齊之后會(huì )有如下日志打?。?
```log 
DEBUG
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)

如果 taskmanager.log 中沒(méi)有這個(gè)日志,則表示 barrier 一直沒(méi)有對齊,接下來(lái)我們需要了解哪些上游的 barrier 沒(méi)有發(fā)
送下來(lái),如果你使用 At Least Once 的話(huà),可以觀(guān)察下面的日志:

DEBUG
Received barrier for checkpoint 96508 from channel 5

表示該 task 收到了 channel 5 來(lái)的 barrier,然后看對應 Checkpoint,再查看還剩哪些上游的 barrier 沒(méi)有接受到。

主線(xiàn)程太忙,導致沒(méi)機會(huì )做 snapshot

在 task 端,所有的處理都是單線(xiàn)程的,數據處理和 barrier 處理都由主線(xiàn)程處理,如果主線(xiàn)程在處理太慢(比如使用
RocksDBBackend,state 操作慢導致整體處理慢),導致 barrier 處理的慢,也會(huì )影響整體 Checkpoint 的進(jìn)度,可以通過(guò)火焰圖分析。

同步階段做的慢

同步階段一般不會(huì )太慢,但是如果我們通過(guò)日志發(fā)現同步階段比較慢的話(huà),對于非RocksDBBackend 我們可以考慮查看是否開(kāi)
啟了異步 snapshot,如果開(kāi)啟了異步snapshot 還是慢,需要看整個(gè)JVM 在干嘛 , 也可以使用火焰圖分析 。對于
RocksDBBackend 來(lái)說(shuō),我們可以用 iostate 查看磁盤(pán)的壓力如何,另外可以查看 tm 端RocksDB 的 log
的日志如何,查看其中 SNAPSHOT 的時(shí)間總共開(kāi)銷(xiāo)多少。

RocksDB 開(kāi)始 snapshot 的日志如下:

2019/09/10-14:22:55.734684 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:83] 
Started the snapshot process -- creating snapshot in directory 
/tmp/flink-io-87c360ce-0b98-48f4-9629-2cf0528d5d53/XXXXXXXXXXX/chk-92729

snapshot 結束的日志如下:

2019/09/10-14:22:56.001275 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:145] 
Snapshot DONE. All is good

異步階段做的慢

對于異步階段來(lái)說(shuō),tm端主要將state備份到持久化存儲上,對于非RocksDBBackend 來(lái)說(shuō),主要瓶頸來(lái)自于網(wǎng)絡(luò ),這個(gè)階段
可以考慮觀(guān)察網(wǎng)絡(luò )的 metric,或者對應機器上能夠觀(guān)察到網(wǎng)絡(luò )流量的情況(比如 iftop)。

對于 RocksDB 來(lái)說(shuō),則需要從本地讀取文件,寫(xiě)入到遠程的持久化存儲上,所以不僅需要考慮網(wǎng)絡(luò )的瓶頸,還需要考慮本地
磁盤(pán)的性能。另外對于 RocksDBBackend
來(lái)說(shuō),如果覺(jué)得網(wǎng)絡(luò )流量不是瓶頸,但是上傳比較慢的話(huà),還可以嘗試考慮開(kāi)啟多線(xiàn)程上傳功能。
(Flink 1.13 開(kāi)始,state.backend.rocksdb.checkpoint.transfer.thread.num 默認值是 4)。

Kafka 動(dòng)態(tài)發(fā)現分區

當 FlinkKafkaConsumer 初始化時(shí),每個(gè) subtask 會(huì )訂閱一批 partition,但是當 Flink 任務(wù)運行過(guò)程中,如果被
訂閱的 topic 創(chuàng )建了新的 partition,FlinkKafkaConsumer 如何實(shí)現動(dòng)態(tài)發(fā)現新創(chuàng )建的 partition 并消費呢?
在使用 FlinkKafkaConsumer 時(shí),可以開(kāi)啟 partition 的動(dòng)態(tài)發(fā)現。通過(guò) Properties 指定參數開(kāi)啟(單位是毫秒):
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
該參數表示間隔多久檢測一次是否有新創(chuàng )建的 partition。默認值是 Long 的最小值,表示不開(kāi)啟,大于 0 表示開(kāi)啟。開(kāi)啟
時(shí)會(huì )啟動(dòng)一個(gè)線(xiàn)程根據傳入的 interval 定期獲取 Kafka最新的元數據,新 partition 對應的那一個(gè) subtask 會(huì )自動(dòng)發(fā)現
并從 earliest 位置開(kāi)始消費,新創(chuàng )建的 partition 對其他 subtask 并不會(huì )產(chǎn)生影響。

代碼如下所示:

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, 30 * 1000 + "");

Watermark 不更新

如果數據源中的某一個(gè)分區/分片在一段時(shí)間內未發(fā)送事件數據,則意味著(zhù) WatermarkGenerator 也不會(huì )獲得任何新數據去生
成 watermark。我們稱(chēng)這類(lèi)數據源為 空閑輸入或空閑源。在這種情況下,當某些其他分區仍然發(fā)送事件數據的時(shí)候就會(huì )出現
問(wèn)題。比如 Kafka 的 Topic 中,由于某些原因,造成個(gè)別 Partition 一直沒(méi)有新的數據。由于下游 算子 watermark 的計
算方式是取所有不同的上游并行數據源 watermark 的最小值,則 其 watermark 將不會(huì )發(fā)生變化,導致窗口、定時(shí)器等不會(huì )
被觸發(fā)。為了解決這個(gè)問(wèn)題,你可以使用 WatermarkStrategy 來(lái)檢測空閑輸入并將其標記為 空閑狀態(tài)。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>("flinktest",new SimpleStringSchema(),properties);
kafkaSourceFunction.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(
  Duration.ofMinutes(2)).withIdleness(Duration.ofMinutes(5)));
env.addSource(kafkaSourceFunction)

依賴(lài)沖突

ClassNotFoundException/NoSuchMethodError/IncompatibleClassChangeError

一般都是因為用戶(hù)依賴(lài)第三方包的版本與 Flink 框架依賴(lài)的版本有沖突導致。根據報錯信息中的類(lèi)名,定位到?jīng)_突
的 jar 包,idea可以借助 maven helper插件查找沖突的有哪些。 打包插件建議使用 maven-shade-plugin。

超出文件描述符限制

java.io.IOException: Too many open files

首先檢查 Linux 系統 ulimit -n 的文件描述符限制,再注意檢查程序內是否有資源(如各種連接池的連接)未及時(shí)釋放。
值得注意的是,低版本 Flink 使用 RocksDB 狀態(tài)后端也有可能會(huì )拋出這個(gè)異常,此時(shí)需修改flink-conf.yaml 中的
state.backend.rocksdb.files.open 參數,如果不限制,可以改為-1(1.13 默認就是-1)。

臟數據導致數據轉發(fā)失敗

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Couldnot forward element to next operator

該異常幾乎都是由于程序業(yè)務(wù)邏輯有誤,或者數據流里存在未處理好的臟數據導致的,繼續向下追溯異常棧一般就可以看到
具體的出錯原因,比較常見(jiàn)的如 POJO 內有空字段,或者抽取事件時(shí)間的時(shí)間戳為 null 等。

通訊超時(shí)

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://...]] after [10000 ms]

Akka 超時(shí)導致,一般有兩種原因:

  • 一是集群負載比較大或者網(wǎng)絡(luò )比較擁塞,
  • 二是業(yè)務(wù)邏輯同步調用耗時(shí)的外部服務(wù)。如果負載或網(wǎng)絡(luò )問(wèn)題無(wú)法徹底緩解,需考慮調大 akka.ask.timeout 參數的值
    (默認只有 10 秒);另外,調用外部服務(wù)時(shí)盡量異步操作(Async I/O)。



標 題:《Flink 常見(jiàn)問(wèn)題總結
作 者:zeekling
提 示:轉載請注明文章轉載自個(gè)人博客:浪浪山旁那個(gè)村

    評論
    1 評論
    rak
    2023-07-10 08:55 回復?

    學(xué)習了

avatar

取消
在线免费国产视频,亚洲无码成人免费在线,又黄又爽又无遮挡国产,日韩爆乳av少妇无码,国产福利精品98视频一区二区