<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • Loading

    Stream常用操作以及原理探索

    Stream常用操作以及原理


    Stream是什么?

    Stream是一個高級迭代器,它不是數據結構,不能存儲數據。它可以用來實現內部迭代,內部迭代相比平常的外部迭代,它可以實現并行求值(高效,外部迭代要自己定義線程池實現多線程來實現高效處理)、惰性求值(中沒有終止操作,中間操作是不會執行的)、短路操作(拿到正確的結果就返回,不需要等到整個過程完成之后)等

    • Stream翻譯過來的意思就是“溪流,流”的意思,而我們剛開始學習java的時候接觸最多的就是IO流,它更像“農夫山泉”,“我們只做大自然的搬運工”,只是將一個文件從這個地方傳到另一個地方,對于文件當中內容不做任何增刪改操作,而Stream就會,也就是將要處理的數據當作流,在管道中進行傳輸,并在管道中的每個節點對數據進行處理,如過濾、排序、轉換等;

    • 通常我們需要處理的數據是以Collection、Array等數據來源;

    • Stream它是Java8中的一個新特性,那關于Java8中的其他新特性內容可以參考這篇文章《Java8新特性實戰》

    • 那既然是Java8的新特性,而且我們也知道Java8大改動之一的就是增加了函數式編程,而Stream就主角,那有關函數式編程是什么,可以參考知乎上的一篇文章《什么是函數式編程?》

    • 既然是函數式編程,所以通常是配合Lambda表達式使用;


    Stream怎么用?

    所有操作分類

    首先Stream的所有操作可分為兩類,一是中間操作二是終止操作

    中間操作:中間操作只是一種標記,只有結束操作才會觸發實際計算

    • 無狀態:指元素的處理不受前面元素的影響;
    • 有狀態:有狀態的中間操作必須等到所有元素處理之后才知道最終結果,比如排序是有狀態操作,在讀取所有元素之前并不能確定排序結果。

    終止操作:顧名思義,就是得出最后計算結果的操作

    • 短路操作:指不用處理全部元素就可以返回結果;
    • 非短路操作:指必須處理所有元素才能得到最終結果。

    此外這里我看到有的地方將collect定義為了中間操作,但通過我看了大部分對Stream的介紹,發現Collect這個收集操作是最終止操作,畢竟這也符合我們平時所用到它的場景,所以還請加以辨別有的文章中提到的collect是中間操作的錯誤解釋。


    常用操作

    以下兩張圖是對stream的常用操作做了一個簡單使用案例,原本流程圖在這Java8新特性

    image-20220624190633854

    image-20220624190537640

    那至于常用操作這塊,本次博客也不在進行過多的細說,因為網上有很多這種使用類型的文章,我常看的有這三篇文章:


    為什么使用Stream?

    聲明式處理數據

    第一個原因我覺得是Stream流可以以聲明式的方式去處理數據,也就是像它其中就有filter、sort這種以及寫好的操作,只需要拿來使用即可,如果我們平時使用for循環,還要在for循環中自己去寫怎么過濾的這些操作,最后才得出自己想要的結果,對比這種命令式的操作

    可以說讓我們代碼更加干凈、簡潔。

    對比for循環

    對于與for循環效率的對比,我覺得和以下內容差不多,但搜尋網上資料來證明某一觀點正確的我目前沒有找到,很多人持有觀點就是“犧牲代碼效率來換取代碼簡潔度”,“Stream的優勢在于有并行處理”,“Stream的效率與for差不多,為了代碼簡潔更偏向Stream”等。

    但是犧牲代碼效率換代碼簡潔度我覺得還是有問題的,不能一概而論。但是函數式編程的優點就是代碼簡潔,多核友好并行處理這是不可否認的。

    • 針對不同的數據結構,Stream流的執行效率是不一樣的
    • 針對不同的數據源,Stream流的執行效率也是不一樣的
    • 對于簡單的數字(list-Int)遍歷,普通for循環效率的確比Stream串行流執行效率高(1.5-2.5倍)。但是Stream流可以利用并行執行的方式發揮CPU的多核優勢,因此并行流計算執行效率高于for循環。
    • 對于list-Object類型的數據遍歷,普通for循環和Stream串行流比也沒有任何優勢可言,更不用提Stream并行流計算。

    雖然在不同的場景、不同的數據結構、不同的硬件環境下。Stream流與for循環性能測試結果差異較大,甚至發生逆轉。但是總體上而言

    • Stream并行流計算 >> 普通for循環 ~= Stream串行流計算 (之所以用兩個大于號,你細品)
    • 數據容量越大,Stream流的執行效率越高。
    • Stream并行流計算通常能夠比較好的利用CPU的多核優勢。CPU核心越多,Stream并行流計算效率越高。
    • 如果數據在1萬以內的話,for循環效率高于foreach和stream;如果數據量在10萬的時候,stream效率最高,其次是foreach,最后是for。另外需要注意的是如果數據達到100萬的話,parallelStream異步并行處理效率最高,高于foreach和for

    處理集合數據

    Stream可以說是Java8中對于處理集合的抽象概念,所以我們經常對集合中的數據采用像SQL這種類似方式去處理;所以經常會用Stream進行遍歷操作,那相較于我們以前寫的嵌套for循環可以說是代碼更加的簡潔,更直觀易讀。當然循環只是循環,而Stream是個流的形式去做處理。那如何去做迭代,那就得看看stream的原理了。

    惰性計算

    惰性計算我們也可以稱作惰性求值或者延遲求值,這種方式在函數式編程中極為常見,也就是當計算出結果后不立馬去返回值,而是在它要被用到的時候來計算;

    在Stream中,我們就可以看作中間操作,比如當要對一個List集合做出Stream操作,比如filter,但是沒有最終操作,它返回的還是一個Stream流。也就是我們可以看作下圖這種方式。

    image-20220625164918253

    與Collection的不同點

    從實現角度比較, Stream和Collection也有眾多不同:

    • 不存儲數據。 流不是一個存儲元素的數據結構。 它只是傳遞源(source)的數據。
    • 功能性的(Functional in nature)。 在流上操作只是產生一個結果,不會修改源。 例如filter只是生成一個篩選后的stream,不會刪除源里的元素。
    • 延遲搜索。 許多流操作, 如filter, map等,都是延遲執行。 中間操作總是lazy的。
    • Stream可能是無界的。 而集合總是有界的(元素數量是有限大小)。 短路操作如limit(n) , findFirst()可以在有限的時間內完成在無界的stream
    • 可消費的(Consumable)。 不是太好翻譯, 意思流的元素在流的聲明周期內只能訪問一次。 再次訪問只能再重新從源中生成一個Stream

    Stream原理

    也許我們會覺得,Stream的實現是每一次去調用函數,它就會進行一次迭代,這肯定是不對的,這樣Stream的效率是很低的。

    其實事實是我們可以通過看源碼來發現它是怎樣迭代的,其實Stream內部是通過流水線(Pipeline)的方式來實現的,基本思想是在迭代的時候順著流水線(Pipeline)盡可能的執行更多的操作,從而避免多次迭代

    也就是說Stream在執行中間操作時僅僅是記錄,當用戶調用終止操作時,會在一個迭代里將已經記錄的操作順著流水線全部執行掉。沿著這個思路,有幾個問題需要解決:

    1. 用戶的操作如何記錄?
    2. 操作如何疊加?
    3. 疊加之后的操作如何執行?

    關鍵問題解決

    以上我們可以知道Stream的完整操作,是一個由<數據來源、操作、回調函數>組成的三元組;

    此外我們還需要知道Stream的相關類與接口的繼承關系。如下圖:

    • 從圖中可以看出我們除了基本類型以外,引用類型是通過實例化的ReferencePipeline來表示
    • 而與ReferencePipeline并行三個類是為其基本類型定制的。

    image-20220625165921940

    1.操作如何記錄?

    • 首先JDK源碼中經常會用stage(階段)來標識一次操作。
    • 其次,Stream操作通常需要一個回調函數(Lambda表達式)

    image-20220625171257401

    從以上我們可以看出,當我們調用stream方法時,最終會去創建一個Head實例來表示操作頭,也就是第一個stage,當調用filter()方法時則會創建中間操作實例StatelessOp(無狀態),接著調用map()方法時則會創建中間操作實例StatelessOp(無狀態),最后調用sort()方法時會創建最終操作實例StatefulOp(有狀態),同樣調用其他操作對應的方法也會生成一個ReferencePipeline實例,通過調用這一系列操作最終形成一個雙向鏈表,即每個Stage都記錄了前一個Stage和本次的操作以及回調函數。

    源碼

    1.調用stream,創建Head實例

    image-20220625173329776

    2.調用filter或map中間操作

    • 這些中間操作以及最終操作都在ReferencePipeline這個類中,它實現其元素類型的中間管道階段或管道源階段的抽象基類。

    下面代碼邏輯就是將回調函數mapper包裝到一個Sink當中。由于Stream.map()是一個無狀態的中間操作,所以map()方法返回了一個StatelessOp內部類對象(一個新的Stream),調用這個新Stream的opWripSink()方法將得到一個包裝了當前回調函數的Sink。

    這個Sink就是下面提到的操作如何疊加方式。

    image-20220625173737720

    2.操作如何疊加?

    從上面我們可以知道Stream通過stage來記錄操作,但stage只保存當前操作,它是不知道怎么操作下一個stage,它又需要什么操作。

    所以要執行的話還需要某種協議將各個stage關聯起來。

    JDK中就是使用Sink(我們可以稱為“匯聚結點”)接口來實現的,Sink接口定義begin()、end()、cancellationRequested()、accept()四個方法,如下表所示。

    方法名 作用
    void begin(long size) 開始遍歷元素之前調用該方法,通知Sink做好準備。
    void end() 所有元素遍歷完成之后調用,通知Sink沒有更多的元素了。
    boolean cancellationRequested() 是否可以結束操作,可以讓短路操作盡早結束。
    void accept(T t) 遍歷元素時調用,接受一個待處理元素,并對元素進行處理。Stage把自己包含的操作和回調方法封裝到該方法里,前一個Stage只需要調用當前Stage.accept(T t)方法就行了。

    Sink接口注釋文檔:

    Consumer的擴展,用于通過流管道的各個階段傳遞值,具有管理大小信息、控制流等的附加方法。在第一次調用Sink上的accept()方法之前,您必須首先調用begin()方法來通知它有數據來了(可選地通知接收器有多少數據來了),并且在所有數據都發送之后,你必須調用end()方法。在調用end()之后,您不應該在沒有再次調用begin() ) 的情況下調用accept() )。 Sink還提供了一種機制,通過該機制,sink 可以合作發出它不希望接收更多數據的信號( cancellationRequested()方法),源可以在向Sink發送更多數據之前輪詢該機制。
    接收器可能處于以下兩種狀態之一:初始狀態和活動狀態。它從初始狀態開始; begin()方法將其轉換為活動狀態, end()方法將其轉換回初始狀態,在該狀態下可以重復使用。數據接受方法(如accept()僅在活動狀態下有效。
    API 注釋:
    流管道由一個源、零個或多個中間階段(例如過濾或映射)和一個終端階段(例如歸約或 for-each)組成。具體來說,考慮管道:
     
         int longestStringLengthStartingWithA
             = strings.stream()
                      .filter(s -> s.startsWith("A"))
                      .mapToInt(String::length)
                      .max();
     
    在這里,我們分為三個階段,過濾、映射和歸約。過濾階段使用字符串并發出這些字符串的子集;映射階段使用字符串并發出整數;歸約階段消耗這些整數并計算最大值。
    Sink實例用于表示此管道的每個階段,無論該階段接受對象、整數、長整數還是雙精度數。 Sink 具有accept(Object) 、 accept(int)等的入口點,因此我們不需要每個原始特化的專用接口。 (對于這種雜食性趨勢,它可能被稱為“廚房水槽”。)管道的入口點是過濾階段的Sink ,它將一些元素“下游”發送到映射階段的Sink ,然后將整數值向下游發送到Sink以進行縮減階段。與給定階段關聯的Sink實現應該知道下一階段的數據類型,并在其下游Sink上調用正確的accept方法。同樣,每個階段都必須實現與其接受的數據類型相對應的正確accept方法。
    Sink.OfInt等特化子類型覆蓋accept(Object)以調用accept的適當原語特化,實現Consumer的適當原語特化,并重新抽象accept的適當原語特化。
    Sink.ChainedInt等鏈子類型不僅實現Sink.OfInt ,還維護了一個表示下游Sink的downstream字段,并實現了begin() 、 end()和cancellationRequested()方法來委托給下游Sink 。大多數中間操作的實現將使用這些鏈接包裝器。例如,上面示例中的映射階段如下所示:
     
         IntSink is = new Sink.ChainedReference<U>(sink) {
             public void accept(U u) {
                 downstream.accept(mapper.applyAsInt(u));
             }
         };
     
    在這里,我們實現Sink.ChainedReference<U> ,這意味著我們期望接收U類型的元素作為輸入,并將下游接收器傳遞給構造函數。因為下一階段需要接收整數,所以我們必須在向下游發送值時調用accept(int)方法。 accept()方法將映射函數從U應用到int并將結果值傳遞給下游Sink 。
        
    interface Sink<T> extends Consumer<T> {}
    

    從上面那張圖中調用ReferencePipeline.map()的方法,我們會發現我們在創建一個ReferencePipeline實例的時候,需要重寫opWrapSink方法來生成對應Sink實例。而且通過閱讀源碼會發現常用的操作都會創建一個ChainedReference實例;

    image-20220625175215959

    有了上面的協議,相鄰Stage之間調用就很方便了,每個Stage都會將自己的操作封裝到一個Sink里,前一個Stage只需調用后一個Stage的accept()方法即可,并不需要知道其內部是如何處理的。

    當然對于有狀態的操作,Sink的begin()end()方法也是必須實現的。比如Stream.sorted()是一個有狀態的中間操作,其對應的Sink.begin()方法可能創建一個乘放結果的容器,而accept()方法負責將元素添加到該容器,最后end()負責對容器進行排序。

    對于短路操作,Sink.cancellationRequested()也是必須實現的,比如Stream.findFirst()是短路操作,只要找到一個元素,cancellationRequested()就應該返回true,以便調用者盡快結束查找。

    Sink的四個接口方法常常相互協作,共同完成計算任務。

    實際上Stream API內部實現的的本質,就是如何重載Sink的這四個接口方法

    3.操作疊加后如何進行執行?

    Sink完美封裝了Stream每一步操作,并給出了[處理->轉發]的模式來疊加操作。這一連串的齒輪已經咬合,就差最后一步撥動齒輪啟動執行。是什么啟動這一連串的操作呢?也許你已經想到了啟動的原始動力就是結束操作(Terminal Operation),一旦調用某個結束操作,就會觸發整個流水線的執行。

    結束操作之后不能再有別的操作,所以結束操作不會創建新的流水線階段(Stage),直觀的說就是流水線的鏈表不會在往后延伸了。結束操作會創建一個包裝了自己操作的Sink,這也是流水線中最后一個Sink,這個Sink只需要處理數據而不需要將結果傳遞給下游的Sink(因為沒有下游)。對于Sink的[處理->轉發]模型,結束操作的Sink就是調用鏈的出口。

    我們再來考察一下上游的Sink是如何找到下游Sink的。

    一種可選的方案是在PipelineHelper中設置一個Sink字段,在流水線中找到下游Stage并訪問Sink字段即可。

    但Stream類庫的設計者沒有這么做,而是設置了一個Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來得到Sink,該方法的作用是返回一個新的包含了當前Stage代表的操作以及能夠將結果傳遞給downstream的Sink對象。

    為什么要產生一個新對象而不是返回一個Sink字段?

    這是因為使用opWrapSink()可以將當前操作與下游Sink(上文中的downstream參數)結合成新Sink。試想只要從流水線的最后一個Stage開始,不斷調用上一個Stage的opWrapSink()方法直到最開始(不包括stage0,因為stage0代表數據源,不包含操作),就可以得到一個代表了流水線上所有操作的Sink,用代碼表示就是這樣:

    類PipelineHelper

    image-20220625193452816

    image-20220625181757639

    image-20220625182737377

    類 AbstractPipeline extends PipelineHelper

    • 通過wrapSink方法得到從開始到結束的所有操作并包裝在一個sink里面,然后通過copyInto執行,就相當于整個流水線進行了執行

    • 代碼執行邏輯:首先調用wrappedSink.begin()方法告訴Sink數據即將到來,然后調用spliterator.forEachRemaining()方法對數據進行迭代,最后調用wrappedSink.end()方法通知Sink數據處理結束。

      image-20220625194205588

    image-20220625181910918

    image-20220625182759077

    4.操作結果在哪?

    針對不同類型的返回結果,下表給出了各種有返回結果的Stream結束操作:

    返回類型 對應的結束操作
    boolean anyMatch() allMatch() noneMatch()
    Optional findFirst() findAny()
    歸約結果 reduce() collect()
    數組 toArray()
    1. 對于表中返回boolean或者Optional的操作(Optional是存放 一個 值的容器)的操作,由于值返回一個值,只需要在對應的Sink中記錄這個值,等到執行結束時返回就可以了。
    2. 對于歸約操作,最終結果放在用戶調用時指定的容器中(容器類型通過[收集器](http://www.tnepal.com/CarpenterLee/p/5-Streams API(II).md#收集器)指定)。collect(), reduce(), max(), min()都是歸約操作,雖然max()和min()也是返回一個Optional,但事實上底層是通過調用[reduce()](http://www.tnepal.com/CarpenterLee/p/5-Streams API(II).md#多面手reduce)方法實現的。
    3. 對于返回是數組的情況,毫無疑問的結果會放在數組當中。這么說當然是對的,但在最終返回數組之前,結果其實是存儲在一種叫做Node的數據結構中的。Node是一種多叉樹結構,元素存儲在樹的葉子當中,并且一個葉子節點可以存放多個元素。這樣做是為了并行執行方便。

    參考文章:

    Java 8 Stream原理解析

    深入理解Java Stream流水線

    Java 8 Stream探秘

    Java8 Stream原理深度解析

    梳理

    //例子:List<T> a = b.stream().map(m::getId()).collect(Collectors.toList())
    
    //1.首先調用stream方法,看源碼:
    public interface Collection<E> extends Iterable<E> {
        default Stream<E> stream() {
            return StreamSupport.stream(spliterator(), false);
        }
    }
    //2.進入StreamSupport
    //3.發現用的ReferencePipeline創建的Head頭,進行此次操作記錄
    public final class StreamSupport {
        public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
            Objects.requireNonNull(spliterator);
            return new ReferencePipeline.Head<>(spliterator,
                                                StreamOpFlag.fromCharacteristics(spliterator),
                                                parallel);
        }
    }
    //4.調用中間操作map方法,發現中間操作和最終操作的那些操作都在此
    //5.發現map操作是個StatelessOp(無狀態操作),同時此類繼承于AbstractPipeline,并重寫了opWrapSink方法;
    //6.并通過Sink接口實現相鄰stage直接的連接,來進行操作記錄的疊加
    abstract class ReferencePipeline<P_IN, P_OUT>
            extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
            implements Stream<P_OUT>  {
        
        @Override
        @SuppressWarnings("unchecked")
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            Objects.requireNonNull(mapper);
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<P_OUT, R>(sink) {
                        @Override
                        public void accept(P_OUT u) {
                            downstream.accept(mapper.apply(u));
                        }
                    };
                }
            };
        }
        
    }
    //7.通過PipelineHelper中的wrapSink接口進行開始到結束的操作記錄包裝到一個Sink中
    abstract class PipelineHelper<P_OUT> {
         abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
    }
    
    abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
            extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
        
            @Override
        	@SuppressWarnings("unchecked")
        	final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
            Objects.requireNonNull(sink);
            for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
                sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
            }
            return (Sink<P_IN>) sink;
        }
        
    }
    //8.通過PipelineHelper中的copyInto接口執行stage
    abstract class PipelineHelper<P_OUT> {
         abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
    }
    
    abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
            extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
        
            @Override
       	    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
            Objects.requireNonNull(wrappedSink);
    
            if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
                wrappedSink.begin(spliterator.getExactSizeIfKnown());
                spliterator.forEachRemaining(wrappedSink);
                wrappedSink.end();
            }
            else {
                copyIntoWithCancel(wrappedSink, spliterator);
            }
        }
        
    }
    //9.最后通過不同類型的操作類型來得出Stream的返回結果
    

    最后

    這些個人想說的話還是留在結尾吧,畢竟放前言好像有點不符,畢竟文章重點也不是這。

    有一段時間沒有寫博客了,還是得自我反省。反省的結果就是人喜歡偷懶,變得不會去對一段時間的學習內容進行一個總結,加之在整個寫博客過程中需要梳理自己的思路,并且還要對自己寫的內容要有一定的正確性判斷,如此寫博客的時間也隨之變長。漸漸地,自己也放松了下來,而這樣導致的最大問題就是自己的知識體系越來越碎,導致自己好像一直在學東西,但同時忘記的速度也在隨之變快,導致自己無法去正確在實踐當中去運用這些所學的技術以及知識點

    上次也說了,會總結設計模式的相關內容,但畢竟這種思想級別的東西,如果不通過理論加實踐,是很難總結出來一些對自己有用的東西的,而且這些內容畢竟放到自己網上博客當中,那就不僅僅是自己在看了,我也不希望有一些和我一樣的菜鳥看完之后被文章所誤導。

    Stream這個東西也算自己平時用的較多的一個東西,所以來進行一個總結。

    文中如有錯誤,請各位大佬及時指出,并請不吝賜教。

    posted @ 2022-06-26 13:02  余月七  閱讀(271)  評論(1編輯  收藏  舉報
    国产美女a做受大片观看