学会Pulsar Consumer的使用方式

目录
  • 1、使用前准备
  • 2、PulsarClient
  • 3、Producer
  • 4、Consumer
    • 4.1 第一次使用:
    • 4.2 第二次使用:
    • 4.3 第三次使用:
    • 4.4 第四次使用:
    • 4.5 第五次使用:
      • 重试机制源码分析
    • 4.6 第六次使用

1、使用前准备

引入依赖:

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.6.1</version>
</dependency>

2、PulsarClient

在尝试使用Producer和Consumer前,我们先讲一下Pulsar客户端,因为不管是Producer还是Consumer,都是依靠PulsarClient来创建的:

/**
 * Pulsar工具类
 * @author winfun
 **/
public class PulsarUtils {

    /**
     * 根据serviceUrl创建PulsarClient
     * @param serviceUrl 服务地址
     * @return 客户端
     * @throws PulsarClientException 异常
     */
    public static PulsarClient createPulsarClient(String serviceUrl) throws PulsarClientException {
        return PulsarClient.builder()
                .serviceUrl(serviceUrl)
                .build();
    }
}

我们这里简单使用,只借用ServiceUrl创建客户端,其实还有很多比较重要的参数,下面稍微列举一下:

  • ioThreads:Set the number of threads to be used for handling connections to brokers (default: 1 thread)
  • listenerThreads:Set the number of threads to be used for message listeners (default: 1 thread). 一条线程默认只为一个消费者服务
  • enableTcpNoDelay:No-delay features make sure packets are sent out on the network as soon as possible

3、Producer

Producer这里我们也先简单使用,只负责往指定Topic发送消息,其他功能不用,例如异步发送、延时发送等

/**
 * 初次使用Pulsar生产者,无任何封装
 * @author winfun
 **/
public class FirstProducerDemo {

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://127.0.0.1:6650")
                .build();

        ProducerBuilder<String> productBuilder = client.newProducer(Schema.STRING).topic("winfun/study/test-topic")
                .blockIfQueueFull(Boolean.TRUE).batchingMaxMessages(100).enableBatching(Boolean.TRUE).sendTimeout(3, TimeUnit.SECONDS);

        Producer<String> producer = productBuilder.create();
        for (int i = 0; i < 100; i++) {
            producer.send("hello"+i);;
        }
        producer.close();
    }
}

4、Consumer

下面我们将比较详细地介绍消费者的使用方式,因为这里能拓展的东西稍微多一点,下面开始使用旅程。

4.1 第一次使用:

我们利用PulsarClient创建Consumer;接着在死循环中利用Consumer#receive方法接收消息然后进行消费。

/**
 * 初次使用Pulsar消费者,无任何封装
 * @author winfun
 **/
@Slf4j
public class FirstConsumerDemo {

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://127.0.0.1:6650")
                .build();

        /**
         * The subscribe method will auto subscribe the consumer to the specified topic and subscription.
         * One way to make the consumer listen on the topic is to set up a while loop.
         * In this example loop, the consumer listens for messages, prints the contents of any received message, and then acknowledges that the message has been processed.
         * If the processing logic fails, you can use negative acknowledgement to redeliver the message later.
         */
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("winfun/study/test-topic")
                .subscriptionName("my-subscription")
                .ackTimeout(10, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Exclusive)
                .subscribe();
        // 死循环接收
        while (true){
            Message<String> message = consumer.receive();
            String msgContent = message.getValue();
            log.info("接收到消息: {}",msgContent);
            consumer.acknowledge(message);
        }
    }
}

4.2 第二次使用:

上面我们可以看到,我们是利用死循环来保证及时消费,但是这样会导致主线程;所以下面我们可以使用Pulsar提供的MessageListener,即监听器,当消息来了,会回调监听器指定的方法,从而避免阻塞主线程。

/**
 * 使用MessageListener,避免死循环代码&阻塞主线程
 * @author winfun
 **/
