Flink有哪些高级特性是初学者容易忽视的?

2026-04-29 20:432阅读0评论SEO资讯
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1065个文字,预计阅读时间需要5分钟。

Flink有哪些高级特性是初学者容易忽视的?

window window是指针对DataStream的一种操作,可以将无界的原始数据切割成有界的数据块的手柄。它可以基于时间驱动的[time window]或数据驱动的[count window],以及元素个数。类型:分为滚动窗口:滚动窗口。

Flink有哪些高级特性是初学者容易忽视的?

window

window是针对DataStream,一种可以把无界的数据切割为有界数据块的手段,可以是时间驱动的或者数据驱动的,元素个数。

类型:分为 tumbling window:滚动窗口、sliding window:滑动窗口

time window

通过socket接收数据,统计窗口内的单词数量。不使用keyBy时,使用timeWindowAll来代替timeWindow

public class WindowStudent { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001); // 使用滚动窗口,每10秒计算一次前10秒的窗口数据 // 对每个时间窗口内的数据进行单词统计 streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { // value 接收到的每行数据 @Override public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception { String[] splitValue = value.split(" "); for (String str : splitValue) { out.collect(new Tuple2<String,Integer>(str,1)); } } }).keyBy(0).timeWindow(Time.seconds(10)).sum(1).print(); // 使用滑动窗口 每隔5秒统计前10秒窗口内的数据 streamSource.flatMap(new FlatMapFunction<String, Tuple2>() { // value 接收到的每行数据 @Override public void flatMap(String value, Collector<Tuple2> out) throws Exception { String[] splitValue = value.split(" "); for (String str : splitValue) { out.collect(new Tuple2(str,1)); } } }) .keyBy(0) // 第一个参数:窗口大小 第二个参数:滑动间隔 .timeWindow(Time.seconds(10),Time.seconds(5)) .sum(1).print(); env.execute(); } }


CountWindow

CountWindow 是针对元素个数来进行分隔。

注意:当前面有keyBy对元素进行分组时,当分组中的元素达到窗口大小才会进行计算,而不是总元素大小

不使用keyBy时,使用CountWindowAll来代替CountWindow

public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001); // 使用滚动窗口,每隔6个元素计算一次 // 对每个窗口内的数据进行单词统计 streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { // value 接收到的每行数据 @Override public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception { String[] splitValue = value.split(" "); for (String str : splitValue) { out.collect(new Tuple2<String,Integer>(str,1)); } } }).keyBy(0) // 表示每隔6个元素计算一次 前面有keyBy,会对元素进行分组,当分组中的元素有6个才会进行计算 .countWindow(6) .sum(1).print(); // 使用滑动窗口 每隔2个元素计算一次前5个元素 streamSource.flatMap(new FlatMapFunction<String, Tuple2>() { // value 接收到的每行数据 @Override public void flatMap(String value, Collector<Tuple2> out) throws Exception { String[] splitValue = value.split(" "); for (String str : splitValue) { out.collect(new Tuple2(str,1)); } } }) .keyBy(0) // 第一个参数:窗口大小 第二个参数:滑动间隔 .countWindow(5,2) .sum(1).print(); env.execute(); }

自定义window

timeWindow和countWindow都是基于window()实现的,timeWindowAll则是windowAll。

public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001); // 使用滚动窗口,每隔10个元素计算一次 // 对每个窗口内的数据进行单词统计 streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { // value 接收到的每行数据 @Override public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception { String[] splitValue = value.split(" "); for (String str : splitValue) { out.collect(new Tuple2<String,Integer>(str,1)); } } }).keyBy(0) // 表示每隔10个元素计算一次 前面有keyBy,会对元素进行分组,当分组中的元素有6个才会进行计算 .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .sum(1).print(); // 使用滑动窗口 每隔2秒统计前5秒的数据 streamSource.flatMap(new FlatMapFunction<String, Tuple2>() { // value 接收到的每行数据 @Override public void flatMap(String value, Collector<Tuple2> out) throws Exception { String[] splitValue = value.split(" "); for (String str : splitValue) { out.collect(new Tuple2(str,1)); } } }) .keyBy(0) // 第一个参数:窗口大小 第二个参数:滑动间隔 .window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))) .sum(1).print(); env.execute(); }

增量聚合与全量聚合

增量聚合:窗口中每进入一条数据就进行一次计算,代表函数:reduce,sum,min,max等

全量聚合:等属于窗口的数据到齐,才开始进行计算,可以对窗口内的数据进行排序等需求 ,代表函数:apply,process。process比apply包含更多的上下文信息。

