MQTT から WebSocket 経由で IO データを受信してグラフ表示、XBee-MQTT Gateway (その2)

●概要

この記事では、前の記事で説明した  MQTT ブローカに登録されている XBee IO サンプリングデータを、WebSocket 経由で取得してグラフ表示をする Webアプリケーションを作成します。

リモート XBee デバイスから JSON 形式で MQTT ブローカに送信された IO サンプリングデータを、Web アプリケーション中から購読(MQTT Subscribe) します。MQTT からメッセージを受信したら、メッセージ内容からデータを取り出してグラフ表示します。MQTT ブローカからは連続してメッセージを受信しますので、グラフは最新のデータで更新されてスクロール表示されます。

下記は、XBee (Device5) のAD0 に接続した光センサの値をグラフ表示した様子です。

●MQTT ブローカ mosquitto を WebSocket 対応版にする

前回の記事までで使用していた MQTT ブローカ (mosquitto) は、RaspberryPi 上でバイナリパッケージをインストールしたものを使用していました。mosquitto の最新バージョン ver1.4.2 では WebSocket をサポートしているのですが、残念ながら記事を作成していた時点では最新バージョンのバイナリパッケージをインストールできませんでした。そのため、mosquitto のホームページから直接ソースをダウンロードしてコンパイルすることにします。

mosquitto-1.4.2.tar.gz ファイルを RaspberryPi にダウンロードして適当な場所に展開します。

デフォルトのビルド設定では WebSocket が有効にされていないので、config.mk ファイルを修正して使えるようにします。ファイル中でコメントアウトされた下記の部分を

#WITH_WEBSOCKETS:=no

以下の様に変更します

WITH_WEBSOCKETS:=yes

その後、make コマンドを実行するとコンパイルされて最新のバージョンの mosquitto が作成されます。実行する RaspberryPi の環境によっては、幾つかの足りないライブラリやパッケージを RaspberryPi にインストールする必要がでてくると思います。この場合でも コンパイルエラーで表示されるメッセージを手がかりにネットで検索すると、必要なパッケージやライブラリが簡単に見つかります。あきらめずにコンパイルに挑戦して下さい。(ライブラリをインストールしたときにはライブラリキャッシュの更新を忘れずに!)

無事にコンパイルが完了したら、既存の mosquitto パッケージをアンインストールしてプロセスを削除してください。その後、”src” ディレクトリに移動して先ほどコンパイルして作成した mosquitto を起動します。

起動時にはデフォルトの TCP/IP ポート 1883 の他に、WebSocket を使用するコンフィギュレーションファイルを指定します。下記の内容のファイルをホームディレクトリにmosquitto.conf の名前で作成しました。ここではWebSocket のポート番号を 9090 にしていますが別の値を指定しても構いません。

以前のバージョンの mosquitto で使用していたユーザーアカウントファイルを、 password_file パラメータで再利用しています。MQTT アカウントの認証を省略する場合には、このパラメータは削除してください。

listener 1883

listener 9090 127.0.0.1
protocol websockets

password_file /etc/mosquitto/password_file

作成したコンフィギュレーションファイルを使用して mosquitto MQTT ブローカを起動するまでの様子は以下になります。

mosquitto を起動したときのメッセージを確認してください。TCP/IP のポート 1883 と WebSocket のポート 9090 がそれぞれ ”listen” 状態になっていれば大丈夫です。

これで WebSocket も使用可能な MQTT ブローカ(mosquitto) の準備が完了しました。DeviceServer を再起動して MQTTT エンドポイントを接続してください。その後、前のバージョンの時と同様に XBee からの IO サンプリングデータを mosquitto_sub コマンドで確認してください。

●Webアプリケーション作成

