加入收藏 | 设为首页 | 会员中心 | 我要投稿 安卓应用网 (https://www.0791zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

Java编程rabbitMQ实现消息的收发

发布时间:2020-05-23 08:00:43 所属栏目:Java 来源:互联网
导读:java实现rAMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

本文不介绍amqp和rabbitmq相关知识,请自行网上查阅

本文是基于spring-rabbit中间件来实现消息的发送接受功能

see http://www.rabbitmq.com/tutorials/tutorial-one-java.html

see http://www.springsource.org/spring-amqp

Java编程通过操作rabbitMQ消息的收发实现代码如下:

<!-- for rabbitmq -->
 	<dependency>
		<groupId>com.rabbitmq</groupId>
		<artifactId>amqp-client</artifactId>
		<version>2.8.2</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.amqp</groupId>
		<artifactId>spring-amqp</artifactId>
		<version>1.1.1.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.amqp</groupId>
		<artifactId>spring-rabbit</artifactId>
		<version>1.1.1.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>com.caucho</groupId>
		<artifactId>hessian</artifactId>
		<version>4.0.7</version>
	</dependency>
 </dependencies>

首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象

public class EventMessage implements Serializable{
	private String queueName;
	private String exchangeName;
	private byte[] eventData;
	public EventMessage(String queueName,String exchangeName,byte[] eventData) {
		this.queueName = queueName;
		this.exchangeName = exchangeName;
		this.eventData = eventData;
	}
	public EventMessage() {
	}	
	public String getQueueName() {
		return queueName;
	}
	public String getExchangeName() {
		return exchangeName;
	}
	public byte[] getEventData() {
		return eventData;
	}
	@Override
	public String toString() {
		return "EopEventMessage [queueName=" + queueName + ",exchangeName="
				+ exchangeName + ",eventData=" + Arrays.toString(eventData)
				+ "]";
	}
}

为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂

public interface CodecFactory {
	byte[] serialize(Object obj) throws IOException;
	Object deSerialize(byte[] in) throws IOException;
}

下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式

public class HessionCodecFactory implements CodecFactory {
	private final Logger logger = Logger.getLogger(HessionCodecFactory.class);
	@Override
	public byte[] serialize(Object obj) throws IOException {
		ByteArrayOutputStream baos = null;
		HessianOutput output = null;
		try {
			baos = new ByteArrayOutputStream(1024);
			output = new HessianOutput(baos);
			output.startCall();
			output.writeObject(obj);
			output.completeCall();
		} catch (final IOException ex) {
			throw ex;
		} finally {
			if (output != null) {
				try {
					baos.close();
				} catch (final IOException ex) {
					this.logger.error("Failed to close stream.",ex);
				}
			}
		}
		return baos != null ? baos.toByteArray() : null;
	}
	@Override
	public Object deSerialize(byte[] in) throws IOException {
		Object obj = null;
		ByteArrayInputStream bais = null;
		HessianInput input = null;
		try {
			bais = new ByteArrayInputStream(in);
			input = new HessianInput(bais);
			input.startReply();
			obj = input.readObject();
			input.completeReply();
		} catch (final IOException ex) {
			throw ex;
		} catch (final Throwable e) {
			this.logger.error("Failed to decode object.",e);
		} finally {
			if (input != null) {
				try {
					bais.close();
				} catch (final IOException ex) {
					this.logger.error("Failed to close stream.",ex);
  }
  }
 }
 return obj;
	}
}

接下来就先实现发送功能,新增一个接口专门用来实现发送功能

public interface EventTemplate {
	void send(String queueName,Object eventContent) throws SendRefuseException;
	void send(String queueName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;
}

SendRefuseException是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为EventMessage

public class DefaultEventTemplate implements EventTemplate {
	private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);
	private AmqpTemplate eventAmqpTemplate;
	private CodecFactory defaultCodecFactory;
//	private DefaultEventController eec;
//	public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,//			CodecFactory defaultCodecFactory,DefaultEventController eec) {
//		this.eventAmqpTemplate = eopAmqpTemplate;
//		this.defaultCodecFactory = defaultCodecFactory;
//		this.eec = eec;
//	}
	public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {
		this.eventAmqpTemplate = eopAmqpTemplate;
		this.defaultCodecFactory = defaultCodecFactory;
	}
	@Override
	public void send(String queueName,Object eventContent)
			throws SendRefuseException {
		this.send(queueName,exchangeName,eventContent,defaultCodecFactory);
	} 
	@Override
	public void send(String queueName,CodecFactory codecFactory) throws SendRefuseException {
		if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {
			throw new SendRefuseException("queueName exchangeName can not be empty.");
		}
//		if (!eec.beBinded(exchangeName,queueName))
//			eec.declareBinding(exchangeName,queueName);
		byte[] eventContentBytes = null;
		if (codecFactory == null) {
			if (eventContent == null) {
				logger.warn("Find eventContent is null,are you sure...");
			} else {
				throw new SendRefuseException(
						"codecFactory must not be null,unless eventContent is null");
			}
		} else {
			try {
				eventContentBytes = codecFactory.serialize(eventContent);
			} catch (IOException e) {
				throw new SendRefuseException(e);
			}
		}
		// 构造成Message
		EventMessage msg = new EventMessage(queueName,eventContentBytes);
		try {
			eventAmqpTemplate.convertAndSend(exchangeName,queueName,msg);
		} catch (AmqpException e) {
			logger.error("send event fail. Event Message : [" + eventContent + "]",e);
			throw new SendRefuseException("send event fail",e);
		}
	}
}

注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类

public interface EventProcesser {
	public void process(Object e);
}

 为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器

/**
 * MessageListenerAdapter的Pojo
 * <p>消息处理适配器,主要功能:</p>
 * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来</p>
 * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p>
 * 
 */
public class MessageAdapterHandler {
	private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);
	private ConcurrentMap<String,EventProcessorWrap> epwMap;
	public MessageAdapterHandler() {
		this.epwMap = new ConcurrentHashMap<String,EventProcessorWrap>();
	}
	public void handleMessage(EventMessage eem) {
		logger.debug("Receive an EventMessage: [" + eem + "]");
		// 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值
		if (eem == null) {
			logger.warn("Receive an null EventMessage,it may product some errors,and processing message is canceled.");
			return;
		}
		if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {
			logger.warn("The EventMessage's queueName and exchangeName is empty,this is not allowed,and processing message is canceled.");
			return;
		}
		// 解码,并交给对应的EventHandle执行
		EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());
		if (eepw == null) {
			logger.warn("Receive an EopEventMessage,but no processor can do it.");
			return;
		}
		try {
			eepw.process(eem.getEventData());
		} catch (IOException e) {
			logger.error("Event content can not be Deserialized,check the provided CodecFactory.",e);
			return;
		}
	}
	protected void add(String queueName,EventProcesser processor,CodecFactory codecFactory) {
		if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {
			throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");
		}
		EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);
		EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName,epw);
		if (oldProcessorWrap != null) {
			logger.warn("The processor of this queue and exchange exists,and the new one can't be add");
		}
	}
	protected Set<String> getAllBinding() {
		Set<String> keySet = epwMap.keySet();
		return keySet;
	}
	protected static class EventProcessorWrap {
		private CodecFactory codecFactory;
		private EventProcesser eep;
		protected EventProcessorWrap(CodecFactory codecFactory,EventProcesser eep) {
			this.codecFactory = codecFactory;
			this.eep = eep;
		}
		public void process(byte[] eventData) throws IOException{
			Object obj = codecFactory.deSerialize(eventData);
			eep.process(obj);
		}
	}
}

(编辑:安卓应用网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读