MQTT から WebSocket 経由で IO データを受信してグラフ表示、XBee-MQTT Gateway (その2)

●概要

この記事では、前の記事で説明した  MQTT ブローカに登録されている XBee IO サンプリングデータを、WebSocket 経由で取得してグラフ表示をする Webアプリケーションを作成します。

リモート XBee デバイスから JSON 形式で MQTT ブローカに送信された IO サンプリングデータを、Web アプリケーション中から購読(MQTT Subscribe) します。MQTT からメッセージを受信したら、メッセージ内容からデータを取り出してグラフ表示します。MQTT ブローカからは連続してメッセージを受信しますので、グラフは最新のデータで更新されてスクロール表示されます。

下記は、XBee (Device5) のAD0 に接続した光センサの値をグラフ表示した様子です。

●MQTT ブローカ mosquitto を WebSocket 対応版にする

前回の記事までで使用していた MQTT ブローカ (mosquitto) は、RaspberryPi 上でバイナリパッケージをインストールしたものを使用していました。mosquitto の最新バージョン ver1.4.2 では WebSocket をサポートしているのですが、残念ながら記事を作成していた時点では最新バージョンのバイナリパッケージをインストールできませんでした。そのため、mosquitto のホームページから直接ソースをダウンロードしてコンパイルすることにします。

mosquitto-1.4.2.tar.gz ファイルを RaspberryPi にダウンロードして適当な場所に展開します。

デフォルトのビルド設定では WebSocket が有効にされていないので、config.mk ファイルを修正して使えるようにします。ファイル中でコメントアウトされた下記の部分を

#WITH_WEBSOCKETS:=no

以下の様に変更します

WITH_WEBSOCKETS:=yes

その後、make コマンドを実行するとコンパイルされて最新のバージョンの mosquitto が作成されます。実行する RaspberryPi の環境によっては、幾つかの足りないライブラリやパッケージを RaspberryPi にインストールする必要がでてくると思います。この場合でも コンパイルエラーで表示されるメッセージを手がかりにネットで検索すると、必要なパッケージやライブラリが簡単に見つかります。あきらめずにコンパイルに挑戦して下さい。(ライブラリをインストールしたときにはライブラリキャッシュの更新を忘れずに!)

無事にコンパイルが完了したら、既存の mosquitto パッケージをアンインストールしてプロセスを削除してください。その後、”src” ディレクトリに移動して先ほどコンパイルして作成した mosquitto を起動します。

起動時にはデフォルトの TCP/IP ポート 1883 の他に、WebSocket を使用するコンフィギュレーションファイルを指定します。下記の内容のファイルをホームディレクトリにmosquitto.conf の名前で作成しました。ここではWebSocket のポート番号を 9090 にしていますが別の値を指定しても構いません。

以前のバージョンの mosquitto で使用していたユーザーアカウントファイルを、 password_file パラメータで再利用しています。MQTT アカウントの認証を省略する場合には、このパラメータは削除してください。

listener 1883

listener 9090 127.0.0.1
protocol websockets

password_file /etc/mosquitto/password_file

作成したコンフィギュレーションファイルを使用して mosquitto MQTT ブローカを起動するまでの様子は以下になります。

mosquitto を起動したときのメッセージを確認してください。TCP/IP のポート 1883 と WebSocket のポート 9090 がそれぞれ ”listen” 状態になっていれば大丈夫です。

これで WebSocket も使用可能な MQTT ブローカ(mosquitto) の準備が完了しました。DeviceServer を再起動して MQTTT エンドポイントを接続してください。その後、前のバージョンの時と同様に XBee からの IO サンプリングデータを mosquitto_sub コマンドで確認してください。

●Webアプリケーション作成

