RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知(实例代码)

在第三方支付中,例如支付宝、或者微信,对于订单请求,第三方支付系统采用的是消息同步返回、异步通知+主动补偿查询的补偿机制。

由于互联网通信的不可靠性,例如双方网络、服务器、应用等因素的影响,不管是同步返回、异步通知、主动查询报文都可能出现超时无响应、报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制。
例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台、后台通知的,以后台异步通知结果为准),但由于前台跳转、后台结果通知都可能失效,因此还以定时补单+请求方主动查询接口作为辅助手段。

常见的补单操作,任务调度策略一般设定30秒、60秒、3分钟、6分钟、10分钟调度多次(以自己业务需要),如果调度接收到响应确认报文,补单成功,则中止对应订单的调度任务;如果超过补单上限次数,则停止补单,避免无谓的资源浪费。请求端随时可以发起请求报文查询对应订单的状态。在日常开发中,对于网站前端来说,支付计费中心对于订单请求信息的处理也是通过消息同步返回、异步通知+主动补偿查询相结合的机制,其中对于订单的异步通知,目前的通知策略为3s、30s、60s、120s、180、300s的阶梯性通知。返回成功情况下就不继续通知了,本来打算使用将失败的消息写到数据库等待发送,然后每秒查询数据库获取消息通知前端。但觉得这样的处理方式太粗暴。

存在以下缺点:

1 、每秒请求有点儿浪费资源;

2 、通知方式不稳定;

3 、无法承受大数据量等等

所以最终打算使用rabbitmq的消息延迟+死信队列来实现。消息模型如下:

producer发布消息,通过exchangeA的消息会被分发到QueueA,Consumer监听queueA,一旦有消息到来就被消费,这边的消费业务就是通知前端,如果通知失败,就创建一个延迟队列declareQueue,设置每个消息的ttl然后通过declare_exchange将消息分发到declare_queue,因为declare_queue没有consumer并且declare_queue中的消息设置了ttl,当ttl到期后,将通过DEX路由到queueA,被重新消费。代码如下:DeclareQueue.java

package org.delayQueue;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class DeclareQueue {
	public static String EXCHANGE_NAME = "notifyExchange";
	public static void init() {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		factory.setPort(5672);
		Connection connection = null;
		try {
			connection = factory.newConnection();
			Channel channel = connection.createChannel();
			channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
			String routingKey = "AliPaynotify";
			String message = "http://localhost:8080/BossCenter/payGateway/notifyRecv.jsp?is_success=T¬ify_id=4ab9bed148d043d0bf75460706f7774a¬ify_time=2014-08-29+16%3A22%3A02¬ify_type=trade_status_sync&out_trade_no=1421712120109862&total_fee=424.42&trade_no=14217121201098611&trade_status=TRADE_SUCCESS";
			channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
			System.out.println(" [x] Sent :" + message);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (Exception ignore) {
				}
			}
		}
	}
	public static void main(String args[]) {
		init();
}

DeclareConsumer.java

package org.delayQueue;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.DefaultHttpClient;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class DeclareConsumer {
	public static String EXCHANGE_NAME = "notifyExchange";
	public static String QU_declare_15S = "Qu_declare_15s";
	public static String EX_declare_15S = "EX_declare_15s";
	public static String ROUTINGKEY = "AliPaynotify";
	public static Connection connection = null;
	public static Channel channel = null;
	public static Channel DECLARE_15S_CHANNEL = null;
	public static String declare_queue = "init";
	public static String originalExpiration = "0";
	public static void init() throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		factory.setPort(5672);
		connection = factory.newConnection();
		channel = connection.createChannel();
		DECLARE_15S_CHANNEL = connection.createChannel();
	}
	public static void consume() {
		try {
			channel.exchangeDeclare(EXCHANGE_NAME, "topic");
			final String queueName = channel.queueDeclare().getQueue();
			channel.queueBind(queueName, EXCHANGE_NAME, ROUTINGKEY);
			System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
			final Consumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
					String message = new String(body, "UTF-8");
					Map<String, Object> headers = properties.getHeaders();
					if (headers != null) {
						List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
						System.out.println("xDeath--- > " + xDeath);
						if (xDeath != null && !xDeath.isEmpty()) {
							Map<String, Object> entrys = xDeath.get(0);
							// for(Entry<String, Object>
							// entry:entrys.entrySet()){
							// System.out.println(entry.getKey()+":"+entry.getValue());
							// }
							originalExpiration = entrys.get("original-expiration").toString();
						}
					}
					System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'" + "time" + System.currentTimeMillis());
					HttpClient httpClient = new DefaultHttpClient();
					HttpPost post = new HttpPost(message);
					HttpResponse response = httpClient.execute(post);
					BufferedReader inreader = null;
					if (response.getStatusLine().getStatusCode() == 200) {
						inreader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "UTF-8"));
						StringBuffer responseBody = new StringBuffer();
						String line = null;
						while ((line = inreader.readLine()) != null) {
							responseBody.append(line);
						if (!responseBody.equals("success")) {
							// putDeclre15s(message);
							if (originalExpiration.equals("0")) {
								putDeclreQueue(message, 3000, QU_declare_15S);
							}
							if (originalExpiration.equals("3000")) {
								putDeclreQueue(message, 30000, QU_declare_15S);
							if (originalExpiration.equals("30000")) {
								putDeclreQueue(message, 60000, QU_declare_15S);
							if (originalExpiration.equals("60000")) {
								putDeclreQueue(message, 120000, QU_declare_15S);
							if (originalExpiration.equals("120000")) {
								putDeclreQueue(message, 180000, QU_declare_15S);
							if (originalExpiration.equals("180000")) {
								putDeclreQueue(message, 300000, QU_declare_15S);
							if (originalExpiration.equals("300000")) {
//								channel.basicConsume(QU_declare_300S,true, this);
								System.out.println("finish notify");
					} else {
						System.out.println(response.getStatusLine().getStatusCode());
				}
			};
			channel.basicConsume(queueName, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
		}
	static Map<String, Object> xdeathMap = new HashMap<String, Object>();
	static List<Map<String, Object>> xDeath = new ArrayList<Map<String, Object>>();
	static Map<String, Object> xdeathParam = new HashMap<String, Object>();
	public static void putDeclre15s(String message) throws IOException {
		channel.exchangeDeclare(EX_declare_15S, "topic");
		Map<String, Object> args = new HashMap<String, Object>();
		args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange
		AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
		builder.expiration("3000").deliveryMode(2);// 设置消息TTL
		AMQP.BasicProperties properties = builder.build();
		channel.queueDeclare(QU_declare_15S, false, false, false, args);
		channel.queueBind(QU_declare_15S, EX_declare_15S, ROUTINGKEY);
		channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());
		System.out.println("send message in QA_DEFERRED_15S" + message + "time" + System.currentTimeMillis());
	public static void putDeclreQueue(String message, int mis, String queue) throws IOException {
		builder.expiration(String.valueOf(mis)).deliveryMode(2);// 设置消息TTL
		channel.queueDeclare(queue, false, false, false, args);
		channel.queueBind(queue, EX_declare_15S, ROUTINGKEY);
		System.out.println("send message in " + queue + message + "time============" + System.currentTimeMillis());
	public static void main(String args[]) throws Exception {
		init();
		consume();
}

消息通过dlx转发的情况下,header头部会带有x-death的一个数组,里面包含消息的各项属性,比如说消息成为死信的原因reason,original-expiration这个字段表示消息在原来队列中的过期时间,根据这个值来确定下一次通知的延迟时间应该是多少秒。运行结果如下:

到此这篇关于RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知的文章就介绍到这了,更多相关RabbitMQ 延迟队列实现订单支付内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • RabbitMQ 实现延迟队列的两种方式详解

    目录 1. 用插件 1.1 安装插件 1.2 消息收发 2. DLX 实现延迟队列 2.1 延迟队列实现思路 2.2 案例 3. 小结 定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如: 在电商项目中,当我们下单之后,一般需要

  • Rabbitmq延迟队列实现定时任务的方法

    场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行,也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了,所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列.基于优先

  • RabbitMQ死信机制实现延迟队列的实战

    目录 延迟队列 应用场景 Time To Live(TTL) Dead Letter Exchanges(DLX) 延迟队列 延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 应用场景 三方支付,扫码支付调用上游的扫码接口,当扫码有效期过后去调用查询接口查询结果.实现方式:每当一笔扫码支付请求后,立即将此订单号放入延迟队列中(RabbitMQ),队列过期时间为二维码有效期,此队列

  • RabbitMQ延迟队列及消息延迟推送实现详解

    这篇文章主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能. 12306 购票支付确认页面.我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 3

  • RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知(实例代码)

    在第三方支付中,例如支付宝.或者微信,对于订单请求,第三方支付系统采用的是消息同步返回.异步通知+主动补偿查询的补偿机制. 由于互联网通信的不可靠性,例如双方网络.服务器.应用等因素的影响,不管是同步返回.异步通知.主动查询报文都可能出现超时无响应.报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制.例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台.后台通知的,

  • 手把手带你掌握SpringBoot RabbitMQ延迟队列

    目录 1. 简介 2. 安装插件 3. 实现延迟队列 3.1 引入所需依赖 3.2 application.yaml 3.3 RabbitConfig 3.4 Producer 3.5 Consumer 3.6 测试代码 3.7 启动测试 1. 简介 我们在上一篇博文中遗留了一个小问题,就是虽然TTL + DLX能实现延迟队列的功能,但是有两个问题. 首先业务场景为:比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知. 通过给

  • Ajax异步上传文件实例代码分享

    非常不多说,直接给大家上干货,写的不好还请见谅. 具体代码如下所示: <!DOCTYPE html> <html> <head> <meta name="viewport" content="width=device-width" /> <script src="~/Scripts/jquery-1.8.2.min.js"></script> <title>Ind

  • JavaScript异步上传图片文件的实例代码

    html: <form action="url" enctype="multipart/form-data" id="myform" method="post"> <input accept="image/*" id="addfile" type="file" /> </form> jquery: $("#addfile&

  • Ajax表单异步上传文件实例代码(包括文件域)

    1.起因 做前台页面时,需要调用WebAPI的Post请求,发送一些字段和文件(相当于把表单通过ajax异步发送出去,得到返回结果),然后得到返回值判断是否成功. 2.尝试 先是尝试了一下 "jQuery Form Plugin" ,这玩意就是的巨大的坑,实现他和jquery1.9.2兼容性就不是太好,好不容易把$.browser的问题解决了,发现用他上传文件得不到返回值. $("#view").submit( $("#view").ajaxSu

  • C#实现rabbitmq 延迟队列功能实例代码

    最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就

  • 如何通过Python实现RabbitMQ延迟队列

    最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好.因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列.功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1.Time To Live(TTL)消息超时机制:2.Dead Letter Exchanges(DLX)死信队列.下面将具体描述实现原理以及实现代 延迟

  • vue+ElementUI实现订单页动态添加产品数据效果实例代码

    这两天学习了ElementUI基于vue2.0开发学习,这个知识点挺多的,而且很重要,所以,今天添加一点小笔记. 使用vue2.0(ElementUI基于vue2.0)+ElementUI(饿了么出品)实现的在订单页面动态添加产品的效果,并自动计算总价.代码直接保存为html文档,使用浏览器打开即可查看效果. 效果图: <html> <head> <meta charset="UTF-8"> <meta http-equiv="X-U

随机推荐