目录
  1. 1、五种阻塞队列介绍
  2. 2、poll和peek的区别
  3. 3、LinkedBlockingQueue
    1. 3.1、功能
      1. 3.1.1、增加
      2. 3.1.2、删除
    2. 3.2、简单分析
      1. 3.2.1、节点与属性
      2. 3.2.2、插入线程与获取线程的相互通知
      3. 3.2.3、入队与出队操作
      4. 3.2.4、对两把锁的加锁与释放
    3. 3.3、源码解读
      1. 3.3.1、构造函数
      2. 3.3.2、offer(E e)
      3. 3.3.3、put(E e)
      4. 3.3.4、peek()
      5. 3.3.5、poll()
      6. 3.3.6、remove(Object o)
      7. 3.3.7、clear()
      8. 3.3.8、drainTo(Collection c)
阻塞队列之LinkedBlockingQueue分析

1、五种阻塞队列介绍

  • ArrayBlockingQueue
    有界队列,底层使用数组实现,并发控制使用ReentrantLock控制,不管是插入操作还是读取操作,都需要获取锁之后才能执行。
  • LinkedBlockingQueue
    底层基于单向链表实现,既可以当做有界队列,也可以当做无界队列使用。使用两个ReentrantLock实现并发控制:takelock和putlock。
  • SynchronousQueue
    底层使用单向链表实现,只有一个元素,同步的意思是一个写操作必须等到一个读操作之后才返回,指的是读写线程的同步。
  • PriorityBlockingQueue
    带排序的阻塞队列的实现,使用数组进行实现。并发控制使用ReentrantLock,队列为无界队列。
    有初始化参数指定队列大小,但是会自动扩容。使用最小堆来实现排序。
  • DelayedQueue
    DelayedQueue是使用PriorityBlockingQueue和Delayed实现的,内部定义了一个优先级队列,当调用offer的时候,把Delayed对象加入队列中,使用take先把first对象拿出来(peek),如果没有到达阈值,进行await处理。

2、poll和peek的区别

都用于取队列的头结点,poll会删除头结点,peek不会删除头结点。

3、LinkedBlockingQueue

  • 是先进先出队列FIFO。
  • 采用ReentrantLock保证线程安全

3.1、功能

3.1.1、增加

增加有三种方式,前提:队列满

方式 put add offer
特点 一直阻塞 抛异常 返回false
3.1.2、删除

删除有三种方式,前提:队列为空

方式 remove poll take
特点 NoSuchElementException 返回false 阻塞

3.2、简单分析

  • LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。
  • 基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。
  • 头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据,当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。这样做的好处在于,与计数器 count 结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。

LinkedBlockingQueue继承关系图.png

3.2.1、节点与属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//链表节点内部类
static class Node<E> {
//节点元素
E item;
Node<E> next;
Node(E x) {
item = x;
}
}
//容量界限,如果未设定,则为Integer最大值
private final int capacity;
//当前元素个数
private final AtomicInteger count = new AtomicInteger();
//链表的头:head.item == null
transient Node<E> head;
//链表的尾:last.next == null
private transient Node<E> last;
//take,poll等获取锁
private final ReentrantLock takeLock = new ReentrantLock();
//等待任务的等待队列
private final Condition notEmpty = takeLock.newCondition();
//put,offer等插入锁
private final ReentrantLock putLock = new ReentrantLock();
//等待插入的等待队列
private final Condition notFull = putLock.newCondition();
3.2.2、插入线程与获取线程的相互通知

signalNotEmpty()方法,在插入线程发现队列为空时调用,告知获取线程需要等待。
signalNotFull()方法,在获取线程发现队列已满时调用,告知插入线程需要等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//表示等待take。put/offer调用,否则通常不会锁定takeLock。
private void signalNotEmpty() {
//获取takeLock
final ReentrantLock takeLock = this.takeLock;
//锁定takeLock
takeLock.lock();
try {
//唤醒take线程等待队列
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
}
//表示等待put,take/poll 调用
private void signalNotFull() {
//获取putLock
final ReentrantLock putLock = this.putLock;
//锁定putLock
putLock.lock();
try {
//唤醒插入线程等待队列
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
}
3.2.3、入队与出队操作

enqueue()方法只能在持有 putLock 锁下执行,dequeue()在持有 takeLock 锁下执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//在队列尾部插入
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
//last.next指向当前node
//尾指针后移
last = last.next = node;
}
//移除队列头
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
//保存头指针
Node<E> h = head;
//获取当前链表第一个元素
Node<E> first = h.next;
//头指针的next指向自己
h.next = h; // help GC
//头指针指向第一个元素
head = first;
//获取第一个元素的值
E x = first.item;
//将第一个元素的值置空
first.item = null;
//返回第一个元素的值
return x;
}
3.2.4、对两把锁的加锁与释放

在需要对两把锁同时加锁时,把加锁的顺序与释放的顺序封装成方法,确保所有地方都是一致的。而且获取锁时都是不响应中断的,一直获取直到加锁成功,这就避免了第一把锁加锁成功,而第二把锁加锁失败导致锁不释放的风险。

1
2
3
4
5
6
7
8
9
10
//锁定putLock和takeLock
void fullyLock() {
putLock.lock();
takeLock.lock();
}
//与fullyLock的加锁顺序相反,先解锁takeLock,再解锁putLock
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

3.3、源码解读

简单介绍一下LinkedBlockingQueue中API的源码,如构造方法,新增,获取,删除,drainTo。

3.3.1、构造函数