ここからは、Web アプリケーションを作成します。最初に index.html ファイルを下記の内容で作成します。ここで紹介する Web アプリのファイルは、最新の DeviceServer インストールキットに含まれています。DeviceServer が動作している PC の “C:\Program Files (x86)\AllBlueSystem\WebRoot\web_api_sample\mqtt_xbee_chart” フォルダに Webアプリケーションに必要なファイルが格納されています。

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8" />
		<meta name="viewport" content="width=device-width, initial-scale=1" />
		<title>MQTT-XBee ゲートウェイ ADCグラフ</title>
		<link rel="stylesheet" href="libs/css/themes/default/jquery.mobile-1.4.5.min.css" />
		<link rel="stylesheet" href="libs/css/jquery.jqplot.css" />
		<script src="libs/js/jquery.js"></script>
		<script src="libs/js/jquery.mobile-1.4.5.js"></script>
		<script src="libs/js/mqttws31.js"></script>
		<script src="libs/js/jquery.jqplot.js"></script>
	</head>
	<body>

		<div data-role="page" id="setup_page">
			<div data-role="header" data-position="inline">
				<h3>MQTTブローカ接続設定</h3>
			</div>
			<div role="main" class="ui-content">
				<label for="host_name">MQTT broker host name</label>
				<input id="host_name" value="127.0.0.1" type="text" placeholder="hostname or IP address (WebSocket)" data-clear-btn="true"/>
				<label for="port_number">MQTT broker WebSocket port</label>
				<input id="port_number" value="9090" type="number" placeholder="port number (WebSocket)" data-clear-btn="true"/>
				<label for="mqtt_topic">Subscribe topic</label>
				<input id="mqtt_topic" value="/+/+/io" type="text" placeholder="subscribe topic ex: /xbee/NodeIdentifier/io" data-clear-btn="true"/>
				<div><h3>&nbsp</h3></div>
				<div class="ui-grid-b">
					<div class="ui-block-b">
							<a class="ui-btn ui-btn-inline ui-icon-check ui-btn-icon-left " id="connect_btn" >Connect</a>
					</div>
				</div>
			</div>
			<div data-role="footer">
				<h3>Copyright(c) 2015 All Blue System</h3>
			</div>
		</div>

		<div data-role="page" id="chart_page">
			<div data-role="header" data-position="inline">
				<a data-icon="back" id="chart_view_prev_btn">接続設定</a>
				<h3>グラフ表示</h3>
			</div>
			<div id="chart1" style="height:500px;width:100%;"></div>
			<div data-role="footer">
				<h3>Copyright(c) 2015 All Blue System</h3>
			</div>
		</div>

		<div data-role="page" id="error_quit_dialog">
			<div data-role="header" data-theme="b">
				<h1>*MQTT BROKER ERR*</h1>
			</div>
			<div role="main" class="ui-content">
				<h2>エラーが発生しました</h2>
				<p>MQTT ブローカ接続処理中にエラーが発生しました。接続設定を見直して下さい</p>
				<p><a data-role="button" id="error_ok_btn" class="ui-btn ui-shadow ui-btn-a ui-icon-check ui-btn-icon-left">OK</a></p>
			</div>
		</div>

		<script src="main.js"></script>

    </body>
</html>

index.html ファイルでは jQuery-mobile フレームワークを使用して Webアプリの画面とメッセージダイアログを定義しています。

Webアプリで最初に表示される “設定画面ページ”では MQTT ブローカ WebSocket に接続するためのパラメータを入力します。”Connect” ボタンを押すと MQTT ブローカに接続します。

MQTT ブローカ接続後に表示されるのが “グラフ表示ページ” です。グラフ表示には jplot ライブラリを使用しています。

index.html ファイル中から参照している mqttws31.js は Paho プロジェクトで公開されている JavaScript 版の MQTT クライアント・ライブラリです。これを利用して WebSocket 経由で MQTT ブローカに接続します。

次に、メインのアプリケーションロジックを実現している JavaScript ファイル(main.js)は以下になります。

//////////////////////////////////////
// グラフ表示ページ
//////////////////////////////////////

var plot1;
var ad0 = new Array();
var ad1 = new Array();
var ad2 = new Array();
var ad3 = new Array();
var ad4 = new Array();
var ad5 = new Array();

// グラフを右端から開始するために、初期データをシリーズ変数に格納する
for (var i = 0; i <= 120; i++){
	ad0.push(null);
	ad1.push(null);
	ad2.push(null);
	ad3.push(null);
	ad4.push(null);
	ad5.push(null);
}