@Slf4j
public class SecondConsumerDemo {

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650");
        /**
         * If you don't want to block your main thread and rather listen constantly for new messages, consider using a MessageListener.
         *
         */
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("winfun/study/test-topic")
                .subscriptionName("my-subscription")
                .ackTimeout(10, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Exclusive)
                .messageListener((MessageListener<String>) (consumer1, msg) -> {

                    /**
                     * 当接收到一个新的消息,就会回调 MessageListener的receive方法。
                     * 消息将会保证按顺序投放到单个消费者的同一个线程,因此可以保证顺序消费
                     * 除非应用程序或broker崩溃,否则只会为每条消息调用此方法一次
                     * 应用程序负责调用消费者的确认方法来确认消息已经被消费
                     * 应用程序负责处理消费消息时可能出现的异常
                     */
                    log.info("接收到消息:{}",msg.getValue());
                    try {
                        consumer1.acknowledge(msg);
                    } catch (PulsarClientException e) {
                        e.printStackTrace();
                    }
                }).subscribe();
    }
}

4.3 第三次使用:

上面利用监听器来解决死循环代码和阻塞主线程问题;但是我们可以发现,每次消费都是单线程,当一个消息消费完才能进行下一个消息的消费,这样会导致消费效率非常的低;

如果如果追求高吞吐量,不在乎消息消费的顺序性,那么我们可以接入线程池;一有消息来就丢进线程池中,这样不但可以支持异步消费,还能保证消费的效率非常的高。

/**
 * MessageListener 内使用线程池进行异步消费
 * @author winfun
 **/
@Slf4j
public class ThirdConsumerDemo {

    public static void main(String[] args) throws PulsarClientException {

        Executor executor = new ThreadPoolExecutor(
                10,
                10,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100)
        );
        PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650");
        /**
         * If you don't want to block your main thread and rather listen constantly for new messages, consider using a MessageListener.
         *
         */
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("winfun/study/test-topic")
                .subscriptionName("my-subscription")
                .ackTimeout(10, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Exclusive)
                .messageListener((MessageListener<String>) (consumer1, msg) -> {
                    /**
                     * MessageListener还是保证了接收的顺序性
                     * 但是利用线程池进行异步消费后不能保证消费顺序性
                     */
                    executor.execute(() -> handleMsg(consumer1, msg));
                }).subscribe();
    }

    /**
     * 线程池异步处理
     * @param consumer 消费者
     * @param msg 消息
     */
    public static void handleMsg(Consumer consumer, Message msg){

        ThreadUtil.sleep(RandomUtil.randomInt(3),TimeUnit.SECONDS);
        log.info("接收到消息:{}",msg.getValue());
        try {
            consumer.acknowledge(msg);
        } catch (PulsarClientException e) {
            e.printStackTrace();
        }
    }
}

4.4 第四次使用:

我们可以发现,在上面的三个例子中,如果在调用Consumer#acknowledge方法前,因为代码问题导致抛异常了,我们是没有做处理的,那么会导致消费者会一直重试没有被确认的消息。

那么我们此时需要接入Pulsar提供的死信队列:当Consumer消费消息时抛异常,并达到一定的重试次数,则将消息丢入死信队列;但需要注意的是,单独使用死信队列,Consumer的订阅类型需要是 Shared/Key_Shared;否则不会生效。

/**
 * 超过最大重试次数,进入死信队列
 * @author: winfun
 **/
@Slf4j
public class FourthConsumerDemo {

