加入收藏 | 设为首页 | 会员中心 | 我要投稿 晋中站长网 (https://www.0354zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

Apache Flink 漫谈系列(15) - DataStream Connectors之Kafka

发布时间:2019-01-29 18:04:21 所属栏目:教程 来源:孙金城
导读:一、聊什么 为了满足本系列读者的需求,我先介绍一下Kafka在Apache Flink中的使用。所以本篇以一个简单的示例,向大家介绍在Apache Flink中如何使用Kafka。 二、Kafka 简介 Apache Kafka是一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,Li

我们计算一个大小为1秒的Tumble窗口,计算窗口内最大的值。完整的程序如下:

  1. import org.apache.flink.api.common.typeinfo.BasicTypeInfo; 
  2. import org.apache.flink.api.common.typeinfo.TypeInformation; 
  3. import org.apache.flink.api.java.tuple.Tuple2; 
  4. import org.apache.flink.api.java.typeutils.TupleTypeInfo; 
  5. import org.apache.flink.api.java.utils.ParameterTool; 
  6. import org.apache.flink.streaming.api.TimeCharacteristic; 
  7. import org.apache.flink.streaming.api.datastream.DataStream; 
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
  9. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; 
  10. import org.apache.flink.streaming.api.windowing.time.Time; 
  11. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; 
  12. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; 
  13. import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; 
  14.  
  15. import java.util.Properties; 
  16.  
  17. public class KafkaWithEventTimeExample { 
  18.     public static void main(String[] args) throws Exception { 
  19.         // 用户参数获取 
  20.         final ParameterTool parameterTool = ParameterTool.fromArgs(args); 
  21.         // Stream 环境 
  22.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  23.         // 设置 Event-time 
  24.         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
  25.  
  26.         // Source的topic 
  27.         String sourceTopic = "flink-topic"; 
  28.         // Sink的topic 
  29.         String sinkTopic = "flink-topic-output"; 
  30.         // broker 地址 
  31.         String broker = "localhost:9092"; 
  32.  
  33.         // 属性参数 - 实际投产可以在命令行传入 
  34.         Properties p = parameterTool.getProperties(); 
  35.         p.putAll(parameterTool.getProperties()); 
  36.         p.put("bootstrap.servers", broker); 
  37.  
  38.         env.getConfig().setGlobalJobParameters(parameterTool); 
  39.         // 创建消费者 
  40.         FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Tuple2<String, Long>>( 
  41.                 sourceTopic, 
  42.                 new KafkaWithTsMsgSchema(), 
  43.                 p); 
  44.  
  45.         // 读取Kafka消息 
  46.         TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<Tuple2<String, Long>>( 
  47.                 BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); 
  48.  
  49.         DataStream<Tuple2<String, Long>> input = env 
  50.                 .addSource(consumer).returns(typeInformation) 
  51.                 // 提取时间戳,并生产Watermark 
  52.                 .assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks()); 
  53.  
  54.         // 数据处理 
  55.         DataStream<Tuple2<String, Long>> result = input 
  56.                 .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) 
  57.                 .max(0); 
  58.  
  59.         // 创建生产者 
  60.         FlinkKafkaProducer producer = new FlinkKafkaProducer<Tuple2<String, Long>>( 
  61.                 sinkTopic, 
  62.                 new KeyedSerializationSchemaWrapper<Tuple2<String, Long>>(new KafkaWithTsMsgSchema()), 
  63.                 p, 
  64.                 FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); 
  65.  
  66.         // 将数据写入Kafka指定Topic中 
  67.         result.addSink(producer); 
  68.  
  69.         // 执行job 
  70.         env.execute("Kafka With Event-time Example"); 
  71.     }} 

(编辑:晋中站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读