WebFlux 前置知识(三)

[TOC]

Stream 流式 API 也是 JDK8 开始引入的 API,用过的小伙伴都说爽,没用过的小伙伴则感到一脸懵逼。现在 JDK 已经到 16 了,可能很多人还不熟悉 Stream 流操作,这次趁着讲 WebFlux,松哥也来和大家一起回顾一下 Stream 流操作。

老实说,松哥在日常工作中,有时候刚好碰上了也会用 Stream 流,用着确实很爽,代码简洁,而且看起来高大上,确实是一个不错的东西。不过组里有很多人其实并不太熟悉 Stream 操作,所以也就没有去强行推广,但是作为一个合格的程序员,还是很有必要去学习一下 Stream 流操作的。

1.Stream 流简介

说到 Stream,大家可能很容易想到 Java IO 中的各种流操作,名字看起来很像,但这其实是两个完全不同的东西。

Java8 中的 Stream 不存储数据,它通过函数式编程模式来对集合进行链状流式操作。

2.基本玩法

先来个简单的例子感受下 Stream 的操作。

例如我有一个数组,如下:

1
int[] arr = {1, 2, 3, 4, 5, 6};

现在想给这个数组求平均数,如下:

1
double asDouble = IntStream.of(arr).average().getAsDouble();

一行代码搞定,首先调用 IntStream.of 方法获取流对象,然后调用 average 方法计算平均数,再调用 getAsDouble 方法获取结果。全程都是方法调用,不用自己写求平均数的逻辑。

再比如求和,如下:

1
int sum = IntStream.of(arr).sum();

也是直接调用 sum 方法即可。

而且我们还可以在流中对数据进行二次加工,例如给数组中的每个元素先求平方再求和:

1
2
double[] arr = {1, 2, 3, 4, 5, 6};
double sum = DoubleStream.of(arr).map(i -> Math.pow(i, 2)).sum();

先在 Map 中对每一个元素求平方,然后再求和。

这里就能涉及到两个概念:

  • 中间操作
  • 终止操作

所谓中间操作,就是中途求平方的操作,所谓终止操作就是最终计算出结果的操作,只要方法的返回值不是一个 Stream,那就是终止操作,否则就是中间操作。

3.Stream 的创建

获取一个流的方式多种多样,最常见的就是从集合或者数组中获取一个流,当然我们也可以直接自己创建一个流出来。

一起来看下。

集合

List 集合或者 Set 集合都可以直接搞一个流出来,方式如下:

1
2
3
4
List<String> list = new ArrayList<>();
Stream<String> s1 = list.stream();
Set<String> set = new HashSet<>();
Stream<String> s2 = set.stream();

集合中直接调用 stream 方法就可以获取到流。

数组

通过数组获取流的方式也很简单,如下:

1
IntStream stream = Arrays.stream(new int[]{11, 22, 33, 44, 55, 66});

数字 Stream

也可以直接利用 IntStream、LongStream 等对象创建一个数字 Stream,如下:

1
2
3
IntStream s1 = IntStream.of(1, 2, 3);
DoubleStream s2 = DoubleStream.of(1, 2, 3);
LongStream s3 = LongStream.of(1L, 2L, 3L);

自己创建

1
2
3
Random random = new Random();
Supplier<Integer> supplier = () -> random.nextInt(100);
Stream<Integer> stream = Stream.generate(supplier).limit(5);

调用 Stream.generate 方法可以自己创建一个流,自己创建的时候需要提供一个 Supplier,通过调用 Supplier 中的 get 方法自动获取到元素。

无论哪种创建方式,大家需要明白的是,Stream 并不会保存数据,它只会对数据进行加工。

4.Stream 的中间操作

中间操作可以分为两大类:

  • map 或者 filter 会从输入流中获取每一个元素,并且在输出流中得到一个结果,这些操作没有内部状态,称为无状态操作。
  • reduce、sum、max 这些操作都需要内部状态来累计计算结果,所以称为有状态操作。

分别来看下:

无状态操作

  • map/mapToXxx
  • flatMap/flatMapToXxx
  • filter
  • peek

有状态操作

  • distinct
  • sorted
  • limit/skip

map

Stream.map() 是 Stream 中最常用的一个转换方法,可以把一个 Stream 对象转为另外一个 Stream 对象。map 方法所接收的参数就是一个 Function 对象,松哥在前面文章中和大家介绍过 Function 对象了,就是有输入有输出(参见WebFlux 前置知识(一)),了解了 map 的参数,那么 map 的功能就很明白了,就是对数据进行二次加工。

举个栗子,例如把一个字符串数组转为数字:

1
2
3
String[] arr = {"1", "2", "3"};
Stream<String> s1 = Arrays.stream(arr);
Stream<Integer> s2 = s1.map(i -> Integer.valueOf(i));

再比如一个数字流,给所有的元素乘 2,如下:

1
IntStream.of(1, 2, 3).map(i -> 2 * i).forEach(System.out::println);

最后的 forEach 就是将元素打印出来。

JDK 中也提供了一些现成的格式转换,如下图:

这样可以直接将元素转为 Double、Long、Obj 等类型,例如下面这样:

1
2
3
String[] arr = {"1", "2", "3"};
Stream<String> s1 = Arrays.stream(arr);
s1.mapToLong(i -> Long.parseLong(i)).forEach(System.out::println);

flatMap

flatMap 可以把 Stream 中的每个元素都映射为一个 Stream,然后再把这多个 Stream 合并为一个 Stream。

例如如下代码,返回的 Stream 中的元素是数组:

1
2
Stream<Integer[]> s = Stream.of(new Integer[]{1, 2, 3}, new Integer[]{4, 5, 6}, new Integer[]{7, 8, 9});
s.forEach(System.out::println);

通过 flatMap 我们可以将 Stream 中的元素变为 Integer:

1
2
Stream<Integer> s = Stream.of(new Integer[]{1, 2, 3}, new Integer[]{4, 5, 6}, new Integer[]{7, 8, 9}).flatMap( i -> Arrays.stream(i));
s.forEach(System.out::println);

filter

filter 操作会对一个 Stream 中的所有元素一一进行判断,不满足条件的就被过滤掉了,剩下的满足条件的元素就构成了一个新的 Stream。

例如要找到数组中所有大于 3 的元素,如下:

1
IntStream.of(2, 3, 4, 5, 6, 7).filter(i -> i > 3).forEach(System.out::println);

filter 方法接收的参数是 Predicate 接口函数,关于 Predicate 接口函数,大家可以参考WebFlux 前置知识(一))一文。

peek

peek 的入参是 Consumer,没有返回值,因此当我们要对元素内部进行处理时,使用 peek 是比较合适的,这个时候可以不用 map(map 的入参是 Function,它是有返回值的)。peek 方法本身会继续返回流,可以对数据继续进行处理。

举个简单的数据转换的例子吧(最终返回的数据并不会被转换):

1
IntStream.of(2, 3, 4, 5, 6, 7).filter(i -> i > 3).peek(String::valueOf).forEach(i-> System.out.println(i));

peek 方法的感觉就像数据中途被消费了一次。

distinct

这个是去重。由于去重操作需要获取到其他元素的值(比较之后才知道是否重复),所以这个是有状态操作。如下:

1
IntStream.of(2, 3, 4, 3, 7, 6, 2, 5, 6, 7).distinct().forEach(System.out::println);

sorted

sorted 是排序,因为也需要知道其他元素的值,然后才能去重,所以这个也是有状态操作,如下:

1
IntStream.of(2, 3, 4, 3, 7, 6, 2, 5, 6, 7).distinct().sorted().forEach(System.out::println);

limit/skip

limit 和 skip 配合操作有点像数据库中的分页,skip 表示跳过 n 个元素,limit 表示取出 n 个元素。例如下面这个例子:

1
Arrays.asList('A', 'B', 'C', 'D', 'E', 'F').stream().skip(2).limit(3).forEach(System.out::println);

这个会跳过 A 和 B,最终打印出 C D E。这也是一种有状态操作。

5.Stream 终止操作

终止操作就是最终计算出结果的操作,只要方法的返回值不是一个 Stream,那就是终止操作,否则就是中间操作。

终止操作又分为两类:

  • 短路操作:不用处理全部元素就可以返回结果。
  • 非短路操作:必须处理所有元素才能得到最终结果。

各自都包含哪些操作,我们分别来看下:

非短路操作

  • forEach/forEachOrdered
  • collect/toArray
  • reduce
  • min/max/count

短路操作

  • findFirst/findAny
  • allMatch/anyMatch/noneMatch

forEach/forEachOrdered

forEach 和 forEachOrdered 都是接收一个 Consumer 类型的参数,完成对参数的消费,不同的是,在并行流中,forEachOrdered 会保证执行顺序。

例如如下一段代码:

1
2
3
int[] arr = {1, 2, 3, 4, 5, 6, 7, 8, 9};
Arrays.stream(arr).parallel().forEach(System.out::println);
Arrays.stream(arr).parallel().forEachOrdered(System.out::println);

前者打印出来的顺序不一定是 123456789,后者一定是。

collect/toArray

这两个都是收集器,可以将执行结果转为一个 List 集合或者一个数组:

1
2
List<Integer> list = Stream.of(1, 2, 3, 4).filter(p -> p > 2).collect(Collectors.toList());
System.out.println(list);

reduce

reduce 是 Stream 的一个聚合方法,它可以把一个 Stream 的所有元素按照聚合函数聚合成一个结果。reduce 方法传入的对象是BinaryOperator 接口,它定义了一个apply 方法,负责把上次累加的结果和本次的元素进行运算,并返回累加的结果。

举个简单的例子,数组求和,当然可以直接调用 sum 计算,我们这里也可以调用 reduce 来实现,如下:

1
2
Optional<Integer> optional = Stream.of(1, 2, 3, 4).reduce((i, j) -> i + j);
System.out.println(optional.orElse(-1));

reduce 的参数是 BinaryOperator,这个接收两个参数,第一个参数是之前计算的结果,第二个参数是本次参与计算的元素,两者累加求和。

