Queue 存入 和 取出 是线程安全的吗?

作者&投稿:寇炕 (若有异议请与网页底部的电邮联系)
linkedblockingqueue是线程安全的么~

数据结构
链表节点
既然是链表,那么肯定少不了节点,节点自然包括节点内容和next指针。jdk开发人员,设计的节点是这样的:
static class Node {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node next;
Node(E x) { item = x; }
}


在这里,用到了java范型的机制,用来保存不同类型的对象。
上述节点,提供了一个构造函数,用来传入需要保存的内容,这里的构造函数没有判断传入参数是否合法,因为在所有public方法中,已经判断过了,这里无需进行再次判断。

链表的指针
LInkedBlockingQueue中的链表,包含头指针和尾指针,其中:
头指针用来管理元素出队,和 take(), poll(), peek() 三个操作关联
尾指针用来管理元素入队,和 put(), offer() 两个操作关联
具体的数据结构定义如下:
private transient Node head; /* 头结点, 头节点不保存数据信息 */
private transient Node last; /* 尾节点, 尾节点保存最新入队的数据信息 */


链表的容量和大小
LinkedBlockingQueue是有大小限制的,当队列满后不能继续入队,同时,也有一个变量记录当前队列中的元素数量:
private final int capacity; /* 队列容量一般使用中,构造LinkedBlockingQueue时,需要传入当前队列大小,如果不传入,默认是Integer.MAX_VALUE */
private final AtomicInteger count = new AtomicInteger(0); // 队列当前大小

注意:这里的count对象,是原子类型,而不是一般的int类型,与ArrayBlockingQueue中的不符,这是因为LinkedBlockingQueue使用读和写两把锁来控制并发操作,读和写可能同时修改count字段的值,而ArrayBlockingQueue只有一把锁用于控制读写操作,所以count对象是普通的,线程不安全的类型

控制并发的lock和condition
LinkedBlockingQueue中,读和写分别由两把锁控制,两把锁分别管理head节点和last节点的操作,如下所示:
private final ReentrantLock takeLock = new ReentrantLock(); /* 读锁 */
private final Condition notEmpty = takeLock.newCondition(); /* 读锁对应的条件 */
private final ReentrantLock putLock = new ReentrantLock(); /* 写锁 */
private final Condition notFull = putLock.newCondition(); /* 写锁对应的条件 */

jdk文档中,解释说,这两把锁的控制,是“two lock queue”算法的一种实现,但具体操作与其有些差异(A variant of the "two lock queue" algorithm.)
关于two lock queue可以参考:http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html

关键代码解读
入队和出队的核心操作
入队和出队的核心操作,就是对于链表头结点和尾节点的操作,与我们大学学习的数据结构基本一致。因为这些操作,都是private方法,外部已经进行了正确的同步,所以这些方法中,不带任何加锁和解锁的操作。
入队的代码如下所示:
private void enqueue(E x) {
last = last.next = new Node(x);
}
上述代码其实是将三行写成了一行,为了方便学习,这里把其拆开:
private void enqueue(E x) {
Node newNode = new Node(x); /* 新建一个Node对象,此对象的数据部分是新元素的指针,next指针为null */
last.next = newNode ; /* 将目前list节点的next指针指向新对象 */
last =last.next; /* 将last指针向后移动一个元素,指向新的尾端 */
}
出队的代码如下所示:
private E dequeue() {
Node h = head; /* 记录目前头节点的指针 */
Node first = h.next; /* 得到头结点后的第一个节点,即需要出队的数据节点 */
/* 将需要出队的数据的尾节点设置为自己,让此对象变为孤立对象,GC可以进行回收,更重要的是,如果 迭代器引用此节点,迭代器可以通过判断next是否等于自己,来了解迭代器的下一个节点是否应该重定向为头节点 */
h.next = h;
head = first; /* 将头指针指向新的头节点 */
E x = first.item; /* 获取数据元素的内容 */
/* 将头节点所在数据元素设置为null,因为头节点的数据已出队,如果此时再持有其引用,可能造成内存泄漏 */
first.item = null;
return x;
}