ここからは、Web アプリケーションを作成します。最初に index.html ファイルを下記の内容で作成します。ここで紹介する Web アプリのファイルは、最新の DeviceServer インストールキットに含まれています。DeviceServer が動作している PC の “C:\Program Files (x86)\AllBlueSystem\WebRoot\web_api_sample\mqtt_xbee_chart” フォルダに Webアプリケーションに必要なファイルが格納されています。

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8" />
		<meta name="viewport" content="width=device-width, initial-scale=1" />
		<title>MQTT-XBee ゲートウェイ ADCグラフ</title>
		<link rel="stylesheet" href="libs/css/themes/default/jquery.mobile-1.4.5.min.css" />
		<link rel="stylesheet" href="libs/css/jquery.jqplot.css" />
		<script src="libs/js/jquery.js"></script>
		<script src="libs/js/jquery.mobile-1.4.5.js"></script>
		<script src="libs/js/mqttws31.js"></script>
		<script src="libs/js/jquery.jqplot.js"></script>
	</head>
	<body>

		<div data-role="page" id="setup_page">
			<div data-role="header" data-position="inline">
				<h3>MQTTブローカ接続設定</h3>
			</div>
			<div role="main" class="ui-content">
				<label for="host_name">MQTT broker host name</label>
				<input id="host_name" value="127.0.0.1" type="text" placeholder="hostname or IP address (WebSocket)" data-clear-btn="true"/>
				<label for="port_number">MQTT broker WebSocket port</label>
				<input id="port_number" value="9090" type="number" placeholder="port number (WebSocket)" data-clear-btn="true"/>
				<label for="mqtt_topic">Subscribe topic</label>
				<input id="mqtt_topic" value="/+/+/io" type="text" placeholder="subscribe topic ex: /xbee/NodeIdentifier/io" data-clear-btn="true"/>
				<div><h3>&nbsp</h3></div>
				<div class="ui-grid-b">
					<div class="ui-block-b">
							<a class="ui-btn ui-btn-inline ui-icon-check ui-btn-icon-left " id="connect_btn" >Connect</a>
					</div>
				</div>
			</div>
			<div data-role="footer">
				<h3>Copyright(c) 2015 All Blue System</h3>
			</div>
		</div>

		<div data-role="page" id="chart_page">
			<div data-role="header" data-position="inline">
				<a data-icon="back" id="chart_view_prev_btn">接続設定</a>
				<h3>グラフ表示</h3>
			</div>
			<div id="chart1" style="height:500px;width:100%;"></div>
			<div data-role="footer">
				<h3>Copyright(c) 2015 All Blue System</h3>
			</div>
		</div>

		<div data-role="page" id="error_quit_dialog">
			<div data-role="header" data-theme="b">
				<h1>*MQTT BROKER ERR*</h1>
			</div>
			<div role="main" class="ui-content">
				<h2>エラーが発生しました</h2>
				<p>MQTT ブローカ接続処理中にエラーが発生しました。接続設定を見直して下さい</p>
				<p><a data-role="button" id="error_ok_btn" class="ui-btn ui-shadow ui-btn-a ui-icon-check ui-btn-icon-left">OK</a></p>
			</div>
		</div>

		<script src="main.js"></script>

    </body>
</html>

index.html ファイルでは jQuery-mobile フレームワークを使用して Webアプリの画面とメッセージダイアログを定義しています。

Webアプリで最初に表示される “設定画面ページ”では MQTT ブローカ WebSocket に接続するためのパラメータを入力します。”Connect” ボタンを押すと MQTT ブローカに接続します。

MQTT ブローカ接続後に表示されるのが “グラフ表示ページ” です。グラフ表示には jplot ライブラリを使用しています。

index.html ファイル中から参照している mqttws31.js は Paho プロジェクトで公開されている JavaScript 版の MQTT クライアント・ライブラリです。これを利用して WebSocket 経由で MQTT ブローカに接続します。

次に、メインのアプリケーションロジックを実現している JavaScript ファイル(main.js)は以下になります。

//////////////////////////////////////
// グラフ表示ページ
//////////////////////////////////////

var plot1;
var ad0 = new Array();
var ad1 = new Array();
var ad2 = new Array();
var ad3 = new Array();
var ad4 = new Array();
var ad5 = new Array();

// グラフを右端から開始するために、初期データをシリーズ変数に格納する
for (var i = 0; i <= 120; i++){
	ad0.push(null);
	ad1.push(null);
	ad2.push(null);
	ad3.push(null);
	ad4.push(null);
	ad5.push(null);
}

// グラフ表示フォーマット
var chartOptions = {
	legend: { show: true, location: "nw" },
	title: "-- no data --",
	series: [{label: "AD0" },{label: "AD1" },{label: "AD2" },{label: "AD3" },
				{label: "AD4" },{label: "AD5" }],
	seriesColors: ["#000000","#A64F08","#FF0000","#FF8000","#FFFF00","#04B404"],
	seriesDefaults: {
		rendererOptions: { smooth: true },
    	pointLabels: { show: true },
		lineWidth:3,
		showMarker:false,
    	breakOnNull: true
	},
	axes: {
		xaxis: {
			label: "(past) sample",
			min: 0,
			max: 120,
			ticks: [[1,"120"],[11,"110"],[21,"100"],[31,"90"],[41,"80"],[51,"70"],
					[61,"60"],[71,"50"],[81,"40"],[91,"30"],[101,"20"],[111,"10"],[121,"0"]],
			pad: 0
		},
		yaxis: {
			label: "XBee-ADC",
			min: 0,
			max: 1023,
			tickOptions: { formatString: '%d'},
			ticks: [0,100,200,300,400,500,600,700,800,900,1000,1023]
		},
	}
};

