Java编程rabbitMQ实现消息的收发

java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

本文不介绍amqp和rabbitmq相关知识,请自行网上查阅

本文是基于spring-rabbit中间件来实现消息的发送接受功能

see http://www.rabbitmq.com/tutorials/tutorial-one-java.html

see http://www.springsource.org/spring-amqp

Java编程通过操作rabbitMQ消息的收发实现代码如下:

<!-- for rabbitmq -->
 	<dependency>
		<groupId>com.rabbitmq</groupId>
		<artifactId>amqp-client</artifactId>
		<version>2.8.2</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.amqp</groupId>
		<artifactId>spring-amqp</artifactId>
		<version>1.1.1.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.amqp</groupId>
		<artifactId>spring-rabbit</artifactId>
		<version>1.1.1.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>com.caucho</groupId>
		<artifactId>hessian</artifactId>
		<version>4.0.7</version>
	</dependency>
 </dependencies>

首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象

public class EventMessage implements Serializable{
	private String queueName;
	private String exchangeName;
	private byte[] eventData;
	public EventMessage(String queueName, String exchangeName, byte[] eventData) {
		this.queueName = queueName;
		this.exchangeName = exchangeName;
		this.eventData = eventData;
	}
	public EventMessage() {
	}
	public String getQueueName() {
		return queueName;
	}
	public String getExchangeName() {
		return exchangeName;
	}
	public byte[] getEventData() {
		return eventData;
	}
	@Override
	public String toString() {
		return "EopEventMessage [queueName=" + queueName + ", exchangeName="
				+ exchangeName + ", eventData=" + Arrays.toString(eventData)
				+ "]";
	}
}

为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂

public interface CodecFactory {
	byte[] serialize(Object obj) throws IOException;
	Object deSerialize(byte[] in) throws IOException;
}

下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式

public class HessionCodecFactory implements CodecFactory {
	private final Logger logger = Logger.getLogger(HessionCodecFactory.class);
	@Override
	public byte[] serialize(Object obj) throws IOException {
		ByteArrayOutputStream baos = null;
		HessianOutput output = null;
		try {
			baos = new ByteArrayOutputStream(1024);
			output = new HessianOutput(baos);
			output.startCall();
			output.writeObject(obj);
			output.completeCall();
		} catch (final IOException ex) {
			throw ex;
		} finally {
			if (output != null) {
				try {
					baos.close();
				} catch (final IOException ex) {
					this.logger.error("Failed to close stream.", ex);
				}
			}
		}
		return baos != null ? baos.toByteArray() : null;
	}
	@Override
	public Object deSerialize(byte[] in) throws IOException {
		Object obj = null;
		ByteArrayInputStream bais = null;
		HessianInput input = null;
		try {
			bais = new ByteArrayInputStream(in);
			input = new HessianInput(bais);
			input.startReply();
			obj = input.readObject();
			input.completeReply();
		} catch (final IOException ex) {
			throw ex;
		} catch (final Throwable e) {
			this.logger.error("Failed to decode object.", e);
		} finally {
			if (input != null) {
				try {
					bais.close();
				} catch (final IOException ex) {
					this.logger.error("Failed to close stream.", ex);
  }
  }
 }
 return obj;
	}
}

接下来就先实现发送功能,新增一个接口专门用来实现发送功能

public interface EventTemplate {
	void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;
	void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;
}

SendRefuseException是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为EventMessage

public class DefaultEventTemplate implements EventTemplate {
	private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);
	private AmqpTemplate eventAmqpTemplate;
	private CodecFactory defaultCodecFactory;
