`

如何实现ActiveMq的Topic的持久订阅

阅读更多

原文地址:http://www.mytju.com/classcode/news_readNews.asp?newsID=486

 

(1)使用queue,即队列时,每个消息只有一个消费者,所以,持久化很简单,只要保存到数据库即可

。然后,随便一个消费者取走处理即可。某个消费者关掉一阵子,也无所谓。

(2)使用topic,即订阅时,每个消息可以有多个消费者,就麻烦一些。

首先,假设消费者都是普通的消费者,
------------------------
<1>activemq启动后,发布消息1,可惜,现在没有消费者启动着,也就是没有消费者进行了订阅。那么

,这个消息就被抛弃了。

<2>消费者1启动了,连接了activemq,进行了订阅,在等待消息~~

activemq发布消息2,OK,消费者1收到,并进行处理。消息抛弃。

<3>消费者2也启动了,连接了activemq,进行了订阅,在等待消息~~

activemq发布消息3,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。

<4>消费者1关掉了。

activemq发布消息4,OK,消费者2收到,并进行处理。消息抛弃。

<5>消费者1又启动了。

activemq发布消息5,OK,消费者1,消费者2都收到,并进行处理。消息抛弃。
-----------------------------
总结一下:
activemq只是向当前启动的消费者发送消息。
关掉的消费者,会错过很多消息,并无法再次接收这些消息。

如果发送的消息是重要的用户同步数据,错过了,用户数据就不同步了。

那么,如何让消费者重新启动时,接收到错过的消息呢?

答案是持久订阅。

(3)普通的订阅,不区分消费者,场地里有几个人头,就扔几个馒头。
持久订阅,就要记录消费者的名字了。
张三说,我是张三,有馒头给我留着,我回来拿。
李四说,我是李四,有馒头给我留着,我回来拿。
activemq就记下张三,李四两个名字。

那么,分馒头时,还是一个人头给一个馒头。
分完了,一看张三没说话,说明他不在,给他留一个。
李四说话了,那就不用留了。

张三回来了,找activemq,一看,这不张三吧,快把他的馒头拿来。
可能是一个馒头,也可能是100个馒头,就看张三离开这阵子,分了多少次馒头了。

activemq区分消费者,是通过clientID和订户名称来区分的。

 


// 创建connection
connection = connectionFactory.createConnection();
connection.setClientID("bbb"); //持久订阅需要设置这个。
connection.start();

// 创建session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

// 创建destination
Topic topic = session.createTopic("userSyncTopic"); //Topic名称

//MessageConsumer consumer = session.createConsumer(topic); //普通订阅
MessageConsumer consumer = session.createDurableSubscriber(topic,"bbb"); //持久订阅

 



(4)还有一点,消息的生产者,发送消息时用使用持久模式
MessageProducer producer = ...;
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
不设置,默认就是持久的

(5)使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。

(6)activemq的设置在conf/activemq.xml中,默认消息是保存在data/kahadb中,重启activemq消息不会丢。

可以访问http://localhost:8161/admin/index.jsp
查看当前的队列、Topic和持久订户的信息、发送消息等等,很方便。

可以复制activemq-jdbc.xml中的内容过来,修改一下,就可以把消息保存在其它数据库中了。

 

-------------------------------------------这是一条分割线-----------------------------------

关于ActiveMQ的持久化:

1、消息模型

ActiveMQ的有两种消息模型:一是点对点,二是发布/订阅模式。

点对点在ActiveMQ中的具体实现就是Queue,发布/订阅则是Topic。

2、ActiveMQ的持久化

ActiveMQ持久化就是在ActiveMQ崩溃时修复后,原消息数据仍未丢失,具体的实现是ActiveMQ使用文件系统或者数据库对消息进行存储,这样在AMQ恢复后可以根据存储的消息进行消息数据的恢复。

3、Queue

点对点消息,一方发送,一方接收。已经持久化的消息,一旦接收方已经成功的消费掉,则从持久化介质中去掉。

4、Topic

发布/订阅方式,已经持久化的消息,只有订阅者成功消费后才被清除。其中,该方式的持久化,一方面包括订阅者的持久化,是指订阅者由于某些原因断开连接,再重新连接之后,能够获取到连接断开期间的消息,即不错过消息;另一方面,是指ActiveMQ发生异常恢复后,发布方的消息仍旧存在。

-------------------------------------------这又是一条分割线--------------------------------

持久化方式:

1、JMS Queue消息的持久化

  Queue的持久化实现比较简单,只需要在发送消息的时候指定为持久化消息即可:

  

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

 

 

2、JMS  Topic消息的持久化

  Topic消息进行持久化,需要发布方、订阅方都进行持久化。

  消息的发布方需要设置消息为持久化消息:

 

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

  消息的订阅方需要设置持久化接收(订阅方需要指定自己的ClientID):

 

 

connection.setClientID("clientid");
consumer=session.createDurableSubscriber(topic, "clientid");

 

3、MQTT Topic消息的持久化

  消息发布方:

 

mqtt.setClientId("clientid");
mqtt.setCleanSession(false);

  消息订阅方:

 

 

mqtt.setClientId("clientid");
mqtt.setCleanSession(false);

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics