博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
eclipse mqtt paho client 处理shared topic, 共享主题问题
阅读量:7043 次
发布时间:2019-06-28

本文共 1745 字,大约阅读时间需要 5 分钟。

hot3.png

问题:

有些mqtt broker,例如 emqttd 支持共享主题。

就是多个client共同消费同一个topic的消息。 非常适合处理大量数据的接收。但是会有一个问题,

服务器端把消息发给client ,但是client端发现自己没有订阅这个topic,就把消息丢了。

因为共享主题会有特殊前缀,例如 $queue/GATEWAY/10001,  服务器端会识别为这是一个共享主题  GATEWAY/10001,但是client端会自认为subscribe的是 $queue/GATEWAY/10001 。

解决:

172554_86Vi_1165943.png

 

/** * 
 * 自定义 MqttClient,继承自 MqttClient * 改动:为了支持 emqtt 的 shared topic 功能, * 当subscribe 这样的 topic时:    $queue/GATEWAY/10001, * 往mqtt broker 发送的 topic 依然是  $queue/GATEWAY/10001 * 但是本地 注册 listener 等,保存的是   GATEWAY/10001 * 注意: subscribe 有很多方法,这里只 重写了其中的一个 * 另外注意:本地运行的时候,因为 eclipse的 paho包 中包含了 签名信息,会有 security exception, * 解决办法是 把本地 jar 修改一下 删除里面  META-INF 的  签名文件,都删掉也可以 * spring 打出来的可执行jar包 不会有这些文件,所以不会报错。本地eclipse中运行才有问题。 * 
* * @author chenhua */public class CustMqttClient extends MqttClient implements Serializable{ private static final long serialVersionUID = 1327828642750763384L; public CustMqttClient(String serverURI, String clientId) throws MqttException { super(serverURI, clientId); } public CustMqttClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException { super(serverURI, clientId, persistence); } public void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException { this.subscribe(topicFilters, qos); // add message handlers to the list for this client for (int i = 0; i < topicFilters.length; ++i) { String topicFilter = topicFilters[i]; if (topicFilter.startsWith("$queue/")) { topicFilter = StringUtils.stripStart(topicFilter, "$queue/"); } super.aClient.comms.setMessageListener(topicFilter, messageListeners[i]); } }}

 

ps : 更优雅的合适的解决方案:

https://github.com/yogin16/paho-shared-sub-example

update at 2018-07

转载于:https://my.oschina.net/chen1988/blog/1611310

你可能感兴趣的文章
Fedora22和Ubuntu 配置android SDK的32位库
查看>>
腾讯在线前端规范
查看>>
MySQL查询面试题
查看>>
html 文本输入框效果大汇集
查看>>
Ubuntu12.04安装OpenCV 2.4.1
查看>>
刚刚安装Live Writer
查看>>
页面添加访问统计
查看>>
Mac nginx 403
查看>>
Lock接口的实现类
查看>>
结合源码分析 setTimeout / setInterval / setImmediate / process.nextTick 执行时机
查看>>
jQuery 效果 - 淡入淡出
查看>>
3.6 Struts2上传文件
查看>>
MyISAM InnoDB 区别
查看>>
ruby中的inject
查看>>
stringByTrimmingCharactersInSet与stringByReplaci...
查看>>
JavaSE——常用类
查看>>
Mybatis懒加载
查看>>
记录otter遇到问题
查看>>
Java虚拟机详解01----初识JVM
查看>>
神经网络---前馈神经网络
查看>>