博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ + Camel 实现消息路由
阅读量:2815 次
发布时间:2019-05-13

本文共 2959 字,大约阅读时间需要 9 分钟。

 
本文的目的是在 broker 端实现消息的路由分发,通俗点讲就是,根据消息的特征将消息分发到不同的 queue 或者 topic 上。要实现消息路由,最简单的方式是在 activemq 提供的 xml 配置文件下面构建路由规则。
所使用的版本:
  • ActiveMQ 5.6.0
  • Camel 2.9.2
在 ActiveMQ 的每个发行版的 conf 目录下包含了很多的示例 xml 配置文件,它们是对 ActiveMQ 重要 feature 的配置举例,比如使用jdbc作持久化机制时针对不同数据库的配置、如何进行权限配置、如何配置 network broker 等等。在研究ActiveMQ的某项特征时,参考下这里面的配置会节省很多时间。本文将要实现的功能用到了 conf 目录下的 activemq.xml 和 camel.xml 两个文件。
activemq.xml 是启动 broker 时缺省的配置文件。如果要使用 camel,我们首先需要将 activemq.xml 中的 “import camel.xml” 的那行注释去掉。
      <import resource="camel.xml"/>
camel.xml 中的 camelContext 中包含了一个简单的路由规则:
Java代码  
  1. <route>  
  2.         <description>Example Camel Route</description>  
  3.         <from uri="activemq:example.A"/>  
  4.         <to uri="activemq:example.B"/>  
  5. </route>  
 
其中 uri 的格式为 activemq:[queue:|topic:]destinationName 在不指定 queue/topic 的情况下,默认为 queue。
它实现的功能很简单:所有发送到队列 example.A 上的消息都将转发到队列 example.B。
camel允许我们在activemq的broker端根据Message的特征实现路由分发。为此,我们需要一种可以描述路由规则的语言,而camel支持很多这样的语言(http://camel.apache.org/languages.html)。我选用了 Bean language,这种语言允许我们通过在bean中定义的过滤方法(返回类型为boolean)对消息进行过滤。并且,可以用spring下bean的定义方式,在xml文件下注册这个bean。
Java代码  
  1. import javax.jms.JMSException;  
  2. import org.apache.camel.Body;  
  3. import org.apache.camel.Header;  
  4.   
  5. public class CamelBean {  
  6.     public boolean isEven(@Header("JMSType")String jmsType) throws JMSException {  
  7.         return "even".equals(jmsType);  
  8.     }  
  9.       
  10.     public boolean isOdd(@Header("JMSType")String jmsType) throws JMSException {  
  11.         return "odd".equals(jmsType);  
  12.     }  
  13.       
  14.     public boolean matchBody(@Body String body) {  
  15.         if (body.contains("aaa"))  
  16.             return true;  
  17.         return false;  
  18.     }  
  19. }  
 
我在CamelBean中示例了三种过滤方法。 isEven和isOdd都是通过消息头的JMSType作判断的,而matchBody方法则是检查实际的消息体中是否包含“aaa”关键字。
为了使用这个bean,我需要将该类打到jar包里,然后放到activemq主目录的lib路径下面。
下一步要做的就是在camel.xml文件下注册这个bean。
Java代码  
  1. <bean id="camelBean" class="com.example.activemq.CamelBean"></bean>  
最后,也是最关键的部分,即将这个bean与camel的路由机制关联。为此我们需要在 route下面使用 filter。配置如下:
Xml代码  
  1. <route>  
  2.            <description>Example Camel Route</description>  
  3.            <from uri="activemq:example.A"/>  
  4.     <filter>  
  5.         <method ref="camelBean" method="isEven" />  
  6.         <to uri="activemq:example.even"/>  
  7.     </filter>  
  8.     <filter>  
  9.         <method ref="camelBean" method="isOdd" />  
  10.         <to uri="activemq:example.odd"/>  
  11.     </filter>  
  12.     <filter>  
  13.         <method ref="camelBean" method="matchBody" />  
  14.         <to uri="activemq:example.aaa"/>  
  15.     </filter>  
  16. </route>  
 
这段配置实现的功能是:
检查消息头的JMSType值,如果是“even”就将消息的一个副本发送到example.even队列(odd,情况同理);如果检查消息内容发现“aaa”关键字,则发送一个消息副本到example.aaa队列。注意,同一份消息是可能会被分发到多个队列的。
这种配置下,每个消息到达broker后,broker端要做多次检查,次数取决于filter的个数。因此,如果filter很多,而且broker接收到的消息量很大的话,性能上会有一定的影响。
附:创建并发送消息的部分示例代码:
Java代码  
  1. private void sendMessage() {  
  2.         try {  
  3.             Queue queue = session.createQueue("example.A");  
  4.             final MessageProducer producer = session.createProducer(queue);  
  5.             producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
  6.               
  7.             TextMessage message = session.createTextMessage("aaabbbbb");  
  8.               
  9.             if (ai.incrementAndGet() %2 == 0) {  
  10.                 message.setJMSType("even");  
  11.             } else {  
  12.                 message.setJMSType("odd");  
  13.             }  
  14.               
  15.             producer.send(message);  
  16.             producer.close();  
  17. //          session.close();  
  18.         } catch (JMSException e) {  
  19.             // TODO Auto-generated catch block  
  20.             e.printStackTrace();  
  21.         }  
  22.     } 

转载地址:http://mckhd.baihongyu.com/

你可能感兴趣的文章
python技术点
查看>>
安装vue搭建移端项目1
查看>>
px pt rpx rem
查看>>
微擎开发001
查看>>
php打印错误信息
查看>>
h5下载预览pdf,excel
查看>>
js视频播放器DPlayer,video.js
查看>>
实例解读什么是Redis缓存穿透、缓存雪崩和缓存击穿
查看>>
php 实现redis 订阅发布
查看>>
redis基础1
查看>>
php-resque消息队列
查看>>
Redis2
查看>>
tp3.2分层
查看>>
文字转音频-百度在线
查看>>
Laravel使用migrate对数据表的迁移
查看>>
PHPer学啥 路线 笔记1
查看>>
redis过期键删除策略 雪崩
查看>>
laravel-admin 管理平台获取当前登陆用户信息
查看>>
mysql分库分表实战及php代码操作完整实例
查看>>
为什么重写HashCode()和equal()
查看>>