    public static void main(String[] args) throws PulsarClientException {
        /**
         * 如果指定了死信队列策略,但是没指定死信队列
         * 死信队列:String.format("%s-%s-DLQ", topic, this.subscription)
         * 这里的this.subscription为上面指定的 subscriptionName。
         *
         * 一般在生产环境,会将pulsar的自动创建topic功能给关闭掉,所以上线前,记得先提工单创建指定的死信队列。
         *
         * 重点信息:
         *  如果是单单使用死信队列,subscriptionType为 Shared/Key_Shared,否则死信队列不生效。
         */
        PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650");
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("winfun/study/test-topic")
                .subscriptionName("my-subscription")
                .receiverQueueSize(100)
                .ackTimeout(1, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Key_Shared)
                .negativeAckRedeliveryDelay(1,TimeUnit.SECONDS)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        //可以指定最大重试次数,最大重试三次后,进入到死信队列
                        .maxRedeliverCount(3)
                        //可以指定死信队列
                        .deadLetterTopic("winfun/study/test-topic-dlq3")
                        .build())
                .messageListener((MessageListener<String>) (consumer1, msg) -> {
                    log.info("接收到队列「{}」消息:{}",msg.getTopicName(),msg.getValue());

                    if (msg.getValue().equals("hello3")) {
                        throw new RuntimeException("hello3消息消费失败!");
                    }else {
                        try {
                            consumer1.acknowledge(msg);
                        } catch (PulsarClientException e) {
                            e.printStackTrace();
                        }
                    }
                }).subscribe();
    }
}

4.5 第五次使用:

死信队列一般是不做消费的,我们会关注死信队列的情况,从而作出下一步的动作。
而且,一般做消息重试,我们不希望在原Topic中做重试,这样会影响原有消息的消费进度。

那么我们可以同时使用重试队列和死信队列。
当代码抛出异常时,我们可以捕获住,然后调用Consumer#reconsumeLater方法,将消息丢入重试队列;当消息重试指定次数后还无法正常完成消费,即会将消息丢入死信队列。

/**
 * 重试队列
 * @author winfun
 **/
@Slf4j
public class FifthConsumerDemo {

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650");
        /**
         * 注意点:
         * 1、使用死信策略,但是没有指定重试topic和死信topic名称
         *      死信队列:String.format("%s-%s-DLQ", topic, this.subscription)
         *      重试队列:String.format("%s-%s-RETRY", topic, this.subscription)
         *      这里的this.subscription为上面指定的 subscriptionName。
         *
         * 2、是否限制订阅类型
         *      同时开启重试队列和死信队列,不限制subscriptionType只能为Shared/Key_Shared;
         *      如果只是单独使用死信队列,需要限制subscriptionType为Shared
         *
         * 3、重试原理
         *      如果使用重试队列,需要保证 enableRetry 是开启的,否则调用 reconsumeLater 方法时会抛异常:org.apache.pulsar.client.api.PulsarClientException: reconsumeLater method not support!
         *      如果配置了重试队列,consumer会同时监听原topic和重试topic,consumer的实现类对应是:MultiTopicsConsumerImpl
         *      如果消费消息时调用了 reconsumeLater 方法,会将此消息丢进重试topic
         *      如果在重试topic重试maxRedeliverCount次后都无法正确ack消息,即将消息丢到死信队列。
         *      死信队列需要另起Consumer进行监听消费。
         *
         * 4、直接抛异常
         *      如果我们不是业务层面上调用 reconsumeLater 方法来进行重试,而是代码层面抛异常了,如果subscriptionType不为Shared/Key_Shared,消息无法进入重试队列和死信队列,是当前消费者无限在原topic进行消费。
         *      而如果如果subscriptionType为Shared/Key_Shared,则消息进行maxRedeliverCount次消费后,会直接进入到死信队列,此时不会用到重试队列。
         *      因此,重试队列是仅仅针对 reconsumeLater 方法的,而不针对异常的重试。
         */
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("winfun/study/test-retry-topic")
                .subscriptionName("my-subscription")
                .receiverQueueSize(100)
                .ackTimeout(1, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Exclusive)
                .negativeAckRedeliveryDelay(1,TimeUnit.SECONDS)
                .enableRetry(true)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                                          //可以指定最大重试次数,最大重试三次后,进入到死信队列
                                          .maxRedeliverCount(3)
                                          .retryLetterTopic("winfun/study/test-retry-topic-retry")
                                          //可以指定死信队列
                                          .deadLetterTopic("winfun/study/test-retry-topic-dlq")
                                          .build())
                .messageListener((MessageListener<String>) (consumer1, msg) -> {
                    log.info("接收到队列「{}」消息:{}",msg.getTopicName(),msg.getValue());

                    if (msg.getValue().equals("hello3")) {
                        try {
                            consumer1.reconsumeLater(msg,1,TimeUnit.SECONDS);
                        } catch (PulsarClientException e) {
                            e.printStackTrace();
                        }
                        //throw new RuntimeException("hello3消息消费失败!");
                    }else {
                        try {
                            consumer1.acknowledge(msg);
                        } catch (PulsarClientException e) {
                            e.printStackTrace();
                        }
                    }
                }).subscribe();
    }
}