//	private DefaultEventController eec;
//	public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,
//			CodecFactory defaultCodecFactory, DefaultEventController eec) {
//		this.eventAmqpTemplate = eopAmqpTemplate;
//		this.defaultCodecFactory = defaultCodecFactory;
//		this.eec = eec;
//	}
	public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {
		this.eventAmqpTemplate = eopAmqpTemplate;
		this.defaultCodecFactory = defaultCodecFactory;
	}
	@Override
	public void send(String queueName, String exchangeName, Object eventContent)
			throws SendRefuseException {
		this.send(queueName, exchangeName, eventContent, defaultCodecFactory);
	}
	@Override
	public void send(String queueName, String exchangeName, Object eventContent,
			CodecFactory codecFactory) throws SendRefuseException {
		if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {
			throw new SendRefuseException("queueName exchangeName can not be empty.");
		}
//		if (!eec.beBinded(exchangeName, queueName))
//			eec.declareBinding(exchangeName, queueName);
		byte[] eventContentBytes = null;
		if (codecFactory == null) {
			if (eventContent == null) {
				logger.warn("Find eventContent is null,are you sure...");
			} else {
				throw new SendRefuseException(
						"codecFactory must not be null ,unless eventContent is null");
			}
		} else {
			try {
				eventContentBytes = codecFactory.serialize(eventContent);
			} catch (IOException e) {
				throw new SendRefuseException(e);
			}
		}
		// 构造成Message
		EventMessage msg = new EventMessage(queueName, exchangeName,
				eventContentBytes);
		try {
			eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg);
		} catch (AmqpException e) {
			logger.error("send event fail. Event Message : [" + eventContent + "]", e);
			throw new SendRefuseException("send event fail", e);
		}
	}
}

注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类

public interface EventProcesser {
	public void process(Object e);
}

为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器

/**
 * MessageListenerAdapter的Pojo
 * <p>消息处理适配器,主要功能:</p>
 * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来</p>
 * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p>
 *
 */
public class MessageAdapterHandler {
	private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);
	private ConcurrentMap<String, EventProcessorWrap> epwMap;
	public MessageAdapterHandler() {
		this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>();
	}
	public void handleMessage(EventMessage eem) {
		logger.debug("Receive an EventMessage: [" + eem + "]");
		// 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值
		if (eem == null) {
			logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled.");
			return;
		}
		if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {
			logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled.");
			return;
		}
		// 解码,并交给对应的EventHandle执行
		EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());
		if (eepw == null) {
			logger.warn("Receive an EopEventMessage, but no processor can do it.");
			return;
		}
		try {
			eepw.process(eem.getEventData());
		} catch (IOException e) {
			logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e);
			return;
		}
	}
	protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) {
		if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {
			throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");
		}
		EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);
		EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw);
		if (oldProcessorWrap != null) {
			logger.warn("The processor of this queue and exchange exists, and the new one can't be add");
		}
	}
	protected Set<String> getAllBinding() {
		Set<String> keySet = epwMap.keySet();
		return keySet;
	}
	protected static class EventProcessorWrap {
		private CodecFactory codecFactory;
		private EventProcesser eep;
		protected EventProcessorWrap(CodecFactory codecFactory,
				EventProcesser eep) {
			this.codecFactory = codecFactory;
			this.eep = eep;
		}
		public void process(byte[] eventData) throws IOException{
			Object obj = codecFactory.deSerialize(eventData);
			eep.process(obj);
		}
	}
}

这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息

public class MessageErrorHandler implements ErrorHandler{
	private static final Logger logger = Logger.getLogger(MessageErrorHandler.class);
	@Override
	public void handleError(Throwable t) {
		logger.error("RabbitMQ happen a error:" + t.getMessage(), t);
	}
}

接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息

public class EventControlConfig {
	private final static int DEFAULT_PORT = 5672;
	private final static String DEFAULT_USERNAME = "guest";
	private final static String DEFAULT_PASSWORD = "guest";
	private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;
	private static final int PREFETCH_SIZE = 1;
	private String serverHost ;
	private int port = DEFAULT_PORT;
	private String username = DEFAULT_USERNAME;
	private String password = DEFAULT_PASSWORD;
	private String virtualHost;
	/**
	 * 和rabbitmq建立连接的超时时间
	 */
	private int connectionTimeout = 0;
	/**
	 * 事件消息处理线程数,默认是 CPU核数 * 2
	 */
	private int eventMsgProcessNum;
	/**
	 * 每次消费消息的预取值
	 */
	private int prefetchSize;

