前面提到的各种BlockingQueue对读或者写都是锁上整个队列,在并发量大的时候,各种锁是比较耗资源和耗时间的,而前面的SynchronousQueue虽然不会锁住整个队列,但它是一个没有容量的“队列”,那么有没有这样一种队列,它即可以像其他的BlockingQueue一样有容量又可以像SynchronousQueue一样不会锁住整个队列呢?有!答案就是LinkedTransferQueue。
LinkedTransferQueue是基于链表的FIFO无界阻塞队列,它出现在JDK7中。Doug Lea 大神说LinkedTransferQueue是一个聪明的队列。它是ConcurrentLinkedQueue、SynchronousQueue (公平模式下)、无界的LinkedBlockingQueues等的超集。既然这么牛逼,那势必要弄清楚其中的原理了。
LinkedTransferQueue
看源码之前我们先稍微了解下它的原理,这样看源码就会有迹可循了。
LinkedTransferQueue采用一种预占模式。什么意思呢?有就直接拿走,没有就占着这个位置直到拿到或者超时或者中断。即消费者线程到队列中取元素时,如果发现队列为空,则会生成一个null节点,然后park住等待生产者。后面如果生产者线程入队时发现有一个null元素节点,这时生产者就不会入列了,直接将元素填充到该节点上,唤醒该节点的线程,被唤醒的消费者线程拿东西走人。是不是有点儿SynchronousQueue的味道?
结构
LinkedTransferQueue与其他的BlockingQueue一样,同样继承AbstractQueue类,但是它实现了TransferQueue,TransferQueue接口继承BlockingQueue,所以TransferQueue算是对BlockingQueue一种扩充,该接口提供了一整套的transfer接口:
相对于其他的BlockingQueue,LinkedTransferQueue就多了上面几个方法。这几个方法在LinkedTransferQueue中起到了核心作用。
LinkedTransferQueue定义的变量如下:
Node节点
Node节点由四个部分构成:
- isData:表示该节点是存放数据还是获取数据
- item:存放数据,isData为false时,该节点为null,为true时,匹配后,该节点会置为null
- next:指向下一个节点
- waiter:park住消费者线程,线程就放在这里
结构如下:
源码如下:
节点Node为LinkedTransferQueue的内部类,其内部结构和公平方式的SynchronousQueue差不多,里面也同样提供了一些很重要的方法。
put操作
LinkedTransferQueue提供了add、put、offer三类方法,用于将元素插入队列中,如下:
由于LinkedTransferQueue是无界的,不会阻塞,所以在调用xfer方法是传入的是ASYNC,同时直接返回true.
take操作
LinkedTransferQueue提供了poll、take方法用于出列元素:
这里和put操作有点不一样,take()方法传入的是SYNC,阻塞。poll()传入的是NOW,poll(long timeout, TimeUnit unit)则是传入TIMED。
tranfer操作
实现TransferQueue接口,就要实现它的方法:
xfer()
通过上面几个核心方法的源码我们清楚可以看到,最终都是调用xfer()方法,该方法接受四个参数,item或者null的E,put操作为true、take操作为false的havaData,how(有四个值NOW, ASYNC, SYNC, or TIMED,分别表示不同的操作),超时nanos。
整个算法的核心就是寻找匹配节点找到了就返回,否则就入队(NOW直接返回):
- matched。判断匹配条件(isData不一样,本身没有匹配),匹配后就casItem,然后unpark匹配节点的waiter线程,如果是reservation则叫人收货,data则叫null收货。
- unmatched。如果没有找到匹配节点,则根据传入的how来处理,NOW直接返回,其余三种先入对,入队后如果是ASYNC则返回,SYNC和TIMED则会阻塞等待匹配。
其实相当于SynchronousQueue来说,这个处理逻辑还是比较简单的。
如果没有找到匹配节点,且how != NOW会入队,入队则是调用tryAppend方法:
tryAppend方法是将S节点添加到tail上,然后返回其前驱节点。好吧,我承认这段代码我看的有点儿晕!!!
加入队列后,如果how还不是ASYNC则调用awaitMatch()方法阻塞等待:
整个awaitMatch过程和SynchronousQueue的awaitFulfill没有很大区别,不过在自旋过程会调用Thread.yield();这是干嘛?
在awaitMatch过程中,如果线程中断了,或者超时了则会调用unsplice()方法去除该节点:
主体流程已经完成,这里总结下:
- 无论是入对、出对,还是交换,最终都会跑到xfer(E e, boolean haveData, int how, long nanos)方法中,只不过传入的how不同而已
- 如果队列不为空,则尝试在队列中寻找是否存在与该节点相匹配的节点,如果找到则将匹配节点的item设置e,然后唤醒匹配节点的waiter线程。如果是reservation则叫人收货,data则叫null收货
- 如果队列为空,或者没有找到匹配的节点且how != NOW,则调用tryAppend()方法将节点添加到队列的tail,然后返回其前驱节点
- 如果节点的how != NOW && how != ASYNC,则调用awaitMatch()方法阻塞等待,在阻塞等待过程中和SynchronousQuque的awaitFulfill()逻辑差不多,都是先自旋,然后判断是否需要自旋,如果中断或者超时了则将该节点从队列中移出
实例
这段摘自JAVA 1.7并发之LinkedTransferQueue原理理解。感觉看完上面的源码后,在结合这个例子会有更好的了解,掌握。
1:Head->Data Input->Data
Match: 根据他们的属性 发现 cannot match ,因为是同类的
处理节点: 所以把新的data放在原来的data后面,然后head往后移一位,Reservation同理
HEAD=DATA->DATA
2:Head->Data Input->Reservation (取数据)
Match: 成功match,就把Data的item变为reservation的值(null,有主了),并且返回数据。
处理节点: 没动,head还在原地
HEAD=DATA(用过)
3:Head->Reservation Input->Data(放数据)
Match: 成功match,就把Reservation的item变为Data的值(有主了),并且叫waiter来取
处理节点: 没动
HEAD=RESERVATION(用过)