⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 小姐姐味道 「小姐姐养的狗」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

ArrayList和LinkedList有什么区别?

这种侮辱人的问题,默认就把这两者限定在了同一个场景之中,它甚至连八股文都算不上。

一旦你被问到这种问题,也证明面试基本上泡汤了--面试官已经实在是找不到其他问题与你交流了。

你Over了。

但当我们细看一下LinkedList的class定义,就会发现,它并不像是ArrayList的那样具有纯洁的列表精神。

public class ArrayList<E> extends AbstractList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable

//VS

public class LinkedList<E>
extends AbstractSequentialList<E>
implements List<E>, Deque<E>, Cloneable, java.io.Serializable

LinkedList除了能够当作普通的列表,它还是一个Deque。双向链表,听着就比较唬人,这就是一个既能当做队列、又能当做栈的存在。

有了这种双重Buff的叠加,LinkedList的应用场景比ArrayList丰富的多。除了能做最简单的LRU缓存,LinkedList在刷题的时候也是充满了正能量。

王者ConcurrentLinkedQueue,一个阻塞的双向队列,它的基本操作方法有:(3[基本]x2[异常与返回值]+4[阻塞加超时])x3[队头队尾]=5x2x3=30,足足有30个方法。看完上面的文章,这30个方法可以很快手到擒来。

不过我们今天要聊的一个重点,是使用Deque来实现更快的延迟队列。

延迟队列

如果你想要某些数据延迟一段时间再进行处理,或者要再某段时间内按照分组进行一些计算,那延迟队列无疑是非常合适的。

在Java的Concurrent包里,就静悄悄的躺着DelayQueue。只要你的数据实现了Delayed接口,那么就可以放心大胆的把它们往里面塞。

可惜的是,DelayQueue的底层存储,使用的是PriorityQueue。

PriorityQueue是堆实现的,offer和poll数据的时间复杂度是O(logN)。这就意味着,当DelayQueue中的数据比较多的时候,它的性能就会下降。

除了把数据分片,使用多个DelayQueue来完成工作,我们有没有速度更快的方法?比如把PriorityQueue使用LinkedList来替换?

这要看场景。

如果你向DelayQueue中添加数据,是与当前添加的时间相关的,那完全可以使用LinkedList来替换PriorityQueue。

关键代码

要了解DelayQueue的运行原理,我们可以需要先看一下Delayed接口。任何想要存储到DelayQueue中的数据,都需要实现这个接口。

其中,getDelay就是用来判断当前数据是否超时的方法。而compareTo,则是PriorityQueue用来排序的,如果我们是按照当前塞入数据的,则compareTo方法就不是必要的。

public long getDelay(@NotNull TimeUnit unit) {
return MAX_CACHE_DUAL + this.enqueueTimeNs - System.nanoTime();
}
public int compareTo(@NotNull Delayed o) {
long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return (d == 0) ? 0 : (d < 0 ? -1 : 1);
}

按照以上的思路,我们把DelayQueue的代码拷贝一份,仅保留关键代码,如下。

public class LightweightDelayedQueue<E extends Delayed> {
private final transient ReentrantLock lock = new ReentrantLock();
private final LinkedList<E> q = new LinkedList<>();
private final Condition available = lock.newCondition();
private Thread leader;

public void put(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (; ; ) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}

public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
for (E first;
n < maxElements
&& (first = q.peek()) != null
&& first.getDelay(NANOSECONDS) <= 0; ) {
c.add(first); // In this order, in case add() throws.
q.poll();
++n;
}
return n;
} finally {
lock.unlock();
}
}
}

主要方法

轻量级的延迟队列,如果一旦采用了LinkedList,那么它的入队、出队方法,就都变成了O(1)的时间复杂度。在延迟队列中的数据增加时,时间复杂度也能维持不变,可以说是速度快的连兔子都追不上了。

一般,在java中,put和take方法,都是代表阻塞性方法。比如,take方法,我们可以将其安全的阻塞在某个线程上,而不用担心太多的资源浪费。

final Thread thread = Thread.currentThread();
while (!this.close && !thread.isInterrupted()) {
Data data = q.take();
}

这一切都是Condition办到的,它和wait、notify的作用是一样的。

当我们通过put方法添加新的数据到队列中,会通过signal方法,来通知等待的线程获取数据。

相同的,如果take方法发现队列中的数据为空,它将进入等待状态。如果有数据,也并不是马上把这些数据取出来,因为数据还没到期。比如最老的数据还剩下5秒才到期,代码也对这种情况进行了处理,它会尝试awaitNanos对应的时间。

这样,我们就可以使用这种简单的轮询方式来实现延迟队列,而不需要单独的线程去检测队列中的数据。

增加take方法的效率

但是这样还不够。

当数据量比较大的时候,队列的数据可能有多条已经到期。如果我们通过take方法来一条一条获取的话,效率自然不如批量获取高。

drainTo方法,可以一股脑的把到期的数据转移到其他的集合中,但它并不是一个阻塞性的方法。

我们可以先使用take来阻塞线程,然后再批量取出所有数据。

下面代码,会一次性获取100条数据作为一个批量。

final Data takeItem = q.take();
final List<Data> buckets = new ArrayList<>(100);
q.drainTo(buckets, 99);
buckets.add(takeItem);

End

实际上,我们的某个业务,当采用LinkedList来替代PriorityQueue,并进行批量操作后,CPU的使用直接降低了1/3。

Deque是xjjdog最喜欢的一个接口。每当使用offerFirst、offerLast这样精准的API进行操作,都会体验到多巴胺的乐趣。LinkedList作为它的儿子,自然拥有了这些所有的功能。

当我们使用concurrent包里的基本API,对这些淳朴的工具进行封装,它们就会焕发出新的活力。

文章目录
  1. 1. 延迟队列
  2. 2. 关键代码
  3. 3. 主要方法
  4. 4. 增加take方法的效率
  5. 5. End