<input id="ohw05"></input>
  • <table id="ohw05"><menu id="ohw05"></menu></table>
  • <var id="ohw05"></var>
  • <code id="ohw05"><cite id="ohw05"></cite></code>
    <label id="ohw05"></label>
    <var id="ohw05"></var>
  • 一個關于wait/notify與鎖關系的探究

      wait/notify 機制是解決生產者消費者問題的良藥。它的核心邏輯是基于條件變量的鎖機制處理。所以,它們到底是什么關系?wait()時是否需要持有鎖? notify()是否需要持有鎖?先說答案:都需要持有鎖。

      wait需要持有鎖的原因是,你肯定需要知道在哪個對象上進行等待,如果不持有鎖,將無法做到對象變更時進行實時感知通知的作用。與此同時,為了讓其他線程可以操作該值的變化,它必須要先釋放掉鎖,然后在該節點上進行等待。不持有鎖而進行wait,可能會導致長眠不起。而且,如果不持有鎖,則當wait之后的操作,都可能是錯的,因為可能這個數據已經過時,其實也叫線程不安全了。總之,一切為了安全,單獨的wait做不成這事。

      notify需要持有鎖的原因是,它要保證線程的安全,只有它知道數據變化了,所以它有權力去通知其他線程數據變化。而且通知完之后,不能立即釋放鎖,即必須在持有鎖的情況下進行通知,否則notify后續的工作的線程安全性將無法保證,盡量它是在lock的范圍內,但卻因為鎖釋放,將導致不可預期的結果。而且在notify的時候,并不能真正地將對應的線程喚醒,即不能從操作系統層面喚醒線程,因為此時當前通知線程持有鎖,而此時如果將其他等待線程喚醒,它們將立即參與到鎖的競爭中來,而這時的競爭是一定會失敗的,這可能會導致被喚醒的線程立即又進入等待隊列,更糟糕的是它可能再也不會被喚醒 了。所以不能將在持有鎖的時,將對應的線程真正喚醒,我們看到的notify只是從語言上下文級別,將它從等待隊列轉移到同步隊列而已,對此操作系統一無所知。

     

    1. 實驗驗證

      我們通過一個實驗來看一下,wait/和notify是否會在持有鎖的情況下進行。

        private ReentrantLock mainLock = new ReentrantLock();
    
        @Test
        public void testWaitNotify() throws InterruptedException {
            Condition c1 = mainLock.newCondition();
            Condition c3 = mainLock.newCondition();
    
            CountDownLatch t1StartLatch = new CountDownLatch(2);
            Thread t1 = new Thread(() -> {
                mainLock.lock();
                try {
                    System.out.println(LocalDateTime.now() + " - t1 start");
                    c1.await();
                    System.out.println(LocalDateTime.now() + " - t1 c1 await out");
                    // 過早通知問題,導致無法測試下一步
    //                c3.await();
    //                System.out.println(LocalDateTime.now() + " - t1 c2 await out");
                    t1StartLatch.await();
                    System.out.println(LocalDateTime.now() + " - t1 sleeping");
                    SleepUtil.sleepMillis(10_000L);
                    c1.signalAll();
                    c3.signalAll();
                    System.out.println(LocalDateTime.now() + " - t1 notified, sleeping again");
                    SleepUtil.sleepMillis(10_000L);
                    System.out.println(LocalDateTime.now() + " - t1 out");
                }
                catch (Exception e) {
                    System.err.println("t1 exception ");
                    e.printStackTrace();
                }
                finally {
                    mainLock.unlock();
                }
            }, "t1");
            Thread t2 = new Thread(() -> {
                mainLock.lock();
                try {
                    t1StartLatch.countDown();
                    System.out.println(LocalDateTime.now() + " - t2 c1 signal");
                    c1.signalAll();
                    System.out.println(LocalDateTime.now() + " - t2 wait");
                    c1.await();
                    System.out.println(LocalDateTime.now() + " - t2 out");
                }
                catch (Exception e) {
                    System.err.println("t2 exception ");
                    e.printStackTrace();
                }
                finally {
                    mainLock.unlock();
                }
            }, "t2");
            Thread t3 = new Thread(() -> {
                mainLock.lock();
                try {
                    t1StartLatch.countDown();
                    System.out.println(LocalDateTime.now() + " - t3 c3 signal");
                    c3.signalAll();
                    System.out.println(LocalDateTime.now() + " - t3 wait");
                    c3.await();
                    System.out.println(LocalDateTime.now() + " - t3 out");
                }
                catch (Exception e) {
                    System.err.println("t2 exception ");
                    e.printStackTrace();
                }
                finally {
                    mainLock.unlock();
                }
            }, "t3");
            t1.start();
            t2.start();
            t3.start();
            t1.join();
            System.out.println(LocalDateTime.now() + " - main t1 out");
            t2.join();
            System.out.println(LocalDateTime.now() + " - main t2 out");
            t3.join();
            System.out.println(LocalDateTime.now() + " - main t3 out");
        }

      大概意思是,針對同一個鎖,wait之后,是否可以被其他線程進入臨界區?如果wait之前不通知進入,wait之后能進入,說明wait依賴于鎖,而且會釋放當前鎖。notify之后,wait()是否會立即執行,如果必須等到notify的模塊完成后,才執行,說明notify是必須要依賴于鎖的。

      結果如下:

    2022-03-27T20:09:43.588 - t1 start
    2022-03-27T20:09:43.603 - t2 c1 signal
    2022-03-27T20:09:43.603 - t2 wait
    2022-03-27T20:09:43.603 - t3 c3 signal
    2022-03-27T20:09:43.603 - t3 wait
    2022-03-27T20:09:43.603 - t1 c1 await out
    2022-03-27T20:09:43.603 - t1 sleeping
    2022-03-27T20:09:53.605 - t1 notified, sleeping again
    2022-03-27T20:10:03.612 - t1 out
    2022-03-27T20:10:03.612 - t2 out
    2022-03-27T20:10:03.612 - main t1 out
    2022-03-27T20:10:03.612 - t3 out
    2022-03-27T20:10:03.612 - main t2 out
    2022-03-27T20:10:03.612 - main t3 out
    
    
    2022-03-27T20:11:39.982 - t1 start
    2022-03-27T20:11:39.982 - t2 c1 signal
    2022-03-27T20:11:39.982 - t2 wait
    2022-03-27T20:11:39.982 - t3 c3 signal
    2022-03-27T20:11:39.982 - t3 wait
    2022-03-27T20:11:39.982 - t1 c1 await out
    2022-03-27T20:11:39.982 - t1 sleeping
    2022-03-27T20:11:49.989 - t1 notified, sleeping again
    2022-03-27T20:11:59.990 - t1 out
    2022-03-27T20:11:59.990 - t2 out
    2022-03-27T20:11:59.990 - main t1 out
    2022-03-27T20:11:59.990 - t3 out
    2022-03-27T20:11:59.990 - main t2 out
    2022-03-27T20:11:59.990 - main t3 out

      

    2. wait/notify 的實現機制

      我們以AQS的實現機制為線索,探索wait/notify機制。它在喚醒操作隊列時,設置狀態為 SIGNAL , 但它實際不執行操作系統喚醒。

            //     java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll
            /**
             * Moves all threads from the wait queue for this condition to
             * the wait queue for the owning lock.
             *
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
    
            // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignalAll
            /**
             * Removes and transfers all nodes.
             * @param first (non-null) the first node on condition queue
             */
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal
        /**
         * Transfers a node from a condition queue onto sync queue.
         * Returns true if successful.
         * @param node the node
         * @return true if successfully transferred (else the node was
         * cancelled before signal)
         */
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            // 不到萬不得已,不會真正喚醒等待中的隊列,從而滿足notify無法將線程喚醒的作用,或者說線程仍然在操作系統的等待隊列上
            // 它只是將當前線程移動到本語文的同步隊列中,以下線程下次運行過來時可以通過該限制
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
        
        /**
         * Inserts node into queue, initializing if necessary. See picture above.
         * @param node the node to insert
         * @return node's predecessor
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
            // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
            /**
             * Implements interruptible condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with saved state as argument,
             *      throwing IllegalMonitorStateException if it fails.
             * <li> Block until signalled or interrupted.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * </ol>
             */
            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                // 進來等待隊列,先釋放鎖,此時進入線程不安全狀態
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                // 此判斷只是本語文級別的等待隊列限制
                // notify 時只能滿足這個條件,而不會將線程從操作系統掛起隊列中喚醒,即不會進行 LockSupport.unpark()
                while (!isOnSyncQueue(node)) {
                    // 交由操作系統進行線程掛起
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                // 重新進行鎖的獲取,嘗試
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
        /**
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         *
         * @param node the node
         * @param arg the acquire argument
         * @return {@code true} if interrupted while waiting
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    // 獲取當鎖,則替換head后返回
                    // 而 tryAcquire() 則由各自策略實現
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 如果獲取不到鎖,則重新進入操作系統等待隊列
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

      所以,總結:

    1. wait將會釋放持有的鎖;
    2. wait將會加入到語言級別的等待隊列,同時也會提交給操作系統的等待隊列,做到真正的線程掛起;
    3. wait將會在被操作系統喚醒后,重新進行新一輪的鎖獲取嘗試,返回時已攜帶回原有的鎖,從外部看起來就像鎖一直都在一樣;
    4. notify不會真正的喚醒等待的線程,而只是將各等待線程從語言級別的等待隊列移出,到語言級別的同步隊列;
    5. notify只有在極端情況下,才會做到線程的真正喚醒作用,比如中斷,但這被喚醒的線程將無法正常進行業務操作,所以也是安全的;
    6. 只有在整體的鎖在進行 unlock() 的時候,才會喚醒線程,使其重新參與鎖的競爭;

      

    3. lock/unlock 流程

      同樣的AQS的實現為線索,lock/unlock 流程如下:

        // java.util.concurrent.locks.ReentrantLock#lock
        /**
         * Acquires the lock.
         *
         * <p>Acquires the lock if it is not held by another thread and returns
         * immediately, setting the lock hold count to one.
         *
         * <p>If the current thread already holds the lock then the hold
         * count is incremented by one and the method returns immediately.
         *
         * <p>If the lock is held by another thread then the
         * current thread becomes disabled for thread scheduling
         * purposes and lies dormant until the lock has been acquired,
         * at which time the lock hold count is set to one.
         */
        public void lock() {
            sync.lock();
        }
        
            // java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
        /**
         * Acquires in exclusive mode, ignoring interrupts.  Implemented
         * by invoking at least once {@link #tryAcquire},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquire} until success.  This method can be used
         * to implement method {@link Lock#lock}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         */
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                // 同上wait時的鎖爭搶操作
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        
        // java.util.concurrent.locks.ReentrantLock#unlock
        /**
         * Attempts to release this lock.
         *
         * <p>If the current thread is the holder of this lock then the hold
         * count is decremented.  If the hold count is now zero then the lock
         * is released.  If the current thread is not the holder of this
         * lock then {@link IllegalMonitorStateException} is thrown.
         *
         * @throws IllegalMonitorStateException if the current thread does not
         *         hold this lock
         */
        public void unlock() {
            sync.release(1);
        }
        
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#release
        /**
         * Releases in exclusive mode.  Implemented by unblocking one or
         * more threads if {@link #tryRelease} returns true.
         * This method can be used to implement method {@link Lock#unlock}.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryRelease} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @return the value returned from {@link #tryRelease}
         */
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                // 直接喚醒頭節點(真正的喚醒)
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
        
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
        /**
         * Wakes up node's successor, if one exists.
         *
         * @param node the node
         */
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            // 真正喚醒線程,只有一個線程將被喚醒
            if (s != null)
                LockSupport.unpark(s.thread);
        }

      總結: lock/unlock 是一個真正的上鎖解鎖操作,上鎖時如未成功,則進行park()進行操作系統掛起,解鎖時將頭節點unpark()交由操作系統調度。

     

    4. 喚醒多個等待線程

      如何喚醒多個等待線程?共享鎖有這個需求,其實也是notifyAll 的表面語義所在。

        // java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
        /**
         * Releases in shared mode.  Implemented by unblocking one or more
         * threads if {@link #tryReleaseShared} returns true.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryReleaseShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         * @return the value returned from {@link #tryReleaseShared}
         */
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared
        /**
         * Release action for shared mode -- signals successor and ensures
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         */
        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        // 喚醒頭節點
                        unparkSuccessor(h);
                    }
                    // 因為上一頭節點剛剛被設置為0,說明正在執行中,設置當前head為 PROPAGATE
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                // 即盡量只設置一個 head 節點即可
                // 除非在這期間發生變更
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
        /**
         * Acquires in shared mode, aborting if interrupted.  Implemented
         * by first checking interrupt status, then invoking at least once
         * {@link #tryAcquireShared}, returning on success.  Otherwise the
         * thread is queued, possibly repeatedly blocking and unblocking,
         * invoking {@link #tryAcquireShared} until success or the thread
         * is interrupted.
         * @param arg the acquire argument.
         * This value is conveyed to {@link #tryAcquireShared} but is
         * otherwise uninterpreted and can represent anything
         * you like.
         * @throws InterruptedException if the current thread is interrupted
         */
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly
        /**
         * Acquires in shared interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // 共享式鎖的傳播性質實現
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
        
        // java.util.concurrent.locks.AbstractQueuedSynchronizer#setHeadAndPropagate
        /**
         * Sets head of queue, and checks if successor may be waiting
         * in shared mode, if so propagating if either propagate > 0 or
         * PROPAGATE status was set.
         *
         * @param node the node
         * @param propagate the return value from a tryAcquireShared
         */
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            /*
             * Try to signal next queued node if:
             *   Propagation was indicated by caller,
             *     or was recorded (as h.waitStatus either before
             *     or after setHead) by a previous operation
             *     (note: this uses sign-check of waitStatus because
             *      PROPAGATE status may transition to SIGNAL.)
             * and
             *   The next node is waiting in shared mode,
             *     or we don't know, because it appears null
             *
             * The conservatism in both of these checks may cause
             * unnecessary wake-ups, but only when there are multiple
             * racing acquires/releases, so most need signals now or soon
             * anyway.
             */
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                // 遞歸進行喚醒下一線程節點,從而級聯喚醒
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    
        /**
         * Release action for shared mode -- signals successor and ensures
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         */
        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

      總結: 多個線程的喚醒,主要是使用了級聯喚醒的機制,在做共享鎖時,根據現有的情況,進行喚醒下一線程。而當線程調度很快或算法不確定時,就會給人一種所有線程一起被喚醒工作的效果。

    posted @ 2022-03-28 06:08  等你歸去來  閱讀(259)  評論(0編輯  收藏  舉報
    国产美女a做受大片观看