<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • java8Stream原理深度解析

    Java8 Stream原理深度解析

    Author:Dorae
    Date:2017年11月2日19:10:39
    轉載請注明出處


    上一篇文章中簡要介紹了Java8的函數式編程,而在Java8中另外一個比較大且非常重要的改動就是Stream。在這篇文章中,將會對流的實現原理進行深度,解析,具體關于如何使用,請參考《Java8函數式編程》。

    常用的流操作

    在深入原理之前,我們有必要知道關于Stream的一些基礎知識,關于Stream的操作分類,如表1-1所示。

    表1-1 Stream的常用操作分類(表格引自這里)

    表1-1

    如表1-1中所示,Stream中的操作可以分為兩大類:中間操作與結束操作,中間操作只是對操作進行了記錄,只有結束操作才會觸發實際的計算(即惰性求值),這也是Stream在迭代大集合時高效的原因之一。中間操作又可以分為無狀態(Stateless)操作與有狀態(Stateful)操作,前者是指元素的處理不受之前元素的影響;后者是指該操作只有拿到所有元素之后才能繼續下去。結束操作又可以分為短路與非短路操作,這個應該很好理解,前者是指遇到某些符合條件的元素就可以得到最終結果;而后者是指必須處理所有元素才能得到最終結果。

    原理探秘

    在探究Stream的執行原理之前,我們先看如下兩段代碼(本文將以code_1為例進行說明):

    code_1

    public static void main(String[] args) {
    	List<String> list = Lists.newArrayList(
    			"bcd", "cde", "def", "abc");
    	List<String> result = list.stream()
    			//.parallel()
    			.filter(e -> e.length() >= 3)
    			.map(e -> e.charAt(0))
    			//.peek(System.out :: println)
    			//.sorted()
    			//.peek(e -> System.out.println("++++" + e))
    			.map(e -> String.valueOf(e))
    			.collect(Collectors.toList());
    	System.out.println("----------------------------");
    	System.out.println(result);
    }
    

    code_2

    public void targetMethod() {
    	List<String> list = Lists.newArrayList(
    			"bcd", "cde", "def", "abc");
    	List<String> result = Lists.newArrayListWithCapacity(list.size());
    	for (String str : list) {
    		if (str.length() >= 3) {
    			char e = str.charAt(0);
    			String tempStr = String.valueOf(e);
    			result.add(tempStr);
    		}
    	}
    	System.out.println("----------------------------");
    	System.out.println(result);
    }
    

    很明顯,在最終結果上而言,code_1與code_2是等價的。那么,Stream是怎么做的呢?顯然不是每次操作都進行迭代,因為這對于執行時間與存儲中間變量來說都將是噩夢。

    要解決的問題

    顯然,如果code_2只對集合迭代了一次,也就是說相當高效。那么這么做有沒有弊端?有!模板代碼、中間變量、不利于并行都是其存在的問題。但是按著code_2的思路可以知道有以下幾個問題需要解決:

    • 如何記錄每次操作?
    • 操作如何疊加?
    • 疊加后的操作如何執行?
    • 最后的結果如何存儲?

    包結構分析

    那么Stream是如何解決的呢?所謂源碼之下,無所遁形。那么,首先來看一下Stream包的結構(如圖1-1所示)。

    圖1-1

    圖1-1 Stream包的結構示意圖

    其中各個部分的主要功能為:

    1. 主要是各種操作的工廠類、數據的存儲結構以及收集器的工廠類等;
    2. 主要用于Stream的惰性求值實現;
    3. Stream的并行計算框架;
    4. 存儲并行流的中間結果;
    5. 終結操作的定義;

    我們單獨把第二部分拎出來用于說明Stream的惰性求值實現,如圖1-2所示,Java8針對Int、long、double進行了優化,主要用于頻繁的拆裝箱。我們以引用類型進行介紹,在圖中已經標為綠色。

    • BaseStream規定了流的基本接口,比如iterator、spliterator、isParallel等;
    • Stream中定義了map、filter、flatmap等用戶關注的常用操作;
    • PipelineHelper主要用于Stream執行過程中相關結構的構建;
    • Head、StatelessOp、StatefulOp為ReferencePipeline中的內部類。

    圖1-2

    圖1-2

    操作如何記錄

    關于操作如何記錄,在JDK源碼注釋中多次用(操作)stage來標識用戶的每一次操作,而通常情況下Stream的操作又需要一個回調函數,所以一個完整的操作是由數據來源、操作、回調函數組成的三元組來表示。而在具體實現中,使用實例化的ReferencePipeline來表示,即圖1-2中的Head、StatelessOp、StatefulOp的實例。

    如code_3、code_4所示為調用stream.map()的關鍵的兩個方法,在用戶
    調用一系列操作后會形成如圖1-3所示的雙鏈表結構。

    圖1-3

    圖1-3

    code_3(ReferencePipeline.map())

    @Override
    @SuppressWarnings("unchecked")
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }
    

    code_4(AbstractPipeline.AbstractPipeline())

    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;
    
        this.previousStage = previousStage;
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }
    

    如何疊加

    在上一步已經在stage中記錄了每一步操作,此時并沒有執行。但是stage只是保存了當前的操作,并不能確定下一個stage需要何種操作,何種數據,其實JDK為此定義了Sink接口,其中只有begin()、end()、cancellationRequested()、accept()四個接口(如表1-2所示,摘自這里),其中中間操作的子類中包含一個指向下游sink的指針。

    表1-2

    表1-2

    現在轉向code_3,可以看出,在satge鏈中,每一步都包含了opWrapSink()。當調用終結操作時,將會觸發code_5從最后一個stage(終結操作產生的satge)開始,遞歸產生圖1-4所示的結構。

    code_5(AbstractPipeline.wrapSink())

    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);
    
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }
    

    圖1-4

    圖1-4

    如何執行

    所有的操作已經形成了圖1-4的結構,接下來就會觸發code_6,此時結果就會產生對應的結果啦!

    code_6(AbstractPipelie.copyInto())

    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);
    
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }
    

    并行原理

    那么,Stream是如何并行執行的呢?其實產生stage鏈的過程和串行并沒有區別,只是在最終執行時進行了相應的調整,我們將code_1改變為code_7

    code_7

    public static void main(String[] args) {
    	List<String> list = Lists.newArrayList(
    			"bcd", "cde", "def", "abc");
    	List<String> result = list.stream()
    			.parallel()
    			.filter(e -> e.length() >= 3)
    			//.map(e -> e.charAt(0))
    			//.peek(System.out :: println)
    			.sorted()
    			//.peek(e -> System.out.println("++++" + e))
    			.map(e -> String.valueOf(e))
    			.collect(Collectors.toList());
    	System.out.println("----------------------------");
    	System.out.println(result);
    }
    

    那么最終產生的stage鏈與sink的結構如圖1-5所示,因為此時stage鏈中有一個有狀態操作(sorted()),也就是說在這里必須處理完所有元素才能進行下一步操作。那么此時無論是并行還是串行,此時都會產生兩個sink鏈,也就是代表了兩次迭代,才產生了最終結果。

    圖1-5

    圖1-5

    那么,究竟是如何并行的呢?其實當調用collect操作時會調用code_8,其中的evaluateParallel()如code_9所示。

    code_8(AbstractPipeline.evaluate())

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
    
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
    

    code_9(ReduceOp.evaluateParallel())

    @Override
        public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<P_IN> spliterator) {
            return new ReduceTask<>(this, helper, spliterator).invoke().get();
        }
    

    其實Stream的并行處理是基于ForkJoin框架的,相關類與接口的結構如圖1-6所示。其中AbstractShortCircuitTask用于處理短路操作,其他相關操作類似,會產生對應的Task。

    圖1-6

    圖1-6

    關于code_8中獲取源Spliterator,如code_10所示,

    code_10(AbstractPipeline.sourceSpliterator())

    @SuppressWarnings("unchecked")
    private Spliterator<?> sourceSpliterator(int terminalFlags) {
        Spliterator<?> spliterator = null;
        if (sourceStage.sourceSpliterator != null) {
            spliterator = sourceStage.sourceSpliterator;
            sourceStage.sourceSpliterator = null;
        }
        else if (sourceStage.sourceSupplier != null) {
            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
            sourceStage.sourceSupplier = null;
        }
        else {
            throw new IllegalStateException(MSG_CONSUMED);
        }
    
        if (isParallel() && sourceStage.sourceAnyStateful) {
            //如果是并行流并且有stage包含stateful操作
    		//那么就會依次遍歷stage,直到遇到stateful stage時
            int depth = 1;
            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
                 u != e;
                 u = p, p = p.nextStage) {
    
                int thisOpFlags = p.sourceOrOpFlags;
                if (p.opIsStateful()) {
                    depth = 0;
    
                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
                        //如果有短路操作,則去除相應標記
                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
                    }
    				//盡量以惰性求值的方式進行操作
                    spliterator = p.opEvaluateParallelLazy(u, spliterator);
    
                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
                            ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
                            : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
                }
                p.depth = depth++;
                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
            }
        }
    
        if (terminalFlags != 0)  {
            // Apply flags from the terminal operation to last pipeline stage
            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
        }
    
        return spliterator;
    }
    

    如何并行執行

    關于各個task就行是如何并行執行,其實最終調用的是code_11所示,對應的流程如圖1-7所示,其中交替fork子節點是為了緩和數據分片不均造成的性能退化。

    code_11(AbstractTask.compute())

    @Override
    public void compute() {
        Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
        long sizeEstimate = rs.estimateSize();
        long sizeThreshold = getTargetSize(sizeEstimate);
        boolean forkRight = false;
        @SuppressWarnings("unchecked") K task = (K) this;
        while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
            K leftChild, rightChild, taskToFork;
            task.leftChild  = leftChild = task.makeChild(ls);
            task.rightChild = rightChild = task.makeChild(rs);
            task.setPendingCount(1);
            if (forkRight) {
                forkRight = false;
                rs = ls;
                task = leftChild;
                taskToFork = rightChild;
            }
            else {
                forkRight = true;
                task = rightChild;
                taskToFork = leftChild;
            }
            taskToFork.fork();
            sizeEstimate = rs.estimateSize();
        }
        task.setLocalResult(task.doLeaf());
        task.tryComplete();
    }
    

    圖1-7

    圖1-7

    影響并行流的因素

    數據大小;源數據結構(分割越容易越好),arraylist、數組比較好,hashSet、treeSet次之,linked最差;裝箱;核的數量(可使用);單元處理開銷(越大越好)

    建議:

    終結操作以外的操作,盡量避免副作用,避免突變基于堆棧的引用,或者在執行過程中進行任何I/O;傳遞給流操作的數據源應該是互不干擾(避免修改數據源)。

    小結

    本文主要探究了Stream的實現原理,并沒有涉及到具體的流操作的用法(讀者可以參考《java8函數式編程》),并且給出了使用Stream的部分建議。

    參考文章

    深入理解Java Stream流水線
    Java 8 Stream探秘
    java.util.stream 庫簡介

    posted @ 2017-11-03 17:37  Dorae  閱讀(37479)  評論(6編輯  收藏  舉報
    轉載請注明出處
    国产美女a做受大片观看