流处理

 2024-05-29    0 comment    273 browse

java

介绍

流处理

Java 流处理(Stream Processing)在 Java 8 及之后的版本中是一个核心概念,它允许我们以声明性方式处理数据集合(如列表、集合等)。通过使用流(Stream),我们可以对集合中的元素执行复杂的查询和操作,而无需修改原始数据结构。流处理提供了一种更加简洁、易读和高效的方式来处理集合数据。

Java 中的流是数据序列的抽象表示,它允许我们进行一系列的操作,如过滤(filter)、映射(map)、排序(sorted)、归约(reduce)等,这些操作可以通过链式调用组合在一起,形成一个流管道(pipeline)。流处理的过程是惰性的(lazy),即只有当我们需要结果时(如调用 collect 方法),流上的操作才会被执行。

Java 流处理的关键点和特点:

  1. 声明性:通过流,我们可以使用声明性语法来描述数据转换和操作,而无需编写复杂的循环和条件语句。
  2. 函数式编程:流操作通常使用 Lambda 表达式或方法引用来定义,这符合函数式编程的风格。
  3. 惰性求值:流操作是惰性的,这意味着它们不会立即执行。只有当需要结果时(如调用 collectfindFirst 等终端操作),流上的操作才会被执行。
  4. 内部迭代:与传统的外部迭代(使用循环)不同,流处理使用内部迭代来遍历集合中的元素。这意味着迭代逻辑被封装在流操作中,我们无需关心具体的迭代过程。
  5. 并行处理:流支持并行处理,可以自动将数据划分为多个部分并在多个线程上并行执行操作。这可以显著提高处理大型数据集的性能。

使用流处理而不使用遍历

  1. 简洁性和可读性:流处理提供了一种更为简洁和优雅的方式来操作集合数据,通过链式操作可以清晰地表达数据的处理流程,使得代码更加简洁和易读。
  2. 函数式编程思想:流处理借鉴了函数式编程的思想,鼓励使用函数式接口、Lambda 表达式等来处理数据。这种编程范式使得代码更为简洁、可读,同时也更容易并行化和优化。
  3. 延迟计算:流处理中的中间操作是延迟执行的,只有在遇到终端操作时才会触发实际的计算。这种延迟计算的机制可以提高性能,同时也可以节省内存和资源。
  4. 并行处理:流处理提供了并行流的支持,可以轻松地将流操作并行化,充分利用多核处理器的优势来加速数据处理。这在处理大量数据时尤为重要。
  5. 错误减少:由于流处理提供了一系列的中间操作和终端操作,开发者可以更加集中地处理数据的流动和转换,减少了出错的可能性,提高了代码的可靠性和稳定性。
  6. 适应性:流处理提供了丰富的中间操作和终端操作,可以满足各种不同的需求,例如过滤、映射、排序、聚合等,同时也支持自定义操作,具有较高的适应性和灵活性。

Stream 类

  • Stream 类提供了对集合进行流式操作的功能,它允许在一组数据上进行一系列的中间操作和终端操作。
  • 中间操作可以是过滤、映射、排序等操作,它们不会触发实际的计算,只是在流上构建操作链。
  • 终端操作会触发流的处理,例如 forEach、collect、reduce 等。
  • Stream 的使用可以大大简化集合数据的处理,使代码更加清晰和易读。

基本使用步骤

0.创建集合

1.流对象获取数据

Stream<>  stream=集合对象.stream()

2.操作处理

stream=stream.filter(......)

3.流对象封装回数据

T t=stream.collect(Collectors.to..)

