当java8的第一个早期access版本出现时,最重要的演变似乎是lambdas。这一点现在正在改变,许多开发人员现在似乎认为流是最有价值的Java8特性。这是因为他们相信通过改变程序中的一个单词(用parallelStream替换stream),他们将使这些程序并行工作。许多Java8的传道者已经展示了这方面的惊人例子。这有什么问题吗?不,不是什么。很多事情:

  • 并行运行可能是一个好处,也可能不是。这取决于您使用此功能的目的。
  • Java8 parallel streams 并行流可以使程序运行得更快。不管怎样。或者更慢。
  • 将流看作是以低成本实现并行处理的一种方法,这将妨碍开发人员理解真正发生的事情。流与并行处理没有直接联系。
  • 上述问题大多是基于一个误解:并行处理和并发处理不是一回事。大多数关于Java8“自动并行化”的例子实际上都是并发处理的例子。
  • 把map、filter和其他操作看作“内部迭代”完全是无稽之谈(尽管这不是java8的问题,但是我们使用它的方式有问题)。

那么,什么是streams流?

根据维基百科:

“流是列表的潜在无限模拟,由归纳定义给出:

data Stream a = Cons a (Stream a)

使用流生成和计算需要延迟求值,或者隐式地使用延迟求值的语言,或者通过创建和强制使用渴望的语言的thunk。”

最重要的一点是,Java是Wikipedia所称的“eager”语言,这意味着Java在评估事物时通常是严格的(而不是懒惰的)。例如,如果您在Java中创建了一个列表,那么在创建列表时将计算所有元素。这可能会让您感到惊讶,因为您可能会创建一个空列表并在之后添加元素。这仅仅是因为列表是可变的(并且您正在用对某物的引用替换空引用),或者您正在从附加新元素的旧列表创建一个新列表。

列表是由产生其元素的东西创建的。例如:

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

这里生产者是一个数组,数组的所有元素都经过严格的计算。

也可以以递归的方式创建列表,例如以1开头的列表,其中所有元素都等于1加上前一个元素,小于6。在Java<8中,这可以转化为:

List<Integer> list = new ArrayList<Integer>();
for(int i = 0; i < 6; i++) {
  list.add(i);
}

有人可能会说for循环是Java中罕见的惰性计算示例之一,但结果是一个列表,其中所有元素都被计算。

如果我们想将一个函数应用于这个列表的所有元素,会发生什么?我们可以循环进行。例如,如果希望将所有元素增加2,则可以执行以下操作:

for(int i = 0; i < list.size(); i++) {
  list.set(i, list.get(i) * 2);
}

但是,这不允许使用更改元素类型的操作,例如将所有元素增加10%。以下解决方案解决了此问题:

List<Double> list2 = new ArrayList<Double>();
for(int i = 0; i < list.size(); i++) {
  list2.add(list.get(i) * 1.2);
}

此表单允许对每种语法使用Java 5中的:

List<Double> list2 = new ArrayList<>();
for(Integer i : list) {
  list2.add(i * 1.2);
}

或Java 8语法:

List<Double> list2 = new ArrayList<>();
list.forEach(x -> list2.add(x * 1.2));

到目前为止,还不错。但是如果我们想把这个值增加10%,然后再除以3呢?简单的答案是:

List<Double> list2 = new ArrayList<>();
list.forEach(x -> list2.add(x * 1.2));
List<Double> list3 = new ArrayList<>();
list2.forEach(x -> list3.add(x / 3));

这远不是最佳的,因为我们在列表上迭代了两次。更好的解决方案是:

List<Double> list2 = new ArrayList<>();
for(Integer i : list) {
  list2.add(i * 1.2 / 3);
}

暂时把自动装箱/拆箱问题放在一边。在Java 8中,这可以写成:

List<Double> list2 = new ArrayList<>();
list.forEach(x -> list2.add(x * 1.2 / 3));

但是等等。。。这只可能是因为我们看到绑定到列表的使用者的内部结构,所以我们可以手动组合操作。如果我们有:

List<Double> list2 = new ArrayList<>();
list.forEach(consumer1);
List<Double> list3 = new ArrayList<>();
list2.forEach(consumer2);

我们怎么会知道怎么写呢?不可能。在Java8中,使用者接口有一个默认方法,然后。我们很可能会用这种方式来组成消费者:

list.forEach(consumer1.andThen(consumer2));

但这将导致一个错误,因为和被定义为:

default Consumer<T> andThen(Consumer<? super T> after) {
  Objects.requireNonNull(after);
  return (T t) -> { accept(t); after.accept(t); };
}

这意味着我们不能使用and来组合不同类型的消费者。

事实上,我们从一开始就错了。我们需要的是将列表绑定到函数以获得新列表,例如:

Function<Integer, Double> function1 = x -> x * 1.2;
Function<Double, Double> function2 = x -> x / 3;
list.bind(function1).bind(function2);

其中bind方法将在一个特殊的FList类中定义,如:

public class FList<T> {
  final List<T> list;

  public FList(List<T> list) {
    this.list = list;
  }

  public <U> FList<U> bind(Function<T, U> f) {
    List<U> newList = new ArrayList<U>();
    for (T t : list) {
      newList.add(f.apply(t));
    }
    return new FList<U>(newList);
  }
}

我们将在下面的例子中使用它:

new Flist<>(list).bind(function1).bind(function2);

我们遇到的唯一问题是绑定两次需要在列表上迭代两次。这是因为bind是严格计算的。我们需要的是一个懒惰的评估,这样我们就可以只迭代一次。

这里的问题是bind方法不是真正的绑定。它实际上是一种真正的约束和还原的组合。”“reducting”是对列表中的每个元素应用一个操作,从而将此元素与应用于前一个元素的相同操作的结果结合起来。因为从第一个元素开始时没有前一个元素,所以我们从一个初始值开始。例如,applying(x)->r+x,其中r是对上一个元素的操作结果,或对第一个元素应用0,得到列表中所有元素的总和。对每个元素applying()->r+1,从r=0开始给出列表的长度。(这可能不是获得列表长度的更有效的方法,但它完全是功能性的!)

这里,操作是add(element),初始值是一个空列表。这仅仅是因为函数应用程序是经过严格评估的。

Java8流提供给我们的是相同的,但是是延迟计算的,这意味着在将函数绑定到流时,不涉及迭代!

将一个Function<T,U>绑定到一个Stream<T>会得到一个没有迭代的Stream<U>。结果流不会被评估,这并不取决于初始流是用评估或未评估的数据构建的。

在函数语言中,将Function<T,U>绑定到Stream<T>本身就是一个函数。在Java8中,它是一个方法,这意味着它的参数是严格求值的,但这与结果流的求值无关。为了理解正在发生的事情,我们可以想象要绑定的函数存储在某个地方,它们成为新的(未计算的)结果流的数据生成器的一部分。

在Java8中,将FunctionT->U绑定到Stream<T>,从而产生Stream<U>的方法称为map。将FunctionT->Stream<U>绑定到Stream<T>,从而产生Stream<U>的函数称为flatMap

flatten在哪里?

大多数函数式语言还提供了一个将Stream<Stream<U>转换为Stream<U>flatten扁平函数,但Java8流中缺少这个函数。这看起来并不是什么大麻烦,因为定义一个方法来实现这一点非常容易。例如,给定以下函数:

Function<Integer, Stream<Integer>> f = x -> Stream.iterate(1, y -> y + 1).limit(x);

Stream<Integer> stream = Stream.iterate(1, x -> x + 1);
Stream<Integer> stream2 = stream.limit(5).flatMap(f);

System.out.println(stream2.collect(toList()))

生产:

[1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5]

使用map而不是flatMap

Stream<Integer> stream = Stream.iterate(1, x -> x + 1);
Stream<Integer> stream2 = stream.limit(5).map(f);

System.out.println(stream2.collect(toList()))

将产生一个流:

[java.util.stream.SliceOps$1@12133b1, java.util.stream.SliceOps$1@ea2f77,
java.util.stream.SliceOps$1@1c7353a, java.util.stream.SliceOps$1@1a9515, 
java.util.stream.SliceOps$1@f49f1c]

使用函数范式将这个整数流转换为整数流非常简单:只需将标识函数平面映射到它:

System.out.println(stream2.flatMap(x -> x).collect(toList()));

然而奇怪的是,没有向流中添加展平方法,因为知道mapflatMapunitflatten之间的强关系,其中unit是从T到Stream<T>的函数,由以下方法表示:

Stream<T> Stream.of(T... t)