// MQTTブローカとの接続が途中で切れた
function onConnectionLost(responseObject) {
	if (responseObject.errorCode !== 0) {
		console.log("onConnectionLost:"+responseObject.errorMessage);
		$("body").pagecontainer("change","#error_quit_dialog",{ transition: "pop",role:"dialog" });
	}
}

// MQTTブローカからメッセージを受信した
function onMessageArrived(message) {
	console.log("onMessageArrived:"+ message.destinationName + " " + message.payloadString);
	// 既存のシリーズデータ(プロットデータ)を左シフトする
	if(ad0.length > chartOptions.axes.xaxis.max) {
		ad0.shift();
		ad1.shift();
		ad2.shift();
		ad3.shift();
		ad4.shift();
		ad5.shift();
	}
	// MQTT ブローカから WebSocket 経由で受信した JSON 文字列をシリーズ変数に格納する
	// メッセージフォーマット:
	//      {"ad0":<ad0_value>, .. ,"ad1":<ad1_value>, ... ,"dio0":<dio0_value>,"dio1":<dio1_value>,... }
	// 該当チャンネルのデータが存在しないときには Null が格納されてグラフには表示されない
	var recv_data;
	try {
		recv_data = $.parseJSON(message.payloadString);
		ad0.push(recv_data.ad0);
		ad1.push(recv_data.ad1);
		ad2.push(recv_data.ad2);
		ad3.push(recv_data.ad3);
		ad4.push(recv_data.ad4);
		ad5.push(recv_data.ad5);
	}catch(e){
		console.log("invalid JSON :" +message.payloadString);
	}
	// 再描画時のメモリリーク対策用 jqplot.replot()使用NG
	if (plot1) {
		plot1.destroy();
	}
	// グラフにシリーズデータをプロット
	chartOptions.title = "XBee-MQTT Gateway ADC samples [ " + message.destinationName + " ]";
	plot1 = $.jqplot("chart1",[ad0,ad1,ad2,ad3,ad4,ad5],chartOptions);
}

// グラフ表示ページが表示された
$(document).on("pageshow","#chart_page",function(event){
	// 再描画時のメモリリーク対策用 jqplot.replot()使用NG
	if (plot1) {
		plot1.destroy();
	}
	// 既存のシリーズデータまたは、初期値の空白グラフを表示する
	plot1 = $.jqplot("chart1",[ad0,ad1,ad2,ad3,ad4,ad5],chartOptions);
});

// グラフ表示ページの "接続設定" ボタンが操作された
$("#chart_view_prev_btn").on( "click",function(event, ui){
	$("body").pagecontainer("change","#setup_page", { transition: "none" });
});

//////////////////////////////////////
// セットアップページ
//////////////////////////////////////

var mqtt_client;
var topic;

// MQTTクライアント接続毎にユニークなID を生成
function get_clientid() {
  function s4() {
    return Math.floor((1 + Math.random()) * 0x10000).toString(16).substring(1);
  }
  return 'mqtt-' + s4() + '-' + s4() + '-' + s4();
}

// MQTTブローカに接続した
function onConnect() {
	console.log("connect to MQTT broker");
	// センサーデータのトピック購読
	mqtt_client.subscribe(topic);
	// グラフ画面ページに移動する
	$("body").pagecontainer("change","#chart_page", { transition: "pop" });
}

// MQTTブローカとの接続に失敗した
function onConnectFailure(responseObject) {
	console.log("onConnectFailure:"+responseObject.errorMessage);
	$("body").pagecontainer("change","#error_quit_dialog", { transition: "pop",role:"dialog" });
}

// セットアップページの "Connect" ボタンを押した
$( "#connect_btn" ).on( "click", function(event, ui){
	var mqtt_host = $("#host_name").val();
	var mqtt_port = parseInt($("#port_number").val());
	topic = $("#mqtt_topic").val();
	// MQTT Client 作成
	mqtt_client = new Paho.MQTT.Client(mqtt_host,mqtt_port,"/mqtt",get_clientid());
	mqtt_client.onConnectionLost = onConnectionLost;
	mqtt_client.onMessageArrived = onMessageArrived;
	// MQTT ブローカ WebSocket に接続
	mqtt_client.connect({ onSuccess:onConnect, onFailure:onConnectFailure });
});

