跳转至

mqtt开源服务器(mosca)

什么是Mosca

Mosca是MQTT在Node.js中的一个Broker的开源实现,换句话说就是使用js语言编写的MQTT的服务端软件

如何使用

windows下安装时,首先需要安装node.js,这里不详细说明如何安装,打开cmd然后输入

npm install mosca --save
新建一个node_mqtt_server的文件夹,创建mqtt-server.js文件 代码如下
var mosca = require('mosca');

var ascoltatore = {
  //using ascoltatore
  //type: 'mongo',
  //url: 'mongodb://localhost:27017/mqtt',
  //pubsubCollection: 'ascoltatori',
  //mongo: {}
};

var settings = {
  port: 1883,
  http: {
    port: 3000,
    bundle: true,
    static: './'
  }
};

var server = new mosca.Server(settings);

server.on('clientConnected', function(client) {
    console.log('client connected', client.id);
});

// fired when a message is received
server.on('published', function(packet, client) {
  console.log('Published', packet.payload);
});

server.on('ready', setup);

// fired when the mqtt server is ready
function setup() {
  console.log('Mosca server is up and running');
}
然后在此文件夹下打开cmd 输入node mqtt-server.js 若显示下图所示,则说明本地mqtt服务端搭建完成

数据持久化

接下来我们需要实现数据的持久化这里我们使用mysql来实现 示例代码

var mosca = require('mosca');
var mysql  = require('mysql');  


//连接数据库
var connection = mysql.createConnection({     
 host     : 'localhost',       
 user     : '用户名',              
 password : '密码',       
 port: '3306',                   
 database: 'demo', 
}); 
connection.connect();
/////////
var MqttServer = new mosca.Server({  
    port: 1883
});  
MqttServer.on('clientConnected', function(client){  
    console.log('client connected', client.id);  
});  
/** 
 *  * 监听MQTT主题消息 
 *   **/  
MqttServer.on('ready', function(){  
    console.log('mqtt is running...');  
    //MqttServer.authenticate = authenticate;  
});  
MqttServer.on('published', function(packet, client) {  
    var topic = packet.topic;  
    var message=packet.payload;
    var arr=message.toString().split("*");
    var  addSqltemp = 'INSERT INTO temperature(tempDevice, tempClient, tempValue) VALUES (?,?,?)';
    var  addSqlhumi = 'INSERT INTO humidity(humiDevice, humiClient, humiValue) VALUES (?,?,?)';
    if(arr[2]=='1')
    {
         var tharr=arr[3].split("/");
         var  addSqlParamstemp = [arr[0],arr[1],tharr[0]];
         var  addSqlParamshumi = [arr[0],arr[1],tharr[1]];
         console.log('paras',"deviceid="+arr[0]+",clientID="+arr[1]+",temprature="+tharr[0]+",humi="+tharr[1]);

         connection.query(addSqlhumi,addSqlParamshumi,function (err, result) {
            if(err)
            {
             console.log('[INSERT ERROR] - ',err.message);
             return;
            }        
        //    console.log('--------------------------INSERT----------------------------');
        //    console.log('INSERT ID:',result.insertId);        
        //    console.log('INSERT ID:',result);        
        //    console.log('-----------------------------------------------------------------\n\n');  
        });

         connection.query(addSqltemp,addSqlParamstemp,function (err, result) {
            if(err)
            {
             console.log('[INSERT ERROR] - ',err.message);
             return;
            }        
        //    console.log('--------------------------INSERT----------------------------');
        //    console.log('INSERT ID:',result.insertId);        
        //    console.log('INSERT ID:',result);        
        //    console.log('-----------------------------------------------------------------\n\n');  
        });
    }
    console.log('message-arrived--->','topic ='+topic+',message = '+ packet.payload.toString());  
  });