// グラフ表示フォーマット
var chartOptions = {
	legend: { show: true, location: "nw" },
	title: "-- no data --",
	series: [{label: "AD0" },{label: "AD1" },{label: "AD2" },{label: "AD3" },
				{label: "AD4" },{label: "AD5" }],
	seriesColors: ["#000000","#A64F08","#FF0000","#FF8000","#FFFF00","#04B404"],
	seriesDefaults: {
		rendererOptions: { smooth: true },
    	pointLabels: { show: true },
		lineWidth:3,
		showMarker:false,
    	breakOnNull: true
	},
	axes: {
		xaxis: {
			label: "(past) sample",
			min: 0,
			max: 120,
			ticks: [[1,"120"],[11,"110"],[21,"100"],[31,"90"],[41,"80"],[51,"70"],
					[61,"60"],[71,"50"],[81,"40"],[91,"30"],[101,"20"],[111,"10"],[121,"0"]],
			pad: 0
		},
		yaxis: {
			label: "XBee-ADC",
			min: 0,
			max: 1023,
			tickOptions: { formatString: '%d'},
			ticks: [0,100,200,300,400,500,600,700,800,900,1000,1023]
		},
	}
};

// MQTTブローカとの接続が途中で切れた
function onConnectionLost(responseObject) {
	if (responseObject.errorCode !== 0) {
		console.log("onConnectionLost:"+responseObject.errorMessage);
		$("body").pagecontainer("change","#error_quit_dialog",{ transition: "pop",role:"dialog" });
	}
}

// MQTTブローカからメッセージを受信した
function onMessageArrived(message) {
	console.log("onMessageArrived:"+ message.destinationName + " " + message.payloadString);
	// 既存のシリーズデータ(プロットデータ)を左シフトする
	if(ad0.length > chartOptions.axes.xaxis.max) {
		ad0.shift();
		ad1.shift();
		ad2.shift();
		ad3.shift();
		ad4.shift();
		ad5.shift();
	}
	// MQTT ブローカから WebSocket 経由で受信した JSON 文字列をシリーズ変数に格納する
	// メッセージフォーマット:
	//      {"ad0":<ad0_value>, .. ,"ad1":<ad1_value>, ... ,"dio0":<dio0_value>,"dio1":<dio1_value>,... }
	// 該当チャンネルのデータが存在しないときには Null が格納されてグラフには表示されない
	var recv_data;
	try {
		recv_data = $.parseJSON(message.payloadString);
		ad0.push(recv_data.ad0);
		ad1.push(recv_data.ad1);
		ad2.push(recv_data.ad2);
		ad3.push(recv_data.ad3);
		ad4.push(recv_data.ad4);
		ad5.push(recv_data.ad5);
	}catch(e){
		console.log("invalid JSON :" +message.payloadString);
	}
	// 再描画時のメモリリーク対策用 jqplot.replot()使用NG
	if (plot1) {
		plot1.destroy();
	}
	// グラフにシリーズデータをプロット
	chartOptions.title = "XBee-MQTT Gateway ADC samples [ " + message.destinationName + " ]";
	plot1 = $.jqplot("chart1",[ad0,ad1,ad2,ad3,ad4,ad5],chartOptions);
}

// グラフ表示ページが表示された
$(document).on("pageshow","#chart_page",function(event){
	// 再描画時のメモリリーク対策用 jqplot.replot()使用NG
	if (plot1) {
		plot1.destroy();
	}
	// 既存のシリーズデータまたは、初期値の空白グラフを表示する
	plot1 = $.jqplot("chart1",[ad0,ad1,ad2,ad3,ad4,ad5],chartOptions);
});

// グラフ表示ページの "接続設定" ボタンが操作された
$("#chart_view_prev_btn").on( "click",function(event, ui){
	$("body").pagecontainer("change","#setup_page", { transition: "none" });
});

//////////////////////////////////////
// セットアップページ
//////////////////////////////////////

var mqtt_client;
var topic;

// MQTTクライアント接続毎にユニークなID を生成
function get_clientid() {
  function s4() {
    return Math.floor((1 + Math.random()) * 0x10000).toString(16).substring(1);
  }
  return 'mqtt-' + s4() + '-' + s4() + '-' + s4();
}

// MQTTブローカに接続した
function onConnect() {
	console.log("connect to MQTT broker");
	// センサーデータのトピック購読
	mqtt_client.subscribe(topic);
	// グラフ画面ページに移動する
	$("body").pagecontainer("change","#chart_page", { transition: "pop" });
}

// MQTTブローカとの接続に失敗した
function onConnectFailure(responseObject) {
	console.log("onConnectFailure:"+responseObject.errorMessage);
	$("body").pagecontainer("change","#error_quit_dialog", { transition: "pop",role:"dialog" });
}