// セットアップページが表示された
$(document).on("pageshow","#setup_page",function(event){
	// 既存の MQTT Client オブジェトを削除する(もしあれば)
	if (mqtt_client) {
		try{
			// ConnectionLost 発生時には disconnect() で例外が発生するが無視する。
			mqtt_client.disconnect();
		}catch(e){
			console.log("disconnect() failed");
		}
		delete mqtt_client;
	}
});

// エラーダイアログから復帰する場合はセットアップページに戻る
$( "#error_ok_btn" ).on( "click", function(event, ui){
	session_token = "";
	$("body").pagecontainer("change","#setup_page", { transition: "pop" });
});

設定画面で入力されたパラメータで MQTT ブローカに接続します。

接続に成功すると、設定画面で指定したトピックを購読します。デフォルトでは “/+/+/io” になっていますので XBee, XBee-ZB から送信される全てのリモート IO サンプリングデータを受信することになります。

MQTT ブローカからトピック・メッセージを受信すると、JSON 文字列をオブジェクトに変換した後、個々の AD サンプリング値を取り出して配列に格納します。配列はグラフ表示するときの X 軸方向の目盛り分のサイズを確保しています。もし最新のサンプリングデータを格納するときに、配列のサイズを超える場合には古いデータを順に削除します。これによってグラフ表示したときに左スクロールするように見えます。

最新のサンプリングデータを配列に格納したら、jqplot ライブラリを使用してグラフ表示します。MQTT ブローカからメッセージを受信する毎にこれらの動作を繰り返します。

●Webアプリを動作させる

早速 Web アプリを起動してみます。DeviceServer をインストールしていた場合には “http://<IPアドレス>/web_api_sample/mqtt_xbee_chart/index.html” をWebブラウザで開いて下さい。

この Webアプリは DeviceServer 側の Web API 機能を一切使用しませんので、”mqtt_xbee_chart” フォルダごと他の Webサーバーに配置して動作させることもできます。WebSocket では Webブラウザで実行中の JavaScript からのネットワークアクセスを、配信元サーバーと同一ドメインに制限する “Same Origin Policy” から除外されています。このためMQTT ブローカが動作している RaspberryPi 以外のWebサーバーに配置しても大丈夫です。”index.html” ファイルをダブルクリックして直接 “file” 形式のURI でアクセスしても動作します。

Web アプリを起動すると最初に設定画面が表示されます。

MQTT ブローカのホスト名(IP アドレス) と WebSocket ポート番号を入力します。ここで指定するポート番号は DeviceServer 側から XBee IO サンプリングデータを Publish している TCP/IP ポート番号(1883) ではありませんので注意してください。必ず、この記事の最初で mosquitto.conf に設定した WebSocket ポート番号を指定します。

“Connect” ボタンを押すと、MQTT ブローカに接続してトピック購読を開始します。

グラフ画面です。この画面では IO サンプリングデータ中に含まれる AD0 .. AD5 までの IOデータをプロットします。存在しない ADxデータ項目はグラフ表示されません。実際にデータが到着すると下記の様なグラフが表示されます。

複数のリモート XBee からのデータを同時に受信すると、グラフで表示するデータ内容が混在してしまいます。その場合には “接続設定” ボタンを押して設定画面に戻って、トピック文字列を設定し直してください。たとえばトピックを “/xbee/Device5/io” にすると、XBee(Series1) でかつ NodeIdentifier が “Device5″ のサンプリングデータのみをグラフ表示させることができます。

●動作例の動画

MQTT ブローカから受信した IO サンプリングデータをグラフ表示しているときの動画を載せましたのでご覧ください。(音量注意)

それではまた。

 

リモート XBee IO データを MQTT Broker に送信、XBee-MQTT Gateway (その1)

●概要

今回の記事では、リモートから送信したXBee IO サンプリングデータを MQTT ブローカに自動送信するシステムを構築します。

XBee(Series1) や XBee-ZB(Series2) デバイスには A/D や DIO の値を定期的に送信するIOサンプリング機能が予め備わっています。このシステムでは XBee から送信された IO サンプリングデータを JSON 形式に変換して、MQTT ブローカに送信するシステムを構築します。

リモートに設置した複数の XBee(Series1) や XBee-ZB(Series2) デバイスから IO サンプリングパケットは、DeviceServer に接続したXBee デバイスで受信します。

Series1 の場合にはスター接続の中央に配置されたコントローラデバイスで、Series2 の場合には Coordinator デバイスが DeviceServer に接続されています。DeviceServer で受信したIO サンプリングパケットはイベントハンドラ中で解析して、JSON 文字列に変換した後 MQTT ブローカに Publish メッセージとして配信します。

