当前位置:主页 > java教程 > Spring integration在Springboot集成Mqtt

如何使用Spring integration在Springboot中集成Mqtt详解

发布:2023-03-25 10:45:01 59


为找教程的网友们整理了相关的编程文章,网友戈文敏根据主题投稿了本篇教程内容,涉及到spring、integration集成mqtt、集成mqtt、mqtt通信、Spring integration在Springboot集成Mqtt相关内容,已被718网友关注,下面的电子资料对本篇知识点有更加详尽的解释。

Spring integration在Springboot集成Mqtt

前言

接触Mqtt最开始用的是SpingMVC 引入 Mqtt Client Jar包自己实现收发重连等操作,而现在则使用Spring Integration 进行集成。其复杂度和代码简洁度天差地别。所以特别写下此文章算是对自己的一个记录。也希望观看到此文章的人少走一点弯路,哪怕只有一步。

当然有精力的话最好还是使用Client自己实现一遍对自己来说也是一种提高,生产环境,我建议使用Spring Integration 集成

关于Spring Intergration

介绍关于Spring Integration 的一些概念性的东西,如果只想知道怎么集成可跳过该部分

Spring Integration provides an extension of the Spring programming model to support the well known Enterprise Integration Patterns. It enables lightweight messaging within Spring-based applications and supports integration with external systems through declarative adapters. Those adapters provide a higher level of abstraction over Spring’s support for remoting, messaging, and scheduling.

大致意思是

  • Spring Integration 为许多著名的企业级集成方案提供了扩展接口
  • 它能够在基于Sping的应用中进行轻量级消息传递
  • 支持声明式适配器与发布系统进行集成(见Mqtt send消息的实现)
  • 这些适配器在 Spring对远程调用,消息传递和系统调度提供了更高层次的抽象

Spring Intergration核心组件

Message(消息)

Message是对消息的包装,在Spring 系统中传递的任何消息都会被包装为Message。可以理解为是Spring Integration消息传递的基本单位

Message Channel(消息管道)

消息管道:Message 在Message Channel中进行传递,生产者向管道中投递消息,消费者从管道中取出消息。Spring Integration 支持两种消息传递模型,point-to-point(点对点模型),Publish-subscribe(发布订阅模型)有多种管道类型。

本次使用点对点模型,消息管道类型DirectChannel

Message Endpoint(消息切入点)

消息切入点:消息在管道中流动那必定会有某些流入或流出的点亦或是在某个位置(即特定函数)需要对消息进行处理,过滤,格式转换等。这些点即为Message Endpoint(实际为某些处理函数),例如消息发送,消息接收都是Message Endpoint。

Message Transformer(消息转化器)

消息转化器:是将消息进行特定转换例如将一个 Object 序列化为 Json 字符串

Message Filter

消息过滤器,过滤掉特定消息。例如在管道中发送的含username 和 age 属性的 User 对象,如果当前消息(一个User实例的包装)的age < 18则将其过滤掉,那么处在过滤器之后的消费者将无法接收到 age < 18的User对象。当然过滤条件不仅是消息负载的属性,也可以是消息本身的属性。

Message Router(消息路由)

消息路由:向管道投递消息时可由消息路由根据路由规则选择投递给那个管道

Splitter(分割器)

分割器:它从一个输入管道接收一条消息并将其分割为多条消息,再将每条消息发送到不同的输出管道上

Aggregator(聚合器)

聚合器:与分割器功能刚好相反

Service Activator

Service Activator: 它是一个用于将系统服务实例接入到消息系统的泛型切入点,该切入点必须配置输入管道。其返回值可是消息类型也可以是一个消息处理器,当返回值为消息类型时需要指定输出管道,即在该切入点对消息加工处理后再发送到指定的输出管道,如果返回值为消息处理器。那么消息交由消息处理器进行处理。下文中会为Mqtt消息出站配置Service Activator并且 返回值为消息处理器

Channel Adapter(管道适配器)

管道适配器:因为外部协议有无数种,消息适配器则用于连接不同协议的外部系统。从外部系统读入数据并对数据进行处理最终与Spring Integration 内部的消息系统适配。例如将要进行Mqtt集成,那么就需要一个Mqtt的管道适配器,事实上也确实有一个,下文中将会看到。

开始集成

依赖管理工具使用Gardle

引入spring-integration-mqtt依赖

implementation "org.springframework.integration:spring-integration-mqtt:5.4.6"

创建Mqtt配置类