初始操作(转为stream)

  1. 集合(Collections):Java 中的集合类都可以通过 stream() 方法转换为 Stream 对象,例如 List、Set、Map 等。

     List<String> list = Arrays.asList("a", "b", "c");
    Stream<String> stream = list.stream();
    
  2. 数组(Arrays):可以通过 Arrays.stream() 方法将数组转换为 Stream 对象。

    int[] array = {1, 2, 3, 4, 5};
    IntStream stream = Arrays.stream(array);
    
  3. IO 流(IO Streams):Java 中的 IO 流也可以被转换为 Stream 对象,例如 BufferedReader 的 lines() 方法返回的就是一个 Stream 对象。

    BufferedReader reader = new BufferedReader(new FileReader("file.txt"));
    Stream<String> lines = reader.lines();
    
  4. 数值范围(Numeric Ranges):可以使用 IntStream.range()LongStream.range() 等方法创建一个数值范围的 Stream 对象。

     IntStream range = IntStream.range(1, 10); // 1 到 9
    
  5. 其他数据源(Other Sources):Java 8 中还引入了一些其他方式创建 Stream 对象的方法,如 Stream.of()Stream.iterate() 等。

    Stream<String> stream = Stream.of("a", "b", "c");
    Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10); // 从0开始,每次加1,限制长度为10
    

中间操作(过滤)

方法

  • filter(Predicate<? super T> predicate): 保留满足给定条件的元素。
  • map(Function<? super T, ? extends R> mapper): 对流中的每个元素应用函数。
  • flatMap(Function<? super T, ? extends Stream<? extends R>> mapper): 将流中的每个元素映射为一个流,然后将所有流连接成一个流。
  • distinct(): 返回一个元素唯一的流(根据元素的 hashCodeequals 方法)。
  • sorted(): 返回一个排序后的流(自然顺序)。
  • sorted(Comparator<? super T> comparator): 返回一个根据指定比较器排序的流。
  • peek(Consumer<? super T> action): 对流中的每个元素执行给定的操作,然后返回流本身。这主要用于调试目的。
  • limit(long maxSize): 截断流,使其包含不超过给定数量的元素。
  • skip(long n): 跳过流中的前 n 个元素。

案例

map(Function mapper):将流中的每个元素都映射成另一个元素。
stream.map(x -> x * 2)

filter(Predicate predicate):根据指定条件过滤流中的元素。
stream.filter(x -> x > 0)

sorted() / sorted(Comparator comparator):对流中的元素进行排序,默认是自然顺序,也可以指定自定义排序规则。
stream.sorted()
stream.sorted((a, b) -> b.compareTo(a)) // 逆序排序

distinct():去除流中的重复元素。
stream.distinct()

limit(long maxSize):限制流中元素的数量。
stream.limit(10)

skip(long n):跳过流中的前n个元素。
stream.skip(5)

flatMap(Function mapper):将流中的每个元素映射成一个流,然后将所有流连接成一个流。
stream.flatMap(List::stream)

终端操作(返回结果)

方法

  1. forEach(Consumer action):对流中的每个元素执行指定操作。

    stream.forEach(System.out::println)
    
  2. collect(Collector collector):将流中的元素收集到一个集合中。

    List<Integer> list = stream.collect(Collectors.toList())
    
  3. count():返回流中元素的数量。

    long count = stream.count()
    
  4. anyMatch(Predicate predicate) / allMatch(Predicate predicate) / noneMatch(Predicate predicate):检查流中的元素是否满足指定条件。

    boolean anyMatch = stream.anyMatch(x -> x > 10)
    boolean allMatch = stream.allMatch(x -> x > 0)
    boolean noneMatch = stream.noneMatch(x -> x < 0)
    
  5. findFirst() / findAny():返回流中的第一个元素或者任意一个元素。

    Optional<Integer> first = stream.findFirst()
    Optional<Integer> any = stream.findAny()
    
  6. reduce(BinaryOperator accumulator) / reduce(T identity, BinaryOperator accumulator):将流中的元素归约成一个值。

    Optional<Integer> sum = stream.reduce(Integer::sum)
    Integer sum = stream.reduce(0, Integer::sum)
    
  • forEach(Consumer<? super T> action): 对流中的每个元素执行一个操作。
  • forEachOrdered(Consumer<? super T> action): 顺序地(在并行流上)对流中的每个元素执行一个操作。
  • toArray(): 返回一个包含流中所有元素的数组。
  • reduce(T identity, BinaryOperator<T> accumulator): 对流中的元素执行归约操作,如求和、最大值等。
  • reduce(U identity, Function<? super T, ? extends U> mapper, BinaryOperator<U> reducer): 执行映射归约操作,先将流中的元素映射到一个新的类型,然后归约。
  • collect(Collector<? super T, A, R> collector): 使用Collector对流中的元素进行归约操作,如收集到列表中、集合中等。
  • min(Comparator<? super T> comparator): 返回流中的最小元素(根据提供的比较器)。
  • max(Comparator<? super T> comparator): 返回流中的最大元素(根据提供的比较器)。
  • count(): 返回流中的元素数量。
  • anyMatch(Predicate<? super T> predicate): 检查流中是否存在任何满足给定条件的元素。
  • allMatch(Predicate<? super T> predicate): 检查流中的所有元素是否都满足给定条件。
  • noneMatch(Predicate<? super T> predicate): 检查流中是否没有任何元素满足给定条件。
  • findFirst(): 返回流中的第一个元素(如果存在),否则返回一个空的Optional
  • findAny(): 返回流中的任何元素(对于并行流可能返回任意元素),否则返回一个空的Optional