複数のリモート XBee デバイスからの IO サンプリングデータを、 MQTT ブローカの購読側で簡単に選別できるように、トピック名をデバイス名(NodeIdentifier) を含んだ毎にユニークな文字列 “/<”xbee” | “zb”>/<NodeIdentifier>/”io” にして送信します。またメッセージ内容は JSON 形式で下記のフォーマットで送信します

{“ad0″:<ad0_value>, .. ,”ad1″:<ad1_value>, … ,”dio0″:<dio0_value>,”dio1″:<dio1_value>,… }

JSON 形式にすることで、IO サンプリングデータを MQTT メッセージ購読側のアプリで利用しやすくなります。また、XBee デバイスからは複数の IO 値が1つのサンプリングパケットにまとめて送信されますので、MQTT ブローカに送信するときも IO ポート毎にトピックを分解しないほうが元のデータを正しく表現することができると思います。

●MQTT ブローカを準備

センサデータを配信する MQTT ブローカをセットアップします。今回は RaspberryPi にインストールした mosquitto を使用します。

セットアップの方法は此方の記事を参照にしてください。

これ以外にも LAN やインターネット上に設置された MQTT ブローカを使用できます。送信するセンサデータは JSON 文字列で数十バイト程度なので、もし利用する MQTT ブローカでペイロードサイズの制限があったとしても引っかかることはないと思います。

ただ、XBee  IO サンプリングデータの送信頻度を百ミリ秒未満にする場合にはMQTT ブローカ間とのネットワーク遅延などを考慮してください。

●リモート側XBee デバイスを設置

ここではリモート側の XBee デバイスに XBee(Series1) を使用していますが、XBee-ZB(Series2)を使用することもできます。光センサ(フォトダイオード) を XBee デバイスの AD0 に接続しています。

フォトダイオードは 浜松ホトニクス S9648 を 負荷抵抗 10KΩで使用しています。またフィルタ用に 0.1μ コンデンサを並列に接続しています。XBee の Vcc と GND の他に、Vref 端子を忘れないように配線してください(Series1 の場合)。

●DeviceServer 側のエンドポイントを作成

DeviceServer 側に MQTT ブローカとの接続を管理するためのエンドポイントを作成します。今回のアプリケーションでは、エンドポイントを XBee から受信した IO サンプリングデータを MQTT ブローカに送信(Publish) する時のみに使用しています。

DeviceServer のサーバー設定プログラムを起動します。

“次へ” ボタンを押して DeviceServer の設定を行います。

MQTT タブを選択して、MQTT クライアント機能の設定とエンドポイントの作成を行います。ここで “MQTT 機能を有効にする” にチェックを付けます。KeepAliveTimer はデフォルトの 60秒のままにしておいて下さい。ここで設定した間隔で、DeviceServer 側から MQTT ブローカに対して PINGREQ メッセージを定期的に送信します。また、エンドポイントのソケット接続でエラーを検出したときには再接続処理が自動で行われます。

“追加” ボタンを押して、新規のエンドポイントを作成します。

エンドポイントのタイトルを上記のように設定します。Broker のホスト名と TCP/IP ソケット・ポート番号をここで入力します。その他の設定は空白のままで構いません。

このエンドポイントは XBee のIO サンプリングデータを MQTT ブローカに送信するためだけに使用する予定なので、”起動時に購読するトピック” 項目などは空白のままで大丈夫です。

“OK” を押してエンドポイントを作成した後、サーバー設定プログラムの “次へ” ボタンを押すとDeviceServer が再起動します。同時に、エンドポイントに設定した MQTT ブローカへの接続が自動的に開始されます。

●XBee のイベントハンドラスクリプト設定

XBee デバイスから IO サンプリングデータを受信したときに、DeviceServer 側で実行されるイベントハンドラスクリプトを設定します。XBee(Series1) の場合には XBEE_IO_DATA.lua が自動起動される Lua スクリプトです。

このスクリプトを修正して、MQTT ブローカに JSON 形式で IO データを送信するようにします。スクリプトの内容は以下になります。最新の DeviceServer インストールキットでインストールすると、下記の内容が既にコメントで囲まれた状態で記述されていますので簡単に設定できます。

file_id = "XBEE_IO_DATA"

