qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github

java实时消息推送(二)

之前写实时推送(一)的时候,是用的 rabbitmq 的 mqtt 插件。在当时它能满足我的需求,现在有一个新的需求,远程唤醒,判断设备是否在线,如果当前时间设备在线,则表示可以被远程唤醒。
一开始,我还是想在 rabbitmq 的基础上找找看有没有什么 API 能够获取到 mqtt 的在线设备。毕竟消息队列也是用的这个,没必要再增加中间件。然后我发现 mqtt 有很多约定系统主题,这些系统主题维护了 mqtt broker 的状态。
但正如我所说,系统主题是约定的配置,并非强制性的,且 rabbitmq 中的 mqtt 实现也只是作为插件补充。这是我在官方仓库提的discussions

没办法,看来只能引入专业的 mqtt 中间件了,这里我选了EMQX, 主要是文档确实详细。

安装#

这里我选择使用 WSL2 的 Docker 安装,安装命令很简单

$ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.0.0

安装成功后可以直接使用 http://localhost:18083 访问控制面板,默认的账号名是 admin,密码是 public

image

配置#

安装完成后还需要在程序中进行配置,因为版本切换成 mqtt V5 了,所以依赖和配置也要更改下

依赖#

 <!--mqtt依赖包-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
            <version>1.2.5</version>
        </dependency>

配置文件#

# mqtt协议配置
spring:
  mqtt:
    url: tcp://127.0.0.1:1883
    username: admin
    password: admin
    clientId: serverClientId
    #发布的主题--MQTT-默认的消息推送主题,实际可在调用接口时指定
    pubTopic: testTopic
    #订阅的主题
    subTopic: testTopic,remote-wake
    completionTimeout: 30000

因为 emqx 默认是允许匿名连接的,所以用户名和密码可以省略。

生产者#

接着创建生产者的配置类

/**
 * @author lza
 * @date 2023/11/24-10:24
 **/


@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProviderConfig {

    private final MqttProperties mqttProperties;

    /**
     * 客户端对象
     */
    private MqttClient providerClient;

    /**
     * 在bean初始化后连接到服务器
     * @author xct
     * @date 2021/7/30 16:48
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客户端连接服务端
     * @author xct
     * @date 2021/7/30 16:01
     */
    @SneakyThrows
    public void connect(){
        //创建MQTT客户端对象
        providerClient = new MqttClient(mqttProperties.getUrl(), "providerClient", new MemoryPersistence());
        //连接设置
        MqttConnectionOptions options = new MqttConnectionOptions();
        //是否清空session,设置为false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
        //设置为true表示每次连接到服务端都是以新的身份
        options.setCleanStart(true);
        //设置连接用户名
        options.setUserName(mqttProperties.getUsername());
        //设置连接密码
        options.setPassword(mqttProperties.getPassword().getBytes());
        //设置超时时间,单位为秒
        options.setConnectionTimeout(100);
        //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
        options.setKeepAliveInterval(20);
        //设置自动重连
        options.setAutomaticReconnect(true);

        //设置回调
        providerClient.setCallback(new MqttProviderCallBack());
        providerClient.connect(options);
    }

    /**
     * 发布消息
     *
     * @param qos      服务质量等级
     *                 0只会发送一次,不管成不成功
     *                 1未成功会继续发送,直到成功,可能会收到多次
     *                 2未成功会继续发送,但会保证只收到一次
     * @param retained 保留标志
     *                 如果设置为true,服务端必须存储这个应用消息和它的服务质量等级,当有订阅者订           阅这个主题时,会把消息推送给这个订阅者
     *                 但服务端对同一个主题只会保留一条retained消息(最后收到的那条)
     * @param topic    主题
     * @param message  消息
     * @author xct
     * @date 2021/7/30 16:27
     */
    @SneakyThrows
    public void publish(int qos, boolean retained, String topic, String message) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        //主题目的地,用于发布/订阅消息
        MqttTopic mqttTopic = providerClient.getTopic(topic);
        //提供一种机制来跟踪消息的传递进度。
        //用于在以非阻塞方式(在后台运行)执行发布时跟踪消息的传递进度
        MqttToken token;
        //将指定消息发布到主题,但不等待消息传递完成。返回的token可用于跟踪消息的传递状态。
        //一旦此方法干净地返回,消息就已被客户端接受发布。当连接可用时,将在后台完成消息传递。
        token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }

主要做的事情就是根据配置文件的连接参数创建 mqtt 客户端对象,然后创建了一个 publish 推送消息的方法。
因为在创建生产者客户端对象时指定了使用 MqttProviderCallBack 做回调函数,所以还需要创建这个回调类

生产者回调#

生产者的回调方法按需实现

/**
 * @author lza
 * @date 2023/11/24-10:34
 **/

public class MqttProviderCallBack implements MqttCallback {



    @Override
    public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        System.out.println("生产者:与服务器断开连接");
    }

    @Override
    public void mqttErrorOccurred(MqttException e) {

    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

    }

    @Override
    public void deliveryComplete(IMqttToken iMqttToken) {
        MqttClientInterface client = iMqttToken.getClient();
        System.out.println(client.getClientId() + "发布消息成功!");
    }

    @Override
    public void connectComplete(boolean b, String s) {
    }

    @Override
    public void authPacketArrived(int i, MqttProperties mqttProperties) {

    }


}

消费者#

创建完生产者后,还需要有个消费者

/**
 * @author lza
 * @date 2023/11/24-10:43
 **/

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttConsumerConfig {

    private final MqttProperties mqttProperties;

    /**
     * 客户端对象
     */
    public MqttClient consumerClient;

    /**
     * 在bean初始化后连接到服务器
     * @author xct
     * @date 2021/7/30 16:48
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客户端连接服务端
     * @author xct
     * @date 2021/7/30 16:01
     */
    @SneakyThrows
    public void connect(){
        //创建MQTT客户端对象
        consumerClient = new MqttClient(mqttProperties.getUrl(), "consumerClient", new MemoryPersistence());
        //连接设置
        MqttConnectionOptions options = new MqttConnectionOptions();
        //是否清空session,设置为false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
        //设置为true表示每次连接到服务端都是以新的身份
        options.setCleanStart(true);
        //设置连接用户名
        options.setUserName(mqttProperties.getUsername());
        //设置连接密码
        options.setPassword(mqttProperties.getPassword().getBytes());
        //设置超时时间,单位为秒
        options.setConnectionTimeout(100);
        //设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
        options.setKeepAliveInterval(20);
        //设置自动重连
        options.setAutomaticReconnect(true);

        //设置回调
        consumerClient.setCallback(new MqttConsumerCallBack(this));
        consumerClient.connect(options);

        //订阅主题
        //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
        int[] qos = {1,1};
        //主题
        String[] topics = mqttProperties.getSubTopic().split(",");
        //订阅主题
        consumerClient.subscribe(topics,qos);
    }

    /**
     * 断开连接
     *
     * @author xct
     * @date 2021/8/2 09:30
     */
    @SneakyThrows
    public void disConnect() {
        consumerClient.disconnect();
    }


    /**
     * 订阅主题
     *
     * @param topic 主题
     * @param qos   消息等级
     * @author xct
     * @date 2021/7/30 17:12
     */
    @SneakyThrows
    public void subscribe(String topic, int qos) {
        consumerClient.subscribe(topic, qos);
    }
}

消费者同样需要使用连接信息创建一个消费者的客户端实例,也同样需要指定消费者的回调函数,不同的是消费者的方法是 subscribe 订阅方法和 disConnect 断开连接方法。

消费者回调#

/**
 * @author lza
 * @date 2023/11/24-10:55
 **/
public class MqttConsumerCallBack implements MqttCallback {

    private final MqttConsumerConfig consumerConfig;

    public MqttConsumerCallBack(MqttConsumerConfig consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

    @Override
    public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        System.out.println("消费者;与服务器断开连接");
    }