案例

import java.util.Arrays;  
import java.util.List;  
import java.util.stream.Collectors;  
  
public class StreamTerminalOperations {  
    public static void main(String[] args) {  
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);  
  
        // 使用中间操作 filter 和 map,以及终端操作 forEach  
        numbers.stream()  
                .filter(n -> n % 2 != 0) // 过滤出奇数  
                .map(n -> n * n) // 将每个奇数平方  
                .forEach(System.out::println); // 打印每个平方数  
  
        // 使用终端操作 reduce 计算流中所有元素的和  
        int sum = numbers.stream()  
                .mapToInt(Integer::intValue) // 将流转换为int流以便进行数学运算  
                .sum();  
        System.out.println("Sum: " + sum);  
  
        // 使用终端操作 collect 收集结果到列表中  
        List<String> strings = numbers.stream()  
                .map(String::valueOf) // 将每个整数转换为字符串  
                .collect(Collectors.toList());  
        System.out.println(strings);  
    }  
}

应用场景

归集(将流转换为集合)

  • List, Set, ArrayList:使用 collect 方法配合适当的收集器(如 Collectors.toList(), Collectors.toSet())可以将流转换为集合。

聚合(获取流中特殊值)

  • maxBy, minBy:使用 Collectors.maxBy()Collectors.minBy() 可以找到流中的最大值或最小值,需要传入一个比较器。
  • summingInt, summingDouble, summingLong:这些收集器用于计算流中元素的和。
  • averagingInt, averagingDouble, averagingLong:计算平均值。
  • counting:计算流中元素的数量。

分组(根据属性分组)

  • groupingBy:根据某个属性对流中的元素进行分组。可以配合其他收集器进行多级分组。

分区(只有两个分区,true/false)

  • partitioningBy:根据某个条件对流中的元素进行分区,通常分为 truefalse 两个组。

拼接(把元素按一定条件拼接)

  • joining:使用 Collectors.joining() 可以将流中的元素拼接成一个字符串。

数据过滤(筛选数据)

  • filter:根据条件过滤流中的元素。
  • distinct:去除流中的重复元素。
  • limit:限制流中元素的数量。
  • skip:跳过流中的前N个元素。

数据映射(获取新对象)

  • map:对流中的每个元素应用一个函数,产生一个新的流。

数据查找(查找符合条件信息)

  • allMatch:检查流中的所有元素是否都满足某个条件。
  • anyMatch:检查流中是否至少有一个元素满足某个条件。
  • noneMatch:检查流中是否没有元素满足某个条件。
  • findFirst:返回流中第一个满足条件的元素(返回一个 Optional)。

数据收集(过滤+映射)

  • 使用 collect 方法配合各种收集器(如 Collectors.counting(), Collectors.maxBy(), Collectors.groupingBy() 等)可以对流中的元素进行统计、分组等操作。

