博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
在spring web中启动mqtt
阅读量:6839 次
发布时间:2019-06-26

本文共 13619 字,大约阅读时间需要 45 分钟。

hot3.png

基础配置,应用spring integration模块集成mqtt协议通道

applicationContext-mqtt.xml

    

mqtt客户端连接工厂,实现初始化配置类接口,目的是能够在启动时加载

MqttClientFactoryBean.java

package youway.service.mqtt;import org.eclipse.paho.client.mqttv3.IMqttClient;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttClientPersistence;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.beans.factory.FactoryBean;import org.springframework.beans.factory.InitializingBean;import org.springframework.util.Assert;import org.springframework.util.StringUtils;public class MqttClientFactoryBean implements InitializingBean,FactoryBean
{    private static String TCP_PROTOCOL = "tcp://";    private static String SSL_PROTOCOL = "ssl://";    private String protocol = TCP_PROTOCOL;    private boolean useSsl = false;    private String host; private int port = 1883;    private String clientId = buildClientId();    private MqttClientPersistence mqttClientPersistence;    private String username, password;    private MqttConnectOptions mqttConnectOptions;    private Boolean cleanSession = null;    public MqttClientFactoryBean() {    }        public MqttClientFactoryBean(String host) {        setup(host, this.username, this.password);    }    public MqttClientFactoryBean(String host, String u, String p) {        setup(host, u, p);    }    public MqttClientFactoryBean(String host, int port, String u, String p) {        setup(host, u, p);        this.setPort(port);    }    public void setup(String h, String u, String p) {        setHost(h);        setUsername(u);        setPassword(p);    }    public void setCleanSession(boolean cleanSession) {        this.cleanSession = cleanSession;    }    public void setPassword(String p) {        this.password = p;    }    public void setUsername(String u) {        this.username = u;    }    public void setMqttConnectOptions(MqttConnectOptions mqttConnectOptions) {        this.mqttConnectOptions = mqttConnectOptions;    }    public void setClientId(String c) {        this.clientId = c;    }    public void setProtocol(String protocol) {        this.protocol = protocol;    }    public void setUseSsl(boolean useSsl) {        this.useSsl = useSsl;    }    public void setHost(String host) {        this.host = host;    }        public String getHost() { return host; }    public void setPort(int port) {        this.port = port;    }    public void setMqttClientPersistence(MqttClientPersistence mqttClientPersistence) {        this.mqttClientPersistence = mqttClientPersistence;    }    @Override    public IMqttClient getObject() throws Exception {        String serverUri = buildServerUri();        MqttClient client = this.mqttClientPersistence == null ?                new MqttClient(serverUri, clientId) :                new MqttClient(serverUri, clientId, mqttClientPersistence);        MqttConnectOptions connectOptions = this.buildMqttConnectionOptions();        if (null != connectOptions) {            client.connect(connectOptions);        } else {            client.connect();        }        return client;    }    @Override    public Class
 getObjectType() {        return IMqttClient.class;    }    @Override    public boolean isSingleton() {        return true;    }    @Override    public void afterPropertiesSet() throws Exception {        Assert.hasText(this.protocol, String.format("you must specify a non-null protocol value (either %s or %s)", SSL_PROTOCOL, TCP_PROTOCOL));        Assert.isTrue(this.protocol.equalsIgnoreCase(SSL_PROTOCOL) || this.protocol.equalsIgnoreCase(TCP_PROTOCOL), "");        Assert.hasText(this.clientId, "your clientId must be non-null");        Assert.hasText(this.host, "you must specify a valid host");        Assert.isTrue(this.port > 0, "you must specify a valid port");        boolean connectionOptionsAreCorrectlySpecified =                this.mqttConnectOptions != null && weShouldCreateConnectionOptions();        Assert.isTrue(!connectionOptionsAreCorrectlySpecified,                String.format("you must specify an instance of %s for the 'buildMqttConnectionOptions' attribute" +                        " OR any of the following options ('cleanSession', 'username', 'password'), but not both!", MqttConnectOptions.class.getName()));    }    protected String buildServerUri() {        if (this.useSsl) {            this.protocol = SSL_PROTOCOL;        }        return this.protocol + this.host + ":" + this.port;    }    protected boolean weShouldCreateConnectionOptions() {        return (this.cleanSession != null || StringUtils.hasText(this.username) || StringUtils.hasText(this.password));    }    protected String buildClientId() {        String user = System.getProperty("user.name");        int totalLength = 23;        int userLength = user.length();        if (userLength > 10) {            user = user.substring(0, 10);        }        String clientId = user + System.currentTimeMillis();        Assert.isTrue(clientId.length() <= totalLength);        return clientId;    }    protected MqttConnectOptions buildMqttConnectionOptions() {        MqttConnectOptions connectOptions = null;        if (weShouldCreateConnectionOptions()) {            connectOptions = new MqttConnectOptions();            connectOptions.setCleanSession(this.cleanSession);            connectOptions.setUserName(this.username);            connectOptions.setPassword(this.password.toCharArray());        } else if (this.mqttConnectOptions != null) {            connectOptions = this.mqttConnectOptions;        }        return connectOptions;    }}

