SynchronousQueue 源码解读

简介

SynchronousQueue 是 BlockingQueue 的一个实现,由于其特殊性,平时使用的比较少,主要在线程池的设计中会使用到。

引用 JDK SynchronousQueue 源码中的注释:

A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and poll() will return null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.
Synchronous queues are similar to rendezvous channels used in CSP and Ada. They are well suited for handoff designs, in which an object running in one thread must sync up with an object running in another thread in order to hand it some information, event, or task.
This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access in FIFO order.

可以看到

  • 每个写队列操作不会立即返回,而是必须等待另一个线程来取出对应元素,反之亦然,每个取队列元素操作也必须等待写队列操作。
  • 队列实际上不提供任何空间保存元素,所以无法操作 peek、iterate 等方法(会直接返回空)。
  • 队头表示第一个试图插入元素的线程,若没有这个排队线程,那么就没有元素可供取出。
  • 支持公平性策略的配置,用于对等待的生产者和消费者线程进行排序,默认非公平。

原理

首先看构造函数部分:

private transient volatile Transferer<E> transferer;

/**
 * Creates a {@code SynchronousQueue} with nonfair access policy.
 */
public SynchronousQueue() {
    this(false);
}

/**
 * Creates a {@code SynchronousQueue} with the specified fairness policy.
 *
 * @param fair if true, waiting threads contend in FIFO order for
 *        access; otherwise the order is unspecified.
 */
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

主要包含一个 Transferer ,其主要作用是转移元素:

abstract static class Transferer<E> {
        /**
         * Performs a put or take.
         *
         * @param e if non-null, the item to be handed to a consumer;
         *          if null, requests that transfer return an item
         *          offered by producer.
         * @param timed if this operation should timeout
         * @param nanos the timeout, in nanoseconds
         * @return if non-null, the item provided or received; if null,
         *         the operation failed due to timeout or interrupt --
         *         the caller can distinguish which of these occurred
         *         by checking Thread.interrupted.
         */
        abstract E transfer(E e, boolean timed, long nanos);
 }

SynchronousQueue 对元素的存取都依赖这个对象,如注释,transfer 方法第一个参数非空时,表示把对应的元素转移给消费者;否则,表示消费者等待生产者提供元素。后面的参数表示超时时间。

下面主要分析非公平场景,也即 TransferStack

static final class TransferStack<E> extends Transferer<E> {
    /*
     * This extends Scherer-Scott dual stack algorithm, differing,
     * among other ways, by using "covering" nodes rather than
     * bit-marked pointers: Fulfilling operations push on marker
     * nodes (with FULFILLING bit set in mode) to reserve a spot
     * to match a waiting node.
     */
        // Snodes 模式
    /* Modes for SNodes, ORed together in node fields */
        // 表示取元素
    /** Node represents an unfulfilled consumer */
    static final int REQUEST    = 0;
        // 表示存元素
    /** Node represents an unfulfilled producer */
    static final int DATA       = 1;
        // 表示节点正在等待另一个存/取元素节点,即正处于匹配中的节点
    /** Node is fulfilling another unfulfilled DATA or REQUEST */
    static final int FULFILLING = 2;

        // 判断节点是否有 FULFILLING 标志位(FULFILLING 为 0b10)
    /** Returns true if m has fulfilling bit set. */
    static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

    /** Node class for TransferStacks. */
    static final class SNode {
                // 下一个节点
        volatile SNode next;        // next node in stack
                // 与当前节点匹配的节点
        volatile SNode match;       // the node matched to this
                // 在等待的线程,即当前节点代表的线程
        volatile Thread waiter;     // to control park/unpark
                // 节点值
        Object item;                // data; or null for REQUESTs
                // 模式
        int mode;
        // Note: item and mode fields don't need to be volatile
        // since they are always written before, and read after,
        // other volatile/atomic operations.
                // item 和 mode 不需要 volatile 标志,原因是他们总是在其他 volatile/atomic 操作前写、后读

        SNode(Object item) {
            this.item = item;
        }