注意事项

  • 在使用 maxByminBy 时,确保比较器(Comparator)的创建是合适的,因为这两个操作返回的是 Optional<T>,其中 T 是流中元素的类型。
  • 在使用 groupingBy 进行多级分组时,第二个参数(downstream collector)可以是一个 groupingBy 收集器,以实现更复杂的分组结构。
  • 在使用 joining 时,要注意元素中可能包含的特殊字符(如分隔符),它们可能会干扰拼接结果。
  • 在进行过滤和映射操作时,要确保传递给 filtermap 的 lambda 表达式是正确和高效的。
  • 在使用流时,要注意流的中间操作(如 filter, map)是惰性的,只有在终端操作(如 collect, forEach)被调用时才会执行。

Optional 类(处理stream流空值)

介绍

  • Optional 类是 Java 8 引入的一种用来表示可能为 null 的值的容器类型。
  • 它可以避免空指针异常,通过提供一系列方法来处理可能为 null 的情况,例如 orElseorElseGetorElseThrow 等。
  • Optional 类的设计鼓励开发者显式地处理空值情况,使得代码更加健壮和可读。

使用

1. 创建 Optional 对象

  • 使用 Optional.of(T value) 创建一个包含非空值的 Optional
Optional<String> optional = Optional.of("Hello, World!");
  • 使用 Optional.empty() 创建一个不包含值的 Optional
Optional<String> emptyOptional = Optional.empty();
  • 使用 Optional.ofNullable(T value) 创建一个可能包含空值的 Optional
Optional<String> optionalNullable = Optional.ofNullable(null); // 不包含值  
Optional<String> optionalNotNullable = Optional.ofNullable("Hello, World!"); // 包含值