	public EventControlConfig(String serverHost) {
		this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory());
	}
	public EventControlConfig(String serverHost, int port, String username,
			String password, String virtualHost, int connectionTimeout,
			int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) {
		this.serverHost = serverHost;
		this.port = port>0?port:DEFAULT_PORT;
		this.username = username;
		this.password = password;
		this.virtualHost = virtualHost;
		this.connectionTimeout = connectionTimeout>0?connectionTimeout:0;
		this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM;
		this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE;
	}
	public String getServerHost() {
		return serverHost;
	}
	public int getPort() {
		return port;
	}
	public String getUsername() {
		return username;
	}
	public String getPassword() {
		return password;
	}
	public String getVirtualHost() {
		return virtualHost;
	}
	public int getConnectionTimeout() {
		return connectionTimeout;
	}
	public int getEventMsgProcessNum() {
		return eventMsgProcessNum;
	}
	public int getPrefetchSize() {
		return prefetchSize;
	}
}

具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信

public interface EventController {
	/**
	 * 控制器启动方法
	 */
	void start();
	/**
	 * 获取发送模版
	 */
	EventTemplate getEopEventTemplate();
	/**
	 * 绑定消费程序到对应的exchange和queue
	 */
	EventController add(String queueName, String exchangeName, EventProcesser eventProcesser);
	/*in map, the key is queue name, but value is exchange name*/
	EventController add(Map<String,String> bindings, EventProcesser eventProcesser);
}

它的实现类如下:

/**
 * 和rabbitmq通信的控制器,主要负责:
 * <p>1、和rabbitmq建立连接</p>
 * <p>2、声明exChange和queue以及它们的绑定关系</p>
 * <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p>
 * <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p>
 * @author yangyong
 *
 */
