跳过正文
  1. 博客/
  2. 后端/
  3. 框架/

Stream源码(2):从问题出发看源码

·4 分钟· ·
后端 框架 Java Stream
目录

之前看一些开源项目源码的时候,发现一个问题,假如你贪全,一口气把整个代码看完,由于现在程序架构
比较复杂,很多功能被分成很多个组件来完成,有的时候你会被程序跳来跳去给弄晕,假如你但看一个小功能,你又
不知道为啥要用这个

所以这次准备尝试从问题入手,首先给自己提一些
问题,然后在从源代码中寻找答案,
在寻找答案的过程中会遇到更多问题,就这样打破砂锅问到底,最终没有问题了,这个时候你就差不多看懂了

0x00 问题
#

  1. distinct 操作过程中是否会将新加入的元素和历史元素一一比较?

为啥会有这个问题呢,因为在看源码

    Returns a stream consisting of the distinct elements (according to Object.equals(Object)) of this stream.
  
    For ordered streams, the selection of distinct elements is stable (for duplicated elements, the element appearing first in the encounter order is preserved.) For unordered streams, no stability guarantees are made.
  

这句话意思是会依靠`Object.equals(Object)` 来去重,我们知道`distinct`和`filter`都是中间操作
难道distinct会将每个元素和历史元素做一个Object.equals调用吗

假如这样做的话,那么这个操作就是O(n^2)的时间复杂度了,显然不太靠谱,我们查看distinct源码发现

最终distinct生成了一个StatefulOp ,而且这个类存在一个reduce函数,其中声明了一个

    TerminalOp<T, LinkedHashSet<T>> reduceOp
  
                    = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
  
                                                             LinkedHashSet::addAll);
  

我们可以看到这个变量是一个终止操作,其中使用LinkedHashSet来进行聚合,所以看到这我们就猜测
Stream没有这么傻,它声明了一个LinkedHashSet来存贮历史元素,这样只需要将加进来的元素进行
哈希计算,然后跟哈希碰撞的调用一下Object.equals(Object)函数就好了

我们看到这篇博文的实验也证明我们的猜测了`

接下来我们又有一个疑惑Stream内部是如何实现的呢?

0x01 猜测
#

我们首先从最小的代码来看,我们首先来看一个无状态的stream函数

    Stream.of(1L, 2L, 3L, 4L).forEach(System.out::println)
  

我们使用for循环用来实现也非常简单

           for (long l : new long[]{1L, 2L, 3L, 4L}) {
  
            System.out.println(l);
  
          }
  

接下来我们来思考,如何实现一个有状态的Stream

     Stream.of(1L, 2L, 3L, 4L).reduce(0L, Long::sum);
  

我们如何用for循环来实现呢,很简单,定义一个变量

    long begin = 0;
  
    for (long l : new long[]{1L, 2L, 3L, 4L}) {
  
        begin += l;
  
    }
  

我们能很容易写出一层for循环,但是Stream强大的地方在于,他可以穿插很多函数处理
比如:

    Stream.of(1L, 2L, 3L, 4L, 4L, 5L, 5L).distinct().filter(x -> x > 2).reduce(0, Long::sum);
  

我们简单的穿插了distinctfilter操作,我们接下来尝试使用for循环来实现上面的Stream

首先我们知道distinct 需要一个Set来过滤已经存在的,其中reduce需要一个初始量,那就好做了

    long start = 0L;
  
    for (long l : new long[]{1L, 2L, 3L, 4L}) {
  
        if(!set.contains(l)) {
  
            set.add(l);
  
            if(l > 2) {
  
                start += l;
  
            }
  
        }
  
    }
  

00x02 源码探究
#

接下来我们看看Stream内部如何实现这个for循环的,我们可以看到,其实.distinct().filter(x -> x > 2).reduce(0, Long::sum)对于每一层我们都需要能
创建一个Sink,对于这个for循环来说,都是把每个数据,我们把数据从一个sink到其他的sink

所有的Sink都实现了Consumer 接口,其中最重要的接口就是

    void accept(T t);
  

这个消费接口,我们可以理解“吃”数据,它会把我们传给它的值都“消化”掉

当我们在创建.distinct().filter(...)...这些stream的时候,我们做了什么呢,
我们每进行一次中间操作,我们都新建了一个流,其中我们通过upstream 这个变量指向
之前的流

当我们碰到终止操作比如reduce的时候,我们会进行一个回溯,把所有upstream都进行回溯,反过来把一个sink组装起来(每个sink指向它的上游)

// java.util.stream.AbstractPipeline.class
  
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
  
    Objects.requireNonNull(sink);
  
    // 回溯之前的stream流,创建sink,并让当前的sink指向上流
  
    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
  
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
  
    }
  
    return (Sink<P_IN>) sink;
  
}
  

最终我们得到了最上流的sink,以上面为例就是distinct那个sink

接下来我们执行for循环,其中最重要的就是

    // java.util.stream.AbstractPipeline.copyInto 函数
  
    // 执行for循环 其中 传入的sink就是 我们上面得到的像葫芦串一样的sink
  
    spliterator.forEachRemaining(wrappedSink)
  

我们只需要给wrappedSink传入for循环的值就好了,由于每个sink都有其上游的引用,比如说distinct的sink,
他会判断是否已经存贮在sink中,如果没有就往上游传,由于上游也是个sink,所以最终如果不传了或者到最上游了就继续下一for循环的值

0x03
#

总结,我们这边非常浅显的把源代码给介绍了一下,其实要想吃透最好使用debug功能,一行一行代码进行debug,这样就能印象更深刻

## 引用

相关文章

Stream源码(1):如何实现去重
·3 分钟
后端 框架 Java Stream
本篇博客是在看代码的时候看到使用Java8使用Stream去重的妙用,从而对Java如何使用Stream实现几行代码 完成一个可支持并行化的流式计算程序
Dubbo浅探
·3 分钟
后端 框架 Java Dubbo
繁忙的一周终于过去了,加入小影第一周主要是熟悉后端架构,同事们都挺好,自己的基础还是有点弱,前段时间简单的把Spring Cloud 和 Dubbo学习了一下,但是其实对于工作来说,之前学的都是最新的版本,但是其实公司用的版本很老了,所以需要时间去学习老版本
Spring Cloud Alibaba浅探
·2 分钟
后端 框架 Java SpringBoot
花了半天时间把Spring Cloud Alibaba 的Nacos 、 Sentinel 和 Seata简单的使用了一下,下面是我的一些看法
SpringCloud浅析
·5 分钟
后端 框架 Java SpringBoot
最近在学SpringCloud,之前一直对用视频学嗤之以鼻,觉得只有弱者才会这样,但是其实对于一些已经非常常见的技术 比如SpringCloud这种,已经出来很长一段时间了,而且其实非常杂,用视频学起来其实非常快,当然前提是你要三倍速播放,而且 你得把视频配套的代码找到,这样你就能很快的掌握这个。
浅析Spring
·3 分钟
后端 框架 Java SpringBoot
 > Spring核心就是IoC(依赖注入)AoP(面向切面)本篇就基于一个开源项目 tiny-spring 来分析Spring框架到底给我们提供了什么东西
浅析微服务
·5 分钟
后端 框架 Java
这篇博客主要是从web技术发展来探索微服务的起源 要想了解微服务是什么得从web框架出来之后开始讲起,大部分可能不知道微服务,一定知道写web服务的框架,懂Java的可以用Spring Boot一把梭,懂Python的Flask、Django、Tornado也写的飞起