2. 检索 Optional 中的值

  • 使用 get() 方法获取值(如果值不存在,会抛出 NoSuchElementException
String value = optional.get(); // 如果 optional 包含值,则返回该值
  • 使用 isPresent() 方法检查值是否存在
boolean isPresent = optional.isPresent(); // 如果 optional 包含值,则返回 true
  • 使用 ifPresent(Consumer<? super T> consumer) 方法处理值(如果值存在)
optional.ifPresent(System.out::println); // 如果 optional 包含值,则打印该值
  • 使用 orElse(T other) 方法返回值或默认值
String defaultValue = optional.orElse("Default Value"); // 如果 optional 不包含值,则返回 "Default Value"
  • 使用 orElseGet(Supplier<? extends T> other) 方法返回值或调用函数获取的默认值
String lazyDefaultValue = optional.orElseGet(() -> "Lazy Default Value"); // 如果 optional 不包含值,则调用函数获取默认值
  • 使用 orElseThrow(Supplier<? extends X> exceptionSupplier) 方法返回值或抛出异常
try {  
    String value = optional.orElseThrow(() -> new RuntimeException("Value not found"));  
} catch (RuntimeException e) {  
    // 处理异常  
}

3. 转换 Optional 中的值

  • 使用 map(Function<? super T, ? extends U> mapper) 方法转换值

Optional<Integer> optionalLength = optional.map(String::length); // 如果 optional 包含字符串,则返回其长度
  • 使用 flatMap(Function<? super T, Optional<U>> mapper) 方法转换值为另一个 Optional
Optional<Optional<String>> optionalOfOptional = Optional.of(optional);  
Optional<String> flattenedOptional = optionalOfOptional.flatMap(Function.identity()); // 扁平化 Optional

4. 过滤 Optional 中的值

  • 使用 filter(Predicate<? super T> predicate) 方法过滤值
Optional<String> filteredOptional = optional.filter(s -> s.startsWith("Hello")); // 如果 optional 包含以 "Hello" 开头的字符串,则返回该 optional,否则返回空的 optional

处理stream

示例 1:从Stream中找到第一个元素(可能为空)

List<String> list = Arrays.asList("a", "b", "c");  
  
// 使用Stream的findFirst方法,它返回一个Optional<T>  
Optional<String> firstElement = list.stream().findFirst();  
  
// 使用ifPresent检查元素是否存在并处理  
firstElement.ifPresent(System.out::println);  
  
// 或者使用orElse提供一个默认值  
String defaultElement = firstElement.orElse("No element found");  
System.out.println(defaultElement);

示例 2:从Stream中映射并过滤,然后收集到Optional

有时你可能想从一个 Stream 中找到满足某个条件的唯一元素。如果这样的元素存在,你希望得到一个包含它的 Optional;如果不存在,你希望得到一个空的 Optional。这可以通过 StreamfilterfindFirstreduce 方法与 Optional 一起实现。

List<Person> people = ... // 假设这里有一个Person类的列表  
  
// 查找年龄为30岁的第一个人  
Optional<Person> personOptional = people.stream()  
    .filter(p -> p.getAge() == 30)  
    .findFirst();  
  
// 处理Optional  
personOptional.ifPresent(p -> System.out.println(p.getName()));

示例 3:将Stream的结果转换为单个值(可能是null)并使用Optional封装

在某些情况下,你可能有一个 Stream,你知道它只会产生一个元素或没有元素,并且你希望将这个元素封装在 Optional 中。虽然这通常不是 Stream 的典型用法(因为 Stream 是为了处理多个元素而设计的),但你可以通过 reduce 方法来实现这一点。

List<String> uniqueList = Arrays.asList("uniqueValue"); // 假设列表只包含一个元素  
  
// 使用reduce和Optional来封装可能的单个值  
Optional<String> singleValue = uniqueList.stream()  
    .reduce((a, b) -> {  
        throw new IllegalStateException("Stream should contain only one element");  
    })  
    .map(Optional::ofNullable) // 实际上,这个map调用在这里是多余的,因为我们知道reduce不会调用  
    .orElse(Optional.empty()); // 但为了演示目的,我们假设reduce可能不返回任何值  
  
// 但更常见的做法是直接使用findFirst或类似的方法  
Optional<String> singleValueFromFindFirst = uniqueList.stream().findFirst();

Collectors 类(收集stream)

介绍

  • Collectors 类提供了一系列静态工厂方法,用于将流中的元素收集到不同类型的结果容器中,例如 List、Set、Map 等。
  • 它提供了丰富的收集器,可以满足各种不同的需求,例如分组、分区、聚合等。
  • Collectors 类的使用可以使得流式操作更加灵活和高效,使得代码更加简洁和可维护。
  • 它包含了一系列静态方法,用于对流(Stream)进行归约操作(如收集到集合、汇总等)。Collectors 类中的方法可以被用在 Streamcollect() 方法中,将流中的元素收集到各种容器中,或者进行汇总操作。

使用

1. 收集到 List

List<String> list = Arrays.asList("a", "b", "c", "d");  
List<String> collectedList = list.stream().collect(Collectors.toList());

2. 收集到 Set

List<String> list = Arrays.asList("a", "b", "a", "c");  
Set<String> collectedSet = list.stream().collect(Collectors.toSet()); // 自动去重

3. 收集到 Map(基于某个属性分组)

List<Person> people = ... // 假设这里有一个Person类的列表  
Map<String, List<Person>> byDepartment = people.stream()  
    .collect(Collectors.groupingBy(Person::getDepartment)); // 假设Person类有一个getDepartment()方法

4. 收集到 Map(同时包含键和值的转换)

List<Person> people = ... // 假设这里有一个Person类的列表  
Map<String, Integer> nameToAgeMap = people.stream()  
    .collect(Collectors.toMap(Person::getName, Person::getAge)); // 假设Person类有getName()和getAge()方法

5. 汇总操作(求和、平均值等)

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);  
  
// 求和  
int sum = numbers.stream().mapToInt(Integer::intValue).sum();  
  
// 平均值  
double average = numbers.stream().mapToDouble(Integer::doubleValue).average().orElse(0);  
  
// 使用Collectors汇总  
IntSummaryStatistics stats = numbers.stream()  
    .mapToInt(Integer::intValue)  
    .collect(Collectors.summarizingInt(Integer::intValue));  
System.out.println("Sum: " + stats.getSum());  
System.out.println("Average: " + stats.getAverage());

6. 连接字符串(使用分隔符)

List<String> words = Arrays.asList("Hello", "world", "!");  
String sentence = words.stream().collect(Collectors.joining(" ")); // 输出 "Hello world !"

7. 自定义收集器

可以通过实现 Collector 接口或使用 Collectors.collectingAndThen()Collectors.teeing() 等方法来创建自定义收集器。这通常在需要更复杂的收集逻辑时有用。