public class DefaultEventController implements EventController {
	private CachingConnectionFactory rabbitConnectionFactory;
	private EventControlConfig config;
	private RabbitAdmin rabbitAdmin;
	private CodecFactory defaultCodecFactory = new HessionCodecFactory();
	private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container
	private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler();
	private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定
	//queue cache, key is exchangeName
	private Map<String, DirectExchange> exchanges = new HashMap<String,DirectExchange>();
	//queue cache, key is queueName
	private Map<String, Queue> queues = new HashMap<String, Queue>();
	//bind relation of queue to exchange cache, value is exchangeName | queueName
	private Set<String> binded = new HashSet<String>();
	private EventTemplate eventTemplate; // 给App使用的Event发送客户端
	private AtomicBoolean isStarted = new AtomicBoolean(false);
	private static DefaultEventController defaultEventController;
	public synchronized static DefaultEventController getInstance(EventControlConfig config){
		if(defaultEventController==null){
			defaultEventController = new DefaultEventController(config);
		}
		return defaultEventController;
	}
	private DefaultEventController(EventControlConfig config){
		if (config == null) {
			throw new IllegalArgumentException("Config can not be null.");
		}
		this.config = config;
		initRabbitConnectionFactory();
		// 初始化AmqpAdmin
		rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
		// 初始化RabbitTemplate
		RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
		rabbitTemplate.setMessageConverter(serializerMessageConverter);
		eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this);
	}
	/**
	 * 初始化rabbitmq连接
	 */
	private void initRabbitConnectionFactory() {
		rabbitConnectionFactory = new CachingConnectionFactory();
		rabbitConnectionFactory.setHost(config.getServerHost());
		rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum());
		rabbitConnectionFactory.setPort(config.getPort());
		rabbitConnectionFactory.setUsername(config.getUsername());
		rabbitConnectionFactory.setPassword(config.getPassword());
		if (!StringUtils.isEmpty(config.getVirtualHost())) {
			rabbitConnectionFactory.setVirtualHost(config.getVirtualHost());
		}
	}
	/**
	 * 注销程序
	 */
	public synchronized void destroy() throws Exception {
		if (!isStarted.get()) {
			return;
		}
		msgListenerContainer.stop();
		eventTemplate = null;
		rabbitAdmin = null;
		rabbitConnectionFactory.destroy();
	}
	@Override
	public void start() {
		if (isStarted.get()) {
			return;
		}
		Set<String> mapping = msgAdapterHandler.getAllBinding();
		for (String relation : mapping) {
			String[] relaArr = relation.split("\\|");
			declareBinding(relaArr[1], relaArr[0]);
		}
		initMsgListenerAdapter();
		isStarted.set(true);
	}
	/**
	 * 初始化消息监听器容器
	 */
	private void initMsgListenerAdapter(){
		MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter);
		msgListenerContainer = new SimpleMessageListenerContainer();
		msgListenerContainer.setConnectionFactory(rabbitConnectionFactory);
		msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
		msgListenerContainer.setMessageListener(listener);
		msgListenerContainer.setErrorHandler(new MessageErrorHandler());
		msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 设置每个消费者消息的预取值
		msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum());
		msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数
		msgListenerContainer.setQueues(queues.values().toArray(new Queue[queues.size()]));
		msgListenerContainer.start();
	}
	@Override
	public EventTemplate getEopEventTemplate() {
		return eventTemplate;
	}
	@Override
	public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) {
		return add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
	}

	public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) {
		msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
		if(isStarted.get()){
			initMsgListenerAdapter();
		}
		return this;
	}
	@Override
	public EventController add(Map<String, String> bindings,
			EventProcesser eventProcesser) {
		return add(bindings, eventProcesser,defaultCodecFactory);
	}

	public EventController add(Map<String, String> bindings,
			EventProcesser eventProcesser, CodecFactory codecFactory) {
		for(Map.Entry<String, String> item: bindings.entrySet())
			msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory);
		return this;
	}
	/**
	 * exchange和queue是否已经绑定
	 */
	protected boolean beBinded(String exchangeName, String queueName) {
		return binded.contains(exchangeName+"|"+queueName);
	}
	/**
	 * 声明exchange和queue已经它们的绑定关系
	 */
	protected synchronized void declareBinding(String exchangeName, String queueName) {
		String bindRelation = exchangeName+"|"+queueName;
		if (binded.contains(bindRelation)) return;

		boolean needBinding = false;
		DirectExchange directExchange = exchanges.get(exchangeName);
		if(directExchange == null) {
			directExchange = new DirectExchange(exchangeName, true, false, null);
			exchanges.put(exchangeName, directExchange);
			rabbitAdmin.declareExchange(directExchange);//声明exchange
			needBinding = true;
		}
		Queue queue = queues.get(queueName);
		if(queue == null) {
			queue = new Queue(queueName, true, false, false);
			queues.put(queueName, queue);
			rabbitAdmin.declareQueue(queue);	//声明queue
			needBinding = true;
		}
		if(needBinding) {
			Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange
			rabbitAdmin.declareBinding(binding);//声明绑定关系
			binded.add(bindRelation);
		}
	}
}

搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO

@SuppressWarnings("serial")
public class People implements Serializable{
	private int id;
	private String name;
	private boolean male;
	private People spouse;
	private List<People> friends;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public boolean isMale() {
		return male;
	}
	public void setMale(boolean male) {
		this.male = male;
	}
	public People getSpouse() {
		return spouse;
	}
	public void setSpouse(People spouse) {
		this.spouse = spouse;
	}
	public List<People> getFriends() {
		return friends;
	}
	public void setFriends(List<People> friends) {
		this.friends = friends;
	}
	@Override
	public String toString() {
		// TODO Auto-generated method stub
		return "People[id="+id+",name="+name+",male="+male+"]";
	}
}

