由于最近公司在开发一款后台与安卓的更新系统,经过再三研究之后,也是选择Mqtt这个目前流行的框架。为了能够让项目运营起来,最终虽说是选择ActiveMQ。
但在这个过程中,也是发现Apollo作为服务器也是相当不错。当然对于后者已经被apace放弃,不过今天还是和大家整理一下SpringBoot+MQTT+apollo实现订阅发布功能的全过程。
对于项目首先需要用到的前提东西,比如Apollo如何下载,以及MQTT测试工具在这里就不多说。如果真的不懂私聊Damon吧,在这里就不浪费时间。
对于项目,首先你所需要引入maven包:

pom.xml

<!-- MQTT -->
<dependency>
<groupId>
org.springframework.integration
</groupId>
<artifactId>
spring-integration-stream
</artifactId>
</dependency>
<dependency>
<groupId>
org.springframework.integration
</groupId>
<artifactId>
spring-integration-mqtt
</artifactId>
</dependency>
其目标就是将MQTT用在项目组中
接着就是项目yml文件的配置,使用properties,以葫芦画瓢就行了:

applicaiton.yml

mqtt:

  username: admin

  password: password

  host-url: tcp://127.0.0.1:8161  
# 你自己服务器的地址和端口,这个需要改
  clientID: test1       
# 这个改不改随意,但不同的客户端肯定不能一样
  default-topic: home/garden/fountain      
# 默认主题
  timeout: 100

  keepalive: 100


# Tomcat
server:

  tomcat:

    uri-encoding: UTF-8

    max-threads: 1000

    min-spare-threads: 30

  port: 8088

注意host-url,这就是你apollo的地址
来到第三步,此时就是项目内的文件:

MqttConfig文件

@Component
@ConfigurationProperties
(
"mqtt"
)

@Setter
@Getter
publicclassMqttConfig
{

@Autowired
private
 MqttPushClient mqttPushClient;


/**

     * 用户名

     */

// @Value("username")
private
 String username;

/**

     * 密码

     */

private
 String password;

/**

     * 连接地址

     */

private
 String hostUrl;

/**

     * 客户Id

     */

private
 String clientID;

/**

     * 默认连接话题

     */

private
 String defaultTopic;

/**

     * 超时时间

     */

privateint
 timeout;

/**

     * 保持连接数

     */

privateint
 keepalive;


@Bean
public MqttPushClient getMqttPushClient()
{

        System.out.println(
"hostUrl: "
+ hostUrl);

        System.out.println(
"clientID: "
+ clientID);

        System.out.println(
"username: "
+ username);

        System.out.println(
"password: "
+ password);

        System.out.println(
"timeout: "
+timeout);

        System.out.println(
"keepalive: "
+ keepalive);

        mqttPushClient.connect(hostUrl, clientID, username, password, timeout, keepalive);

// 以/#结尾表示订阅所有以test开头的主题
        mqttPushClient.subscribe(defaultTopic, 
0
);

return
 mqttPushClient;

    }

}

目的就是配置所对应的消息
第四步就是发布以及订阅等功能:

MqttPushClient

@Component
publicclassMqttPushClient
{

privatestaticfinal
 Logger logger = LoggerFactory.getLogger(MqttPushClient
.class)
;


@Autowired
private
 PushCallback pushCallback;


privatestatic
 MqttClient client;


privatestatic MqttClient getClient()
{

return
 client;

    }


privatestaticvoidsetClient(MqttClient client)
{

        MqttPushClient.client = client;

    }


/**

     * 客户端连接

     *

     * 
@param
 host      ip+端口

     * 
@param
 clientID  客户端Id

     * 
@param
 username  用户名

     * 
@param
 password  密码

     * 
@param
 timeout   超时时间

     * 
@param
 keepalive 保留数

     */

publicvoidconnect(String host, String clientID, String username, String password, int timeout, int keepalive)
{

        MqttClient client;

try
 {

            client = 
new
 MqttClient(host, clientID, 
new
 MemoryPersistence());

            MqttConnectOptions options = 
new
 MqttConnectOptions();

            options.setCleanSession(
true
);

            options.setUserName(username);

            options.setPassword(password.toCharArray());

            options.setConnectionTimeout(timeout);

            options.setKeepAliveInterval(keepalive);

            MqttPushClient.setClient(client);

try
 {

                client.setCallback(pushCallback);

                client.connect(options);

            } 
catch
 (Exception e) {

                e.printStackTrace();

            }

        } 
catch
 (Exception e) {

            e.printStackTrace();

        }

    }


/**

     * 发布

     *

     * 
@param
 qos         连接方式

     * 
@param
 retained    是否保留

     * 
@param
 topic       主题

     * 
@param
 pushMessage 消息体

     */

publicvoidpublish(int qos, boolean retained, String topic, String pushMessage)
{

        MqttMessage message = 
new
 MqttMessage();

        message.setQos(qos);

        message.setRetained(retained);

        message.setPayload(pushMessage.getBytes());

        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);

if
 (
null
 == mTopic) {

            logger.error(
"topic not exist"
);

        }

        MqttDeliveryToken token;

try
 {

            token = mTopic.publish(message);

            token.waitForCompletion();

        } 
catch
 (MqttPersistenceException e) {

            e.printStackTrace();

        } 
catch
 (MqttException e) {

            e.printStackTrace();

        }

    }


/**

     * 订阅某个主题

     *

     * 
@param
 topic 主题

     * 
@param
 qos   连接方式

     */

publicvoidsubscribe(String topic, int qos)
{

        logger.info(
"开始订阅主题"
 + topic);

try
 {

            MqttPushClient.getClient().subscribe(topic, qos);

        } 
catch
 (MqttException e) {

            e.printStackTrace();

        }

    }

}

订阅主题以及发布的方式等内容更多编写
最后在搞个测试看看我们的结果是否正确:

TestController

@RestController
@RequestMapping
(
"/"
)

publicclassTestController
{


@Autowired
private
 MqttPushClient mqttPushClient;


@GetMapping
(value = 
"/publishTopic"
)

public String publishTopic()
{

        String topicString = 
"home/garden/fountain"
;

        mqttPushClient.publish(
0
false
, topicString, 
"测试一下发布消息"
);

return"ok"
;

    }

// 发送自定义消息内容(使用默认主题)
@RequestMapping
(
"/publishTopic/{data}"
)

public String test1(@PathVariable("data") String data) 
{

        String topicString = 
"home/garden/fountain"
;

        mqttPushClient.publish(
0
,
false
,topicString, data);

return"ok"
;

    }


// 发送自定义消息内容,且指定主题
@RequestMapping
(
"/publishTopic/{topic}/{data}"
)

public String test2(@PathVariable("topic") String topic, @PathVariable("data") String data) 
{

        mqttPushClient.publish(
0
,
false
,topic, data);

return"ok"
;

    }

}

如此一来就OK!
你可以使用MQTT.fx进行测试。用Postman发出,就能够查看最终的结果。在这里,因为时间的原因就不多说,有啥有趣的问题,咱们可以一同探讨。
希望你希望,Damon将会不断的分享各种有趣的开发小故事给大家娱乐。下一期,或在POI实现导出导入或者是ActiveMQ进行选择。
 关注公众号:Java后端编程,回复下面关键字 
要Java学习完整路线,回复  路线 
缺Java入门视频,回复 视频 
要Java面试经验,回复  面试 
缺Java项目,回复: 项目 
进Java粉丝群: 加群 
PS:如果觉得我的分享不错,欢迎大家随手点赞、在看。
(完)
加我"微信获取一份 最新Java面试题资料
请备注:666不然不通过~
最近好文
最近面试BAT,整理一份面试资料Java面试BAT通关手册,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。
获取方式:关注公众号并回复 java 领取,更多内容陆续奉上。
明天见(。・ω・。)ノ♡
(感谢阅读,希望对你所有帮助)
blog.csdn.net/qq_41689567/article/details/106774202
继续阅读
阅读原文