--[[

******************************************************************************
* イベントハンドラスクリプト実行時間について                                 *
******************************************************************************

一つのスクリプトの実行は長くても数秒以内で必ず終了するようにしてください。
処理に時間がかかると、イベント処理の終了を待つサーバー側でタイムアウトが発生します。

また、同時実行可能なスクリプトの数に制限があるため、他のスクリプトの実行開始が
待たされる原因にもなります。

頻繁には発生しないイベントで、処理時間がかかるスクリプトを実行したい場合は
スクリプトを別に作成して、このイベントハンドラ中から script_fork_exec() を使用して
別スレッドで実行することを検討してください。

******************************************************************************

XBEE_IO_DATA スクリプト起動時に渡される追加パラメータ

---------------------------------------------------------------------------------
キー値			値		            									値の例
---------------------------------------------------------------------------------

APIType			フレームデータ中のAPI Type(16進数2桁)					83

SourceAddress	フレームデータ中のSourceAddress
				16bit アドレスの場合(16進数4桁)							0A01
				64bit アドレスの場合(16進数16桁)						0013A200404AC397

SerialNumber	XBee デバイスの SerialNumber
				DeviceServer に保持されたマスターファイルを使用して、
				SourceAddress から変換した値が設定される。				0013A200404AC397

NodeIdentifier	XBee デバイスの NodeIdentifier。
				DeviceServer に保持されたマスターファイルを使用して、
				SourceAddress から変換した値が設定される。				Device1
				マスターにNodeIdentifier未登録の場合は"" が設定される

RSSI			フレームデータ中のRSSI(16進数2桁)						45

Options			フレームデータ中Options(16進数2桁)						00

SAMPLE_COUNT	I/O データのサンプル数									1

SAMPLE_DIO		I/O データ中のサンプル対象となったDIOビット番号リスト
				(10進数、カンマ区切り)									0,1,4

SAMPLE_ADC		I/O データ中のサンプル対象となったADCビット番号リスト
				(10進数、カンマ区切り)									2,3

SAMPLE_<Sample#>_<"DIO"|"ADC">_<Bit#>

				I/O サンプルデータ値。
				DIO の場合は、High で "1"、Low で "0"。					1
				ADC の場合は 10進数。									1023

				<Sample#> には 最大、SAMPLE_COUNT まで 1から順番に
				インクリメントされた値が入る。

				<"DIO"|"ADC">は、I/O サンプルデータが ADC
				もしくは、DIO のどちらであるかを示す。

				<Bit#>は、サンプルデータのビット番号。10進数。

		例:"SAMPLE_1_ADC_0" は、第一サンプルデータ中の #0番ポートのADC変換値を示す。

]]

--[[
log_msg("start..",file_id)
for key,val in pairs(g_params) do
	log_msg(string.format("g_params[%s] = %s",key,val),file_id)
end
]]

--[[
********************************************************************

XBee->MQTT Gateway 機能の実装例

リモート XBee から送信された I/O パケットデータを MQTT に登録します。
トピック名: "/xbee/<NodeIdentifier>/io"
メッセージ文字列:
 {"ad0":<ad0_value>, .. ,"ad1":<ad1_value>, ... ,"dio0":<dio0_value>,"dio1":<dio1_value>,... }

XBee の IO 設定で有効になっている A/D, DIO ポート値のみが上記のフォーマット
で格納されます。

送信先 MQTT ブローカは、サーバー設定プログラムを使用して
エンドポイントを登録して下さい。下記スクリプト中の end_point 変数に
エンドポイントのタイトル文字列または ClientID を設定します。

XBee デバイスの "Sample Before TX" の設定値を1以上にした場合には
最後にサンプリングしたデータのみを MQTT に送信しています

********************************************************************
]]

local end_point = "センサーデータ登録"
local qos = 1
local topic = "/xbee/" .. g_params["NodeIdentifier"] .. "/io"

local ad_list = csv_to_tbl(g_params["SAMPLE_ADC"])
local dio_list = csv_to_tbl(g_params["SAMPLE_DIO"])
local json_str = '{'
local first = true
for i,v in ipairs(ad_list) do
	if first then
		first = false
	else
		json_str = json_str .. ","
	end
	json_str = json_str .. string.format('"ad%s":%s',v,
				g_params["SAMPLE_" .. g_params["SAMPLE_COUNT"] .. "_ADC_" .. v])
end
for i,v in ipairs(dio_list) do
	if first then
		first = false
	else
		json_str = json_str .. ","
	end
	json_str = json_str .. string.format('"dio%s":%s',v,
				g_params["SAMPLE_" .. g_params["SAMPLE_COUNT"] .. "_DIO_" .. v])
end
json_str = json_str .. '}'
mqtt_publish(end_point,topic,json_str,qos)

上記スクリプト中の “end_point” 変数に設定している名前が、先にサーバー設定プログラムで作成したエンドポイントのタイトル名と一致していることを確認してください。

XBEE_IO_DATA イベントハンドラスクリプトでは、リモートの XBee(Series1) デバイスから送信された IO サンプリングパケットは自動的に解析されて、既にスクリプトパラメータに格納されtた状態で起動されます。

