X

java8实战(函数式编程)笔记

函数式接口就是只定义一个抽象方法的接口

函数式接口的抽象方法的签名基本上就是Lambda表达式的签名。我们将这种抽象方法叫作函数描述符。

@FunctionalInterface这个标注用于表示该接口会设计成一个函数式接口。如果你用 @FunctionalInterface定义了一个接口,而它却不是函数式接口的话,编译器将返回一个提示原因的错误。

三大函数式接口

Predicate ->boolean test(T t);

Consumer->void accept(T t);

Function->R apply(T t);

针对专门的输入参数类型的函数式接口的名称都要加上对应的原始类型前缀,比如DoublePredicate、 IntConsumer、 LongBinaryOperator、 IntFunction等。 Function接口还有针对输出参数类型的变种: ToIntFunction<T>、 IntToDoubleFunction等。

任何函数式接口都不允许抛出受检异常( checked exception)。如果你需要Lambda表达式来抛出异常, 有两种办法: 定义一个自己的函数式接口,并声明受检异常,或者把Lambda包在一个try/catch块中。

Lambda的类型是从使用Lambda的上下文推断出来的。上下文(比如,接受它传递的方法的参数,或接受它的值的局部变量)中Lambda表达式需要的类型称为目标类型。

java编译器会从上下文(目标类型)推断出用什么函数式接口来配合Lambda表达式,这意味着它也可以推断出适合Lambda的签名,因为函数描述符可以通过目标类型来得到。这样做的好处在于,编译器可以了解Lambda表达式的参数类型,这样就可以在Lambda语法中省去标注参数类型。

Lambda可以没有限制地捕获(也就是在其主体中引用)实例变量和静态变量。但局部变量必须显式声明为final,或事实上是final。换句话说, Lambda表达式只能捕获指派给它们的局部变量一次。

为什么局部变量有这些限制。第一,实例变量和局部变量背后的实现有一个关键不同。实例变量都存储在堆中,而局部变量则保存在栈上。如果Lambda可以直接访问局部变量,而且Lambda是在一个线程中使用的,则使用Lambda的线程,可能会在分配该变量的线程将这个变量收回之后,去访问该变量。因此, Java在访问自由局部变量时,实际上是在访问它的副本,而不是访问原始变量。如果局部变量仅仅赋值一次那就没有什么区别了——因此就有了这个限制。

第二,这一限制不鼓励使用改变外部变量的典型命令式编程模式

如何构建方法引用

方法引用主要有三类。

(1) 指向静态方法的方法引用(例如Integer的parseInt方法, 写作Integer::parseInt)。

(2) 指 向 任 意 类 型 实 例 方 法 的 方 法 引 用 ( 例 如 String 的 length 方 法 , 写 作

String::length)。

(3) 指向现有对象的实例方法的方法引用(假设你有一个局部变量expensiveTransaction

用于存放Transaction类型的对象,它支持实例方法getValue,那么你就可以写expensiveTransaction::getValue)。

逆序:

inventory.sort(comparing(Apple::getWeight).reversed())

复合函数

andThen()

compose()

这两个方法的执行顺序刚好是相反的,例如f.compose(g)会先执行g函数,再将g函数的结果作为f函数的入参执行,而f.andThen(g)会先执行f函数,将f函数的结果作为g函数的入参,在执行g函数。

流简介

filter——接受Lambda,从流中排除某些元素。

map——接受一个Lambda,将元素转换成其他形式或提取信息。

limit——截断流,使其元素不超过给定数量。

collect——将流转换为其他形式。

流只能遍历一次。遍历完之后,这个流已经被消费掉

流是在概念上固定的数据结构(你不能添加或删除元素),其元素则是按需计算的。流就

像是一个延迟创建的集合:只有在消费者要求的时候才会计算值。

流支持一个叫作distinct的方法,它会返回一个元素各异的流;流支持limit(n)方法,该方法会返回一个不超过给定长度的流;流还支持skip(n)方法,返回一个扔掉了前n个元素的流。如果流中元素不足n个,则返回一个空流;流支持map方法,它会接受一个函数作为参数。这个函数会被应用到每个元素上,并将其映射成一个新的元素。

使用flatMap方法的效果是,各个数组并不是分别映射成一个流,而是映射成流的内容。所有使用map(Arrays::stream)时生成的单个流都被合并起来,即扁平化为一个流。flatmap方法让你把一个流中的每个值都换成另一个流,然后把所有的流连接起来成为一个流。

给定两个数字列表,如何返回所有的数对呢?例如,给定列表[1, 2, 3]和列表[3, 4],应

该返回[(1, 3), (1, 4), (2, 3), (2, 4), (3, 3), (3, 4)]。

List<Integer> numbers1 = Arrays.asList(1, 2, 3);
List<Integer> numbers2 = Arrays.asList(3, 4);
List<int[]> pairs = numbers1.stream().
flatMap(i -> numbers2.stream().map(j -> new int[]{i, j}))
.collect(toList());

