我们构造一个只包含一个data字段的用户表,用户表数据如下:

查询的需求是将data字段flatten成为name和age两个字段的表,期望得到:

我们以ITCase方式完成如上查询需求,完整代码如下:
- @Test
- def testLateralTVF(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStateBackend(getStateBackend)
- StreamITCase.clear
-
- val userData = new mutable.MutableList[(String)]
- userData.+=(("Sunny#8"))
- userData.+=(("Kevin#36"))
- userData.+=(("Panpan#36"))
-
- val SQLQuery = "SELECT data, name, age FROM userTab, LATERAL TABLE(splitTVF(data)) AS T(name, age)"
-
- val users = env.fromCollection(userData).toTable(tEnv, 'data)
-
- val tvf = new SplitTVF()
- tEnv.registerTable("userTab", users)
- tEnv.registerFunction("splitTVF", tvf)
-
- val result = tEnv.SQLQuery(SQLQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
- StreamITCase.testResults.foreach(println(_))
- }
运行结果:

上面的核心语句是:
- val SQLQuery = "SELECT data, name, age FROM userTab, LATERAL TABLE(splitTVF(data)) AS T(name, age)"
如果大家想运行上面的示例,请查阅《Apache Flink 漫谈系列 - SQL概览》中 源码方式 搭建测试环境。
六、小结
本篇重点向大家介绍了一种新的JOIN类型 - JOIN LATERAL。并向大家介绍了SQL Server中对LATERAL的支持方式,详细分析了JOIN LATERAL和INNER JOIN的区别与联系,最后切入到Apache Flink中,以UDTF示例说明了Apache Flink中对JOIN LATERAL的支持,后续篇章会介绍Apache Flink中另一种使用LATERAL的场景,就是Temporal JION,Temporal JION也是一种新的JOIN类型,我们下一篇再见!
关于点赞和评论
本系列文章难免有很多缺陷和不足,真诚希望读者对有收获的篇章给予点赞鼓励,对有不足的篇章给予反馈和建议,先行感谢大家!
作者:孙金城,花名 金竹,目前就职于阿里巴巴,自2015年以来一直投入于基于Apache Flink的阿里巴巴计算平台Blink的设计研发工作。
【本文为51CTO专栏作者“金竹”原创稿件,转载请联系原作者】
【编辑推荐】
- Apache Flink 漫谈系列 - Fault Tolerance
- Apache Flink 漫谈系列 - 流表对偶(duality)性
- Apache Flink 漫谈系列 - 持续查询(Continuous Queries)
- Apache Flink 漫谈系列 - SQL概览
- Apache Flink 漫谈系列 - JOIN 算子
【责任编辑:赵宁宁 TEL:(010)68476606】
点赞 0 (编辑:晋中站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|