博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink得Aggregate和minBy、maxBy的区别
阅读量:3963 次
发布时间:2019-05-24

本文共 1544 字,大约阅读时间需要 5 分钟。

Flink得Aggregate和minBy、maxBy的区别

直接代码结果分析说明

public class AggregateDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource dataSource = env.readTextFile(“data/apache.log”);
MapOperator<String, Tuple2<String, Integer>> map = dataSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value.split(" ")[0], 1);
}
});

GroupReduceOperator
, Tuple2
> reduceOperator = map.groupBy(0).reduceGroup(new GroupReduceFunction
, Tuple2
>() { @Override public void reduce(Iterable
> values, Collector
> out) throws Exception { String key = null; int count = 0; for (Tuple2
value : values) { key = value.f0; count += value.f1; } out.collect(Tuple2.of(key, count)); } }); reduceOperator.print(); System.out.println("--------------------"); reduceOperator.aggregate(Aggregations.MAX,1).print(); reduceOperator.aggregate(Aggregations.MIN,1).print(); reduceOperator.aggregate(Aggregations.SUM,1).print();}

}

这个代码运行结果如下:

.csdnimg.cn/20210121143204699.png)

根据结果可以看出,经过Aggregate.MAX和Aggregations.MIN和Aggregations.SUM处理的结果key和value是不对应的,比如原有数据value最大的应该是(10.0.0.1,7),但是Aggregate.MAX得出的数据是(83.149.9.216,7)。

然后再看maxBy和minBy的程序和结果:

reduceOperator.minBy(1).print();
reduceOperator.maxBy(1).print();
结果如图
在这里插入图片描述
key和value是对应的;

所以两种都是求最值,但是内部计算逻辑不一样:

Min在计算的过程中,会记录最小值,对于其它的列,会取最后一次出现的,然后和最小值组合形成结果返回;
minBy在计算的过程中,当遇到最小值后,将第一次出现的最小值所在的整个元素返回。

转载地址:http://fjgzi.baihongyu.com/

你可能感兴趣的文章
ksh 动态命令
查看>>
ksh 多进程
查看>>
ksh 命令分隔符
查看>>
Linux 精萃
查看>>
sed 精萃
查看>>
awk 精萃
查看>>
awk 简介
查看>>
awk 调用方式
查看>>
awk 注释
查看>>
awk 命令分隔符
查看>>
awk 变量
查看>>
awk 数组
查看>>
如何写出高效的SQL
查看>>
awk 运算符
查看>>
awk 控制结构
查看>>
awk 格式化输出
查看>>
awk 正则表达式
查看>>
awk 函数
查看>>
awk 向命令传递参数
查看>>
awk I/O
查看>>