anyMatch方法可以回答“流中是否有一个元素能匹配给定的谓词”。

allMatch方法的工作原理和anyMatch类似,但它会看看流中的元素是否都能匹配给定的谓词

和allMatch相对的是noneMatch。

reduce接受两个参数:一个初始值,一个BinaryOperator<T>来将两个元素结合起来产生一个新值,这里我们用的是lambda (a, b) -> a + b。

reduce还有一个重载的变体,它不接受初始值,但是会返回一个Optional对象:

Optional<Integer> sum = numbers.stream().reduce((a, b) -> (a + b));

如果流中没有任何元素的情况。 reduce操作无法返回其和,因为它没有初始值。这就是为什么结果被包裹在一个Optional对象里,以表明结果可能不存在。

Optional里面几种可以迫使你显式地检查值是否存在或处理值不存在的情形的方法也不错。

isPresent()将在Optional包含值的时候返回true, 否则返回false。

ifPresent(Consumer<T> block)会在值存在的时候执行给定的代码块。

T get()会在值存在时返回值,否则抛出一个NoSuchElement异常。

T orElse(T other)会在值存在时返回值,否则返回一个默认值。

将流转换为特化版本的常用方法是mapToInt、 mapToDouble和mapToLong。这些方法和前面说的map方法的工作方式一样,只是它们返回的是一个特化流,而不是Stream<T>。

Stream<Integer> stream = intStream.boxed();将数值流转换为stream

区分没有元素的流和最大值真的是0的流使用OptionalInt、 OptionalDouble和OptionalLong。

IntStream.rangeClosed(1, 100)包含100

IntStream.range(1, 100)不包含100

值、数组、文件都可以生成流

斐波纳契数列是著名的经典编程练习。下面这个数列就是斐波纳契数列的一部分: 0, 1, 1,

2, 3, 5, 8, 13, 21, 34, 55…数列中开始的两个数字是0和1,后续的每个数字都是前两个数字之和。

斐波纳契元组序列与此类似,是数列中数字和其后续数字组成的元组构成的序列: (0, 1),

(1, 1), (1, 2), (2, 3), (3, 5), (5, 8), (8, 13), (13, 21) …

用iterate方法生成斐波纳契元组序列中的前20个元素。

Stream.iterate(new int[]{0, 1},
t -> new int[]{t[1], t[0]+t[1]}).limit(20)

收集器

对流调用collect方法将对流中的元素触发一个归约操作

归约和汇总

在需要将流项目重组成集合时,一般会使用收集器,但凡要把流中所有的项目合并成一个结果时就可以用。这个结果可以是任何类型。

long howManyDishes = menu.stream().collect(Collectors.counting());

这还可以写得更为直接:

long howManyDishes = menu.stream().count();

获取流中最大值:

Optional<Dish> mostCalorieDish =menu.stream().collect(maxBy(dishCaloriesComparator));

汇总:

int totalCalories = menu.stream().collect(summingInt(Dish::getCalories));

Collectors.summingLong和Collectors.summingDouble方法的作用完全一样,可以用于求和字段为long或double的情况。汇总不仅仅是求和;还有Collectors.averagingInt,连同对应的averagingLong和averagingDouble可以计算数值的平均数。

IntSummaryStatistics工厂方法返回的收集器。例如,通过一次summarizing操作你可以就数出菜单中元素的个数,并得到总和、平均值、最大值和最小值。

oining工厂方法返回的收集器会把对流中每一个对象应用toString方法得到的所有字符串连接成一个字符串。

String shortMenu = menu.stream().map(Dish::getName).collect(joining(“,”));

分组

Map<Dish.Type, List<Dish>> dishesByType =menu.stream().collect(groupingBy(Dish::getType));

二级分组

menu.stream().collect(groupingBy(Dish::getType,

groupingBy(dish -> {

if (dish.getCalories() <= 400) return CaloricLevel.DIET;

else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;

else return CaloricLevel.FAT;

} )));

第二个参数不一定是groupBy()

menu.stream().collect(groupingBy(Dish::getType, counting()));

其结果是下面的Map:

{MEAT=3, FISH=2, OTHER=4}

分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函数。

这意味着得到的分组Map的键类型是Boolean,于是它最多可以分为两组true一组, false一组

menu.stream().collect(partitioningBy(Dish::isVegetarian));

Collector接口:

public interface Collector<T, A, R> { a mutable result container. * * @return a function which folds a value into a mutable result container */ BiConsumer<A, T> accumulator(); /** * A function that accepts two partial results and merges them. The * combiner function may fold state from one argument into the other and * return that, or may return a new result container. * * @return a function which combines two partial results into a combined * result */ BinaryOperator<A> combiner(); /** * Perform the final transformation from the intermediate accumulation type * {@code A} to the final result type {@code R}. * * <p>If the characteristic {@code IDENTITY_TRANSFORM} is * set, this function may be presumed to be an identity transform with an * unchecked cast from {@code A} to {@code R}. * * @return a function which transforms the intermediate result to the final * result */ Function<A, R> finisher(); /** * Returns a {@code Set} of {@code Collector.Characteristics} indicating * the characteristics of this Collector. This set should be immutable. * * @return an immutable set of collector characteristics */ Set<Characteristics> characteristics();

1. 建立新的结果容器:supplier方法必须返回一个结果为空的Supplier,也就是一个无参数函数,在调用时它会创建一个空的累加器实例,供数据收集过程使用。例如

public Supplier<List<T>> supplier() {

return () -> new ArrayList<T>();

}

2. 将元素添加到结果容器: accumulator方法会返回执行归约操作的函数。当遍历到流中第n个元素时,这个函数执行时会有两个参数:保存归约结果的累加器, 还有第n个元素本身。

该函数将返回void,因为累加器是原位更新,即函数的执行改变了它的内部状态以体现遍历的元素的效果。例如

public BiConsumer<List<T>, T> accumulator() {

return (list, item) -> list.add(item);

}

3. 对结果容器应用最终转换: finisher方法在遍历完流后, 必须返回在累积过程的最后要调用的一个函数,以便将累加器对象转换为整个集合操作的最终结果。

public Function<List<T>, List<T>> finisher() {

return Function.identity();

}

4. 合并两个结果容器: combiner方法会返回一个供归约操作使用的函数,它定义了对流的各个子部分进行并行处理时,各个子部分归约所得的累加器要如何合并。对于toList而言,这个方法的实现非常简单,只要把从流的第二个部分收集到的项目列表加到遍历第一部分时得到的列表后面就行了:

public BinaryOperator<List<T>> combiner() {

return (list1, list2) -> {

list1.addAll(list2);

return list1; }

}

5. characteristics方法会返回一个不可变的Characteristics集合,它定义了收集器的行为,尤其是关于流是否可以并行归约,以及可以使用哪些优化的提示。

Characteristics是一个包含三个项目的枚举。

 UNORDERED——归约结果不受流中项目的遍历和累积顺序的影响。

 CONCURRENT——accumulator函数可以从多个线程同时调用,且该收集器可以并行归约流。如果收集器没有标为UNORDERED,那它仅在用于无序数据源时才可以并行归约。

 IDENTITY_FINISH——这表明完成器方法返回的函数是一个恒等函数,可以跳过。这种情况下,累加器对象将会直接用作归约过程的最终结果。这也意味着,将累加器A不加检查地转换为结果R是安全的

并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样一来,你就可以自动把给定操作的工作负荷分配给多核处理器的所有内核。

Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);

并行流内部使用了默认的ForkJoinPool,它默认的线 程 数 量 就 是 你 的 处 理 器 数 量 , 这 个 值 是 由 Runtime.getRuntime().availableProcessors()得到的。但 是 你 可 以 通 过 系 统 属 性 java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小,如下所示:

System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”,”12″);

这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则建议不要修改它。

并行化并不是没有代价的。并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。

要避免共享可变状态,确保并行Stream得到正确的结果。

留意装箱。自动装箱和拆箱操作会大大降低性能。 Java 8中有原始类型流IntStream、LongStream、 DoubleStream来避免这种操作,但凡有可能都应该用这些流。

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。

对一个任务调用join方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。

 不应该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。

 对子任务调用fork方法可以把它排进ForkJoinPool。同时对左边和右边的子任务调用它似乎很自然,但这样做的效率要比直接对其中一个调用compute低。这样做你可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。

 和并行流一样,你不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计算快。一个任务可以分解成多个独立的子任务,才能让性能在并行化时有所提升。所有这些子任务的运行时间都应该比分出新任务所花的时间长;一个惯用方法是把输入/输出放在一个子任务里,计算放在另一个里,这样计算就可以和输入/输出同时进行。此外,在比较同一算法的顺序和并行版本的性能时还有别的因素要考虑。就像任何其他Java代码一样,分支/合并框架需要“预热”或者说要执行几遍才会被JIT编译器优化。这就是为什么在测量性能之前跑几遍程序很重要。

分支/合并框架工程用一种称为工作窃取( work stealing)的技术来解决任务分配不均的问题。在实际应用中,这意味着这些任务差不多被平均分配到ForkJoinPool中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队尾“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。

在Lambda中,this关键字代表的是包含类

基础类型的Optional对象,以及为什么应该避免使用它们:Optional 也 提 供 了类 似的 基 础类

型——OptionalInt、 OptionalLong以及OptionalDouble。因为基础类型的Optional不支持map、flatMap以及filter方法,而这些却是Optional类最有用的方法。

如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。

❑ 反之,如果并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,依据等待/计算,设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。