重试机制源码分析

关于重试机制,其实是比较有意思的,下面我们会简单分析一下源码。

1.判断是否开启重试机制,如果没有开启重试机制,则直接抛异常

public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
    // 如果没开启重试机制,直接抛异常
    if (!this.conf.isRetryEnable()) {
        throw new PulsarClientException("reconsumeLater method not support!");
    } else {
        try {
            // 当然了,reconsumeLaterAsync里面也会判断是否开启重试机制
            this.reconsumeLaterAsync(message, delayTime, unit).get();
        } catch (Exception var7) {
            Throwable t = var7.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException)t;
            } else {
                throw new PulsarClientException(t);
            }
        }
    }
}

还有我们可以发现,pulsar很多方法是支持同步和异步的,而同步就是直接调用异步方法,再后调用get()方法进行同步阻塞等待即可。

2.调用 reconsumeLaterAsunc 方法,接着调用 get() 进行同步阻塞等待结果

public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) {
    if (!this.conf.isRetryEnable()) {
        return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!"));
    } else {
        try {
            return this.doReconsumeLater(message, AckType.Individual, Collections.emptyMap(), delayTime, unit);
        } catch (NullPointerException var6) {
            return FutureUtil.failedFuture(new InvalidMessageException(var6.getMessage()));
        }
    }
}

3.调用 doReconsumeLater 方法

我们知道,在 Pulsar 的 Consumer 中,可以支持多 Topic 监听,而如果我们加入了重试机制,默认是同个 Consumer 同时监听原队列和重试队列,所以 Consumer 接口的实现此时为 MultiTopicsConsumerImpl,而不是 ComsumerImpl。
那我们看看 MultiConsumerImpl 的 doReconsumeLater 是如何进行重新消费的:

protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType, Map<String, Long> properties, long delayTime, TimeUnit unit) {
    MessageId messageId = message.getMessageId();
    Preconditions.checkArgument(messageId instanceof TopicMessageIdImpl);
    TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl)messageId;
    if (this.getState() != State.Ready) {
        return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
    } else {
        MessageId innerId;
        if (ackType == AckType.Cumulative) {
            Consumer individualConsumer = (Consumer)this.consumers.get(topicMessageId.getTopicPartitionName());
            if (individualConsumer != null) {
                innerId = topicMessageId.getInnerMessageId();
                return individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit);
            } else {
                return FutureUtil.failedFuture(new NotConnectedException());
            }
        } else {
            ConsumerImpl<T> consumer = (ConsumerImpl)this.consumers.get(topicMessageId.getTopicPartitionName());
            innerId = topicMessageId.getInnerMessageId();
            return consumer.doReconsumeLater(message, ackType, properties, delayTime, unit).thenRun(() -> {
                this.unAckedMessageTracker.remove(topicMessageId);
            });
        }
    }
}
  • 首先判断客户端是否为准备状态
  • 接着判断 AckType 是累计的还是单独的,如果是累计的话,subscriptionType 一定要是 exclusive
  • 不管是累计还是单独的,最后都是调用 ConsumerImpl 的 doReconsumerLater 方法
protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType, Map<String, Long> properties, long delayTime, TimeUnit unit) {
    MessageId messageId = message.getMessageId();
    if (messageId instanceof TopicMessageIdImpl) {
        messageId = ((TopicMessageIdImpl)messageId).getInnerMessageId();
    }

    Preconditions.checkArgument(messageId instanceof MessageIdImpl);
    if (this.getState() != State.Ready && this.getState() != State.Connecting) {
        this.stats.incrementNumAcksFailed();
        PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + this.getState());
        if (AckType.Individual.equals(ackType)) {
            this.onAcknowledge(messageId, exception);
        } else if (AckType.Cumulative.equals(ackType)) {
            this.onAcknowledgeCumulative(messageId, exception);
        }

        return FutureUtil.failedFuture(exception);
    } else {
        if (delayTime < 0L) {
            delayTime = 0L;
        }
        // 如果 retryLetterProducer 为null,则尝试创建
        if (this.retryLetterProducer == null) {
            try {
                this.createProducerLock.writeLock().lock();
                if (this.retryLetterProducer == null) {
                    this.retryLetterProducer = this.client.newProducer(this.schema).topic(this.deadLetterPolicy.getRetryLetterTopic()).enableBatching(false).blockIfQueueFull(false).create();
                }
            } catch (Exception var28) {
                log.error("Create retry letter producer exception with topic: {}", this.deadLetterPolicy.getRetryLetterTopic(), var28);
            } finally {
                this.createProducerLock.writeLock().unlock();
            }
        }
        // 如果 retryLetterProcuder 不为空,则尝试将消息丢进重试队列中
        if (this.retryLetterProducer != null) {
            try {
                MessageImpl<T> retryMessage = null;
                String originMessageIdStr = null;
                String originTopicNameStr = null;
                if (message instanceof TopicMessageImpl) {
                    retryMessage = (MessageImpl)((TopicMessageImpl)message).getMessage();
                    originMessageIdStr = ((TopicMessageIdImpl)message.getMessageId()).getInnerMessageId().toString();
                    originTopicNameStr = ((TopicMessageIdImpl)message.getMessageId()).getTopicName();
                } else if (message instanceof MessageImpl) {
                    retryMessage = (MessageImpl)message;
                    originMessageIdStr = ((MessageImpl)message).getMessageId().toString();
                    originTopicNameStr = ((MessageImpl)message).getTopicName();
                }

                SortedMap<String, String> propertiesMap = new TreeMap();
                int reconsumetimes = 1;
                if (message.getProperties() != null) {
                    propertiesMap.putAll(message.getProperties());
                }

                // 如果包含 RECONSUMETIMES,则最递增
                if (propertiesMap.containsKey("RECONSUMETIMES")) {
                    reconsumetimes = Integer.valueOf((String)propertiesMap.get("RECONSUMETIMES"));
                    ++reconsumetimes;
                // 否则先加入「原始队列」和「原始messageId」信息
                } else {
                    propertiesMap.put("REAL_TOPIC", originTopicNameStr);
                    propertiesMap.put("ORIGIN_MESSAGE_IDY_TIME", originMessageIdStr);
                }
                // 加入重试次数信息
                propertiesMap.put("RECONSUMETIMES", String.valueOf(reconsumetimes));
                // 加入延时时间信息
                propertiesMap.put("DELAY_TIME", String.valueOf(unit.toMillis(delayTime)));
                TypedMessageBuilder typedMessageBuilderNew;
                // 判断是否超过最大重试次数,如果还未超过,则重新投放到重试队列
                if (reconsumetimes <= this.deadLetterPolicy.getMaxRedeliverCount()) {
                    typedMessageBuilderNew = this.retryLetterProducer.newMessage().value(retryMessage.getValue()).properties(propertiesMap);
                    if (delayTime > 0L) {
                        typedMessageBuilderNew.deliverAfter(delayTime, unit);
                    }

                    if (message.hasKey()) {
                        typedMessageBuilderNew.key(message.getKey());
                    }
                    // 发送延时消息
                    typedMessageBuilderNew.send();
                    // 确认当前消息
                    return this.doAcknowledge(messageId, ackType, properties, (TransactionImpl)null);
                }
                // 先忽略
                this.processPossibleToDLQ((MessageIdImpl)messageId);
                // 判断 deadLetterProducer 是否为null,如果为null,尝试创建
                if (this.deadLetterProducer == null) {
                    try {
                        if (this.deadLetterProducer == null) {
                            this.createProducerLock.writeLock().lock();
                            this.deadLetterProducer = this.client.newProducer(this.schema).topic(this.deadLetterPolicy.getDeadLetterTopic()).blockIfQueueFull(false).create();
                        }
                    } catch (Exception var25) {
                        log.error("Create dead letter producer exception with topic: {}", this.deadLetterPolicy.getDeadLetterTopic(), var25);
                    } finally {
                        this.createProducerLock.writeLock().unlock();
                    }
                }
                // 如果 deadLetterProducer 不为null
                if (this.deadLetterProducer != null) {
                    // 加入「原始队列」信息
                    propertiesMap.put("REAL_TOPIC", originTopicNameStr);
                    // 加入「原始MessageId」信息
                    propertiesMap.put("ORIGIN_MESSAGE_IDY_TIME", originMessageIdStr);
                    typedMessageBuilderNew = this.deadLetterProducer.newMessage().value(retryMessage.getValue()).properties(propertiesMap);
                    // 将消息内容发往死信队列中
                    typedMessageBuilderNew.send();
                    // 确认当前消息
                    return this.doAcknowledge(messageId, ackType, properties, (TransactionImpl)null);
                }
            } catch (Exception var27) {
                log.error("Send to retry letter topic exception with topic: {}, messageId: {}", new Object[]{this.deadLetterProducer.getTopic(), messageId, var27});
                Set<MessageId> messageIds = new HashSet();
                messageIds.add(messageId);
                this.unAckedMessageTracker.remove(messageId);
                this.redeliverUnacknowledgedMessages(messageIds);
            }
        }

        return CompletableFuture.completedFuture((Object)null);
    }
}