// セットアップページの "Connect" ボタンを押した
$( "#connect_btn" ).on( "click", function(event, ui){
	var mqtt_host = $("#host_name").val();
	var mqtt_port = parseInt($("#port_number").val());
	topic = $("#mqtt_topic").val();
	// MQTT Client 作成
	mqtt_client = new Paho.MQTT.Client(mqtt_host,mqtt_port,"/mqtt",get_clientid());
	mqtt_client.onConnectionLost = onConnectionLost;
	mqtt_client.onMessageArrived = onMessageArrived;
	// MQTT ブローカ WebSocket に接続
	mqtt_client.connect({ onSuccess:onConnect, onFailure:onConnectFailure });
});

// セットアップページが表示された
$(document).on("pageshow","#setup_page",function(event){
	// 既存の MQTT Client オブジェトを削除する(もしあれば)
	if (mqtt_client) {
		try{
			// ConnectionLost 発生時には disconnect() で例外が発生するが無視する。
			mqtt_client.disconnect();
		}catch(e){
			console.log("disconnect() failed");
		}
		delete mqtt_client;
	}
});

// エラーダイアログから復帰する場合はセットアップページに戻る
$( "#error_ok_btn" ).on( "click", function(event, ui){
	session_token = "";
	$("body").pagecontainer("change","#setup_page", { transition: "pop" });
});

設定画面で入力されたパラメータで MQTT ブローカに接続します。

接続に成功すると、設定画面で指定したトピックを購読します。デフォルトでは “/+/+/io” になっていますので XBee, XBee-ZB から送信される全てのリモート IO サンプリングデータを受信することになります。

MQTT ブローカからトピック・メッセージを受信すると、JSON 文字列をオブジェクトに変換した後、個々の AD サンプリング値を取り出して配列に格納します。配列はグラフ表示するときの X 軸方向の目盛り分のサイズを確保しています。もし最新のサンプリングデータを格納するときに、配列のサイズを超える場合には古いデータを順に削除します。これによってグラフ表示したときに左スクロールするように見えます。

最新のサンプリングデータを配列に格納したら、jqplot ライブラリを使用してグラフ表示します。MQTT ブローカからメッセージを受信する毎にこれらの動作を繰り返します。

●Webアプリを動作させる

早速 Web アプリを起動してみます。DeviceServer をインストールしていた場合には “http://<IPアドレス>/web_api_sample/mqtt_xbee_chart/index.html” をWebブラウザで開いて下さい。

この Webアプリは DeviceServer 側の Web API 機能を一切使用しませんので、”mqtt_xbee_chart” フォルダごと他の Webサーバーに配置して動作させることもできます。WebSocket では Webブラウザで実行中の JavaScript からのネットワークアクセスを、配信元サーバーと同一ドメインに制限する “Same Origin Policy” から除外されています。このためMQTT ブローカが動作している RaspberryPi 以外のWebサーバーに配置しても大丈夫です。”index.html” ファイルをダブルクリックして直接 “file” 形式のURI でアクセスしても動作します。

Web アプリを起動すると最初に設定画面が表示されます。

MQTT ブローカのホスト名(IP アドレス) と WebSocket ポート番号を入力します。ここで指定するポート番号は DeviceServer 側から XBee IO サンプリングデータを Publish している TCP/IP ポート番号(1883) ではありませんので注意してください。必ず、この記事の最初で mosquitto.conf に設定した WebSocket ポート番号を指定します。

“Connect” ボタンを押すと、MQTT ブローカに接続してトピック購読を開始します。

グラフ画面です。この画面では IO サンプリングデータ中に含まれる AD0 .. AD5 までの IOデータをプロットします。存在しない ADxデータ項目はグラフ表示されません。実際にデータが到着すると下記の様なグラフが表示されます。

複数のリモート XBee からのデータを同時に受信すると、グラフで表示するデータ内容が混在してしまいます。その場合には “接続設定” ボタンを押して設定画面に戻って、トピック文字列を設定し直してください。たとえばトピックを “/xbee/Device5/io” にすると、XBee(Series1) でかつ NodeIdentifier が “Device5″ のサンプリングデータのみをグラフ表示させることができます。

●動作例の動画

MQTT ブローカから受信した IO サンプリングデータをグラフ表示しているときの動画を載せましたのでご覧ください。(音量注意)

それではまた。

 

コメントは受け付けていません。