@Configuration
public class MqttConfig {
    /**
     *  以下属性将在配置文件中读取
     **/
    //mqtt Broker 地址
    private String[] uris;
    //连接用户名
    private String username;
    //连接密码
    private String password;
    //入站Client ID
    private String inClientId;
    //出站Client ID
    private String outClientId;
    //默认订阅主题
    private String defaultTopic;

    public void setUris(String[] uris) {
        this.uris = uris;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public void setInClientId(String inClientId) {
        this.inClientId = inClientId;
    }

    public void setOutClientId(String outClientId) {
        this.outClientId = outClientId;
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }
}

这里需要注意为什么创建两个client ID,Spring Integration 在集成的时候入站与出站消息处理并不使用同一个连接,所以如果clien ID相同,将会出现Mqtt反复重连现象,实为 mqtt 出入站连接交替踢对方下线。

修改配置文件 application.yml

mqtt:
  uris: tcp://ip:port
  username: user
  password: pwd
  in-client-id: ${random.value} # 随机值,使出入站 client ID 不同
  out-client-id: ${random.value}
  default-topic: defaultTopic

在MqttConfig上使用注解 @ConfigurationProperties(prefix = "mqtt")将配置文件中属性注入到MqttConfig中,但别忘记在启动类上使用@EnableConfigurationProperties启用属性注入。

@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {
    .....
}

创建三个管道

这三个管道分别用于处理入站消息,出站消息,错误消息

@Bean
public MessageChannel mqttOutboundChannel(){
    return new DirectChannel();
}
@Bean
public MessageChannel mqttInboundChannel(){
    return new DirectChannel();
}
@Bean
public MessageChannel errorChannel(){
    return new DirectChannel();
}

添加 Mqtt 客户端工厂

@Bean
public MqttPahoClientFactory mqttPahoClientFactory(){
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(uris);
    options.setUserName(username);
    options.setPassword(password.toCharArray());
    factory.setConnectionOptions(options);
    return factory;
}

添加 Mqtt 管道适配器

@Bean
public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
    return new MqttPahoMessageDrivenChannelAdapter(inClientId,factory,defaultTopic);
}

添加消息生产者

@Bean
public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    //入站投递给入站管道
    adapter.setOutputChannel(mqttInboundChannel());
    adapter.setErrorChannel(errorChannel());
    adapter.setQos(0);
    return adapter;
}

添加出站处理器

出站处理器是一个Service Activator

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
    MqttPahoMessageHandler handler =
            new MqttPahoMessageHandler(outClientId,factory);
    handler.setAsync(true);
    handler.setConverter(new DefaultPahoMessageConverter());
    handler.setDefaultTopic(defaultTopic);
    return handler;
}

添加消息接收器

通过前文的配置,当Mqtt 订阅主题产生消息时会通过 MessageProducer(本例中是一个管道适配器)将消息投递到入站管道中,所以当需要接收并处理Mqtt消息时只需要从入站管道中取出消息即可。取出消息即可使用前文的 Endpoint 本例使用Service Activator

创建一个接收类,自定义任意类型。重点是使用其内部的方法,将其注册为Endpoint

@Component
public class Receiver {
    @Bean
    //使用ServiceActivator 指定接收消息的管道为 mqttInboundChannel,投递到mqttInboundChannel管道中的消息会被该方法接收并执行
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
        return message -> {
            System.out.println(message.getPayload());
        };
    }
}

至此,整个服务即可接收Mqtt broker的消息了。默认接收的主题为配置文件中指定的 "defaultTopic"。

添加消息发送器

那么消息发送器无疑是向出站管道投递消息即可。如何实现,Spring Integration 提供了 @MessagingGateway注解,该注解提供一个defaultRequestChannel属性用于指定出站管道。如下

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttSender {
    void sendToMqtt(String text);

    void sendWithTopic(@Header(MqttHeaders.TOPIC) String topic, String text);

    void sendWithTopicAndQos(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String text);
}

此接口内函数参数将作为消息的负载被包装成为消息并投递到出站管道中。同时可以看到,方法参数可以注解的方式自定义消息发送的 topic qos retain 等属性。

至此集成完成,用户可调用MqttSender发布消息或是冲Receiver中接收并处理消息

自定义编解码器

前文中添加消息生产者和添加出站处理器代码中都可以看到setConverter(new DefaultPahoMessageConverter());方法,该方法用与对消息负载进行编解码。多数情况下我们的消息都是可编码的。我在消息传递过程中使用的是json编码。下面我将以json编码为例,展示如何自定义编解码器

//协议对象
public class User {
    private String username;
    private String password;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

实现自己的编解码器

代码如下,详见注释

@Component
public class AlMingConverter implements MqttMessageConverter {
    private final  static Logger log = LoggerFactory.getLogger(AlMingConverter.class);
    private int defaultQos = 0;
    private boolean defaultRetain = false;
    ObjectMapper om = new ObjectMapper();
    //入站消息解码
    @Override
    public Message<User> toMessage(String topic, MqttMessage mqttMessage) {
        
        User protocol = null;
        try {
            protocol = om.readValue(mqttMessage.getPayload(), User.class);
        } catch (IOException e) {
            if (e instanceof JsonProcessingException) {
                System.out.println();
                log.error("Converter only support json string");
            }
        }
        assert protocol != null;
        MessageBuilder<User> messageBuilder = MessageBuilder
                .withPayload(protocol);
        //使用withPayload初始化的消息缺少头信息,将原消息头信息填充进去
        messageBuilder.setHeader(MqttHeaders.ID, mqttMessage.getId())
                .setHeader(MqttHeaders.RECEIVED_QOS, mqttMessage.getQos())
                .setHeader(MqttHeaders.DUPLICATE, mqttMessage.isDuplicate())
                .setHeader(MqttHeaders.RECEIVED_RETAINED, mqttMessage.isRetained());
        if (topic != null) {
            messageBuilder.setHeader(MqttHeaders.TOPIC, topic);
        }
        return messageBuilder.build();
    }
    //出站消息编码
    @Override
    public Object fromMessage(Message<?> message, Class<?> targetClass) {
        MqttMessage mqttMessage = new MqttMessage();
        String msg = null;
        try {
            msg = om.writeValueAsString(message.getPayload());
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        assert msg != null;
        mqttMessage.setPayload(msg.getBytes(StandardCharsets.UTF_8));
        //这里的 mqtt_qos ,和 mqtt_retained 由 MqttHeaders 此类得出不可以随便取,如需其他属性自行查找
        Integer qos = (Integer) message.getHeaders().get("mqtt_qos");
        mqttMessage.setQos(qos == null ? defaultQos : qos);
        Boolean retained = (Boolean) message.getHeaders().get("mqtt_retained");
        mqttMessage.setRetained(retained == null ? defaultRetain : retained);
        return mqttMessage;
    }
    //此方法直接拿默认编码器的来用的,照抄即可
    @Override
    public Message<?> toMessage(Object payload, MessageHeaders headers) {
        Assert.isInstanceOf(MqttMessage.class, payload,
                () -> "This converter can only convert an 'MqttMessage'; received: " + payload.getClass().getName());
        return this.toMessage(null, (MqttMessage) payload);
    }

    public void setDefaultQos(int defaultQos) {
        this.defaultQos = defaultQos;
    }

    public void setDefaultRetain(boolean defaultRetain) {
        this.defaultRetain = defaultRetain;
    }
}

修改MqttConfig

将自定义编解码器注入到MqttConfig中。

private final AlMingConverter alMingConverter;
@Autowired
public MqttConfig(AlMingConverter alMingConverter) {
    this.alMingConverter = alMingConverter;
}

将原来所有DefaultPahoMessageConverter实例更换为alMingConverter

...
adapter.setConverter(alMingConverter);
...
handler.setConverter(alMingConverter);

修改MqttSender

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttSender {
    void sendToMqtt(User user);

    void sendWithTopic(@Header(MqttHeaders.TOPIC) String topic, User user);

    void sendWithTopicAndQos(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, User user);
}

Receiver修改

此时Reveiver的消息负载虽然编译时为 Object ,但运行时为User可安全的进行 Object 到 User 的强转然后进行后续操作

@Component
public class Receiver {
    @Bean
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler handleMessage() {
        return message -> {
            User user = (User) message.getPayload();
            System.out.println(user.getUsername()+":"+user.getPassword());
        };
    }
}

动态修改订阅主题

前文中发布订阅的主题都是默认主题,即在配置文件中指定的主题。生产环境肯定不止使用一个主题。那么就需要运行时动态修改主题。Spring Integration也确实为此提供了支持。上文中向Bean容器中注册的管道适配器(MqttPahoMessageDrivenChannelAdapter)提供了addTopic removeTopic等方法可用于运行时修改主题。所以可以创建一个MqttService专门用于添加和删除主题。

但注意只有Spring Integration 4.1以上版本可以这么使用

@Service
public class MqttServiceImpl implements MqttService { //MqttService是自己定义的,仅包含如下方法均已重写
    MqttPahoMessageDrivenChannelAdapter adapter;
    @Autowired
    public MqttServiceImpl(MqttPahoMessageDrivenChannelAdapter adapter) {
        this.adapter = adapter;
    }