建立单元测试

public class RabbitMqTest{
	private String defaultHost = "127.0.0.1";
	private String defaultExchange = "EXCHANGE_DIRECT_TEST";
	private String defaultQueue = "QUEUE_TEST";
	private DefaultEventController controller;
	private EventTemplate eventTemplate;
	@Before
	public void init() throws IOException{
		EventControlConfig config = new EventControlConfig(defaultHost);
		controller = DefaultEventController.getInstance(config);
		eventTemplate = controller.getEopEventTemplate();
		controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor());
		controller.start();
	}
	@Test
	public void sendString() throws SendRefuseException{
		eventTemplate.send(defaultQueue, defaultExchange, "hello world");
	}
	@Test
	public void sendObject() throws SendRefuseException{
		eventTemplate.send(defaultQueue, defaultExchange, mockObj());
	}
	@Test
	public void sendTemp() throws SendRefuseException, InterruptedException{
		String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange
		String tempQueue = "QUEUE_TEST_TEMP";//以前未声明的queue
		eventTemplate.send(tempQueue, tempExchange, mockObj());
		//发送成功后此时不会接受到消息,还需要绑定对应的消费程序
		controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor());
	}
	@After
	public void end() throws InterruptedException{
		Thread.sleep(2000);
	}
	private People mockObj(){
		People jack = new People();
		jack.setId(1);
		jack.setName("JACK");
		jack.setMale(true);

		List<People> friends = new ArrayList<>();
		friends.add(jack);
		People hanMeiMei = new People();
		hanMeiMei.setId(1);
		hanMeiMei.setName("韩梅梅");
		hanMeiMei.setMale(false);
		hanMeiMei.setFriends(friends);

		People liLei = new People();
		liLei.setId(2);
		liLei.setName("李雷");
		liLei.setMale(true);
		liLei.setFriends(friends);
		liLei.setSpouse(hanMeiMei);
		hanMeiMei.setSpouse(liLei);
		return hanMeiMei;
	}
	class ApiProcessEventProcessor implements EventProcesser{
		@Override
		public void process(Object e) {//消费程序这里只是打印信息
			Assert.assertNotNull(e);
			System.out.println(e);
			if(e instanceof People){
				People people = (People)e;
				System.out.println(people.getSpouse());
				System.out.println(people.getFriends());
			}
		}
	}
}

源码地址请点击这里

总结

以上就是本文关于java实现rabbitmq消息的发送接受的全部内容,希望对大家有所帮助。

感谢大家对本站的支持。

(0)