分析了一波,我们可以看到和上面代码的注释描述的基本一致。

4.6 第六次使用

上面我们提到,当Consumer指定了重试队列,Consumer会同时监听原Topic和重试Topic,那么如果我们想多个Consumer消费重试Topic时,需要将Consumer的订阅类型指定为 Shared/Key_Shared,让重试队列支持多Consumer监听消费,提升重试队列的消费效率。

/**
 * 重试队列-Shared
 * @author winfun
 **/
@Slf4j
public class SixthConsumerDemo {

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650");
        /**
         * 因为如果指定了重试策略,Consumer会同时监听「原队列」和「重试队列」
         * 即如果我们想「重试队列」可以让多个 Consumer 监听,从而提高消费能力,那么 Consumer 需指定为 Shared 模式。
         */
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("winfun/study/test-retry-topic")
                .subscriptionName("my-subscription")
                .receiverQueueSize(100)
                .ackTimeout(1, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Shared)
                .negativeAckRedeliveryDelay(1,TimeUnit.SECONDS)
                .enableRetry(true)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                                          //可以指定最大重试次数,最大重试三次后,进入到死信队列
                                          .maxRedeliverCount(3)
                                          .retryLetterTopic("winfun/study/test-retry-topic-retry")
                                          //可以指定死信队列
                                          .deadLetterTopic("winfun/study/test-retry-topic-dlq")
                                          .build())
                .messageListener((MessageListener<String>) (consumer1, msg) -> {
                    log.info("接收到队列「{}」消息:{}",msg.getTopicName(),msg.getValue());

                    if (msg.getValue().contains("1") || msg.getValue().contains("2") || msg.getValue().contains("3")) {
                        try {
                            consumer1.reconsumeLater(msg,1,TimeUnit.SECONDS);
                        } catch (PulsarClientException e) {
                            e.printStackTrace();
                        }
                        //throw new RuntimeException("hello3消息消费失败!");
                    }else {
                        try {
                            consumer1.acknowledge(msg);
                        } catch (PulsarClientException e) {
                            e.printStackTrace();
                        }
                    }
                }).subscribe();
    }
}

/**
 * 监听重试队列-Shared订阅模式
 * @author winfun
 **/
@Slf4j
public class RetryConsumerDemo {

