在线免费国产视频,亚洲无码成人免费在线,又黄又爽又无遮挡国产,日韩爆乳av少妇无码,国产福利精品98视频一区二区

Flink雙流Join底層原理

底層原理簡(jiǎn)介

pic

  • LState:存儲左邊數據流中的數據。
  • RState:存儲右邊數據流中的數據。
  • 當左邊數據流數據到達的時(shí)候會(huì )保存到LState,并且到RState中進(jìn)行Join。將Join生成的結果數據發(fā)送到下游。
  • 右邊數據流中數據到達的時(shí)候,會(huì )保存到RState當中,并且到LState中進(jìn)行Join,然后將Join之嚄胡的結果數據發(fā)送到下游。

為了保障左右兩邊流中需要Join的數據出現在相同節點(diǎn),Flink SQL會(huì )利用Join中的on的關(guān)聯(lián)條件進(jìn)行分區,把相同關(guān)聯(lián)條件
的數據分發(fā)到同一個(gè)分區里面。

普通雙流Join

現有訂單表A和支付表B進(jìn)行關(guān)聯(lián)得到匯總表C。訂單表和支付表初始數據如下:

表A:訂單表數據

order_id timestamp
1001 2023-02-04 10:00:00
1002 2023-01-04 10:01:02

表B:支付表數據

order_id pay_money
order_id pay_money
1001 80
1002 100

inner join

當A表中每一條數據到達時(shí),都會(huì )和B表中的數據進(jìn)行關(guān)聯(lián):

  • 當能夠關(guān)聯(lián)到數據時(shí),將結果輸出到結果表里面;
  • 當不能關(guān)聯(lián)到數據時(shí),不會(huì )將結果輸出到結果表里面;

所以上述A表和B表的Join結果為:

order_id timestamp pay_money
1002 2023-01-04 10:01:02 100

當表B中1001新數據到達時(shí),新數據如下所示:

order_id pay_money
order_id pay_money
1001 80

此時(shí)結果表的數據為:

order_id timestamp pay_money
1002 2023-01-04 10:01:02 100
1001 2023-02-04 10:00:00 80

注意

Inner Join 不會(huì )產(chǎn)生回撤流。

left join

當A表數據到達時(shí)會(huì )主動(dòng)和B表中數據進(jìn)行關(guān)聯(lián)查詢(xún),沒(méi)有關(guān)聯(lián)到數據。也會(huì )輸出結果,缺失的字段使用null進(jìn)行補全。

B表中的數據1002到達之后且A表中的數據1001和1002已經(jīng)到達,關(guān)聯(lián)之后表C的數據如下:

order_id timestamp pay_money
1001 2023-02-04 10:00:00 null
1002 2023-01-04 10:01:02 100

當B表中數據1001到達之后,也會(huì )主動(dòng)和表A中的數據進(jìn)行關(guān)聯(lián),如果表中的數據已經(jīng)輸出過(guò)結果了且缺失字段為null,此時(shí)
會(huì )產(chǎn)生一個(gè)回撤流,將之前輸出的數據會(huì )撤掉-D,在重新輸出完整的數據+I。

order_id timestamp pay_money /
1001 2023-02-04 10:00:00 null +I
1002 2023-01-04 10:01:02 100 +I
1001 2023-02-04 10:00:00 null -D
1001 2023-02-04 10:00:00 80 +I

注意

left Join會(huì )產(chǎn)生回撤流。

Right Join

當表B中1001到達時(shí),A表中的數據沒(méi)有到達,則還是會(huì )輸出數據,缺失字段使用null代替。當表B中數據1002到達時(shí),A表中的
數據1002已經(jīng)到達此時(shí)可以關(guān)聯(lián)到數據,關(guān)聯(lián)結果如下:

order_id timestamp pay_money
1001 null null
1002 2023-01-04 10:01:02 100

當表A中數據1001到達時(shí),會(huì )主動(dòng)到B表中進(jìn)行關(guān)聯(lián),此時(shí)結果中已經(jīng)輸出過(guò)關(guān)于1001的數據,此時(shí)會(huì )產(chǎn)生一個(gè)回撤流。

order_id timestamp pay_money /
1001 null null +I
1002 2023-01-04 10:01:02 100 +I
1001 null null -D
1001 2023-02-04 10:00:00 80 +I

注意

Right Join會(huì )產(chǎn)生回撤流。

Full Join

當表B中數據1001先到達時(shí),會(huì )主動(dòng)到A表中進(jìn)行關(guān)聯(lián)查詢(xún),關(guān)聯(lián)不到數據,還是會(huì )輸出結果。

當表A中數據到達時(shí),會(huì )主動(dòng)和B表中的數據進(jìn)行關(guān)聯(lián)查詢(xún),此時(shí)B表中只有1001的數據,灌籃不到數據,還是會(huì )輸出結果。

所以此時(shí)關(guān)聯(lián)結果如下:

order_id timestamp pay_money
1001 null null
1002 2023-01-04 10:01:02 null

當表A中的1001到達時(shí),會(huì )和B表進(jìn)行關(guān)聯(lián)查詢(xún),當表B的1002到達時(shí),會(huì )和表A進(jìn)行關(guān)聯(lián)查詢(xún),此時(shí)結果如下:

order_id timestamp pay_money /
1001 null null +I
1002 2023-01-04 10:01:02 null +I
1001 null null -D
1001 2023-02-04 10:00:00 80 +I
1002 2023-01-04 10:01:02 null -D
1002 2023-01-04 10:01:02 100 +I

注意

Full Join 會(huì )產(chǎn)生回撤流。

Interval Join

Interval JOIN 相對于UnBounded的雙流JOIN來(lái)說(shuō)是Bounded JOIN。就是每條流的每一條數據會(huì )與另一條流上的不同時(shí)間區域
的數據進(jìn)行JOIN。

語(yǔ)法

SELECT ... FROM t1 JOIN t2  ON t1.key = t2.key AND TIMEBOUND_EXPRESSION

TIMEBOUND_EXPRESSION 有兩種寫(xiě)法,如下:

  • L.time between LowerBound(R.time) and UpperBound(R.time)
  • R.time between LowerBound(L.time) and UpperBound(L.time)
  • 帶有時(shí)間屬性(L.time/R.time)的比較表達式。

Interval JOIN 的語(yǔ)義就是每條數據對應一個(gè)時(shí)間區間的數據區間,比如有一個(gè)訂單表Orders(orderId, productName,
orderTime)和付款表Payment(orderId, payType, payTime)。假設我們要統計在下單一小時(shí)內付款的訂單信息。SQL查詢(xún)如下:

SELECT 
  o.orderId,
  o.productName,
  p.payType,
  o.orderTime,
  cast(payTime as timestamp) as payTime
FROM
  Orders AS o JOIN Payment AS p ON 
  o.orderId = p.orderId AND 
  p.payTime BETWEEN orderTime AND 
  orderTime + INTERVAL '1' HOUR

Orders訂單數據

orderId productName orderTime
001 iphone 2018-12-26 04:53:22.0
002 mac 2018-12-26 04:53:23.0
003 book 2018-12-26 04:53:24.0
004 cup 2018-12-26 04:53:38.0

Payment付款數據

orderId payType payTime
001 alipay 2018-12-26 05:51:41.0
002 card 2018-12-26 05:53:22.0
003 card 2018-12-26 05:53:30.0
004 alipay 2018-12-26 05:53:31.0

符合語(yǔ)義的預期結果是 訂單id為003的信息不出現在結果表中,因為下單時(shí)間 2018-12-26 04:53:24.0, 付款時(shí)間是
2018-12-26 05:53:30.0超過(guò)了1小時(shí)付款。

那么預期的結果信息如下:

orderId productName payType orderTime payTime
001 iphone alipay 2018-12-26 04:53:22.0 2018-12-26 05:51:41.0
002 mac card 2018-12-26 04:53:23.0 2018-12-26 05:53:22.0
004 cup alipay 2018-12-26 04:53:38.0 2018-12-26 05:53:31.0

這樣Id為003的訂單是無(wú)效訂單,可以更新庫存繼續售賣(mài)。

接下來(lái)我們以圖示的方式直觀(guān)說(shuō)明Interval JOIN的語(yǔ)義,我們對上面的示例需求稍微變化一下: 訂單可以預付款(不管是
否合理,我們只是為了說(shuō)明語(yǔ)義)也就是訂單 前后 1小時(shí)的付款都是有效的。SQL語(yǔ)句如下:

SELECT
  ...
FROM
  Orders AS o JOIN Payment AS p ON
  o.orderId = p.orderId AND
  p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND
  orderTime + INTERVAL '1' HOUR

總結

  • Flink的流關(guān)聯(lián)當前只能支持兩條流的關(guān)聯(lián)
  • Flink同時(shí)支持基于EventTime和ProcessingTime的流流join。
  • Interval join 已經(jīng)支持inner ,left outer, right outer , full outer 等類(lèi)型的join,由此來(lái)看官網(wǎng)對interval join
    類(lèi)型支持的說(shuō)明不夠準確。
  • 當前版本Interval join的兩條流的消息清理是基于兩條流共有的combinedWatermark(較小的流的watermark)。
  • 流的watermark不會(huì )用于將消息直接過(guò)濾掉,即時(shí)消息在本流中的watermark表示中已經(jīng)遲到,但會(huì )直接將遲到的消息根據
    相應的join類(lèi)型或輸出或丟棄。

維表Join

維表(Dimension Table)是來(lái)自數倉建模的概念。在數倉模型中,事實(shí)表(Fact Table)是指存儲有事實(shí)記錄的表,如系統
日志、銷(xiāo)售記錄等,而維表是與事實(shí)表相對應的一種表,它保存了事實(shí)表中指定屬性的相關(guān)詳細信息,可以跟事實(shí)表做關(guān)
聯(lián);相當于將事實(shí)表上經(jīng)常重復出現的屬性抽取、規范出來(lái)用一張表進(jìn)行管理。

在實(shí)際生產(chǎn)中,我們經(jīng)常會(huì )有這樣的需求,以原始數據流作為基礎,關(guān)聯(lián)大量的外部表來(lái)補充一些屬性。這種查詢(xún)操作就是
典型的維表 JOIN。

使用維表的好處

  • 縮小了事實(shí)表的大小。
  • 便于維度的管理和維護,增加、刪除和修改維度的屬性,不必對事實(shí)表的大量記錄進(jìn)行改動(dòng)。
  • 維度表可以為多個(gè)事實(shí)表重用,以減少重復工作。

維表JOIN使用

由于維表是一張不斷變化的表(靜態(tài)表視為動(dòng)態(tài)表的一種特例),因此在維表 JOIN 時(shí),需指明這條記錄關(guān)聯(lián)維表快照的對
應時(shí)刻。Flink SQL 的維表 JOIN 語(yǔ)法引入了 Temporal Table 的標準語(yǔ)法,用于聲明流數據關(guān)聯(lián)的是維表哪個(gè)時(shí)刻的快照。

需要注意是,目前原生 Flink SQL 的維表 JOIN 僅支持事實(shí)表對當前時(shí)刻維表快照的關(guān)聯(lián)(處理時(shí)間語(yǔ)義),而不支持事實(shí)
表 rowtime 所對應的維表快照的關(guān)聯(lián)(事件時(shí)間語(yǔ)義)。

語(yǔ)法說(shuō)明

Flink SQL 中使用語(yǔ)法 for SYSTEM_TIME as of PROC_TIME()來(lái)標識維表JOIN。僅支持 INNER JOINLEFT JOIN。