本文共计1065个文字,预计阅读时间需要5分钟。

Flink有哪些高级特性是初学者容易忽视的?

window window是指针对DataStream的一种操作,可以将无界的原始数据切割成有界的数据块的手柄。它可以基于时间驱动的[time window]或数据驱动的[count window],以及元素个数。类型:分为滚动窗口:滚动窗口。

Flink有哪些高级特性是初学者容易忽视的?

window

window是针对DataStream,一种可以把无界的数据切割为有界数据块的手段,可以是时间驱动的或者数据驱动的,元素个数。

类型:分为 tumbling window:滚动窗口、sliding window:滑动窗口

time window

通过socket接收数据,统计窗口内的单词数量。不使用keyBy时,使用timeWindowAll来代替timeWindow

public class WindowStudent { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001); // 使用滚动窗口,每10秒计算一次前10秒的窗口数据 // 对每个时间窗口内的数据进行单词统计 streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { // value 接收到的每行数据 @Override public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception { String[] splitValue = value.split(" "); for (String str : splitValue) { out.collect(new Tuple2<String,Integer>(str,1)); } } }).keyBy(0).timeWindow(Time.seconds(10)).sum(1).print(); // 使用滑动窗口 每隔5秒统计前10秒窗口内的数据 streamSource.flatMap(new FlatMapFunction<String, Tuple2>() { // value 接收到的每行数据 @Override public void flatMap(String value, Collector<Tuple2> out) throws Exception { String[] splitValue = value.split(" "); for (String str : splitValue) { out.collect(new Tuple2(str,1)); } } }) .keyBy(0) // 第一个参数:窗口大小 第二个参数:滑动间隔 .timeWindow(Time.seconds(10),Time.seconds(5)) .sum(1).print(); env.execute(); } }


CountWindow

CountWindow 是针对元素个数来进行分隔。

注意:当前面有keyBy对元素进行分组时,当分组中的元素达到窗口大小才会进行计算,而不是总元素大小

不使用keyBy时,使用CountWindowAll来代替CountWindow

public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001); // 使用滚动窗口,每隔6个元素计算一次 // 对每个窗口内的数据进行单词统计 streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { // value 接收到的每行数据 @Override public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception { String[] splitValue = value.split(" "); for (String str : splitValue) { out.collect(new Tuple2<String,Integer>(str,1)); } } }).keyBy(0) // 表示每隔6个元素计算一次 前面有keyBy,会对元素进行分组,当分组中的元素有6个才会进行计算 .countWindow(6) .sum(1).print(); // 使用滑动窗口 每隔2个元素计算一次前5个元素 streamSource.flatMap(new FlatMapFunction<String, Tuple2>() { // value 接收到的每行数据 @Override public void flatMap(String value, Collector<Tuple2> out) throws Exception { String[] splitValue = value.split(" "); for (String str : splitValue) { out.collect(new Tuple2(str,1)); } } }) .keyBy(0) // 第一个参数:窗口大小 第二个参数:滑动间隔 .countWindow(5,2) .sum(1).print(); env.execute(); }

自定义window

timeWindow和countWindow都是基于window()实现的,timeWindowAll则是windowAll。

public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStreamSource<String> streamSource = env.socketTextStream("10.50.8.125", 9001); // 使用滚动窗口,每隔10个元素计算一次 // 对每个窗口内的数据进行单词统计 streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { // value 接收到的每行数据 @Override public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception { String[] splitValue = value.split(" "); for (String str : splitValue) { out.collect(new Tuple2<String,Integer>(str,1)); } } }).keyBy(0) // 表示每隔10个元素计算一次 前面有keyBy,会对元素进行分组,当分组中的元素有6个才会进行计算 .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .sum(1).print(); // 使用滑动窗口 每隔2秒统计前5秒的数据 streamSource.flatMap(new FlatMapFunction<String, Tuple2>() { // value 接收到的每行数据 @Override public void flatMap(String value, Collector<Tuple2> out) throws Exception { String[] splitValue = value.split(" "); for (String str : splitValue) { out.collect(new Tuple2(str,1)); } } }) .keyBy(0) // 第一个参数:窗口大小 第二个参数:滑动间隔 .window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2))) .sum(1).print(); env.execute(); }

增量聚合与全量聚合

增量聚合:窗口中每进入一条数据就进行一次计算,代表函数:reduce,sum,min,max等

全量聚合:等属于窗口的数据到齐,才开始进行计算,可以对窗口内的数据进行排序等需求 ,代表函数:apply,process。process比apply包含更多的上下文信息。