java并发包消息队列

    xiaoxiao2023-08-17  121

    消息队列常用于有生产者和消费者两类角色的多线程同步场景

     

    BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。

    主要的方法是:put、take一对阻塞存取;add、poll一对非阻塞存取。

             插入:

                       1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则抛出异常

            2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.

            3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续.

             读取:

            4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

            5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止

             其他

    int remainingCapacity();返回队列剩余的容量,在队列插入和获取的时候,不要瞎搞,数 据可能不准

    boolean remove(Object o); 从队列移除元素,如果存在,即移除一个或者更多,队列改    变了返回true

    public boolean contains(Object o); 查看队列是否存在这个元素,存在返回true

    int drainTo(Collection<? super E> c); 传入的集合中的元素,如果在队列中存在,那么将     队列中的元素移动到集合中

    int drainTo(Collection<? super E> c, int maxElements); 和上面方法的区别在于,制定了移   动的数量

    案例:

    package blockingqueue;

     

    import java.util.concurrent.BlockingQueue;

     

    public class Consumer implements Runnable {

        BlockingQueue<String> queue;

       

        public Consumer(BlockingQueue<String> queue) {

          this.queue = queue;

       }

       

       @Override

       public void run() {

          try {

             String consumer = Thread.currentThread().getName();

             System.out.println(consumer);

             //如果队列为空,会阻塞当前线程

             String temp = queue.take();

             System.out.println(consumer + "消费者  get a product:" + temp);

          } catch (Exception e) {

             e.printStackTrace();

          }

       }

     

    }

    package blockingqueue;

     

    import java.util.concurrent.BlockingQueue;

     

    public class Producer implements Runnable {

       BlockingQueue<String> queue;   

        public Producer(BlockingQueue<String> queue) { 

            this.queue = queue

        }   

        @Override 

        public void run() { 

            try

                String temp = "A Product, 生产线程:" 

                        + Thread.currentThread().getName(); 

                queue.put(temp);//如果队列是满的话,会阻塞当前线程 

                System.out.println("生产者 I have made a product: " 

                     + Thread.currentThread().getName());

            } catch (InterruptedException e) { 

                e.printStackTrace(); 

            } 

        }

    }

    package blockingqueue;

     

    import java.util.concurrent.BlockingQueue;

    import java.util.concurrent.LinkedBlockingQueue;

     

    public class Test {

     

       public static void main(String[] args) throws Exception {

          BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);

          // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

          // 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE

          // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

          Consumer consumer = new Consumer(queue);

          Producer producer = new Producer(queue);

          for (int i = 0; i < 3; i++) {

             new Thread(producer, "Producer" + (i + 1)).start();

          }

          for (int i = 0; i < 5; i++) {

             new Thread(consumer, "Consumer" + (i + 1)).start();

          }

         

          Thread.sleep(5000);

         

          new Thread(producer, "Producer" + (5)).start();

       }

    }

     

    BlockingQueue有四个具体的实现类,常用的两种实现类为:

     

    1、ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。

     

    2、LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。

             LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

     

    LinkedBlockingQueue和ArrayBlockingQueue区别:

     

    LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.

     

    生产者消费者的示例代码:

    见代码  TestBlockingQueue  TestBlockingQueueConsumer   TestBlockingQueueProducer

    package blockingqueue;

     

    import java.util.Random;

    import java.util.concurrent.BlockingQueue;

     

    public class TestBlockingQueueProducer implements Runnable {

       BlockingQueue<String> queue;

       Random random = new Random();

     

       public TestBlockingQueueProducer(BlockingQueue<String> queue) {

          this.queue = queue;

       }

     

       @Override

       public void run() {

     

          for (int i = 0; i < 10; i++) {

             try {

                Thread.sleep(random.nextInt(10));

                String task = Thread.currentThread().getName() + " made a product " + i;

     

                System.out.println(task);

                queue.put(task);  //阻塞方法

             } catch (InterruptedException e) {

                 

                e.printStackTrace();

             }

     

          }

       }

    }

    package blockingqueue;

     

    import java.util.Random;

    import java.util.concurrent.BlockingQueue;

     

    public class TestBlockingQueueConsumer implements Runnable {

       BlockingQueue<String> queue;

        Random random = new Random();

       

        public TestBlockingQueueConsumer(BlockingQueue<String> queue){ 

            this.queue = queue; 

        }       

        @Override 

        public void run() { 

            try { 

              Thread.sleep(random.nextInt(10));

              System.out.println(Thread.currentThread().getName()+ "trying...");

                String temp = queue.take();//如果队列为空,会阻塞当前线程 

                int remainingCapacity = queue.remainingCapacity();

                System.out.println(Thread.currentThread().getName() + " get a job " +temp);

                // System.out.println("队列中的元素个数: "+ remainingCapacity);

            } catch (InterruptedException e) { 

                e.printStackTrace();

            } 

        }

    }

    package blockingqueue;

     

    import java.util.concurrent.BlockingQueue;

    import java.util.concurrent.LinkedBlockingQueue;

     

    public class TestBlockingQueue {

     

       public static void main(String[] args) {

          BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);

          // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

          // 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE

          // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

          TestBlockingQueueConsumer consumer = new TestBlockingQueueConsumer(queue);

          TestBlockingQueueProducer producer = new TestBlockingQueueProducer(queue);

          for (int i = 0; i < 3; i++) {

             new Thread(producer, "Producer" + (i + 1)).start();

          }

          for (int i = 0; i < 5; i++) {

             new Thread(consumer, "Consumer" + (i + 1)).start();

          }

       }

    }

     

    最新回复(0)