入队的public方法
入队的方法有两种,一种是阻塞的方法,另一种是非阻塞的方法,其中:
put()算法,为阻塞算法,直到队列有空余时,才能为队列加入新元素
offer()算法为非阻塞算法,如果队列已满,立即返回或等待一会再返回,通过返回值ture或false,标记本次入队操作是否成功
put的操作算法如下所示:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); /* 如果完成当前入队操作后,队列依然有剩余的空间,那么再唤醒另一个等待入队的线程 */
} finally {
putLock.unlock();
}
if (c == 0) /* 如果入队前,队列大小为空,那么唤醒一个等待出队的线程 */
signalNotEmpty();
}

offer的算法与put类似,这里不再赘述。

出队的public方法
出队的方法与入队类似,也分为阻塞和非阻塞两种,其中:
take()算法为阻塞算法,直到队列有非空时,才将允许调用线程取出数据
poll()算法为非阻塞算法,如果队列为空,立即返回或等待一会再返回,通过返回值ture或false,标记本次出队操作是否成功
peek()算法比较特殊,只返回队列中的第一个元素,既不出队,也不阻塞,如果没有元素,就返回null
task操作的算法如下:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal(); /* 如果完成当前入队操作后,队列依然有剩余的元素,那么再唤醒另一个等待出队的线程 */
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull(); /* 如果出队前,队列是满的,那么出队后,队列就空了,需要通知一个等待入队的线程 */
return x;
}

其余算法,与上述算法类似,这里不再赘述。

多线程安全的迭代器
LinkedBlockingQueue的迭代器,是多线程安全的,在获取元素之前,会对上述读锁和写锁同时加锁,同时,为了防止死锁,读锁和写锁的加解锁顺序,也是经过设计的,代码如下:
void fullyLock() {
putLock.lock(); // 先加写锁
takeLock.lock(); // 再加读锁
}

void fullyUnlock() {
takeLock.unlock(); /* 先解锁读锁 */
putLock.unlock(); /* 再解锁写锁 */
}

LinkedBlockingQueue的迭代器中,保存了以下内容:
private Node current; /* 迭代器的下一个位置 */
private Node lastRet; /* 当前迭代器的位置 */
private E currentElement; /* 当前需要返回的元素内容 */

刚看到代码时,觉得好像只要一个指向当前位置的指针就行了,干嘛这么麻烦呢,但JDK的开发人员考虑的比我们周全多了,这三个参数,足以应付任何多线程的问题:
首先,保存了当前需要返回的内容,可以保证在当前节点移除的情况下,迭代器的next()方法,也能返回当前指向的内容,即使先调用hasNext()方法,其他线程删除了当前对象,那么next()方法也可以保证返回正确对象
其次,如果在迭代器中,调用remove()方法,删除了当前对象,那么 lastRet方法就用上了,可以通过再次遍历列表,找到需要删除的对象,并将其删除,同时为了防止remove()方法被调用两次,在删除时,会将 lastRet设置为null,如果只有这一个指针,那么remove()之后,这个迭代器就啥也干不了了
最后,current保存了迭代器的下一个指向的位置,调用hasNext()时,可以立即直到是否还有空余对象,更重要的是,如果在迭代器创建后,其他线程多次调用了出队的方法,可能导致 lastRet和current都变成悬挂的指针了,这时,只要判断current的next是否为自己,就可以知道自己是否已经被出队,是否需要重定向current的位置
关于迭代器的代码精髓,就是上面的描述了,具体代码,不再赘述。

对锁的精巧使用和思考
LinkedBlockingQueue将读和写操作分离,可以让读写操作在不干扰对方的情况下,完成各自的功能,提高并发吞吐量。
在写这篇文章时,我曾经考虑过,如果使用java内置的同步机制,即 synchronized 关键字进行此类读写锁控制,但实际上实现不了,因为java对象在wait和notify时,需要对lock变量加锁,这样就失去了双锁的优势,同时会导致死锁。

防止内存泄漏
设计链表,最大的一点,就是不能出现内存泄漏。
LinkedBlockingQueue在这点上已经做的很优秀,每次移除节点,都将节点的内容字段设置为null,迭代器也是如此,确保不会发生内存泄漏。

  LinkedTransferQueue是一个聪明的队列,他是ConcurrentLinkedQueue,
  SynchronousQueue (in “fair” mode), and unbounded LinkedBlockingQueue的超集。
  LinkedTransferQueue实现了一个重要的接口TransferQueue,该接口含有下面几个重要方法:
  1. transfer(E e)
  若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。
  2. tryTransfer(E e)
  若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e;
  若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
  3. tryTransfer(E e, long timeout, TimeUnit unit)
  若当前存在一个正在等待获取的消费者线程,会立即传输给它; 否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉,
  若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
  4. hasWaitingConsumer()
  判断是否存在消费者线程
  5. getWaitingConsumerCount()
  获取所有等待获取元素的消费线程数量