        boolean casNext(SNode cmp, SNode val) {
            return cmp == next &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        /**
         * Tries to match node s to this node, if so, waking up thread.
         * Fulfillers call tryMatch to identify their waiters.
         * Waiters block until they have been matched.
         *
         * @param s the node to match
         * @return true if successfully matched to s
         */
                // 尝试匹配,设置 match 节点并唤醒 waiter
                // 这里入参 s 封装的是当前调用方(匹配中)的线程,而 this Node 则是在栈中等待的节点
                // 所以匹配操作是把等待中的节点的 match 设置为当前线程
        boolean tryMatch(SNode s) {
            if (match == null &&
                UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                Thread w = waiter;
                if (w != null) {    // waiters need at most one unpark
                    waiter = null;
                    LockSupport.unpark(w);
                }
                return true;
            }
            return match == s;
        }

        /**
         * Tries to cancel a wait by matching node to itself.
         */
                // 将 match 设为自身节点表示 cancel 状态
        void tryCancel() {
            UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
        }

        boolean isCancelled() {
            return match == this;
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long matchOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = SNode.class;
                matchOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("match"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

    /** The head (top) of the stack */
        // 栈顶(链表头结点)
    volatile SNode head;

    boolean casHead(SNode h, SNode nh) {
        return h == head &&
        UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
    }

    /**
     * Creates or resets fields of a node. Called only from transfer
     * where the node to push on stack is lazily created and
     * reused when possible to help reduce intervals between reads
     * and CASes of head and to avoid surges of garbage when CASes
     * to push nodes fail due to contention.
     */
        // 插入新节点(头插)
    static SNode snode(SNode s, Object e, SNode next, int mode) {
        if (s == null) s = new SNode(e);
        s.mode = mode;
        s.next = next;
        return s;
    }
/**
 * Puts or takes an item.
 */
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
    /*
         * Basic algorithm is to loop trying one of three actions:
     * 本算法主要是循环下面三种逻辑
     *
     * 1. If apparently empty or already containing nodes of same
     *    mode, try to push node on stack and wait for a match,
     *    returning it, or null if cancelled.
         * 1. 如果是栈为空或存在相同模式节点,则尝试入栈,等待匹配线程(即入栈等待),若节点被取消则返回 null
     *
     * 2. If apparently containing node of complementary mode,
     *    try to push a fulfilling node on to stack, match
     *    with corresponding waiting node, pop both from
     *    stack, and return matched item. The matching or
     *    unlinking might not actually be necessary because of
     *    other threads performing action 3:
     * 2. 若存在互补节点,尝试将请求封装为 fulfilling 节点入栈,匹配成功则将对应的两个节点出栈。另外由于逻辑3的存在,匹配和断开链接(出栈)的操作可能不是必要的。
         * 
     * 3. If top of stack already holds another fulfilling node,
     *    help it out by doing its match and/or pop
     *    operations, and then continue. The code for helping
     *    is essentially the same as for fulfilling, except
     *    that it doesn't return the item.
         * 3. 如果栈顶已经存在匹配中的节点,那么帮助它完成匹配和/或出栈操作,之后继续循环执行123。
     */
    SNode s = null; // constructed/reused as needed
        // e 为空代表取操作,否则为存操作
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
                // 空栈或栈顶与当前请求是相同模式
        if (h == null || h.mode == mode) {  // empty or same-mode
                        // 如果设置超时且超时时间不大于0(即不等待)
            if (timed && nanos <= 0) {      // can't wait
                                // 弹出取消的头结点
                if (h != null && h.isCancelled()) 
                    casHead(h, h.next);     // pop cancelled node
                else
                                // 否则返回空
                    return null;
                        // 如果不超时或超时时间大于0,则将请求封装为节点入栈
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                                // 自旋或阻塞当前线程以等待对应的匹配节点
                SNode m = awaitFulfill(s, timed, nanos);
                                // 唤醒或超时后,返回自身节点,表示被取消,出栈并返回空
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                                // 栈顶不为空且栈顶结点的下一个节点是自身,则把头结点和自身出栈
                                // 原因是 awaitFulfill 返回要么是匹配到了、要么是取消了(超时也是取消),所以这里肯定是匹配到了,这里的判断就是把匹配的两个节点弹出
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                                // 根据请求类型,取:返回m节点元素(匹配到的元素),存:返回s即请求自带的元素
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill
                        // 进入上面这个else if,表示头结点和当前请求的模式不同,且头节点不在匹配中
                        // 头结点被取消,出栈
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
                        // 将当前节点入栈并标记为 FULFILLING (匹配中)节点
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                                // 循环直到匹配或没有等待节点
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                                        // 没有等待节点,若头节点(当前节点)的下一节点为空,说明其他节点都出栈了,没有可以匹配的节点了,则清空自身及头结点,重新进入外层循环(重新开始)
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                                        // 下面就是尝试匹配的逻辑,成功则出栈返回,否则继续尝试下一个节点
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                                                // 匹配s的下一节点失败了,帮助出栈,继续循环尝试下一个节点
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
                        // 到这里说明头结点正在匹配中,这里和上一个逻辑最后一样,目的是协助节点匹配
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}
/**
 * Spins/blocks until node s is matched by a fulfill operation.
 *
 * @param s the waiting node
 * @param timed true if timed wait
 * @param nanos timeout value
 * @return matched node, or s if cancelled
 */
// 自旋/阻塞等待出现可以匹配的节点或目标节点被取消
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    /*
     * When a node/thread is about to block, it sets its waiter
     * field and then rechecks state at least one more time
     * before actually parking, thus covering race vs
     * fulfiller noticing that waiter is non-null so should be
     * woken.
     *
     * When invoked by nodes that appear at the point of call
     * to be at the head of the stack, calls to park are
     * preceded by spins to avoid blocking when producers and
     * consumers are arriving very close in time.  This can
     * happen enough to bother only on multiprocessors.
     *
     * The order of checks for returning out of main loop
     * reflects fact that interrupts have precedence over
     * normal returns, which have precedence over
     * timeouts. (So, on timeout, one last check for match is
     * done before giving up.) Except that calls from untimed
     * SynchronousQueue.{poll/offer} don't check interrupts
     * and don't wait at all, so are trapped in transfer
     * method rather than calling awaitFulfill.
     */
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel();
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        else if (s.waiter == null)
                        // 设置 waiter 为当前线程
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
                        // 不允许超时,直接阻塞
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

以上是以栈逻辑实现的 Transferer ,看完发现整体逻辑其实按照注释来看就行,还比较清晰,但要以多线程的视角看还是比较复杂的。相比于队列实现来说,栈主要是以 LIFO 方式实现非公平。队列实现则是以 FIFO 描述公平的场景,有兴趣可以自行学习队列实现。

使用场景

SynchronousQueue **主要的使用场景在线程池,例如

  • JDK CachedThreadPool
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    }
    
  • Dubbo FixedThreadPool
    public class FixedThreadPool implements ThreadPool {
    
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    
    }
    

结合线程池源码对于队列的操作

  • 提交任务
    workQueue.offer(*command*)
    
    对应 SynchronousQueue:
    
    public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            // 有消费线程则入栈,没有线程资源则返回失败
            return transferer.transfer(e, true, 0) != null;
    }
    
  • 线程池线程取任务
    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
    
    对应 SynchronousQueue:
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            E e = transferer.transfer(null, true, unit.toNanos(timeout));
            if (e != null || !Thread.interrupted())
                    return e;
            throw new InterruptedException();
    }
    
    public E take() throws InterruptedException {
            E e = transferer.transfer(null, false, 0);
            if (e != null)
                    return e;
            Thread.interrupted();
            throw new InterruptedException();
     }
    // 两个方法都实现阻塞等待
    

那么为什么这些线程池要选用 SynchronousQueue 队列呢?

首先,CachedThreadPool 目的是对线程资源不敏感的场景,实现优先开启尽可能多的线程资源来执行任务,保证任务执行的效率;dubbo 的 FixedThreadPool 目的是也是尽可能满足吞吐量优先,保证任务要么被执行要么被丢弃。

二者的共同点都是为了满足高并发场景(任务数量比较多且希望优先执行)。

为什么不使用 LinkedBlockingQueue(1)ArrayBlockingQueue(1)

LBQ 或 ABQ (容量设置为1)看起来也能实现类似 Sync.Q 元素转移的能力,但是 Sync.Q 是专门用于传递元素的队列,并没有普通队列存储元素的能力。所以在转移元素的性能上,Sync.Q 要优于二者,毕竟他们都需要保存/取出元素。

在简单的单生产者-单消费者双核机器场景下,SynchronousQueue 的吞吐量比队列长度为 1 的 LinkedBlockingQueue 和 ArrayBlockingQueue 的吞吐量高 20 倍左右。

参考

implementation-of-blockingqueue-what-are-the-differences-between-synchronousque

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

Scroll to Top