自定义Apache Flink Stream Source需要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironment 的addSource方法获取DataStream, 所以我们需要自定义一个 SourceFunction, 并且要支持产生WaterMark,也就是要实现DefinedRowtimeAttributes接口。
(1) Source Function定义
支持接收携带EventTime的数据集合,Either的数据结构,Right表示WaterMark和Left表示数据:
- class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]])
- extends SourceFunction[T] {
- override def run(ctx: SourceContext[T]): Unit = {
- dataWithTimestampList.foreach {
- case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
- case Right(w) => ctx.emitWatermark(new Watermark(w))
- }
- }
- override def cancel(): Unit = ???}
(2) 定义 StreamTableSource
我们自定义的Source要携带我们测试的数据,以及对应WaterMark数据,具体如下:
- class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
-
- val fieldNames = Array("accessTime", "region", "userId")
- val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
- val rowType = new RowTypeInfo(
- Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
- fieldNames)
-
- // 页面访问表数据 rows with timestamps and watermarks
- val data = Seq(
- Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
- Right(1510365660000L),
- Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
- Right(1510365660000L),
- Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
- Right(1510366200000L),
- Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
- Right(1510366260000L),
- Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
- Right(1510373400000L)
- )
-
- override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
- Collections.singletonList(new RowtimeAttributeDescriptor(
- "accessTime",
- new ExistingField("accessTime"),
- PreserveWatermarks.INSTANCE))
- }
-
- override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
- execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType)
- }
-
- override def getReturnType: TypeInformation[Row] = rowType
-
- override def getTableSchema: TableSchema = schema
-
- }
2. Sink 定义 (编辑:晋中站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|