LinkedBlockingQueue有三个构造方法,其中无参构造尽量少用,因为容量为Integer的最大值,操作不当会出现内存溢出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
//参数校验
if (capacity <= 0) throw new IllegalArgumentException();
//设置容量
this.capacity = capacity;
//首尾节点指向一个空节点
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
//获取putLock
final ReentrantLock putLock = this.putLock;
//锁定
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
3.3.2、offer(E e)

将给定的元素设置到队列中,如果设置成功返回true, 否则返回false。 e的值不能为空,否则抛出空指针异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//如果可以在不超过队列容量的情况下立即插入指定的元素到队列的尾部,成功后返回true,如果队列已满,返回false。当使用容量受限的队列时,此方法通常比方法BlockingQueue#add更可取,后者只能通过抛出异常才能插入元素。
public boolean offer(E e) {
//非空判断
if (e == null) throw new NullPointerException();
//计数器
final AtomicInteger count = this.count;
//如果队列已满,直接返回插入失败
if (count.get() == capacity)
return false;
int c = -1;
//新建节点
Node<E> node = new Node<E>(e);
//获取插入锁
final ReentrantLock putLock = this.putLock;
//锁定
putLock.lock();
try {
//如果队列未满
if (count.get() < capacity) {
//插入队列
enqueue(node);
//计数
c = count.getAndIncrement();
//还有空余空间
if (c + 1 < capacity)
//唤醒插入线程
notFull.signal();
}
} finally {
//解锁
putLock.unlock();
}
//如果队列为空
if (c == 0)
//通知获取线程阻塞
signalNotEmpty();
//返回成功或者插入失败
return c >= 0;
}
3.3.3、put(E e)

将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void put(E e) throws InterruptedException {
//不可以插入空元素
if (e == null) throw new NullPointerException();

//所有put/take/etc中的约定都是预先设置本地var
//除非设置,否则保持计数为负数表示失败。
int c = -1;
//新建节点
Node<E> node = new Node<E>(e);
//获取putLock
final ReentrantLock putLock = this.putLock;
//获取计数器
final AtomicInteger count = this.count;
//可中断加锁,即在锁获取过程中不处理中断状态,而是直接抛出中断异常,由上层调用者处理中断。
putLock.lockInterruptibly();
try {
/*
* 注意count在wait守卫线程中使用,即使它没有被锁保护。
* 这是因为count只能在此时减少(所有其他put都被锁定关闭),
* 如果它从容量更改,我们(或其他一些等待put)将收到信号。
* 类似地,count在其他等待守卫线程中的所有其他用途也是如此。
*/
//只要当前队列已满
while (count.get() == capacity) {
//通知插入线程等待
notFull.await();
}
//插入队列
enqueue(node);
//数量加1
c = count.getAndIncrement();
//如果队列增加1个元素还未满
if (c + 1 < capacity)
//唤醒插入进程
notFull.signal();
} finally {
//解锁
putLock.unlock();
}
//如果队列中没有元素了
if (c == 0)
//通知获取线程等待
signalNotEmpty();
}
3.3.4、peek()

非阻塞的获取队列中的第一个元素,不出队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public E peek() {
//队列为空,直接返回
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//获取第一个元素,非哨兵
Node<E> first = head.next;
//元素为空,返回null
if (first == null)
return null;
else
//返回第一个元素值
return first.item;
} finally {
takeLock.unlock();
}
}
3.3.5、poll()

非阻塞的获取队列中的值,未获取到返回null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E poll() {
final AtomicInteger count = this.count;
//队列为空,直接返回
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//队列非空,获取队列中元素
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
3.3.6、remove(Object o)

从队列中移除指定的值。将两把锁都锁定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public boolean remove(Object o) {
//不支持null
if (o == null) return false;
//锁定两个锁
fullyLock();
try {
//迭代队列
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//通过equals方法匹配待删除元素
if (o.equals(p.item)) {
//移除p节点
unlink(p, trail);
//成功
return true;
}
}
//失败
return false;
} finally {
//解锁
fullyUnlock();
}
}
// 将内部节点p与前一个跟踪断开连接
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
//p节点内容置空
p.item = null;
//trail节点的next指向p的next
trail.next = p.next;
//如果p是队尾
if (last == p)
//trail变为队尾
last = trail;
//如果队列已满
if (count.getAndDecrement() == capacity)
//通知插入线程阻塞
notFull.signal();
}
3.3.7、clear()

清空队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//原子性地从队列中删除所有元素。此调用返回后,队列将为空。
public void clear() {
//锁定
fullyLock();
try {
//清空数据,帮助垃圾回收
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
//如果容量为0
if (count.getAndSet(0) == capacity)
//唤醒插入线程
notFull.signal();
} finally {
//解锁
fullyUnlock();
}
}
3.3.8、drainTo(Collection c)

将队列中值,全部移除,并发设置到给定的集合中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public int drainTo(Collection<? super E> c, int maxElements) {
//各种判断
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
//锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//获取要转移的数量
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
//组装集合
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}

tencent.jpg

文章作者: ClawHub
文章链接: https://www.clawhub.club/posts/2019/12/18/%E9%AB%98%E5%B9%B6%E5%8F%91/%E9%98%BB%E5%A1%9E%E9%98%9F%E5%88%97%E4%B9%8BLinkedBlockingQueue%E5%88%86%E6%9E%90/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 ClawHub的博客
打赏
  • 微信
  • 支付宝
扫一扫关注ClawHub公众号,专注Java、技术分享、面试资源。