再比如给一段话中间加上.,方式如下:

1
2
Optional<String> s = Stream.of("wwwjavaboyorg".split("")).reduce((i, j) -> i + "." + j);
System.out.println(s.orElse(""));

最终执行结果如下:

1
w.w.w.j.a.v.a.b.o.y.o.r.g

min/max/count

这个就比较简单了,就是求最大值最小值,统计总个数,如下表示统计总个数:

1
2
3
Stream<Integer> s = Stream.of(1, 2, 3, 4);
long count = s.count();
System.out.println("count = " + count);

如下表示统计最小值:

1
2
3
Stream<Integer> s = Stream.of(1, 2, 3, 4);
Optional<Integer> min = s.min(Comparator.comparingInt(i -> i));
System.out.println("min.get() = " + min.get());

findFirst/findAny

这两个就是返回流中的第一个、任意一个元素,findAny 要在并行流中测试才有效果,举个栗子:

1
2
3
4
5
6
7
8
9
for (int i = 0; i < 10; i++) {
Optional<Integer> first = Stream.of(1, 2, 3, 4).parallel().findFirst();
System.out.println("first.get() = " + first.get());
}
System.out.println("=============");
for (int i = 0; i < 10; i++) {
Optional<Integer> first = Stream.of(1, 2, 3, 4).parallel().findAny();
System.out.println("first.get() = " + first.get());
}

allMatch/anyMatch/noneMatch

allMatch、anyMatch、noneMatch 用来判断所有元素、任意元素或者没有元素满足给定的条件。这三个方法的参数都是一个 Predicate 接口函数。

1
2
boolean b = Stream.of(1, 2, 3, 4).allMatch(i -> i > 5);
System.out.println("b = " + b);

6.并行流

通常情况下,对 Stream 的元素进行处理是单线程的,即一个一个元素进行处理。有时候我们希望可以并行处理 Stream 元素,因为在元素数量非常大的情况,并行处理可以大大加快处理速度。

把一个普通 Stream 转换为可以并行处理的 Stream 非常简单,只需要用 parallel 方法进行转换:

1
2
3
new Random().ints().limit(50).parallel().forEach(i->{
System.out.println(Thread.currentThread().getName() + "--->" + i);
});

这样数据在后台就是并行打印的。

7.收集器

收集器可以将计算结果重新整理收集到一个集合中,这个集合可以是一个 List/Set 获取其他,并且还可以在收集的过程中对数据进行处理。

例如我有一个 users 集合,里边保存了用户数据,用户有 username、age 以及 gender 三个属性,如下代码分别表示:

  • 提取出用户对象中的 age 属性组成新的集合并返回。
  • 提取出用户对象中的 username 属性组成新的集合并返回。
  • 提取出用户对象中的 gender 属性组成新的集合并返回(这里是一个 Set 集合,所以会自动去重)。
1
2
3
4
5
6
List<Integer> ages = users.stream().map(User::getAge).collect(Collectors.toList());
System.out.println("ages = " + ages);
List<String> usernames = users.stream().map(User::getUsername).collect(Collectors.toList());
System.out.println("usernames = " + usernames);
Set<String> genders = users.stream().map(User::getGender).collect(Collectors.toSet());
System.out.println("genders = " + genders);

Collectors.toList() 最终返回的是 ArrayList,Collectors.toSet() 最终返回的是 HashSet。

如果我们想返回一个 Vector 或者 TreeSet,也是可以的,如下:

1
2
3
4
5
6
List<Integer> ages = users.stream().map(User::getAge).collect(Collectors.toList());
System.out.println("ages = " + ages);
List<String> usernames = users.stream().map(User::getUsername).collect(Collectors.toCollection(Vector::new));
System.out.println("usernames = " + usernames);
TreeSet<String> genders = users.stream().map(User::getGender).collect(Collectors.toCollection(TreeSet::new));
System.out.println("genders = " + genders);

也可以获取某一个字段的统计信息:

1
2
IntSummaryStatistics ageStatistics = users.stream().collect(Collectors.summarizingInt(User::getAge));
System.out.println("ageStatistics = " + ageStatistics);

这个统计信息中包含:总和、最小值、平均值以及最大值等:

1
ageStatistics = IntSummaryStatistics{count=20, sum=1222, min=9, average=61.100000, max=96}

还可以对数据进行分块,将男女不同性别统计出来:

1
2
Map<Boolean, List<User>> map = users.stream().collect(Collectors.partitioningBy(u -> u.getGender().equals("男")));
System.out.println("map = " + map);

也可以按照性别对数据进行分组,如下:

1
2
Map<String, List<User>> map2 = users.stream().collect(Collectors.groupingBy(User::getGender));
System.out.println("map2 = " + map2);

分组后,Map 中的 key 就是性别;分块后,Map 中的 key 就是 true/false。

再比如统计男女的人数:

1
2
Map<String, Long> map2 = users.stream().collect(Collectors.groupingBy(User::getGender,Collectors.counting()));
System.out.println("map2 = " + map2);

好啦,今天我们就先聊这么多~