    @Override
    public void mqttErrorOccurred(MqttException e) {

    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.printf("接收消息主题 : %s%n",s);
        System.out.printf("接收消息Qos : %d%n",mqttMessage.getQos());
        System.out.printf("接收消息内容 : %s%n",new String(mqttMessage.getPayload()));
        System.out.printf("接收消息retained : %b%n",mqttMessage.isRetained());

        // 设置mqttV5 请求响应模式
        List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
        if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
            MqttProperties properties = new MqttProperties();
            if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
                MqttMessage responseMessage = new MqttMessage();
                properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
                responseMessage.setProperties(properties);
                responseMessage.setPayload("设备未连接".getBytes());
                responseMessage.setQos(1);
                responseMessage.setRetained(false);
                consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
            }
        }
    }

    @Override
    public void deliveryComplete(IMqttToken iMqttToken) {

    }

    @Override
    public void connectComplete(boolean b, String s) {

    }

    @Override
    public void authPacketArrived(int i, MqttProperties mqttProperties) {

    }
}

消费者的回调中最重要的就是获取消息的方法 messageArrived 当消费者订阅的主题接受到消息时就会进入这个方法。

如果一切正确的话,你可以在 emqx 的后台管理页面看到这 2 个客户端,生产者和消费者已经上线了。

image

请求响应#

你也许已经看到在消费者的 messageArrived 回调方法中有请求响应的注释,这也是为什么要把 mqtt 的版本依赖从 3.1.1 切换到 5 的原因。

请求响应模式是 mqtt V5 的一大版本特性,你可以查看这篇介绍文档,和这篇包含有趣例子的文档

通俗点讲,我们都知道 HTTP 请求是明显的请求 / 响应模型。前端通过接口请求后端,后端处理完数据后把结果返回给前端,无论失败还是成功,前端总是能够获得返回值。
而在 mqtt 的推送订阅模型中,因为生产者和消费者互不关心的背景(因为 mqtt 通常用在物联网场景中,而物联网的大部分场景都会对生产者或消费者过分宽容)。想象一下,你订阅了一个公众号,你不必关注这个公众号什么时候给你推送,你只需要保证不会漏收公众号推送的消息。
举个例子,假如你有一个温度传感器放在房间中,且是联网的,你在外面想要通过手机 APP 获取传感器的当前温度。

image

有点像 HTTP 请求吧,不同的是后端的数据变成传感器采集的数据了。如果不用请求响应,当然也可以做,比如创建 2 个主题 A 和 B,请求时把消息发送到 A,然后设备订阅 A,采集完数据后再把数据发送到 B 主题让 APP 订阅。

而请求响应怎么做的呢?发送消息到 A 时,直接指定响应的主题,设备一收到消息,一看上面加粗写着,有事到 B 找我,直接就把数据返回到 B 主题了。有点像在邮局寄信的流程。

MQTTX中,连接 mqtt 时选择版本为 5 就可以进行请求响应测试了

image

里面增加了用户属性用作判断,对应的是消费者的回调方法

// 设置mqttV5 请求响应模式
        List<UserProperty> userProperties = mqttMessage.getProperties().getUserProperties();
        if("action".equals(userProperties.get(0).getKey()) && "remoteWake".equals(userProperties.get(0).getValue())){
            MqttProperties properties = new MqttProperties();
            if(mqttMessage.getProperties()!=null && StringUtils.hasText(mqttMessage.getProperties().getResponseTopic())){
                MqttMessage responseMessage = new MqttMessage();
                properties.setCorrelationData(mqttMessage.getProperties().getCorrelationData());
                responseMessage.setProperties(properties);
                responseMessage.setPayload("设备未连接".getBytes());
                responseMessage.setQos(1);
                responseMessage.setRetained(false);
                consumerConfig.consumerClient.publish(mqttMessage.getProperties().getResponseTopic(),responseMessage);
            }
        }

如果用户属性 action 的值为 remoteWake, 则触发响应模式,获取到消息中的响应主题并自动回复。

上下线通知#