    @Override
    public void addTopic(String topic) {
        String[] topics = adapter.getTopic();
        if(!Arrays.asList(topics).contains(topic)){
            adapter.addTopic(topic,0);
        }
    }

    @Override
    public void removeTopic(String topic) {
        adapter.removeTopic(topic);
    }
}

增加或删除主题时,注入该服务调用addTopic,removeTopic即可

End&附录

最终MqttConfig完整代码

@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {
    /**
     *  以下属性将在配置文件中读取
     **/
    //mqtt Broker 地址
    private String[] uris;
    //连接用户名
    private String username;
    //连接密码
    private String password;
    //入站Client ID
    private String inClientId;
    //出站Client ID
    private String outClientId;
    //默认订阅主题
    private String defaultTopic;

    private final AlMingConverter alMingConverter;
    @Autowired
    public MqttConfig(AlMingConverter alMingConverter) {
        this.alMingConverter = alMingConverter;
    }

    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(uris);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(inClientId,factory,defaultTopic);
    }
    public void setUris(String[] uris) {
        this.uris = uris;
    }
    @Bean
    public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(alMingConverter);
        //入站投递的通道
        adapter.setOutputChannel(mqttInboundChannel());
        adapter.setErrorChannel(errorChannel());
        adapter.setQos(0);
        return adapter;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(MqttPahoClientFactory factory){
        MqttPahoMessageHandler handler =
                new MqttPahoMessageHandler(outClientId,factory);
        handler.setAsync(true);
        handler.setConverter(alMingConverter);
        handler.setDefaultTopic(defaultTopic);
        return handler;
    }
    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }
    @Bean
    public MessageChannel mqttInboundChannel(){
        return new DirectChannel();
    }
    @Bean
    public MessageChannel errorChannel(){
        return new DirectChannel();
    }
    public void setUsername(String username) {
        this.username = username;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public void setInClientId(String inClientId) {
        this.inClientId = inClientId;
    }

    public void setOutClientId(String outClientId) {
        this.outClientId = outClientId;
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }
}

最后

到此这篇关于如何使用Spring integration在Springboot中集成Mqtt的文章就介绍到这了,更多相关Spring integration在Springboot集成Mqtt内容请搜索码农之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持码农之家!


参考资料

相关文章

  • Springdoc替换swagger的实现步骤分解

    发布:2023-04-02

    最近在spring看到的,spring要对api文档动手了,有些人说swagger不好用,其实也没那么不好用,有人说代码还是有点侵入性,这倒是真的,我刚试了springdoc可以说还是有侵入性但是也可以没有侵入性,这就看你对文档有什么要求了


  • 解析Spring Boot实例通过接口安全退出

    发布:2020-03-17

    这篇文章主要介绍了Spring Boot 实例代码之通过接口安全退出的相关资料,需要的朋友可以参考下


  • SpringBoot中实现跨域的5种方式小结

    SpringBoot中实现跨域的5种方式小结

    发布:2023-01-24

    给网友们整理关于SpringBoot的教程,这篇文章主要介绍了SpringBoot 中实现跨域的5种方式小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧


  • Maven工程搭建spring boot+spring mvc+JPA的步骤详解

    发布:2019-11-15

    本篇文章主要介绍了Maven工程搭建spring boot+spring mvc+JPA的示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧


  • Spring Cloud Alibaba使用Sentinel实现接口限流的方法

    发布:2020-02-09

    这篇文章主要介绍了Spring Cloud Alibaba使用Sentinel实现接口限流,本文详细的介绍了Sentinel组件的用法以及接口限流,感兴趣的可以了解一下


  • SpringBoot生产快速禁用Swagger2的具体方法

    发布:2019-06-06

    这篇文章主要介绍了SpringBoot在生产快速禁用Swagger2的方法步骤,使用注解关闭Swagger2,避免接口重复暴露,非常具有实用价值,需要的朋友可以参考下


  • Springboot轻量级的监控组件SpringbootAdmin

    发布:2023-04-05

    这篇文章主要为大家介绍了Springboot轻量级的监控组件SpringbootAdmin使用示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪


  • spring中使用mybatis plus连接sqlserver的方法实现

    发布:2022-10-26

    为网友们分享了关于spring的教程,这篇文章主要介绍了spring中使用mybatis plus连接sqlserver的方法实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧


网友讨论