そのため、このスクリプトパラメータを利用してMQTT に送信する JSON 文字列を作成します。XBee デバイスの ADC, DIO ポートが有効になっていると、各々のポート値がスクリプトパラメータに設定されていますので、これを連結して JSON 形式に変換します。変換するときの詳しいフォーマットはスクリプト中のコメントを参照してください。

JSON 形式になったIO サンプリングデータは mqtt_publish() ライブラリ関数を使用して、MQTT ブローカに送信(Publish) します。このとき、トピック名には “/xbee/<NodeIdentifier>/io” を使用することで、購読側でデバイスを選択し易くしています。

続いて、XBee-ZB(Series2) のイベントハンドラスクリプト(ZB_IO_DATA.lua) も同様に設定します。

file_id = "ZB_IO_DATA"

--[[

******************************************************************************
* イベントハンドラスクリプト実行時間について                                 *
******************************************************************************

一つのスクリプトの実行は長くても数秒以内で必ず終了するようにしてください。
処理に時間がかかると、イベント処理の終了を待つサーバー側でタイムアウトが発生します。

また、同時実行可能なスクリプトの数に制限があるため、他のスクリプトの実行開始が
待たされる原因にもなります。

頻繁には発生しないイベントで、処理時間がかかるスクリプトを実行したい場合は
スクリプトを別に作成して、このイベントハンドラ中から script_fork_exec() を使用して
別スレッドで実行することを検討してください。

******************************************************************************

ZB_IO_DATA スクリプト起動時に渡される追加パラメータ

---------------------------------------------------------------------------------
キー値			値		            									値の例
---------------------------------------------------------------------------------
FrameType		フレームデータ中のFrame Type
				(16進数2桁)												92

SourceAddress	フレームデータ中のSourceAddress
				64bit アドレス(16進数16桁)								0013A200404AC397

NetworkAddress	フレームデータ中の SourceNetworkAddress
				16bit アドレス(16進数4桁)								D565

NodeIdentifier	XBee デバイスの NodeIdentifier。
				DeviceServerのマスターファイルを検索して設定される。	Node1
				マスターにNodeIdentifier未登録の場合は"" が設定される

DeviceType		XBee デバイスの Device Type
				DeviceServerのマスターファイルを検索して設定される。	01
				マスターにDeviceType未登録の場合は"" が設定される
				8bit値(16進数2桁)
				00: coordinator
                01: router
                02: end device

DeviceTypeID	XBee デバイスの Device Type Identifier
				DeviceServerのマスターファイルを検索して設定される。
				マスターにDeviceTypeID未登録の場合は"" が設定される
				32bit値(16進数8桁)										00030000

ReceiveOptions	フレームデータ中 ReceiveOptions							01
				8bit値(16進数2桁)

SAMPLE_COUNT	I/O データのサンプル数									1

SAMPLE_DIO		I/O データ中のサンプル対象となったDIOビット番号リスト
				(10進数、カンマ区切り)									0,1,4

SAMPLE_ADC		I/O データ中のサンプル対象となったADCビット番号リスト
				(10進数、カンマ区切り)									2,3

SAMPLE_<Sample#>_<"DIO"|"ADC">_<Bit#>

				I/O サンプルデータ値。
				DIO の場合は、High で "1"、Low で "0"。					1
				ADC の場合は 10進数。									1023

				<Sample#> には 最大、SAMPLE_COUNT まで 1から順番に
				インクリメントされた値が入る。

				<"DIO"|"ADC">は、I/O サンプルデータが ADC
				もしくは、DIO のどちらであるかを示す。

				<Bit#>は、サンプルデータのビット番号。10進数。

		例:"SAMPLE_1_ADC_0" は、第一サンプルデータ中の #0番ポートのADC変換値を示す。

]]

--[[
log_msg("start..",file_id)
for key,val in orderedPairs(g_params) do
	log_msg(string.format("g_params[%s] = %s",key,val),file_id)
end
]]

--[[
********************************************************************

XBee->MQTT Gateway 機能の実装例

リモート XBee から送信された I/O パケットデータを MQTT に登録します。
トピック名: "/zb/<NodeIdentifier>/io"
メッセージ文字列:
 {"ad0":<ad0_value>, .. ,"ad1":<ad1_value>, ... ,"dio0":<dio0_value>,"dio1":<dio1_value>,... }

XBee の IO 設定で有効になっている A/D, DIO ポート値のみが上記のフォーマット
で格納されます。

送信先 MQTT ブローカは、サーバー設定プログラムを使用して
エンドポイントを登録して下さい。下記スクリプト中の end_point 変数に
エンドポイントのタイトル文字列または ClientID を設定します。

********************************************************************
]]

local end_point = "センサーデータ登録"
local qos = 1
local topic = "/zb/" .. g_params["NodeIdentifier"] .. "/io"

local ad_list = csv_to_tbl(g_params["SAMPLE_ADC"])
local dio_list = csv_to_tbl(g_params["SAMPLE_DIO"])
local json_str = '{'
local first = true
for i,v in ipairs(ad_list) do
	if first then
		first = false
	else
		json_str = json_str .. ","
	end
	json_str = json_str .. string.format('"ad%s":%s',v,
				g_params["SAMPLE_" .. g_params["SAMPLE_COUNT"] .. "_ADC_" .. v])
end
for i,v in ipairs(dio_list) do
	if first then
		first = false
	else
		json_str = json_str .. ","
	end
	json_str = json_str .. string.format('"dio%s":%s',v,
				g_params["SAMPLE_" .. g_params["SAMPLE_COUNT"] .. "_DIO_" .. v])
end
json_str = json_str .. '}'
mqtt_publish(end_point,topic,json_str,qos)

スクリプトの内容は XBee(Series1) とほぼ同じですが、MQTT ブローカに送信するときのトピック名を “/zb/<NodeIdentifier>/io” に変更しています。

イベントハンドラスクリプトが完成したら、今回の XBee-MQTT Gateway の機能はほぼ実現していることになります。

●リモート側XBee デバイスの IO 設定

次に、光センサを接続したリモート側 XBee デバイスの IO 設定を変更して、IO サンプリングデータが送信されるようにします。

DeviceServer のクライアントを起動してログインします。

ログインしたら、”XBee” ツールボタンを押してPAN 内の XBee デバイス一覧を表示させます。

光センサを接続しているリモート側XBee (Device5) をダブルクリックするか、または選択した後に “設定変更” ツールボタンを押します。現在の XBee 設定値がリモートからロードされた後、詳細設定画面が表示されます。

ここで DIO0 ポートタブを選択して、ADC にチェックをつけます。これで AD0 が有効になりました。次に “Sample Before TX” を 1にして “Sample Rate” を 200ms に設定します。これによって XBee デバイスが 200ms 毎に IO サンプリングを実行して、その結果を DeviceServer に接続した XBee に送信するようになります。

その後 DeviceServer 側では、先ほど設定したイベントハンドラスクリプト中から MQTT ブローカに IO データが JSON 形式で送信されます。

複数の XBee デバイスからの IO サンプリングデータを、同時に MQTT ブローカに送信することもできます。その場合にはそれぞれの XBee デバイスの IO 設定を同様に変更してください。

“OK” を押すと同時に、リモート側 XBee デバイスの IO サンプリングが開始されて MQTT ブローカに送信されるようになります。

●RaspberryPi のコンソール上で、MQTT メッセージを購読して IO データを確認

ここで、MQTT ブローカに送信されている IO サンプリングを見てみましょう。RaspberryPi にログインして MQTT ブローカからメッセージを購読します。

mosquitto_sub コマンドを使用して、MQTT ブローカからメッセージを受信している様子です。XBee デバイスで有効になっている全ての IO 値が JSON 形式で送信されています。光センサに手をかざすと ad0 の値が変化するのを確認できます。

●動作例の動画

XBee-MQTT gateway が動作しているときの動画を載せましたのでご覧ください。(音量注意)

今回は、XBee データを MQTT に変換する例を紹介しましたが、同様のスクリプトを用意することで DeviceServer で取得した様々なセンサデータを MQTT に送信することが可能になります。

また、今回のシステムのセンサデータとは逆方向の Gateway も簡単に作成できます。DeviceServer 側に作成したエンドポイントで MQTT のブローカから特定のトピックを購読して、そのデータを元にリモート側の XBee のポートを変更させることもできます。この時には、MQTT ブローカからトピック・メッセージを受信するときに自動実行される MQTT_PUBLISH イベントハンドラに処理を記述します。

ここで紹介した DeviceServer の機能は、ABS-9000 DeviceServer インストールキットをダウンロードして、直ぐに使用することができます。イベントハンドラで使用したスクリプトファイルも全て格納されていますので是非お試しください。デモライセンスが添付されていますので直ちに使用可能です。こちらから最新のインストールキットやセットアップマニュアル、ユーザーマニュアルをダウンロードできます。

次の記事 “XBee-MQTT Gateway (その2)” では、今回作成したシステムで MQTT ブローカに配信されているセンサデータを WebSocket 経由で受信して、グラフ表示するWebアプリを紹介する予定です。

それではまた。