Java多线程 Producer and Consumer设计模式

目录
  • producer是生产者的意思:指生产数据的线程,
  • consumer是消费者的意思:指的是使用数据的线程
public class ProducerThread extends Thread {

    private final static Random random = new Random(System.currentTimeMillis());
    private final static AtomicInteger counter = new AtomicInteger(0);
    private final MessageQueue messageQueue;

    public ProducerThread(MessageQueue messageQueue, int seq) {
        super("Producer-" + seq);
        this.messageQueue = messageQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Message message = new Message("Message-" + counter.getAndIncrement());
                messageQueue.put(message);
                System.out.println(Thread.currentThread().getName() + " put message " + message.getData());
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                break;
            }
        }

    }
}
public class ConsumerThread extends Thread {

    private final static Random random = new Random(System.currentTimeMillis());
    private final MessageQueue messageQueue;

    public ConsumerThread(MessageQueue messageQueue, int seq) {
        super("Consumer-" + seq);
        this.messageQueue = messageQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Message message = messageQueue.take();
                System.out.println(Thread.currentThread().getName() + " take a message " + message.getData());
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                break;
            }
        }

    }

}
public class Message {

    public Message(String data) {
        this.data = data;
    }

    private String data;

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }

}
public class MessageQueue {
    private final static int DEFAULT_MAX_LIMIT = 100;
    private final LinkedList<Message> queue;
    private final int limit;

    public MessageQueue() {
        this(DEFAULT_MAX_LIMIT);
    }

    public MessageQueue(final int limit) {
        this.limit = limit;
        this.queue = new LinkedList<>();
    }

    public void put(final Message message) throws InterruptedException {
        synchronized (queue) {
            while (queue.size() > limit) {
                queue.wait();
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }

    public Message take() throws InterruptedException {
        synchronized (queue) {
            while (queue.isEmpty()) {
                queue.wait();
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }

    public int getMaxLimit() {
        return this.limit;
    }

    public int getMessageSize() {
        synchronized (queue) {
            return queue.size();
        }
    }
}
public class ProducerAndConsumerClient {

    public static void main(String[] args) {
        final MessageQueue messageQueue = new MessageQueue();

        new ProducerThread(messageQueue, 1).start();
        new ProducerThread(messageQueue, 2).start();
        new ProducerThread(messageQueue, 3).start();
        new ConsumerThread(messageQueue, 1).start();
        new ConsumerThread(messageQueue, 2).start();
    }
}

Producer-1 put message Message-0
Producer-3 put message Message-2
Producer-2 put message Message-1
Consumer-1 take a message Message-0
Consumer-2 take a message Message-1
Producer-2 put message Message-3
Consumer-1 take a message Message-2
Producer-2 put message Message-4
Consumer-2 take a message Message-3
Producer-3 put message Message-5
Producer-3 put message Message-6
Producer-3 put message Message-7
Consumer-1 take a message Message-4
Producer-2 put message Message-8
Consumer-2 take a message Message-5
Producer-3 put message Message-9
Producer-1 put message Message-10
Producer-1 put message Message-11
Producer-2 put message Message-12
省略...

到此这篇关于Java多线程 Producer and Consumer设计模式的文章就介绍到这了,更多相关Java多线程 Producer、Consumer内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java多线程之同步工具类CountDownLatch

    前言: CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行.例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行. 1 CountDownLatch主要方法 void await():如果当前count大于0,当前线程将会wait,直到count等于0或者中断. PS:当count等于0的时候,再去调用await() , 线程将不会阻塞,而是立即运行.后面可以通过源码分析得到. boolean await(long t

  • Java多线程之读写锁分离设计模式

    主要完成任务: 1.read read 并行化 2.read write 不允许 3.write write 不允许 public class ReaderWorker extends Thread { private final SharedData data; public ReaderWorker(SharedData data) { this.data = data; } @Override public void run() { while (true) { try { char[]

  • Java多线程 Guarded Suspension设计模式

    目录 1.Guarded Suspension模式的结构 2. Guarded Suspension模式的简单实现 前言: Guarded Suspension意为保护暂停,其核心思想是仅当服务进程准备好时,才提供服务.设想一种场景,服务器可能会在很短时间内承受大量的客户端请求,客户端请求的数量可能超过服务器本身的即时处理能力,而服务端程序又不能丢弃任何一个客户请求.此时,最佳的处理方案莫过于让客户端要求进行排队,由服务端程序一个接一个处理.这样,既保证了所有的客户端请求均不丢失,同时也避免了服

  • Java多线程之同步工具类CyclicBarrier

    目录 1 CyclicBarrier方法说明 2 CyclicBarrier实例 3 CyclicBarrier源码解析 CyclicBarrier构造函数 await方法 nextGeneration的源码 breakBarrier源码 isBroken方法 reset方法 getNumberWaiting方法 前言: CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到达到某个公共屏障点.与CountDownLatch不同的是该barrier在释放线程等待后可以重用,所以

  • Java多线程 自定义线程池详情

    主要介绍: 1.任务队列 2.拒绝策略(抛出异常.直接丢弃.阻塞.临时队列) 3.init( min ) 4.active 5.max min<=active<=max package chapter13; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; public class SimpleThreadPool { public final static DiscardPolicy

  • Java多线程之同步工具类Exchanger

    目录 1 Exchanger 介绍 2 Exchanger 实例 exchange等待超时 3 实现原理 1 Exchanger 介绍 前面分别介绍了CyclicBarrier.CountDownLatch.Semaphore,现在介绍并发工具类中的最后一个Exchange. Exchanger 是一个用于线程间协作的工具类,Exchanger用于进行线程间的数据交换,它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据.这两个线程通过exchange 方法交换数据,如果第一个线程先执行e

  • Java多线程之Future设计模式

    目录 Future -> 代表的是未来的一个凭据 AsynFuture -> Future具体实现类 FutureService -> 桥接Future和FutureTask FutureTask -> 将你的调用逻辑进行了隔离 Future -> 代表的是未来的一个凭据 public interface Future<T> { T get() throws InterruptedException; } AsynFuture -> Future具体实现类

  • Java多线程之Semaphore实现信号灯

    目录 1 Semaphore的主要方法 2 实例讲解 实现单例模式 3 源码解析 构造方法 获取许可 释放许可 减小许可数量 获取剩余许可数量 前言: Semaphore是计数信号量.Semaphore管理一系列许可证.每个acquire方法阻塞,直到有一个许可证可以获得然后拿走一个许可证:每个release方法增加一个许可证,这可能会释放一个阻塞的acquire方法.然而,其实并没有实际的许可证这个对象,Semaphore只是维持了一个可获得许可证的数量. Semaphore可以维护当前访问自

  • Java多线程之条件对象Condition

    1 简介 Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法. 不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的:而Condition是需要与"互斥锁"/"共享锁&

  • Java多线程 ThreadLocal原理解析

    目录 1.什么是ThreadLocal变量 2.ThreadLocal实现原理 3.内存泄漏问题 4.使用场景 1)存储用户Session 2)解决线程安全的问题 3)使用ThreadLocal重新设计一个上下文设计模式 4)ThreadLocal注意事项 脏数据 内存泄漏 父子线程共享线程变量 1.什么是ThreadLocal变量 ThreadLoal 变量,线程局部变量,同一个 ThreadLocal 所包含的对象,在不同的 Thread 中有不同的副本. 这里有几点需要注意: 因为每个 T

随机推荐