mqtt开源服务器(mosca)
什么是Mosca
Mosca是MQTT在Node.js中的一个Broker的开源实现,换句话说就是使用js语言编写的MQTT的服务端软件
如何使用
windows下安装时,首先需要安装node.js,这里不详细说明如何安装,打开cmd然后输入
npm install mosca --save
var mosca = require('mosca');
var ascoltatore = {
//using ascoltatore
//type: 'mongo',
//url: 'mongodb://localhost:27017/mqtt',
//pubsubCollection: 'ascoltatori',
//mongo: {}
};
var settings = {
port: 1883,
http: {
port: 3000,
bundle: true,
static: './'
}
};
var server = new mosca.Server(settings);
server.on('clientConnected', function(client) {
console.log('client connected', client.id);
});
// fired when a message is received
server.on('published', function(packet, client) {
console.log('Published', packet.payload);
});
server.on('ready', setup);
// fired when the mqtt server is ready
function setup() {
console.log('Mosca server is up and running');
}
数据持久化
接下来我们需要实现数据的持久化这里我们使用mysql来实现 示例代码
var mosca = require('mosca');
var mysql = require('mysql');
//连接数据库
var connection = mysql.createConnection({
host : 'localhost',
user : '用户名',
password : '密码',
port: '3306',
database: 'demo',
});
connection.connect();
/////////
var MqttServer = new mosca.Server({
port: 1883
});
MqttServer.on('clientConnected', function(client){
console.log('client connected', client.id);
});
/**
* * 监听MQTT主题消息
* **/
MqttServer.on('ready', function(){
console.log('mqtt is running...');
//MqttServer.authenticate = authenticate;
});
MqttServer.on('published', function(packet, client) {
var topic = packet.topic;
var message=packet.payload;
var arr=message.toString().split("*");
var addSqltemp = 'INSERT INTO temperature(tempDevice, tempClient, tempValue) VALUES (?,?,?)';
var addSqlhumi = 'INSERT INTO humidity(humiDevice, humiClient, humiValue) VALUES (?,?,?)';
if(arr[2]=='1')
{
var tharr=arr[3].split("/");
var addSqlParamstemp = [arr[0],arr[1],tharr[0]];
var addSqlParamshumi = [arr[0],arr[1],tharr[1]];
console.log('paras',"deviceid="+arr[0]+",clientID="+arr[1]+",temprature="+tharr[0]+",humi="+tharr[1]);
connection.query(addSqlhumi,addSqlParamshumi,function (err, result) {
if(err)
{
console.log('[INSERT ERROR] - ',err.message);
return;
}
// console.log('--------------------------INSERT----------------------------');
// console.log('INSERT ID:',result.insertId);
// console.log('INSERT ID:',result);
// console.log('-----------------------------------------------------------------\n\n');
});
connection.query(addSqltemp,addSqlParamstemp,function (err, result) {
if(err)
{
console.log('[INSERT ERROR] - ',err.message);
return;
}
// console.log('--------------------------INSERT----------------------------');
// console.log('INSERT ID:',result.insertId);
// console.log('INSERT ID:',result);
// console.log('-----------------------------------------------------------------\n\n');
});
}
console.log('message-arrived--->','topic ='+topic+',message = '+ packet.payload.toString());
});