当java8的第一个早期access版本出现时,最重要的演变似乎是lambdas
。这一点现在正在改变,许多开发人员现在似乎认为流是最有价值的Java8特性。这是因为他们相信通过改变程序中的一个单词(用parallelStream替换stream),他们将使这些程序并行工作。许多Java8的传道者已经展示了这方面的惊人例子。这有什么问题吗?不,不是什么。很多事情:
- 并行运行可能是一个好处,也可能不是。这取决于您使用此功能的目的。
- Java8 parallel streams 并行流可以使程序运行得更快。不管怎样。或者更慢。
- 将流看作是以低成本实现并行处理的一种方法,这将妨碍开发人员理解真正发生的事情。流与并行处理没有直接联系。
- 上述问题大多是基于一个误解:并行处理和并发处理不是一回事。大多数关于Java8“自动并行化”的例子实际上都是并发处理的例子。
- 把map、filter和其他操作看作“内部迭代”完全是无稽之谈(尽管这不是java8的问题,但是我们使用它的方式有问题)。
那么,什么是streams流?
根据维基百科:
“流是列表的潜在无限模拟,由归纳定义给出:
data Stream a = Cons a (Stream a)
使用流生成和计算需要延迟求值,或者隐式地使用延迟求值的语言,或者通过创建和强制使用渴望的语言的thunk。”
最重要的一点是,Java是Wikipedia所称的“eager
”语言,这意味着Java在评估事物时通常是严格的(而不是懒惰的)。例如,如果您在Java中创建了一个列表,那么在创建列表时将计算所有元素。这可能会让您感到惊讶,因为您可能会创建一个空列表并在之后添加元素。这仅仅是因为列表是可变的(并且您正在用对某物的引用替换空引用),或者您正在从附加新元素的旧列表创建一个新列表。
列表是由产生其元素的东西创建的。例如:
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
这里生产者是一个数组,数组的所有元素都经过严格的计算。
也可以以递归的方式创建列表,例如以1开头的列表,其中所有元素都等于1
加上前一个元素,小于6
。在Java<8中,这可以转化为:
List<Integer> list = new ArrayList<Integer>();
for(int i = 0; i < 6; i++) {
list.add(i);
}
有人可能会说for
循环是Java中罕见的惰性计算示例之一,但结果是一个列表,其中所有元素都被计算。
如果我们想将一个函数应用于这个列表的所有元素,会发生什么?我们可以循环进行。例如,如果希望将所有元素增加2
,则可以执行以下操作:
for(int i = 0; i < list.size(); i++) {
list.set(i, list.get(i) * 2);
}
但是,这不允许使用更改元素类型的操作,例如将所有元素增加10%。以下解决方案解决了此问题:
List<Double> list2 = new ArrayList<Double>();
for(int i = 0; i < list.size(); i++) {
list2.add(list.get(i) * 1.2);
}
此表单允许对每种语法使用Java 5中的:
List<Double> list2 = new ArrayList<>();
for(Integer i : list) {
list2.add(i * 1.2);
}
或Java 8语法:
List<Double> list2 = new ArrayList<>();
list.forEach(x -> list2.add(x * 1.2));
到目前为止,还不错。但是如果我们想把这个值增加10%,然后再除以3呢?简单的答案是:
List<Double> list2 = new ArrayList<>();
list.forEach(x -> list2.add(x * 1.2));
List<Double> list3 = new ArrayList<>();
list2.forEach(x -> list3.add(x / 3));
这远不是最佳的,因为我们在列表上迭代了两次。更好的解决方案是:
List<Double> list2 = new ArrayList<>();
for(Integer i : list) {
list2.add(i * 1.2 / 3);
}
暂时把自动装箱/拆箱问题放在一边。在Java 8中,这可以写成:
List<Double> list2 = new ArrayList<>();
list.forEach(x -> list2.add(x * 1.2 / 3));
但是等等。。。这只可能是因为我们看到绑定到列表的使用者的内部结构,所以我们可以手动组合操作。如果我们有:
List<Double> list2 = new ArrayList<>();
list.forEach(consumer1);
List<Double> list3 = new ArrayList<>();
list2.forEach(consumer2);
我们怎么会知道怎么写呢?不可能。在Java8中,使用者接口有一个默认方法,然后。我们很可能会用这种方式来组成消费者:
list.forEach(consumer1.andThen(consumer2));
但这将导致一个错误,因为和被定义为:
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
这意味着我们不能使用and
来组合不同类型的消费者。
事实上,我们从一开始就错了。我们需要的是将列表绑定到函数以获得新列表,例如:
Function<Integer, Double> function1 = x -> x * 1.2;
Function<Double, Double> function2 = x -> x / 3;
list.bind(function1).bind(function2);
其中bind
方法将在一个特殊的FList
类中定义,如:
public class FList<T> {
final List<T> list;
public FList(List<T> list) {
this.list = list;
}
public <U> FList<U> bind(Function<T, U> f) {
List<U> newList = new ArrayList<U>();
for (T t : list) {
newList.add(f.apply(t));
}
return new FList<U>(newList);
}
}
我们将在下面的例子中使用它:
new Flist<>(list).bind(function1).bind(function2);
我们遇到的唯一问题是绑定两次需要在列表上迭代两次。这是因为bind
是严格计算的。我们需要的是一个懒惰的评估,这样我们就可以只迭代一次。
这里的问题是bind
方法不是真正的绑定。它实际上是一种真正的约束和还原的组合。”“reducting
”是对列表中的每个元素应用一个操作,从而将此元素与应用于前一个元素的相同操作的结果结合起来。因为从第一个元素开始时没有前一个元素,所以我们从一个初始值开始。例如,applying(x)->r+x
,其中r是对上一个元素的操作结果,或对第一个元素应用0,得到列表中所有元素的总和。对每个元素applying()->r+1
,从r=0开始给出列表的长度。(这可能不是获得列表长度的更有效的方法,但它完全是功能性的!)
这里,操作是add(element)
,初始值是一个空列表。这仅仅是因为函数应用程序是经过严格评估的。
Java8流提供给我们的是相同的,但是是延迟计算的,这意味着在将函数绑定到流时,不涉及迭代!
将一个Function<T,U>
绑定到一个Stream<T>
会得到一个没有迭代的Stream<U>
。结果流不会被评估,这并不取决于初始流是用评估或未评估的数据构建的。
在函数语言中,将Function<T,U>
绑定到Stream<T>
本身就是一个函数。在Java8中,它是一个方法,这意味着它的参数是严格求值的,但这与结果流的求值无关。为了理解正在发生的事情,我们可以想象要绑定的函数存储在某个地方,它们成为新的(未计算的)结果流的数据生成器的一部分。
在Java8中,将FunctionT->U
绑定到Stream<T>
,从而产生Stream<U>
的方法称为map
。将FunctionT->Stream<U>
绑定到Stream<T>
,从而产生Stream<U>
的函数称为flatMap
。
flatten在哪里?
大多数函数式语言还提供了一个将Stream<Stream<U>
转换为Stream<U>
的flatten
扁平函数,但Java8流中缺少这个函数。这看起来并不是什么大麻烦,因为定义一个方法来实现这一点非常容易。例如,给定以下函数:
Function<Integer, Stream<Integer>> f = x -> Stream.iterate(1, y -> y + 1).limit(x);
Stream<Integer> stream = Stream.iterate(1, x -> x + 1);
Stream<Integer> stream2 = stream.limit(5).flatMap(f);
System.out.println(stream2.collect(toList()))
生产:
[1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5]
使用map
而不是flatMap
:
Stream<Integer> stream = Stream.iterate(1, x -> x + 1);
Stream<Integer> stream2 = stream.limit(5).map(f);
System.out.println(stream2.collect(toList()))
将产生一个流:
[java.util.stream.SliceOps$1@12133b1, java.util.stream.SliceOps$1@ea2f77,
java.util.stream.SliceOps$1@1c7353a, java.util.stream.SliceOps$1@1a9515,
java.util.stream.SliceOps$1@f49f1c]
使用函数范式将这个整数流转换为整数流非常简单:只需将标识函数平面映射到它:
System.out.println(stream2.flatMap(x -> x).collect(toList()));
然而奇怪的是,没有向流中添加展平方法,因为知道map
、flatMap
、unit
和flatten
之间的强关系,其中unit
是从T到Stream<T>
的函数,由以下方法表示:
Stream<T> Stream.of(T... t)
何时评估stream流?
当我们对流应用一些称为终端操作的特定操作时,流被评估。这只能做一次。一旦终端操作应用于流,它就不再可用。终端操作包括:
forEach
forEachOrdered
toArray
reduce
collect
min
max
count
anyMatch
allMatch
noneMatch
findFirst
findAny
iterator
spliterator
有些方法是短路。例如,一旦找到第一个元素,findFirst
就会返回。
非终端操作称为中间操作,可以是有状态的(如果元素的计算依赖于前一个元素的计算)或无状态的。中间操作包括:
filter
map
mapTo... (Int, Long or Double)
flatMap
flatMapTo... (Int, Long or Double)
distinct
sorted
peek
limit
skip
sequential
parallel
unordered
onClose
可以对一个流应用多个中间操作,但是只能使用一个终端操作。
那么并行处理呢?
自动并行通常不会给出预期的结果,至少有两个原因:
1. 速度的提高在很大程度上取决于任务类型和并行化策略。总的来说,最好的策略取决于任务的类型。
2. 速度的提高在很大程度上取决于环境。在某些环境中,通过并行化很容易降低速度。
无论什么样的任务要并行化,并行流应用的策略都是相同的,除非您自己设计这个策略,这将消除并行流的许多兴趣。并行化需要:
- 执行子任务的线程池,
- 将初始任务划分为子任务,
- 将子任务分配给线程,
- 整理结果。
不输入细节,所有这些都意味着一些开销。它将显示惊人的结果时:
- 有些任务意味着长时间阻塞,例如访问远程服务,或者
- 没有多少线程同时运行,特别是没有其他并行流。
如果所有子任务都意味着高强度的计算,那么潜在的增益将受到可用处理器数量的限制。默认情况下,Java8将使用与计算机上的处理器相同数量的线程,因此,对于密集型任务,结果高度依赖于其他线程可能同时执行的操作。当然,如果每个子任务基本上都在等待,那么收益可能看起来是巨大的。
最坏的情况是应用程序与其他应用程序一起在服务器或容器中运行,并且子任务并不意味着等待。在这种情况下(例如在J2EE服务器上运行),并行流通常比串行流慢。想象一下,一台服务器每秒处理数百个请求。同时评估多个流的可能性很大,因此这项工作已经并行化了。在业务级别上新的并行化层很可能会使事情变得更慢。
最糟糕的:业务应用程序很有可能看到开发环境的速度提高和生产的减少。这是最糟糕的情况。
Streams流有什么好处
Stream流是一个有用的工具,因为它们允许延迟计算。这在几个方面非常重要:
- 它们允许使用绑定的函数式编程风格。
- 它们允许通过删除迭代来获得更好的性能。迭代随着求值而发生。使用流,我们可以绑定几十个函数而无需迭代。
- 它们允许简单的任务并行化,包括长时间的等待。
- Stream流可能是无限的(因为它们是懒惰的)。函数可以毫无问题地绑定到无限流。经过评估,一定有办法使它们有限。这通常是通过短路操作完成的。
什么Stream流不好
在处理密集型计算任务时,应高度谨慎地使用流。特别是,默认情况下,所有流将使用相同的ForkJoinPool
,配置为使用与运行程序的计算机中的内核数量相同的线程。
如果对一个并行流的求值导致一个运行时间很长的任务,那么可以将其拆分为尽可能多的长时间运行的子任务,这些子任务将分发到池中的每个线程。从那里,没有其他并行流可以被处理,因为所有线程都将被占用。因此,对于计算密集型流评估,应该始终使用特定的ForkJoinPool
,以免阻塞其他流。
为此,可以从流中创建一个Callable
并将其提交到池中:
List<SomeClass> list = // A list of objects
Stream<SomeClass> stream = list.parallelStream().map(this::veryLongProcessing);
Callable<List<Integer>> task = () -> stream.collect(toList());
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
List<SomeClass> newList = forkJoinPool.submit(task).get()
这样,其他并行流(使用它们自己的ForkJoinPool
)就不会被这个流阻塞。换句话说,我们需要一个ForkJoinPool
池来避免这个问题。
如果程序要在容器中运行,那么在使用并行流时必须非常小心。在这种情况下不要使用默认池,除非您确定容器可以处理它。在javaee容器中,不要使用并行流。