Queue本身并不是线程安全的, 有两种方法来保证线程安全:
1. 手动加锁。
Queue myCollection = new Queue();
lock(myCollection.SyncRoot)
{
foreach (object item in myCollection)
{
// Insert your code here.
}
}

2. 调用Synchronized方法,这时候得到的就是一个线程安全的Queue
Queue mySyncdQ = Queue.Synchronized( myQ );


如皋市18546668363: Queue 存入 和 取出 是线程安全的吗? -
塞舍抗腮: Queue本身并不是线程安全的, 有两种方法来保证线程安全: 1. 手动加锁. Queue myCollection = new Queue(); lock(myCollection.SyncRoot) {foreach (object item in myCollection){// Insert your code here.} }2. 调用Synchronized方法,这时候得到的就是一个线程安全的Queue Queue mySyncdQ = Queue.Synchronized( myQ );

如皋市18546668363: c#中的queue是线程安全的吗 -
塞舍抗腮: 通过集合枚举在本质上不是一个线程安全的过程.甚至在对集合进行同步处理时,其他线程仍可以修改该集合,这会导致枚举数引发异常.若要在枚举过程中保证线程安全,可以在整个枚举过程中锁定集合,或者捕捉由于其他线程进行的更改而引发的异常.

如皋市18546668363: c# 指定线程执行 -
塞舍抗腮: 你上面这些语句本来没必要加锁的,lock仅在实际操作数据的时候才需要,举个例子,你上面的additem和removeitem本来合成合成一个方法的,比如就叫add_del_teim(Player player,boolean flag) { lock(obj) { if(flag) //增加 else //删除 } } 同样象上面...

如皋市18546668363: python基于queue多线程怎么退出不了 -
塞舍抗腮: 添加数据很容易.Queue本身就是有锁的.建议在主线程(主程序中)添加,并处理其它线程的监督和处理结果的收集. 难的是取数据的线程.这个线程在取不到数据时,应该循环取,直到获取程序退出的通知. 使用threading里的Thread类.Queue通过初始...

如皋市18546668363: python 的内置数据结构是线程安全的吗 -
塞舍抗腮: Queue模块提供了一个适用于多线程编程的先进先出数据结构,可以用来安全的传递多线程信息.它本身就是线程安全的,使用put和get来处理数据,不会产生对一个数据同时读写的问题,所以是安全的.

如皋市18546668363: java 中怎么使用queue -
塞舍抗腮: Queue接口与List、Set同一级别,都是继承了Collection接口.LinkedList实现了Queue接 口.Queue接口窄化了对LinkedList的方法的访问权限(即在方法中的参数类型如果是Queue时,就完全只能访问Queue接口所定义的方法 了,...

如皋市18546668363: stl中的queue怎么访问队列中某个元素 -
塞舍抗腮: 分析: queue:队列 特性:只能访问首尾元素 访问接口:push,pop,front,back,queue(复制构造函数),size,empty 结论: 只能访问首尾元素(访问方式看接口),中间元素无法访问 如果要访问中间元素,请用其他容器, vector、list等都可以

如皋市18546668363: python有1000个url请求 放到queue 再十个线程 这样输出的数据是乱的 必须加锁么 -
塞舍抗腮: 题主的问题表述不清,尤其十个线程做了哪些工作没说明,我从字面猜测,是把1000个url放在队列里,然后十个线程从队列中取出url,请求之后,调用输出函数.先说队列的问题,通常的设计里,queue只在添加元素,摘取元素的时候内部加锁...

如皋市18546668363: C++实现线程同步的几种方式 -
塞舍抗腮: 首先,互斥量这种线程相关的内容是平台相关的,我假设你用的是windows平台开发.其次,说明一下我的开发环境,vs2008,控制台程序,空的工程.最后给你贴代码,分文件来看.===头文件queuenode.h======你需要的节点数据可能不是...

本站内容来自于网友发表,不代表本站立场,仅表示其个人看法,不对其真实性、正确性、有效性作任何的担保
相关事宜请发邮件给我们
© 星空见康网