相关推荐

  • java远程连接调用Rabbitmq的实例代码

    本文介绍了java远程连接调用Rabbitmq,分享给大家,希望此文章对各位有所帮助. 打开IDEA创建一个maven工程(Java就可以了). pom.xml文件如下 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apac

  • Java编程rabbitMQ实现消息的收发

    java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然. AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全. RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.Actio

  • rabbitmq(中间消息代理)在python中的使用详解

    在之前的有关线程,进程的博客中,我们介绍了它们各自在同一个程序中的通信方法.但是不同程序,甚至不同编程语言所写的应用软件之间的通信,以前所介绍的线程.进程队列便不再适用了:此种情况便只能使用socket编程了,然而不同程序之间的通信便不再像线程进程之间的那么简单了,要考虑多种情况(比如其中一方断线另一方如何处理:消息群发,多个程序之间的通信等等),如果每遇到一次程序间的通信,便要根据不同情况编写不同的socket,还要维护.完善这个socket这会使得编程人员的工作量大大增加,也使得程序更易崩溃

  • Springcloud整合stream,rabbitmq实现消息驱动功能

    springcloud整合stream,rabbitmq实现消息驱动功能 1.代码实现: 创建项目stream 添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.2</version> <relativePath/>

  • java编程实现邮件定时发送的方法

    本文实例讲述了java编程实现邮件定时发送的方法.分享给大家供大家参考,具体如下: 最近做项目时客户提出了一个需求:系统定时发送E-mail到其客户,达到通知的效果.先将实例分享给大家,如果确实有一些帮助的话,请大家来点掌声! 首先介绍java定时器(java.util.Timer)有定时执行计划任务的功能,通过设定定时器的间隔时间,会自动在此间隔时间后执行预先安排好的任务(java.util. TimerTask) 如: 每隔一个小时执行任务 timer.schedule(TimerTask,

  • Java编程调用微信接口实现图文信息推送功能

    本文实例讲述了Java编程调用微信接口实现图文信息等推送功能.分享给大家供大家参考,具体如下: Java调用微信接口工具类,包含素材上传.获取素材列表.上传图文消息内的图片获取URL.图文信息推送. 微信图文信息推送因注意html代码字符串中将双引号(")替换成单引号('),不然信息页面中包含图片将无法显示且图片后面的内容也不会显示 官方文档:http://mp.weixin.qq.com/wiki/home/ StringBuilder sb=new StringBuilder(); sb.a

  • RabbitMQ .NET消息队列使用详解

    本文实例为大家分享了RabbitMQ .NET消息队列使用方法,供大家参考,具体内容如下 首先下载安装包,我都环境是win7 64位: 去官网下载 otp_win64_19.0.exe  和rabbitmq-server-3.6.3.exe安装好 然后开始编程了: (1)创建生产者类: class Program { private static void Main() { //建立RabbitMQ连接和通道 var connectionFactory = new ConnectionFacto

  • Java编程中10个最佳的异常处理技巧

    在实践中,异常处理不单单是知道语法这么简单.编写健壮的代码是更像是一门艺术,在本文中,将讨论Java异常处理最佳实践.这些Java最佳实践遵循标准的JDK库,和几个处理错误和异常的开源代码.这还是一个提供给java程序员编写健壮代码的便利手册.Java 编程中异常处理的最佳实践 这里是我收集的10个Java编程中进行异常处理的10最佳实践.在Java编程中对于检查异常有褒有贬,强制处理异常是一门语言的功能.在本文中,我们将尽量减少使用检查型异常,同时学会在Java编程中使用检查型VS非检查型异常

  • Java编程—在测试中考虑多态

    面向对象编程有三大特性:封装.继承.多态. 封装隐藏了类的内部实现机制,可以在不影响使用的情况下改变类的内部结构,同时也保护了数据.对外界而已它的内部细节是隐藏的,暴露给外界的只是它的访问方法. 继承是为了重用父类代码.两个类若存在IS-A的关系就可以使用继承.,同时继承也为实现多态做了铺垫.那么什么是多态呢?多态的实现机制又是什么?请看我一一为你揭开: 所谓多态就是指程序中定义的引用变量所指向的具体类型和通过该引用变量发出的方法调用在编程时并不确定,而是在程序运行期间才确定,即一个引用变量倒底

  • Java编程异常处理最佳实践【推荐】

    Java中的异常处理不是一个简单的话题.初学者很难理解,甚至有经验的开发人员也会花几个小时来讨论应该如何抛出或处理这些异常.这就是为什么大多数开发团队都有自己的异常处理的规则和方法.如果你是一个团队的新手,你可能会惊讶于这些方法与你之前使用过的那些方法有多么不同.常见的异常类型: NullPointerException -空指针引用异常 ClassCastException-类型强制转换异常 lllegalArgumentException-传递非法参数异常 ArithmeticExcepti

  • Java编程Post数据请求和接收代码详解

    这两天在做http服务端请求操作,客户端post数据到服务端后,服务端通过request.getParameter()进行请求,无法读取到数据,搜索了一下发现是因为设置为text/plain模式才导致读取不到数据 urlConn.setRequestProperty("Content-Type","text/plain; charset=utf-8"); 若设置为以下方式,则通过request.getParameter()可以读取到数据 urlConn.setReq

随机推荐