qinfengge

qinfengge

醉后不知天在水,满船清梦压星河
github

java实时消息推送(一)

前言#

最近有个业务需求,需要判断用户上传的数据是异常还是正常,如果是异常的情况下需要实时的推送报警消息到用户。
在大部分情况下,通常是客户端(浏览器)主动发送请求到服务端(服务器),告诉它需要什么数据。
但也有一部分情况,需要服务端主动的给客户端推送消息,比如上面的情况,又比如网页端常见的扫码登录

技术选择#

实现服务端主动推送的方法有很多种,需要根据业务进行选择。
简单且常见的方法是,长轮询,和短轮询。听名字就知道简单粗暴。
更进阶的是,SSE 和 websocket。
更深层的就是各种消息队列,如 MQTT。
其他的推送也有短信推送或者 APP 通知,比如极光 Push。

实现方式实现原理模式
短轮询客户端在特定的的时间间隔(如每 1 秒),由客户端对服务端发出请求,然后由服务器返回最新的数据给客户端的浏览器。/
长轮询服务端客户端向服务器发送请求,服务器接到请求后hold住连接,直到有新消息才返回响应信息并关闭连接,客户端处理完响应信息后再向服务器发送新的请求。/
SSE(Server-sent Events)服务端客户端先向服务器注册一个长连接,服务器获取到事件后可以发送消息到已注册的客户端只能由服务器推送
Websocket客户端 / 服务端使用 websocket 需要引入新的依赖,一旦客户端和服务端握手成功,它们就处在同一频道,可以实时的收发消息了全双工,服务端和客户端都既能推送又能接收
Mqtt客户端 / 服务端mqtt 常用在物联网业务中,是一种基于发布 / 订阅模式的轻量级通讯协议,该协议构建在 TCP/IP 协议上。 MQTT 最大的有点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务发布 / 订阅模式,服务端和客户端都既能推送又能接收

对于长 / 短轮询不再赘述,本次主要讲解SSEMQTT方式。

SSE#

在 spring boot 中 SSE 是原生支持的,不需要再导入其他的依赖,这是它的优点,但是它的缺点也很明显,只支持服务端单向推送,只支持高级浏览器(chrome,Firefox 等)。受限于浏览器的限制,每个网页最多只能保持最多 6 个的长连接;更多的连接可能会占用更多的内存和计算资源。
首先创建一个 sse 工具类

@Component
@Slf4j
public class SseEmitterUtils {
    /**
     * 当前连接数
     */
    private static AtomicInteger count = new AtomicInteger(0);

    /**
     * 存储 SseEmitter 信息
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * 创建用户连接并返回 SseEmitter
     * @param key userId
     * @return SseEmitter
     */
    public static SseEmitter connect(String key) {
        if (sseEmitterMap.containsKey(key)) {
            return sseEmitterMap.get(key);
        }

        try {
            // 设置超时时间,0表示不过期。默认30秒
            SseEmitter sseEmitter = new SseEmitter(0L);
            // 注册回调
            sseEmitter.onCompletion(completionCallBack(key));
            sseEmitter.onError(errorCallBack(key));
            sseEmitter.onTimeout(timeoutCallBack(key));
            sseEmitterMap.put(key, sseEmitter);
            // 数量+1
            count.getAndIncrement();
            return sseEmitter;
        } catch (Exception e) {
            log.info("创建新的SSE连接异常,当前连接Key为:{}", key);
        }
        return null;
    }

    /**
     * 给指定用户发送消息
     * @param key userId
     * @param message 消息内容
     */
    public static void sendMessage(String key, String message) {
        if (sseEmitterMap.containsKey(key)) {
            try {
                sseEmitterMap.get(key).send(message);
            } catch (IOException e) {
                log.error("用户[{}]推送异常:{}", key, e.getMessage());
                remove(key);
            }
        }
    }

    /**
     * 向同组人发布消息,要求:key + groupId
     * @param groupId 群组id
     * @param message 消息内容
     */
    public static void groupSendMessage(String groupId, String message) {
        if (!CollectionUtils.isEmpty(sseEmitterMap)) {
            sseEmitterMap.forEach((k, v) -> {
                try {
                    if (k.startsWith(groupId)) {
                        v.send(message, MediaType.APPLICATION_JSON);
                    }
                } catch (IOException e) {
                    log.error("用户[{}]推送异常:{}", k, e.getMessage());
                    remove(k);
                }
            });
        }
    }

    /**
     * 广播群发消息
     * @param message 消息内容
     */
    public static void batchSendMessage(String message) {
        sseEmitterMap.forEach((k, v) -> {
            try {
                v.send(message, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                log.error("用户[{}]推送异常:{}", k, e.getMessage());
                remove(k);
            }
        });
    }

    /**
     * 群发消息
     * @param message 消息内容
     * @param ids 用户id集合
     */
    public static void batchSendMessage(String message, Set<String> ids) {
        ids.forEach(userId -> sendMessage(userId, message));
    }

    /**
     * 移除连接
     * @param key userId
     */
    public static void remove(String key) {
        sseEmitterMap.remove(key);
        // 数量-1
        count.getAndDecrement();
        log.info("移除连接:{}", key);
    }

    /**
     * 获取当前连接信息
     * @return Map
     */
    public static List<String> getIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }

    /**
     * 获取当前连接数量
     * @return int
     */
    public static int getCount() {
        return count.intValue();
    }

    private static Runnable completionCallBack(String key) {
        return () -> {
            log.info("结束连接:{}", key);
            remove(key);
        };
    }

    private static Runnable timeoutCallBack(String key) {
        return () -> {
            log.info("连接超时:{}", key);
            remove(key);
        };
    }

    private static Consumer<Throwable> errorCallBack(String key) {
        return throwable -> {
            log.info("连接异常:{}", key);
            remove(key);
        };
    }
}

然后用工具类实现几个接口,以便客户端实现订阅和服务端主动推送消息。

@RequestMapping("/sse")
@RestController
@Slf4j
@CrossOrigin
public class SSEEmitterController {

    /**
     * 创建连接
     * @param id 用户id
     * @return SseEmitter
     */
    @GetMapping(path = "/subscribe/{id}", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter subscribe(@PathVariable String id) {
        return SseEmitterUtils.connect(id);
    }


    /**
     * 向指定用户推送消息
     * @param id 用户id
     * @param content 推送内容
     */
    @PostMapping(path = "/push")
    public void push(String id, String content) {
        SseEmitterUtils.sendMessage(id, content);
    }


    /**
     * 向指定群组推送消息
     * @param groupId 群组id
     * @param content 推送内容
     */
    @PostMapping(path = "/groupPush")
    public void groupPush(String groupId, String content) {
        SseEmitterUtils.groupSendMessage(groupId, content);
    }


    /**
     * 广播消息
     * @param content 推送内容
     */
    @PostMapping(path = "/pushAll")
    public void pushAll(String content) {
        SseEmitterUtils.batchSendMessage(content);
    }

    /**
     * 关闭连接
     * @param id 用户id
     * @param request 请求
     */
    @DeleteMapping(path = "/close/{id}")
    public void close(@PathVariable String id, HttpServletRequest request) {
        request.startAsync();
        SseEmitterUtils.remove(id);
    }

}

最后你可以使用下面的 HTML 页面进行测试

<!DOCTYPE html>
<html lang="en">

<head>
    <title>SSE</title>
    <meta charset="UTF-8">
    <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js" type="text/javascript"></script>
    <script>
        if (window.EventSource) {

            let sources = [];
            // 创建连接  
            for (let i = 1; i < 10; i++) {
                let id = "id_" + i;
                sources[i] = new EventSource('http://localhost:8008/sse/subscribe/' + id);
            }

            /** 
             * 连接一旦建立,就会触发open事件 
             * 另一种写法:source.onopen = function (event) {} 
             */
            // 连接打开事件
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('open', function (e) {
                    setMessageInnerHTML(id + "连接打开")
                    console.log(id + "连接打开");
                });
            });

            /** 
             * 客户端收到服务器发来的数据 
             * 另一种写法:source.onmessage = function (event) {} 
             */
            // 消息事件 
            sources.forEach(source => {
                let id = source.url.split('/').pop();
                source.addEventListener('message', function (e) {
                    setMessageInnerHTML(id + "收到消息:" + e.data)
                    console.log(id + "收到消息:" + e.data);
                });
            });

            /** 
             * 如果发生通信错误(比如连接中断),就会触发error事件 
             * 另一种写法:source.onerror = function (event) {} 
             */
            // 错误处理
            sources.forEach(source => {

                let id = source.url.split('/').pop();

                source.addEventListener('error', function (e) {

                    if (e.readyState === EventSource.CLOSED) {
                        setMessageInnerHTML(id + "连接关闭")
                        console.log(id + "连接关闭");
                    } else {
                        setMessageInnerHTML(id + "连接错误:", e)
                        console.log(id + "连接错误:", e);
                    }

                });

            });
        } else {
            setMessageInnerHTML("浏览器不支持SSE");
        }

        // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据 
        window.onbeforeunload = function () {
            source.close();
            const httpRequest = new XMLHttpRequest();
            httpRequest.open('GET', 'http://localhost:8008/sse/close/' + id, true);
            httpRequest.send();
            console.log("close");
        };

        // 将消息显示在网页上 
        function setMessageInnerHTML(innerHTML) {
            $("#contentDiv").append("<br/>" + innerHTML);
        } 
    </script>
</head>

<body>
    <div>
        <div>
            <div id="contentDiv" style="height:800px; width:1000px; overflow:scroll; background:#ccc;">
            </div>
        </div>
    </div>
</body>

</html>

然后调用接口进行测试

image

可以看到,浏览器最终只保留了 6 个连接,其它的都被丢弃掉。

MQTT#

要实现 mqtt 需要在服务端添加下面的依赖

<!--mqtt依赖包-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>

