paho-mqttで回線を切るとどうなるか

vimのことを書こうと思いましたが、どうもいいネタが思い浮かばないのでMQTTの話にします。

paho-mqttでテスト用のサブスクライブスクリプトを動かして、途中でネットワークのコネクションを切るとどうなるか、という実験です。

サブスクライバは下記のようなものです。これはpaho-mqttのページにあったテスト用のスクリプトです。

test_subscriber.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
topic = "TOPIC/YOU/WANTS/SEE"
broker = "192.168.0.xxx"
portNo = 1883
import paho.mqtt.client as mqtt
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, rc):
print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
# client.subscribe("$SYS/broker/messages/#")
client.subscribe(topic)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker, portNo, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

で、これを実行して、途中でネットワークを切って見ました。

端末上ではこんな感じ。

In case of transmission line failure..
1
2
3
4
5
6
1419926642, 1767, 1433, 2513, 0
Connected with result code 0
1419926762, 1767, 1434, 2514, 0
1419926821, 1767, 1432, 2513, 0
:

最初のサブスクライブを確認してから、接続を60秒以上切断し、元に戻します。

データの最初のフィールドはタイムスタンプなので、120秒のデータ欠損があることがわかります。
その後経路が回復した時点で、再接続されていることがわかります。

スクリプトではclient.connect(broker, portNo, 60)として60秒以上交信しないとpingでMQTT接続をキープするように設定しています。この実験では、60秒以上コネクションを切っていますのでpingが発行され、その答えは帰ってこない状況に有ります。さらにブローカはmosquittoをデフォルトで動かしているので、ブローカがMQTTのコネクションを切ることはありません。(persistent_client_expirationを設定していない)

スクリプトを実行した時のパケットの様子を見てみると。。。。

  • TCP でのネゴシエーションがあったあと、MQTTでの接続リクエストがなされます。

  • 続いてMQTTの接続許可がブローカから来ます。
    そしてサブスクライブ要求としてトピックの指定をブローカに送ります。

  • この返事として、TCPのACKがあり、サブスクライブの準備が整います。指定したトピックにパブリッシュがあると、サブスクライブされるようになります。

そして、スクリプト開始から66.9秒後に最初のデータがきました。その後、接続を切ります。

183秒のときに接続を再開。

このとき起こったことは。。。。

  • まず、TCPでの接続のために、クライアントからブローカに向かってSYN,ACK、SYNが取り交わされます。

  • その後、最初の接続と同じように、MQTTのコネクションが最初の接続と同じIDで開始され、次にサブスクライブ要求が発行されています。

  • サブスクライブ要求後、サブスクライブ確認のメッセージがブローカから帰ってきます。

  • その3秒後、ブローカからFINパケットとともにデータが送られてきて、クライアントがRSTパケットを返してTCP接続が切られます。

    • このFIN-RST応答は、そのシーケンス番号から、回線を切る前のコネクションの異常終了のためのやり取りだという事がわかりました。ですので、このパケットに乗っていたデータは以前のコネクションのデータだということで破棄されているはずです。スクリプトの出力を見るとこのデータは出力されていませんので、これは正しい理解だという事にします。
  • そのあと、最新のデータが配信されてきました。このデータのタイムスタンプは1419926762で、スクリプトの出力からこのタイムスタンプのデータの受信は確認出来ました。

以上が、回線を切断された時のこのスクリプトの動作です。

ここで疑問は、「なぜ、コネクション要求がクライアントから出されるのか」という点です。
スクリプトでは、接続がロストした時の内容は記述していません。ということは、ライブラリ側の判断で再接続がなされているという事になります。

で、よくマニュアルを読むと、loop_forever()ファンクションがこれを管理しているようです。

###Network loop

These functions are the driving force behind the client. If they are not called, incoming network data will not be processed and outgoing network data may not be sent in a timely fashion.

これらのファンクションはクライアントの裏で駆動力となるものです。これらが呼ばれなければ入力されたデータは処理されず、出力されるべきデータは時間通りに送られないでしょう。

###loop_forever()

This is a blocking form of the network loop and will not return until the client calls disconnect(). It automatically handles reconnecting.

これはネットワークループのブロッキングフォームで(私には意味不明)、クライアントがdisconnect()関数を呼び出すまでは帰って来ません。自動的に再接続を管理します。

ということなので、(予想では)loop_forever()が再接続を発行して、その結果、スクリプトの処理で再度サブスクライブ要求が出される。ということなのでしょう。

今回、断線後の再接続では切断前と同じIDを使っていましたので、MQTTプロトコル的には再度サブスクライブ要求を出さずとも、以前に指定したトピックをサブスクライブできたはずです。
ただ、一体何が起こっているのか判断がつかない状況では、再接続後に再度サブスクライブ要求を出すというのは妥当な処理なのかなとも思います。

ここでのもうひとつの疑問は、「回線が切れた、と判断する基準は何?」ということです。

これについて、connect()のkeepaliveだけ切断する状況と、充分短い時間(ほんの数秒)切断する状況で確認してみたところ、短い時間では再接続が起こらなかったことから、keepaliveの時間以上接続が切れていると(pingに応答がないと)切断されたと判断して「再接続」を行うようです。

マニュアル上では

###keepalive

maximum period in seconds allowed between communications with the broker. If no other messages are being exchanged, this controls the rate at which the client will send ping messages to the broker

ブローカとの最大許されるコミュニケーション間の時間(秒)。他のメッセージが交換されなければ、この数値に基づいてクライアントがpingメッセージをブローカに送るレートをコントロールします。

ということで、pingに応答がなければ再接続のトリガになるとは書かれていません。