请求响应只是一个扩展,最初的需求是获取设备是否在线。
已经更换了 mqtt 中间件,那可以使用系统主题了吧?
答案是可以,但没必要。因为 emqx 提供了更优雅的方式。
最简单的方法,直接使用 emqx 的企业版

数据存储的主要使用场景包括将客户端上下线状态,订阅主题信息,消息内容,消息抵达后发送消息回执等操作记录到 Redis、MySQL、PostgreSQL、MongoDB、Cassandra 等各种数据库中。用户也可以通过订阅相关主题的方式来实现类似的功能,但是在企业版中内置了对这些持久化的支持;相比于前者,后者的执行效率更高,也能大大降低开发者的工作量。

第二种方法,也是推荐的方法,就是使用webhook来维护一个上下线通知。

首先,创建一个接口,用于获取 webhook 通知

    private final static String CLIENT_IDS = "clientIds";
    private final static String CONNECTED = "client_connected";
    private final static String DISCONNECTED = "client_disconnected";
    
/**
     * emqx webhook钩子,用于监听客户端上下线
     * @param vo 上下线vo
     */
    @SaIgnore
    @PostMapping("/onlineOrOffline")
    public void onlineOrOffline(@RequestBody OnlineOrOfflineVo vo) {
        System.err.println("客户端:" + vo.getClientid() +
            ",动作:" + vo.getAction() +
            ",原因:" + vo.getReason());
        List<Object> list = RedisUtils.getCacheList(CLIENT_IDS);
        if (vo.getAction().equals(CONNECTED)) {
            list.add(vo.getClientid());
            // 先删除原有的值
            RedisUtils.deleteKeys(CLIENT_IDS);
            // 去重
            ArrayList<Object> distinct = CollUtil.distinct(list);
            RedisUtils.setCacheList(CLIENT_IDS, distinct);
        } else if (vo.getAction().equals(DISCONNECTED)){
            list.remove(vo.getClientid());
            // 先删除原有的值
            RedisUtils.deleteKeys(CLIENT_IDS);
            RedisUtils.setCacheList(CLIENT_IDS, list);
        }

    }

去重的原因是,想象一下,假如 A 客户端上线,此时 redis 里面有 A,接着,服务 down 掉了,然后重启,A 再次上线,那此时 redis 里面就有 2 个 A 了。

然后在 emqx 的插件中选择 webhook 插件开启

image

接着进入 docker 容器内更改插件配置

docker exec -it emqx bash
cd /etc/plugins

image

修改插件

vi emqx_web_hook.conf

最主要的修改文件开头的 Webhook URL,改成接口地址

image

接着修改下面的通知规则

image

去掉前面的注释即可,这 2 个就是上下线通知的规则

web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}

完成后,客户端上下线就能看到通知了

image

但这还不够完美,假如 mqtt 中间件下线了,那此时 webhook 和 redis 保存的在线设备就没用了啊。

别急,emqx 提供了HTTP API,可以获取指定设备是否在线

事实上,API 中还提供了获取集群下所有客户端的信息的接口,但是是分页的,而且一旦有大量的客户端就不好搞了。所以还是用 webhook 来维护一个上下线通知,再使用接口判断指定客户端是否在线比较好,能满足绝大部分的需求。

/**
     * 使用emqx API 检测客户端是否在线
     * @param clientId 客户端id
     * @return 是否在线
     */
    @SaIgnore
    @GetMapping("/checkClientStatus/{clientId}")
    public R<Boolean> checkClientStatus(@PathVariable String clientId) {
        // 发送GET请求
        String url = "http://localhost:18083/api/v4/clients/" + clientId;
        HttpRequest request = HttpUtil.createGet(url);
        request.basicAuth("admin", "public");

        HttpResponse response = request.execute();

        ClientResponseDto dto = JSONUtil.toBean(response.body(), ClientResponseDto.class);
        return R.ok(!ObjectUtil.isEmpty(dto.getData()) && dto.getData().get(0).getConnected());
    }

EMQX 文档
EMQX 安装使用和部分坑
MQTT EMQX 中如何监听客户端上下线?并在业务中正常使用

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.