SELECT 
  column-namesFROM 
  table1 [AS <alias1>][LEFT] 
JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>] 
ON table1.column-name1 = table2.key-name1

注意:
table1.proctime表示 table1proctime字段。

使用示例

下面用一個(gè)簡(jiǎn)單的示例來(lái)展示維表 JOIN 語(yǔ)法。假設我們有一個(gè) Orders 訂單數據流,希望根據用戶(hù) ID 補全訂單中的用戶(hù)
信息,因此需要跟 Customer 維度表進(jìn)行關(guān)聯(lián)。

CREATE TABLE Orders (  
  id   INT,  
  price   DOUBLE,  
  quantity   INT,  
  proc_time AS PROCTIME(),  
  PRIMARY KEY(id) NOT ENFORCED
) WITH ( 
  'connector' = 'datagen',  
  'fields.id.kind' = 'sequence',  
  'rows-per-second' = '10'
);
CREATE TABLE Customers (
  id   INT,
  name   STRING,
  country   STRING,
  zip   STRING,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers'
);
CREATE TABLE OrderDetails (
  id   INT,
  total_price   DOUBLE,
  country   STRING,
  zip   STRING,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/orderdb',
  'table-name' = 'orderdetails'
);
-- enrich each order with customer information
INSERT INTO OrderDetails 
SELECT 
  o.id, 
  o.price,
  o.quantity, 
  c.country, 
  c.zipFROM 
Orders AS o 
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c  
ON o.id = c.id;

Flink SQL 執行流程

Apache Calcite 是一款開(kāi)源的 SQL 解析工具,被廣泛使用于各個(gè)大數據項目中,主要用于解析 SQL 語(yǔ)句。SQL 的執行流程
一般分為四個(gè)主要階段:

  • Parse:語(yǔ)法解析,把 SQL 語(yǔ)句轉換成抽象語(yǔ)法樹(shù)(AST),在 Calcite 中用 SqlNode 來(lái)表示;
  • Validate:語(yǔ)法校驗,根據元數據信息進(jìn)行驗證,例如查詢(xún)的表、使用的函數是否存在等,校驗之后仍然是 SqlNode 構
    成的語(yǔ)法樹(shù);
  • Optimize:查詢(xún)計劃優(yōu)化,包含兩個(gè)階段,1)將 SqlNode 語(yǔ)法樹(shù)轉換成關(guān)系表達式 RelNode 構成的邏輯樹(shù),2)使用優(yōu)
    化器基于規則進(jìn)行等價(jià)變換,例如謂詞下推、列裁剪等,經(jīng)過(guò)優(yōu)化器優(yōu)化后得到最優(yōu)的查詢(xún)計劃;
  • Execute:將邏輯查詢(xún)計劃翻譯成物理執行計劃,生成對應的可執行代碼,提交運行。



標 題:《Flink雙流Join底層原理
作 者:zeekling
提 示:轉載請注明文章轉載自個(gè)人博客:浪浪山旁那個(gè)村

    評論
    1 評論
    2023-02-16 15:39 回復?

    這位博主,您好。在瀏覽完您的博客文章后,感覺(jué)您的博客內容質(zhì)量非常的好,也達到了加入騰訊云自媒體分享計劃的要求。
    現誠摯地向您發(fā)出邀請,邀請您加入騰訊自媒體分享計劃:https://cloud.tencent.com/developer/support-plan?invite_code=347bs58ysckks 。待審核成功入駐后,會(huì )在社區后臺為您發(fā)放相關(guān)得騰訊云無(wú)門(mén)檻代金券以及一些實(shí)物獎勵。
    具體審核細則,請進(jìn)入頁(yè)面查看。

avatar

取消
在线免费国产视频,亚洲无码成人免费在线,又黄又爽又无遮挡国产,日韩爆乳av少妇无码,国产福利精品98视频一区二区