问题:
有些mqtt broker,例如 emqttd 支持共享主题。
就是多个client共同消费同一个topic的消息。 非常适合处理大量数据的接收。但是会有一个问题,
服务器端把消息发给client ,但是client端发现自己没有订阅这个topic,就把消息丢了。
因为共享主题会有特殊前缀,例如 $queue/GATEWAY/10001, 服务器端会识别为这是一个共享主题 GATEWAY/10001,但是client端会自认为subscribe的是 $queue/GATEWAY/10001 。
解决:
/** ** 自定义 MqttClient,继承自 MqttClient * 改动:为了支持 emqtt 的 shared topic 功能, * 当subscribe 这样的 topic时: $queue/GATEWAY/10001, * 往mqtt broker 发送的 topic 依然是 $queue/GATEWAY/10001 * 但是本地 注册 listener 等,保存的是 GATEWAY/10001 * 注意: subscribe 有很多方法,这里只 重写了其中的一个 * 另外注意:本地运行的时候,因为 eclipse的 paho包 中包含了 签名信息,会有 security exception, * 解决办法是 把本地 jar 修改一下 删除里面 META-INF 的 签名文件,都删掉也可以 * spring 打出来的可执行jar包 不会有这些文件,所以不会报错。本地eclipse中运行才有问题。 ** * @author chenhua */public class CustMqttClient extends MqttClient implements Serializable{ private static final long serialVersionUID = 1327828642750763384L; public CustMqttClient(String serverURI, String clientId) throws MqttException { super(serverURI, clientId); } public CustMqttClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException { super(serverURI, clientId, persistence); } public void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException { this.subscribe(topicFilters, qos); // add message handlers to the list for this client for (int i = 0; i < topicFilters.length; ++i) { String topicFilter = topicFilters[i]; if (topicFilter.startsWith("$queue/")) { topicFilter = StringUtils.stripStart(topicFilter, "$queue/"); } super.aClient.comms.setMessageListener(topicFilter, messageListeners[i]); } }}
ps : 更优雅的合适的解决方案:
https://github.com/yogin16/paho-shared-sub-example
update at 2018-07