订阅消息的工具类
import cn.ac.iie.securer.bean.properties.SubscriberConfiguration; import cn.ac.iie.securer.util.AESUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.time.LocalDateTime; /** * @Author: stsahana * @Date: 2018-12-13 15:46 **/ @Slf4j @Configuration public class MqttInbountUtil { final int COMPLETION_TIMEOUT = 10000; @Autowired SubscriberConfiguration subscriberConfiguration;//配置类 /** * 订阅者工厂类 * @return */ @Bean public MqttPahoClientFactory mqttSubscriberFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(subscriberConfiguration.getBroker().split(",")); options.setUserName(subscriberConfiguration.getUsername()); options.setPassword(subscriberConfiguration.getPassword().toCharArray()); options.setCleanSession(false); options.setWill("willTopic", "willTopicMessage".getBytes(), 2, false); factory.setConnectionOptions(options); return factory; } /** * 创建一个channel * @return */ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * 由工厂类创建一个实例 * @return */ @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(subscriberConfiguration.getBroker(), subscriberConfiguration.getClientId(), mqttSubscriberFactory(), subscriberConfiguration.getTopic().split(",")); adapter.setCompletionTimeout(COMPLETION_TIMEOUT); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(subscriberConfiguration.getQos()); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * 使用创建的通道收消息 * @return */ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { //处理逻辑 log.debug("处理请求" + message.getPayload().toString()); } }; } }生产者
/** * @Author: stsahana * @Date: 2018-12-13 16:01 **/ @Component public class MqttOutboundUtil { @Autowired PublisherConfiguration publisherConfiguration; @Bean public MqttPahoClientFactory mqttPublisherFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(publisherConfiguration.getBroker().split(",")); options.setUserName(publisherConfiguration.getUsername()); options.setPassword(publisherConfiguration.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(publisherConfiguration.getClientId(), mqttPublisherFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(publisherConfiguration.getTopic()); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } } /** * @Author: stsahana * @Date: 2018-12-13 16:07 **/ @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway {//使用mqttOutboundChannel来发送数据 void sendToMqtt(String data); } @AuthoWired MyGateway gateway; gateway.sendToMqtt("hello world");