java 使用mqtt

    xiaoxiao2025-01-22  52

    mqtt的搭建请看https://blog.csdn.net/qq_29350001/article/details/76680646

     

    <dependency>     <groupId>org.eclipse.paho</groupId>     <artifactId>mqtt-client</artifactId>     <version>0.4.0</version> </dependency>

    <!-- https://mvnrepository.com/artifact/org.fusesource.mqtt-client/mqtt-client --> <dependency>     <groupId>org.fusesource.mqtt-client</groupId>     <artifactId>mqtt-client</artifactId>     <version>1.14</version> </dependency> public class MQTTSubscribe implements MqttCallback {

        //public static final String HOST = "tcp://10.0.0.250:1884";     public static final String HOST = "tcp://127.0.0.1:61613";

        public static final String TOPIC = "gl";     private static final String clientid = "subscriber";     private MqttClient client;     private MqttConnectOptions options;     private String userName = "admin";     private String passWord = "password";     private ScheduledExecutorService scheduler;

        public void startReconnect() {         scheduler = Executors.newSingleThreadScheduledExecutor();         scheduler.scheduleAtFixedRate(new Runnable() {             public void run() {                 if (!client.isConnected()) {                     try {                         client.connect(options);                         System.out.println("重连成功");                     } catch (MqttSecurityException e) {                         e.printStackTrace();                     } catch (MqttException e) {                         e.printStackTrace();                     }                 }             }         }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);     }

        private void start() {         try {             client = new MqttClient(HOST, clientid, new MemoryPersistence());             options = new MqttConnectOptions();             options.setCleanSession(false);             options.setUserName(userName);             options.setPassword(passWord.toCharArray());             options.setConnectionTimeout(10);             options.setKeepAliveInterval(20);             client.setCallback(this); //            MqttTopic topic = client.getTopic(TOPIC); //            options.setWill(topic, "close".getBytes(), 2, true);             client.connect(options);             int[] Qos = {2};             String[] topic1 = {TOPIC};             client.subscribe(topic1, Qos);         } catch (Exception e) {             e.printStackTrace();         }     }

        public static void main(String[] args) throws MqttException {         MQTTSubscribe client = new MQTTSubscribe();         client.start();     }

        public void connectionLost(Throwable cause) {         startReconnect();     }

        @Override     public void deliveryComplete(IMqttDeliveryToken token) {         try {             System.out.println("deliveryComplete---------" + token.isComplete());         } catch (Exception e) {             e.printStackTrace();         }     }

        @Override     public void messageArrived(String topic, MqttMessage message) throws Exception {         System.out.println("Message arrived on topic:" + topic);         System.out.println("Message arrived on QoS:" + message.getQos());         System.out.println("Message arrived on content:" + new String(message.getPayload()));     } }

    public class MQTTPublish implements MqttCallback {

        //public static final String HOST = "tcp://10.0.0.250:1884";     public static final String HOST = "tcp://127.0.0.1:61613";

        public static final String TOPIC = "gl";     private static final String clientid = "publisher";

        private static final String str = "法国恢复和";     private static MqttClient client;

        private MqttTopic topic;     private String userName = "admin";     private String passWord = "password";

        private MqttMessage message;     MqttConnectOptions options = new MqttConnectOptions();

        private ScheduledExecutorService scheduler;

        public void startReconnect() {         scheduler = Executors.newSingleThreadScheduledExecutor();         scheduler.scheduleAtFixedRate(new Runnable() {             public void run() {                 if (!client.isConnected()) {                     try {                         client.connect(options);                         System.out.println("重连成功");                     } catch (MqttSecurityException e) {                         e.printStackTrace();                     } catch (MqttException e) {                         e.printStackTrace();                     }                 }             }         }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);     }

        public MQTTPublish() throws MqttException {         client = new MqttClient(HOST, clientid, new MemoryPersistence());         connect();     }

        private void connect() {         options.setCleanSession(false);         options.setUserName(userName);         options.setPassword(passWord.toCharArray());         options.setConnectionTimeout(10);         options.setKeepAliveInterval(20);         try {             client.setCallback(this);

                client.connect(options);             topic = client.getTopic(TOPIC);         } catch (Exception e) {             e.printStackTrace();         }     }

        public void publish(MqttMessage message) throws MqttPersistenceException, MqttException {         MqttDeliveryToken token = topic.publish(message);         token.waitForCompletion();         System.out.println("Token is complete:" + token.isComplete());     }

        public static void main(String[] args) throws MqttException {         MQTTPublish mqttpub = new MQTTPublish();         mqttpub.message = new MqttMessage();         mqttpub.message.setQos(2);//设置qos,决定消息到达次数,如果是1,消息重复的不会到达         mqttpub.message.setRetained(true);         mqttpub.message.setPayload(str.getBytes());         mqttpub.publish(mqttpub.message); //        System.out.println("Ratained state:" + mqttpub.message.isRetained()); // //        client.disconnect(); //        System.out.println("Disconnected"); //        System.exit(0);     }

        @Override     public void connectionLost(Throwable arg0) {         startReconnect();     }

        public void deliveryComplete(IMqttDeliveryToken token) {         try {             System.out.println("deliveryComplete---------" + token.isComplete());         } catch (Exception e) {             e.printStackTrace();         }     }

        @Override     public void messageArrived(String arg0, MqttMessage arg1) throws Exception {         // TODO Auto-generated method stub

        } }

    原文:https://blog.csdn.net/qq_37838223/article/details/80495245  

    最新回复(0)