意见箱
恒创运营部门将仔细参阅您的意见和建议,必要时将通过预留邮箱与您保持联络。感谢您的支持!
意见/建议
提交建议

源码角度了解阻塞队列之SynchronousQueue

来源:恒创科技 编辑:恒创科技编辑部
2024-01-28 10:12:59
源码角度了解阻塞队列之SynchronousQueue

SynchronousQueue是一个同步队列,它没有任何的容量,插入操作都必须等待另一个线程的相应删除操作

从它的构造方法中我们可以看到,可以指定是否为公平的队列,如果是公平的使用队列,如果不是公平的,使用栈来存储

put()方法
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

我们可以看到它的put()方法没有什么逻辑,主要调用Transfererd的transfer()方法来放入元素


源码角度了解阻塞队列之SynchronousQueue

transfer()方法

Transfererd是一个抽象类,它的实现类有TransferQueue和TransferStack,分别是公平队列的实现和非公平队列的实现

我们先看一下TransferQueue的transfer()方法的实现逻辑

TransferQueue的transfer()方法
E transfer(E e, boolean timed, long nanos) {
    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read
                continue;
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            if (timed && nanos <= 0)        // can't wait
                return null;
            if (s == null)
                s = new QNode(e, isData);
            if (!t.casNext(null, s))        // failed to link in
                continue;

            advanceTail(t, s);              // swing tail and wait
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

代码有点长,我们分析一下:

isData代表是生产者模式还是消费者模式,如果传入的元素e是空的话显然是消费者模式 for循环,初始化QNode的头结点和尾结点,如果为空的话说明没有初始化好,继续for循环进行初始化 如果头尾节点相同或者是尾部节点模式和传入的模式相同的话,添加到尾部 新建节点,添加到尾部,然后调用awaitFulfill()方法进入阻塞状态 当线程唤醒发现在等待队列中第一个的时候,返回元素 如果模式不相同的话,比如来的是消费者,而等待队列全是生产者,这时候与队列中的第一个元素进行配的,配对成功出队列,同时调用LockSupport.unpark(m.waiter)来唤醒生产者 TransferStack的transfer()方法 同样判断是否为同种模式,如果是同种模式,入栈 阻塞 如果不是同种模式,进行配对一起出栈 take()方法

SynchronousQueue的take()方法也是调用transferer的transfer()方法来获取元素,然后返回元素

    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }
总结

SynchronousQueue有公平队列和非公平队列两种实现方式,公平队列采用队列实现先进先出,非公平队列采用栈实现,先进后出,它的最大特点就是生产者来了需要阻塞,当对应的消费者来之后匹配成功才会被唤醒,这个SynchronousQueue产生阻塞,效率不高,一般不怎么使用

至此,我们的阻塞队列的实现基本介绍了,有ArrayBlockingQueue、LinkedBlockingQueue、PriorityQueue、DelayQueue和SynchronousQueue

❤️ 感谢大家

如果你觉得这篇内容对你挺有有帮助的话:

欢迎关注我❤️,点赞
上一篇: #yyds干货盘点# 面试必刷TOP101:没有重复项数字的全排列 下一篇: 手机怎么远程登录云服务器?