订阅方法类

MqttMessageHandler.java

package youway.service.mqtt;import java.io.UnsupportedEncodingException;import org.eclipse.paho.client.mqttv3.IMqttClient;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttSecurityException;import org.joda.time.DateTime;import org.springframework.integration.Message;import org.springframework.integration.handler.AbstractMessageHandler;import org.springframework.util.Assert;import youway.service.mqtt.msg.IMsgHandle;import youway.service.mqtt.msg.impl.AaaMsgHandle;import youway.service.mqtt.msg.impl.BbbMsgHandle;import com.fuzhi.util.SpringContextUtils;public class MqttMessageHandler extends AbstractMessageHandler implements		MqttCallback {	private IMqttClient client;	private String topic;	private boolean messagesRetained;	private QualityOfService qualityOfService = QualityOfService.AT_LEAST_ONCE;	public MqttMessageHandler() {	}	public MqttMessageHandler(IMqttClient client, String topic) {		setClient(client);		setTopic(topic);	}	@Override	protected void onInit() throws Exception {		Assert.notNull(this.client, String.format(				"you must specify a valid %s instance! ",				MqttClient.class.getName()));		Assert.hasText(this.topic, "you must specify a 'topic'");		Assert.notNull(this.qualityOfService, String.format(				"you must specify a non-null instance of the %s enum.",				QualityOfService.class.getName()));	}	public void setClient(IMqttClient client) {		this.client = client;	}	public void setQualityOfService(QualityOfService qualityOfService) {		this.qualityOfService = qualityOfService;	}	public void setMessagesRetained(boolean messagesRetained) {		this.messagesRetained = messagesRetained;	}	public void setTopic(String topic) {		this.topic = topic;	}	public String getTopic() {		return topic;	}	public IMqttClient getClient() {		return client;	}	@Override	protected void handleMessageInternal(Message
 message) throws Exception { Object payload = message.getPayload(); Assert.isTrue(payload instanceof byte[], String.format( "the payload for %s must be of type byte[]", getClass() .getName())); byte[] payloadOfBytes = (byte[]) payload; client.publish(this.topic, payloadOfBytes, this.qualityOfService.ordinal(), this.messagesRetained); // client.subscribe(MqttHeaders.TOPIC); client.subscribe("a"); client.subscribe("b"); client.setCallback(this); } @Override public void connectionLost(Throwable arg0) { // 处理重连 logger.debug("开始重连......"); String time = (new DateTime()).toString(); while (true) { try { client.connect(); break; } catch (MqttSecurityException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } MqttService.initMessage("重连:"+ time); } @Override public void deliveryComplete(IMqttDeliveryToken token) { // TODO Auto-generated method stub } @Override public void messageArrived(String topic, MqttMessage msg) { try { String content = new String(msg.getPayload(), "UTF-8"); logger.debug("主题:" + topic + "  内容:" + content); IMsgHandle msgHandle = getHandle(topic); msgHandle.handle(topic, content); logger.debug("..................消息处理完成................"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } /**  * 根据接收的topic,生成对应的消息处理对象  *   * @param topic  * @return  */ public IMsgHandle getHandle(String topic) { IMsgHandle msgHandle = null; switch (topic) { case "/a": msgHandle = SpringContextUtils.getBean("AaaMsgHandle", AaaMsgHandle.class); break; case "/b": msgHandle = SpringContextUtils.getBean("BbbMsgHandle", BbbMsgHandle.class); break; return msgHandle; }}

客户端总体调用处理

MqttService.java

package youway.service.mqtt;import static org.slf4j.LoggerFactory.getLogger;import org.eclipse.paho.client.mqttv3.IMqttClient;import org.slf4j.Logger;import org.springframework.context.annotation.AnnotationConfigApplicationContext;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.ImportResource;import org.springframework.integration.MessageChannel;import org.springframework.integration.support.MessageBuilder;public class MqttService {	private static final Logger logger = getLogger(MqttService.class);	private static AnnotationConfigApplicationContext context1 = new AnnotationConfigApplicationContext(MqttConfiguration.class);	private static AnnotationConfigApplicationContext context2 = new AnnotationConfigApplicationContext(MqttConfiguration.class);	private static AnnotationConfigApplicationContext context3 = new AnnotationConfigApplicationContext(MqttConfiguration.class);	private static final String TOPIC_CTRL_MSG = "/ctrlMsg";	private static final String TOPIC_HOST_MSG = "/hostMsg";	public static void initMessage(String content) {		reConnect(context1); //重连		logger.debug("------------initMessage start---------------");		MessageChannel messageChannel = context1.getBean("initMsgChannel",				MessageChannel.class);		messageChannel.send(MessageBuilder.withPayload(content.getBytes())				.build());	}	/**	 * 发送控制方案信息通道	 * 	 * @param content	 * @throws Exception	 */	public static void sendCtrlMsg(String content) {		reConnect(context2); //重连		logger.debug("------------sendCtrlMsg ---------------");		MessageChannel messageChannel = context2.getBean("ctrlMsgChannel",				MessageChannel.class);		messageChannel.send(MessageBuilder.withPayload(content.getBytes())				.build());	}	/**	 * 发送主机信息通道	 * 	 * @param content	 * @throws Exception	 */	public static void sendHostMsg(String content) {		reConnect(context3); //重连		logger.debug("------------sendHostMsg ---------------");		MessageChannel messageChannel = context3.getBean("hostMsgChannel",				MessageChannel.class);		messageChannel.send(MessageBuilder.withPayload(content.getBytes())				.build());	}	@Configuration	@ImportResource("classpath*:/applicationContext-mqtt.xml")	public static class MqttConfiguration {		@Bean		public MqttClientFactoryBean mqttClientFactoryBean() {			return new MqttClientFactoryBean("mqttx.gzdfbz.com"); 		}				@Bean		public MqttMessageHandler mqttInitMsg(IMqttClient client) {			return new MqttMessageHandler(client, "/status");		}		@Bean		public MqttSendingMessageHandler sendCtrlMsg(IMqttClient client) {			return new MqttSendingMessageHandler(client, TOPIC_CTRL_MSG);		}		@Bean		public MqttSendingMessageHandler sendHostMsg(IMqttClient client) {			return new MqttSendingMessageHandler(client, TOPIC_HOST_MSG);		}	}		/**	 * 重连	 * @throws InterruptedException	 */	private static void reConnect(AnnotationConfigApplicationContext context){		while(context==null){			logger.debug("-----reConnect()--------");			context = new AnnotationConfigApplicationContext(					MqttConfiguration.class);			try {				Thread.sleep(3000);			} catch (InterruptedException e) {				e.printStackTrace();			}		}	}}

在监听器中加入启动方法

WebRootPathListener.java

package youway.web.listener;import javax.servlet.ServletContextEvent;import org.joda.time.DateTime;import org.springframework.web.context.ContextLoaderListener;import youway.service.mqtt.MqttService;import youway.util.IConstants;/** * 获得webroot的物理路径. * @author youway * */public class WebRootPathListener extends ContextLoaderListener {	public void contextDestroyed(ServletContextEvent sce) {          }        public void contextInitialized(ServletContextEvent sce) {          String webRootPath = sce.getServletContext().getRealPath("/");          System.setProperty("webRoot.path" , webRootPath);          try {			System.out.println("MQTT线程启动......");			String time = new DateTime().toString("yyyy-MM-dd HH:mm");			MqttService.initMessage(time);		} catch (Exception e) {			e.printStackTrace();		}    } }

转载于:https://my.oschina.net/youway/blog/521197

你可能感兴趣的文章
test.log文件传输到另一台服务器上 --rsync
查看>>
linux 系统管理之磁盘阵列RAID和压缩命令
查看>>
Widgets must be created in the GUI thread
查看>>
JQuery Highcharts图表控件使用说明
查看>>
python基础教程
查看>>
linux命令:function脚本编程之函数
查看>>
Linux性能监控之CPU利用率
查看>>
第九节 VMware View 6.0 菜鸟入门 连接服务器的安装和部署
查看>>
spring容器加载完毕做一件事情(利用ContextRefreshedEvent事件)
查看>>
C# 文件操作详解(二)---------FileInfo类
查看>>
Windows Server 2012系列---文件服务器资源管理器FSRM(2)
查看>>
JPA注解
查看>>
LogMiner详细讲解
查看>>
03.17基本控件的使用
查看>>
ElementaryOS 安装PhpStorm
查看>>
nutch与起点R3集成之笔记(二)
查看>>
ThinkPHP 统计查询
查看>>
厚黑学
查看>>
C++异常处理机制之一
查看>>
CentOS 5.2安装
查看>>