网站首页 > 知识剖析 正文
导读:对于流查询,Regular Join 的语法是最灵活的,它允许任何类型的更新(插入、更新、删除)输入表。
Regular Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):
- Inner Join(Inner Equal Join):当两条流 Join 到才会输出 +[L, R]
- Left Join(Outer Equal Join):左流数据到达之后 Join 到 R 流数据则输出 +[L, R],没 Join 到输出 +[L, null])。如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null],然后输出 +[L, R]。
- Right Join(Outer Equal Join):与 Left Join 逻辑相反
- Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出 +[L, R],没 Join 到输出 +[null, R];对左流来说:Join 到输出 +[L, R],没 Join 到输出 +[L, null])。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R],输出 +[L, R],右流数据到达为例:回撤 -[L, null],输出 +[L, R])。
Regular Inner Join
Flink SQL
CREATE TABLE matchResult (
guid STRING
) WITH (
'connector' = 'kafka',
'topic' = 'match_result_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE readRecord (
guid STRING,
book_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'read_record_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE sink_table (
guid STRING,
book_name STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
matchResult.guid,
readRecord.book_name
FROM matchResult
INNER JOIN readRecord ON matchResult.guid = readRecord.guid;
输出结果解析
-- L 流数据达到,由于没有 Join 到 R 流数据而且是 inner join 便不输出结果
+I[111, book1] -- R 流数据达到, Join 到 L 流数据,便输出 +I[111, book1]
-- R 流数据达到,由于没有 Join 到 L 流数据而且是 inner join 便不输出结果
+I[222, book2] -- L 流数据达到, Join 到 R 流数据便输出结果
Regular Left Join(Right join 则相反)
Flink SQL
CREATE TABLE matchResult (
guid STRING
) WITH (
'connector' = 'kafka',
'topic' = 'match_result_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE readRecord (
guid STRING,
book_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'read_record_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE sink_table (
guid STRING,
book_name STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
matchResult.guid,
readRecord.book_name
FROM matchResult
LEFT JOIN readRecord ON matchResult.guid = readRecord.guid;
输出结果解析
+I[111, null] -- L 流数据达到,没有 Join 到 R 流数据,便输出 +[L, null]
-D[111, null] -- R 流的数据到达,发现 L 流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null]
+I[111, book1] -- 再输出 +[L, R]
-- 这里模拟一条 R 流 guid = 222 的数据到达,由于是 left join 且没有 join 到 L 流,因此不做输出
+I[222, book2] -- 当 L 流 guid = 222 的数据达到 join R 流 后输出结果 +[L, R]
Regular Full Join
Flink SQL
CREATE TABLE matchResult (
guid STRING
) WITH (
'connector' = 'kafka',
'topic' = 'match_result_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE readRecord (
guid STRING,
book_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'read_record_log_test',
'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE sink_table (
guid STRING,
book_name STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
matchResult.guid,
readRecord.book_name
FROM matchResult
FULL JOIN readRecord ON matchResult.guid = readRecord.guid;
输出结果解析
+I[111, null] -- L 流数据达到,没有 Join 到 R 流数据,便输出 +I[L, null]
+I[null, book2] -- R 流数据达到,没有 Join 到 R 流数据,便输出 +I[null, R]
-D[null, book2] -- L 流新数据到达,发现之前 R 流之前输出过没有 Join 到的数据,则发起回撤流,先输出 -D[null, R]
+I[222, book2] -- 再输出 +I[L, R]
-D[111, null] -- 反之同理
+I[111, book1]
TTL 概念
在 Regular Join 时 Flink 会将两条没有时间窗口限制的流的所有数据存储在 State 中,由于流是无穷无尽持续流入的,随着时间的不断推进,内存中积累的状态会越来越多。
针对这个问题,Flink 提出了空闲状态保留时间(Idle State Retention Time)的概念。通过为每个状态设置 Timer,如果这个状态中途被访问过,则重新设置 Timer;否则(如果状态一直未被访问,长期处于 Idle 状态)则在 Timer 到期时做状态清理。这样,就可以确保每个状态都能得到及时的清理,可以通过 table.exec.state.ttl 参数进行控制(注意:这同时也会对结果的准确性有所影响,因此需要合理的权衡)。
最后
感谢您的阅读,如果喜欢本文欢迎关注和转发,转载需注明出处,本头条号将持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。
参考文章
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/concepts/overview/#idle-state-retention-time State 概念https://cloud.tencent.com/developer/article/1452844?from=10680 Flink State TTL
https://cloud.tencent.com/developer/article/1452854 Idle State Retention Time 特性
https://mp.weixin.qq.com/s/zR2ukRjiw-IqUDX894NyGw 大数据羊说
猜你喜欢
- 2025-05-11 产品切换数据库问题处理总结(切换数据库的命令是什么)
- 2025-05-11 真正让你明白Hive参数调优系列1:控制map个数与性能调优参数
- 2025-05-11 拉链表(拉链表取数)
- 2025-05-11 常见的数据库类型有哪些 & SQL介绍
- 2025-05-11 每日SQL自学知识点(第八天)—多表查询详解
- 2025-05-11 大厂必问:MySQL 三表 JOIN 操作的解析与性能优化,效率又如何?
- 2025-05-11 数据库(数据库有哪些)
- 2025-05-11 干货!SQL性能优化,书写高质量SQL语句
- 2025-05-11 Linq 下的扩展方法太少了,MoreLinq 来啦
- 2025-05-11 mysql学习3:select基础---单表查询
- 05-11产品切换数据库问题处理总结(切换数据库的命令是什么)
- 05-11真正让你明白Hive参数调优系列1:控制map个数与性能调优参数
- 05-11拉链表(拉链表取数)
- 05-11常见的数据库类型有哪些 & SQL介绍
- 05-11每日SQL自学知识点(第八天)—多表查询详解
- 05-11大厂必问:MySQL 三表 JOIN 操作的解析与性能优化,效率又如何?
- 05-11数据库(数据库有哪些)
- 05-11干货!SQL性能优化,书写高质量SQL语句
- 最近发表
- 标签列表
-
- xml (46)
- css animation (57)
- array_slice (60)
- htmlspecialchars (54)
- position: absolute (54)
- datediff函数 (47)
- array_pop (49)
- jsmap (52)
- toggleclass (43)
- console.time (63)
- .sql (41)
- ahref (40)
- js json.parse (59)
- html复选框 (60)
- css 透明 (44)
- css 颜色 (47)
- php replace (41)
- css nth-child (48)
- min-height (40)
- xml schema (44)
- css 最后一个元素 (46)
- location.origin (44)
- table border (49)
- html tr (40)
- video controls (49)