java结合WebSphere MQ实现接收队列文件功能
首先我们先来简单介绍下websphere mq以及安装使用简介
websphere mq : 用于传输信息 具有跨平台的功能。
1 安装websphere mq 并启动
2 websphere mq 建立 queue Manager (如:MQSI_SAMPLE_QM)
3 建立queue 类型选择 Local类型 的 (如lq )
4 建立channels 类型选择Server Connection (如BridgeChannel)
接下来,我们来看实例代码:
MQFileReceiver.java package com.mq.dpca.file; import java.io.File; import java.io.FileOutputStream; import com.ibm.mq.MQEnvironment; import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQQueue; import com.ibm.mq.MQQueueManager; import com.ibm.mq.constants.MQConstants; import com.mq.dpca.msg.MQConfig; import com.mq.dpca.util.ReadCmdLine; import com.mq.dpca.util.RenameUtil; /** * * MQ分组接收文件功能 * 主动轮询 */ public class MQFileReceiver { private MQQueueManager qmgr; // 连接到队列管理器 private MQQueue inQueue; // 传输队列 private String queueName = ""; // 队列名称 private String host = ""; // private int port = 1414; // 侦听器的端口号 private String channel = ""; // 通道名称 private String qmgrName = ""; // 队列管理器 private MQMessage inMsg; // 创建消息缓冲 private MQGetMessageOptions gmo; // 设置获取消息选项 private static String fileName = null; // 接收队列上的消息并存入文件 private int ccsid = 0; private static String file_dir = null; /** * 程序的入口 * * @param args */ public static void main(String args[]) { MQFileReceiver mfs = new MQFileReceiver(); //初始化连接 mfs.initproperty(); //接收文件 mfs.runGoupReceiver(); //获取shell脚本名 // String shellname = MQConfig.getValueByKey(fileName); // if(shellname!=null&&!"".equals(shellname)){ // //调用shell // ReadCmdLine.callShell(shellname); // }else{ // System.out.println("have no shell name,Only receive files."); // } } public void runGoupReceiver() { try { init(); getGroupMessages(); qmgr.commit(); System.out.println("\n Messages successfully Receive "); } catch (MQException mqe) { mqe.printStackTrace(); try { System.out.println("\n Backing out Transaction "); qmgr.backout(); System.exit(2); } catch (Exception e) { e.printStackTrace(); System.exit(2); } } catch (Exception e) { e.printStackTrace(); System.exit(2); } } /** * 初始化服务器连接信息 * * @throws Exception */ private void init() throws Exception { /* 为客户机连接设置MQEnvironment属性 */ MQEnvironment.hostname = host; MQEnvironment.channel = channel; MQEnvironment.port = port; /* 连接到队列管理器 */ qmgr = new MQQueueManager(qmgrName); /* 设置队列打开选项以输 */ int opnOptn = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_FAIL_IF_QUIESCING; /* 打开队列以输 */ inQueue = qmgr.accessQueue(queueName, opnOptn, null, null, null); } /** * 接受文件的主函数 * * @throws Exception */ public void getGroupMessages() { /* 设置获取消息选项 */ gmo = new MQGetMessageOptions(); gmo.options = MQConstants.MQGMO_FAIL_IF_QUIESCING; gmo.options = gmo.options + MQConstants.MQGMO_SYNCPOINT; /* 等待消息 */ gmo.options = gmo.options + MQConstants.MQGMO_WAIT; /* 设置等待时间限制 */ gmo.waitInterval = 5000; /* 只获取消息 */ gmo.options = gmo.options + MQConstants.MQGMO_ALL_MSGS_AVAILABLE; /* 以辑顺序获取消息 */ gmo.options = gmo.options + MQConstants.MQGMO_LOGICAL_ORDER; gmo.matchOptions = MQConstants.MQMO_MATCH_GROUP_ID; /* 创建消息缓冲 */ inMsg = new MQMessage(); try { FileOutputStream fos = null; /* 处理组消息 */ while (true) { try { inQueue.get(inMsg, gmo); if (fos == null) { try { fileName = inMsg.getStringProperty("fileName"); String fileName_full = null; fileName_full = file_dir + RenameUtil.rename(fileName); fos = new FileOutputStream(new File(fileName_full)); int msgLength = inMsg.getMessageLength(); byte[] buffer = new byte[msgLength]; inMsg.readFully(buffer); fos.write(buffer, 0, msgLength); /* 查看是否是最后消息标识 */ char x = gmo.groupStatus; if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) { System.out.println("Last Msg in Group"); break; } inMsg.clearMessage(); } catch (Exception e) { System.out .println("Receiver the message without property,do nothing!"); inMsg.clearMessage(); } } else { int msgLength = inMsg.getMessageLength(); byte[] buffer = new byte[msgLength]; inMsg.readFully(buffer); fos.write(buffer, 0, msgLength); /* 查看是否是最后消息标识 */ char x = gmo.groupStatus; if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) { System.out.println("Last Msg in Group"); break; } inMsg.clearMessage(); } } catch (Exception e) { char x = gmo.groupStatus; if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) { System.out.println("Last Msg in Group"); } break; } } if (fos != null) fos.close(); } catch (Exception e) { System.out.println(e.getMessage()); } } public void initproperty() { MQConfig config = new MQConfig().getInstance(); if (config.getMQ_MANAGER() != null) { qmgrName = config.getMQ_MANAGER(); queueName = config.getMQ_QUEUE_NAME(); channel = config.getMQ_CHANNEL(); host = config.getMQ_HOST_NAME(); port = Integer.valueOf(config.getMQ_PROT()); ccsid = Integer.valueOf(config.getMQ_CCSID()); file_dir = config.getFILE_DIR(); } } }
赞 (0)