    public static void main(String[] args) throws PulsarClientException {

        PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650");
        Consumer<String> deadLetterConsumer = client.newConsumer(Schema.STRING)
                .topic("winfun/study/test-retry-topic-retry")
                .subscriptionName("my-subscription2")
                .receiverQueueSize(100)
                .ackTimeout(1, TimeUnit.SECONDS)
                .subscriptionType(SubscriptionType.Shared)
                .messageListener((MessageListener<String>) (consumer1, msg) -> {
                    log.info("接收到队列「{}」消息:{}",msg.getTopicName(),msg.getValue());
                    try {
                        consumer1.acknowledge(msg);
                    } catch (PulsarClientException e) {
                        e.printStackTrace();
                    }
                }).subscribe();
    }
}

到此,我们已经将Consmuer的几种使用方式都尝试了一遍,可以说基本包含了常用的操作;但是我们可以发现,如果我们每次新建一个Consumer都需要写一堆同样的代码,那其实挺麻烦的,又不好看;并且,现在我们大部分项目都是基于 SpringBoot 来做的,而 SpringBoot 也没有一个比较大众的Starter。

所以接下来的计划就是,自己写一个编写一个关于Pulsar的SpringBoot Starter,这个组件不会特别复杂,但是会支持 Producer 和 Cousnmer 的自动配置,并且支持 Consumer 上面提到的几个点:MessageListener 监听、线程池异步并发消费、重试机制等。