然后需要安装中间件,比如EMQX,又或者直接使用 RabbitMq。需要注意的是 rabbitMq 不支持 QOS2 级别的消息等级

image

关于 mqtt 的 qos 消息质量等级可以看下表的解释

QoS 级别描述适用场景
0最多一次交付,消息不会被确认或重传适用于不重要的数据传输,如传感器数据等
1至少一次交付,确保消息至少被传递一次,但可能重复传递适用于需要确保消息传递,但允许重复传递的场景
2只有一次交付,确保消息仅被传递一次,不允许重复传递适用于需要确保消息精确传递,且不允许重复传递的场景

下面以使用 rabbitmq 中间件为例
配置 mqtt 协议信息

server:
  port: 8008

spring:
  application:
    name: mqtt测试项目
  mqtt:
    url: tcp://127.0.0.1:1883
    username: guest
    password: guest
    clientId: serverClientId
      #发布的主题--MQTT-默认的消息推送主题,实际可在调用接口时指定
    pubTopic: testTopic
      #订阅的主题
    subTopic: gps-topic,oil-topic,broadcast-topic,fault-topic
    completionTimeout: 3000

然后创建配置文件的实体类映射

@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {

    /**
     * RabbitMQ连接用户名
     */
    private String username;
    /**
     * RabbitMQ连接密码
     */
    private String password;
    /**
     * 推送主题
     */
    private String pubTopic;
    /**
     * RabbitMQ的MQTT连接地址
     */
    private String url;

    /**
     * RabbitMQ的MQTT连接客户端ID
     */
    private String clientId;

    /**
     * 订阅主题
     */
    private String subTopic;

    /**
     * 超时时间
     */
    private Integer completionTimeout;
}

接着创建一个消费者

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttConsumer {

    private final MqttProperties mqttProperties;

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getUrl(), mqttProperties.getClientId(),
                        mqttProperties.getSubTopic().split(","));
        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        //设置消息质量:0->至多一次;1->至少一次;2->只有一次
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                MessageHeaders headers = message.getHeaders();
                log.info("headers: {}", headers);
                String topic = Objects.requireNonNull(headers.get(MqttHeaders.RECEIVED_TOPIC)).toString();
                log.info("订阅主题为: {}", topic);
                String[] topics = mqttProperties.getSubTopic().split(",");
                for (String t : topics) {
                    if (t.equals(topic)) {
                        log.info("订阅主题为:{};接收到该主题消息为:{}",topic, message.getPayload());
                    }
                }
            }

        };
    }
}

再创建一个生产者

@Configuration
@Slf4j
@RequiredArgsConstructor
public class MqttProvider {

    private final MqttProperties mqttProperties;

    @Bean
    @SneakyThrows
    public MqttPahoClientFactory mqttClientFactory() {

        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttProperties.getUrl()});
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
        // 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,
        // 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息
        options.setCleanSession(false);
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(10);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);
        // 断开后重连,但这个方法并没有重新订阅的机制
        // 在尝试重新连接之前,它将首先等待1秒,对于每次失败的重新连接尝试,延迟将加倍,直到达到2分钟,此时延迟将保持在2分钟。
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getPubTopic());
        // 设置推送时的消息质量:0->至多一次;1->至少一次;2->只有一次
        // 在rabbitmq 中 qos2会被降级为qos1
        messageHandler.setDefaultQos(1);
        // 设置是否保留消息,设置为true时保留消息会在每次重连时发送
        // 除非发送一条内容空白的新的保留信息才能清除
        messageHandler.setDefaultRetained(false);
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

再创建一个网关接口用于发送消息

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

    /**
     * 发送消息到默认topic
     */
    void sendToMqtt(String payload);

    /**
     * 发送消息到指定topic
     */
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

    /**
     * 发送消息到指定topic并设置QOS
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

最后同样编写一个接口用于发送消息

@RestController
@RequiredArgsConstructor
@RequestMapping("/mqtt")
public class MqttController {

    private final MqttGateway mqttGateway;

    @PostMapping("/sendToDefaultTopic")
    public void sendToDefaultTopic(String payload) {
        mqttGateway.sendToMqtt(payload);
    }

    @PostMapping("/sendToTopic")
    public void sendToTopic(String payload, String topic) {
        mqttGateway.sendToMqtt(payload, topic);
    }
}

可以使用第三方 mqtt 软件进行测试,例如mqttx
安装完成后,打开软件创建一个连接

image

需要注意的是

  1. 每次连接的 ClinetId 应该是不同且唯一的,相同的 ClinetId 可能会因连接挤掉而丢失消息
  2. 服务器连接协议可以是 tcp,也可以是 mqtt
  3. 如果是 rabbitmq,需要配置用户的权限
  4. 如果是 rabbitmq,版本请选择 3.1.1
  5. 如果想要消费者离线后仍能收到生产者的消息,请关闭 Clean Session。

连接成功后,调用接口发送消息订阅者就能收到了

image

springboot 整合 mqtt 实现消息发送和消费,以及客户端断线重连之后的消息恢复

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.