何时评估stream流?

当我们对流应用一些称为终端操作的特定操作时,流被评估。这只能做一次。一旦终端操作应用于流,它就不再可用。终端操作包括:

forEach

forEachOrdered

toArray

reduce

collect

min

max

count

anyMatch

allMatch

noneMatch

findFirst

findAny

iterator

spliterator

有些方法是短路。例如,一旦找到第一个元素,findFirst就会返回。

非终端操作称为中间操作,可以是有状态的(如果元素的计算依赖于前一个元素的计算)或无状态的。中间操作包括:

filter

map

mapTo... (Int, Long or Double)

flatMap

flatMapTo... (Int, Long or Double)

distinct

sorted

peek

limit

skip

sequential

parallel

unordered

onClose

可以对一个流应用多个中间操作,但是只能使用一个终端操作。

那么并行处理呢?

自动并行通常不会给出预期的结果,至少有两个原因:

1. 速度的提高在很大程度上取决于任务类型和并行化策略。总的来说,最好的策略取决于任务的类型。

2. 速度的提高在很大程度上取决于环境。在某些环境中,通过并行化很容易降低速度。

无论什么样的任务要并行化,并行流应用的策略都是相同的,除非您自己设计这个策略,这将消除并行流的许多兴趣。并行化需要:

  • 执行子任务的线程池,
  • 将初始任务划分为子任务,
  • 将子任务分配给线程,
  • 整理结果。

不输入细节,所有这些都意味着一些开销。它将显示惊人的结果时:

  • 有些任务意味着长时间阻塞,例如访问远程服务,或者
  • 没有多少线程同时运行,特别是没有其他并行流。

如果所有子任务都意味着高强度的计算,那么潜在的增益将受到可用处理器数量的限制。默认情况下,Java8将使用与计算机上的处理器相同数量的线程,因此,对于密集型任务,结果高度依赖于其他线程可能同时执行的操作。当然,如果每个子任务基本上都在等待,那么收益可能看起来是巨大的。

最坏的情况是应用程序与其他应用程序一起在服务器或容器中运行,并且子任务并不意味着等待。在这种情况下(例如在J2EE服务器上运行),并行流通常比串行流慢。想象一下,一台服务器每秒处理数百个请求。同时评估多个流的可能性很大,因此这项工作已经并行化了。在业务级别上新的并行化层很可能会使事情变得更慢。

最糟糕的:业务应用程序很有可能看到开发环境的速度提高和生产的减少。这是最糟糕的情况。

Streams流有什么好处

Stream流是一个有用的工具,因为它们允许延迟计算。这在几个方面非常重要:

  • 它们允许使用绑定的函数式编程风格。
  • 它们允许通过删除迭代来获得更好的性能。迭代随着求值而发生。使用流,我们可以绑定几十个函数而无需迭代。
  • 它们允许简单的任务并行化,包括长时间的等待。
  • Stream流可能是无限的(因为它们是懒惰的)。函数可以毫无问题地绑定到无限流。经过评估,一定有办法使它们有限。这通常是通过短路操作完成的。

什么Stream流不好

在处理密集型计算任务时,应高度谨慎地使用流。特别是,默认情况下,所有流将使用相同的ForkJoinPool,配置为使用与运行程序的计算机中的内核数量相同的线程。

如果对一个并行流的求值导致一个运行时间很长的任务,那么可以将其拆分为尽可能多的长时间运行的子任务,这些子任务将分发到池中的每个线程。从那里,没有其他并行流可以被处理,因为所有线程都将被占用。因此,对于计算密集型流评估,应该始终使用特定的ForkJoinPool,以免阻塞其他流。

为此,可以从流中创建一个Callable并将其提交到池中:

List<SomeClass> list = // A list of objects
Stream<SomeClass> stream = list.parallelStream().map(this::veryLongProcessing);
Callable<List<Integer>> task = () -> stream.collect(toList());
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
List<SomeClass> newList = forkJoinPool.submit(task).get()

这样,其他并行流(使用它们自己的ForkJoinPool)就不会被这个流阻塞。换句话说,我们需要一个ForkJoinPool池来避免这个问题。

如果程序要在容器中运行,那么在使用并行流时必须非常小心。在这种情况下不要使用默认池,除非您确定容器可以处理它。在javaee容器中,不要使用并行流。