到此这篇关于学会Pulsar Consumer的使用方式的文章就介绍到这了,更多相关Pulsar Consumer 使用内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖析

    简介 通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces.topics的元数据信息. pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink Maven <dependency> <groupId>io.streamnative.connectors</groupId> <artifactId>pu

  • Docker 部署单机版 Pulsar 和集群架构 Redis(开发神器)的方法

    一.前言: 现在互联网的技术架构中,不断出现各种各样的中间件,例如 MQ.Redis.Zookeeper,这些中间件在部署的时候一般都是以主从架构或者集群的架构来部署,公司一般都会在开发环境.测试环境和生产环境各部署一套. 当我们开发的时候,一般就会连着开发环境.但是呢,一般公司的开发环境都只能在内网使用,当我们回家了,除非公司提供有 VPN,不然就没办法使用了.有时候我们是有VPN了,但是开发起来还是很不方便.例如我们现在的 MQ 中间件使用的是 Pulsar,但是 Pulsar 的 tena

  • Java Predicate及Consumer接口函数代码实现解析

    Predicate函数编程 Predicate功能判断输入的对象是否符合某个条件.官方文档解释到:Determines if the input object matches some criteria. 了解Predicate接口作用后,在学习Predicate函数编程前,先看一下Java 8关于Predicate的源码: @FunctionalInterface public interface Predicate<T> { /** * Evaluates this predicate o

  • 学会Pulsar Consumer的使用方式

    目录 1.使用前准备 2.PulsarClient 3.Producer 4.Consumer 4.1 第一次使用: 4.2 第二次使用: 4.3 第三次使用: 4.4 第四次使用: 4.5 第五次使用: 重试机制源码分析 4.6 第六次使用 1.使用前准备 引入依赖: <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId>

  • 一文带你快速学会JDBC及获取连接的五种方式

    目录 1. JDBC基本介绍 2. JDBC快速入门 2.1 JDBC程序编写步骤 2.2 案例演示 2.2.1 前置工作,在数据库中建立对应表 2.2.2 前置工作,导入MySQL数据库的对应jar包 3. 相关类的介绍 3.1 Statement 3.2 ResultSet[结果集] 3.3 PreparedStatement 4. 关闭资源 5. 获取数据库连接的五种方式 方式一 方式二 方式三 方式四 方式五 总结 快速学会JDBC及获取连接的五种方式 1. JDBC基本介绍 JDBC为

  • SpringBoot整合Pulsar的实现示例

    目录 一.添加pom.xml依赖 二.Pulsar 参数类 三.Pulsar 配置类 四.不同消费数据类型的监听器 五.Pulsar的核心服务类 六.Pulsar整合Spring Cloud 一.添加pom.xml依赖 <parent>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-parent</artifactId>  

  • SpringCloud Nacos + Ribbon 调用服务的实现方式(两种)

    目录 1.代码方式调用 1.1创建服务提供者:Provider 1.2创建服务调用者:Consumer 2.注解方式调用 注解实现原理分析 总结 在 Nacos 中,服务调用主要是通过 RestTemplate + Ribbon 实现的,RestTemplate 是 Spring 提供的 Restful 请求实现类,而 Ribbon 是客户端负载均衡器,通过 Ribbon 可以获取服务实例的具体信息(IP 和端口号),之后再通过 RestTemplate 加服务实例的具体信息就可以完成一次服务调

  • 如何成为一名黑客全系列说明第1/2页

    什么是黑客? Jargon File 包含了一大堆关于"hacker"这个词的定义,大部分与技术高超和热衷解决问题及超越极限 有关.但如果你只想知道如何 成为 一名黑客,那么只有两件事情确实相关.这可以追溯到几十年前第一 台分时小型电脑诞生, ARPAnet 实验也刚展开的年代,那时有一个由程序设计专家和网络名人所组成的,  具有分享特点的文化社群.这种文化的成员创造了 "hacker" 这个名词.黑客们建立了 Internet.黑 客们发明出了现在使用的 UNIX

  • 如何成为一名黑客

    内容一览 为什么会有这份文档? 什么是黑客? 黑客应有的态度 黑客的基本技能 黑客文化中的地位 黑客和书呆子(Nerd)的联系 风格的意义 其它资源 FAQ(常问问题解答) 作为Jargon File的编辑和一些其他有名的类似性质文章的作者,我经常收到充满热情的网络新手的email提问(确实如此) "我如何才能成为一名出色的黑客?"非常奇怪的是似乎没有任何的FAQ或者Web形式的文档来说明这个十分重要的问题,因此我写了一份. 如果你现在读的是这份文档的离线拷贝,那么请注意当前最新版本(

  • JAVA的发展前景如何好不好自学

    Java前景如何?我负责任地说,Java非常有前景,因为使用Java的开发场景就非常非常多,可以说是多不胜数. 我刚参加工作的时候,使用Java开发网站应用,用JSP和Servlet,那时候J2EE已经被认为过重了,新的Java应用框架层出不穷,还用Java的Java Applet功能做过网页,当时就感觉Java这种语言非常全能. 后来,我去了Motorola,在Motorola开发一个手机平台,目标场景内是用Java开发手机应用,很不幸,这个项目中途夭折了(值得一提的是,这个项目解散之后不少美

  • 一篇文章学会两种将python打包成exe的方式

    目录 前言 详细步骤 图形窗口打包 总结 前言 python 可以做网站应用,也可以做客户端应用.但是客户端应用需要运行 py 脚本,如果用户不懂 python 就是一件比较麻烦的事情.幸好 pyton 有第三方模块可以将脚本可以转成 exe 执行. 有些人可能要问了既然可以做成网站,为啥还要做成客户端的,直接部署到服务器给客户不就可以了吗?小编的回答是当然是为了追小姐姐呀.在公司给小姐姐写点 python 脚本打包成 exe 减轻上班的工作量.再弄出点 bug,一来二去不就会产生故事了? py

  • 快速学会Dubbo的配置环境及相关配置

    目录 1. Dubbo相关概念 1.1 自动服务发现工作原理 2. 启动文件 2.1 zookeeper-3.4.11\bin\zkServer.cmd 2.2 zookeeper-3.4.11\bin\zkCli.cmd 2.3 java -jar dubbo-admin-0.0.1-SNAPSHOT.jar 2.4 dubbo-monitor-simple-2.0.0\assembly.bin\start.bat 3. 修改项目文件 3.1 在pom.xml文件里导入相关依赖 3.2 在ap

  • 学会在Java中使用Optional功能

    目录 前言 Nullity Optional Class 客户责任 null Optional Objects 重要方法 创建方法 of ofNullable empty 实例方法 isPresent&isEmpty get orElse系列 orElseThrow系列 ifPresent系列 map flatMap filter 何时使用 返回值 字段 参数 替代方案 null 空对象 例外情况 结论 前言 尽管存在争议,但Optiont极大地改进了Java应用程序的设计.在本文中,我们将了解

随机推荐