Java编程rabbitMQ实现消息的收发
|
java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 本文不介绍amqp和rabbitmq相关知识,请自行网上查阅 本文是基于spring-rabbit中间件来实现消息的发送接受功能 see http://www.rabbitmq.com/tutorials/tutorial-one-java.html see http://www.springsource.org/spring-amqp Java编程通过操作rabbitMQ消息的收发实现代码如下: <!-- for rabbitmq --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>com.caucho</groupId> <artifactId>hessian</artifactId> <version>4.0.7</version> </dependency> </dependencies> 首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象
public class EventMessage implements Serializable{
private String queueName;
private String exchangeName;
private byte[] eventData;
public EventMessage(String queueName,String exchangeName,byte[] eventData) {
this.queueName = queueName;
this.exchangeName = exchangeName;
this.eventData = eventData;
}
public EventMessage() {
}
public String getQueueName() {
return queueName;
}
public String getExchangeName() {
return exchangeName;
}
public byte[] getEventData() {
return eventData;
}
@Override
public String toString() {
return "EopEventMessage [queueName=" + queueName + ",exchangeName="
+ exchangeName + ",eventData=" + Arrays.toString(eventData)
+ "]";
}
}
为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂
public interface CodecFactory {
byte[] serialize(Object obj) throws IOException;
Object deSerialize(byte[] in) throws IOException;
}
下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式
public class HessionCodecFactory implements CodecFactory {
private final Logger logger = Logger.getLogger(HessionCodecFactory.class);
@Override
public byte[] serialize(Object obj) throws IOException {
ByteArrayOutputStream baos = null;
HessianOutput output = null;
try {
baos = new ByteArrayOutputStream(1024);
output = new HessianOutput(baos);
output.startCall();
output.writeObject(obj);
output.completeCall();
} catch (final IOException ex) {
throw ex;
} finally {
if (output != null) {
try {
baos.close();
} catch (final IOException ex) {
this.logger.error("Failed to close stream.",ex);
}
}
}
return baos != null ? baos.toByteArray() : null;
}
@Override
public Object deSerialize(byte[] in) throws IOException {
Object obj = null;
ByteArrayInputStream bais = null;
HessianInput input = null;
try {
bais = new ByteArrayInputStream(in);
input = new HessianInput(bais);
input.startReply();
obj = input.readObject();
input.completeReply();
} catch (final IOException ex) {
throw ex;
} catch (final Throwable e) {
this.logger.error("Failed to decode object.",e);
} finally {
if (input != null) {
try {
bais.close();
} catch (final IOException ex) {
this.logger.error("Failed to close stream.",ex);
}
}
}
return obj;
}
}
接下来就先实现发送功能,新增一个接口专门用来实现发送功能
public interface EventTemplate {
void send(String queueName,Object eventContent) throws SendRefuseException;
void send(String queueName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;
}
SendRefuseException是自定义的发送失败异常类
public class DefaultEventTemplate implements EventTemplate {
private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);
private AmqpTemplate eventAmqpTemplate;
private CodecFactory defaultCodecFactory;
// private DefaultEventController eec;
// public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,// CodecFactory defaultCodecFactory,DefaultEventController eec) {
// this.eventAmqpTemplate = eopAmqpTemplate;
// this.defaultCodecFactory = defaultCodecFactory;
// this.eec = eec;
// }
public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {
this.eventAmqpTemplate = eopAmqpTemplate;
this.defaultCodecFactory = defaultCodecFactory;
}
@Override
public void send(String queueName,Object eventContent)
throws SendRefuseException {
this.send(queueName,exchangeName,eventContent,defaultCodecFactory);
}
@Override
public void send(String queueName,CodecFactory codecFactory) throws SendRefuseException {
if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {
throw new SendRefuseException("queueName exchangeName can not be empty.");
}
// if (!eec.beBinded(exchangeName,queueName))
// eec.declareBinding(exchangeName,queueName);
byte[] eventContentBytes = null;
if (codecFactory == null) {
if (eventContent == null) {
logger.warn("Find eventContent is null,are you sure...");
} else {
throw new SendRefuseException(
"codecFactory must not be null,unless eventContent is null");
}
} else {
try {
eventContentBytes = codecFactory.serialize(eventContent);
} catch (IOException e) {
throw new SendRefuseException(e);
}
}
// 构造成Message
EventMessage msg = new EventMessage(queueName,eventContentBytes);
try {
eventAmqpTemplate.convertAndSend(exchangeName,queueName,msg);
} catch (AmqpException e) {
logger.error("send event fail. Event Message : [" + eventContent + "]",e);
throw new SendRefuseException("send event fail",e);
}
}
}
注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
public interface EventProcesser {
public void process(Object e);
}
为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器
/**
* MessageListenerAdapter的Pojo
* <p>消息处理适配器,主要功能:</p>
* <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来</p>
* <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p>
*
*/
public class MessageAdapterHandler {
private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);
private ConcurrentMap<String,EventProcessorWrap> epwMap;
public MessageAdapterHandler() {
this.epwMap = new ConcurrentHashMap<String,EventProcessorWrap>();
}
public void handleMessage(EventMessage eem) {
logger.debug("Receive an EventMessage: [" + eem + "]");
// 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值
if (eem == null) {
logger.warn("Receive an null EventMessage,it may product some errors,and processing message is canceled.");
return;
}
if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {
logger.warn("The EventMessage's queueName and exchangeName is empty,this is not allowed,and processing message is canceled.");
return;
}
// 解码,并交给对应的EventHandle执行
EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());
if (eepw == null) {
logger.warn("Receive an EopEventMessage,but no processor can do it.");
return;
}
try {
eepw.process(eem.getEventData());
} catch (IOException e) {
logger.error("Event content can not be Deserialized,check the provided CodecFactory.",e);
return;
}
}
protected void add(String queueName,EventProcesser processor,CodecFactory codecFactory) {
if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {
throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");
}
EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);
EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName,epw);
if (oldProcessorWrap != null) {
logger.warn("The processor of this queue and exchange exists,and the new one can't be add");
}
}
protected Set<String> getAllBinding() {
Set<String> keySet = epwMap.keySet();
return keySet;
}
protected static class EventProcessorWrap {
private CodecFactory codecFactory;
private EventProcesser eep;
protected EventProcessorWrap(CodecFactory codecFactory,EventProcesser eep) {
this.codecFactory = codecFactory;
this.eep = eep;
}
public void process(byte[] eventData) throws IOException{
Object obj = codecFactory.deSerialize(eventData);
eep.process(obj);
}
}
}
(编辑:安卓应用网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
