123 lines
3.8 KiB
Python
123 lines
3.8 KiB
Python
"""
|
|
*******************************************************************
|
|
Copyright (c) 2013, 2017 IBM Corp.
|
|
|
|
All rights reserved. This program and the accompanying materials
|
|
are made available under the terms of the Eclipse Public License v1.0
|
|
and Eclipse Distribution License v1.0 which accompany this distribution.
|
|
|
|
The Eclipse Public License is available at
|
|
http://www.eclipse.org/legal/epl-v10.html
|
|
and the Eclipse Distribution License is available at
|
|
http://www.eclipse.org/org/documents/edl-v10.php.
|
|
|
|
Contributors:
|
|
Ian Craggs - initial implementation and/or documentation
|
|
*******************************************************************
|
|
"""
|
|
from __future__ import print_function
|
|
|
|
# Trace MQTT traffic
|
|
import MQTTV3112 as MQTTV3
|
|
|
|
import socket, sys, select, traceback, datetime, os
|
|
import SocketServer as socketserver
|
|
|
|
logging = True
|
|
myWindow = None
|
|
|
|
|
|
def timestamp():
|
|
now = datetime.datetime.now()
|
|
return now.strftime('%Y%m%d %H%M%S')+str(float("."+str(now.microsecond)))[1:]
|
|
|
|
|
|
class MyHandler(socketserver.StreamRequestHandler):
|
|
|
|
def handle(self):
|
|
if not hasattr(self, "ids"):
|
|
self.ids = {}
|
|
if not hasattr(self, "versions"):
|
|
self.versions = {}
|
|
inbuf = True
|
|
i = o = e = None
|
|
try:
|
|
clients = self.request
|
|
brokers = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
brokers.connect((brokerhost, brokerport))
|
|
terminated = False
|
|
while inbuf != None and not terminated:
|
|
(i, o, e) = select.select([clients, brokers], [], [])
|
|
for s in i:
|
|
if s == clients:
|
|
inbuf = MQTTV3.getPacket(clients) # get one packet
|
|
if inbuf == None:
|
|
break
|
|
try:
|
|
packet = MQTTV3.unpackPacket(inbuf)
|
|
if packet.fh.MessageType == MQTTV3.PUBLISH and \
|
|
packet.topicName == "MQTTSAS topic" and \
|
|
packet.data == b"TERMINATE":
|
|
print("Terminating client", self.ids[id(clients)])
|
|
brokers.close()
|
|
clients.close()
|
|
terminated = True
|
|
break
|
|
elif packet.fh.MessageType == MQTTV3.CONNECT:
|
|
self.ids[id(clients)] = packet.ClientIdentifier
|
|
self.versions[id(clients)] = 3
|
|
print(timestamp() , "C to S", self.ids[id(clients)], repr(packet))
|
|
#print([hex(b) for b in inbuf])
|
|
#print(inbuf)
|
|
except:
|
|
traceback.print_exc()
|
|
brokers.send(inbuf) # pass it on
|
|
elif s == brokers:
|
|
inbuf = MQTTV3.getPacket(brokers) # get one packet
|
|
if inbuf == None:
|
|
break
|
|
try:
|
|
print(timestamp(), "S to C", self.ids[id(clients)], repr(MQTTV3.unpackPacket(inbuf)))
|
|
except:
|
|
traceback.print_exc()
|
|
clients.send(inbuf)
|
|
print(timestamp()+" client "+self.ids[id(clients)]+" connection closing")
|
|
except:
|
|
print(repr((i, o, e)), repr(inbuf))
|
|
traceback.print_exc()
|
|
if id(clients) in self.ids.keys():
|
|
del self.ids[id(clients)]
|
|
elif id(clients) in self.versions.keys():
|
|
del self.versions[id(clients)]
|
|
|
|
class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
|
pass
|
|
|
|
def run():
|
|
global brokerhost, brokerport
|
|
myhost = '127.0.0.1'
|
|
if len(sys.argv) > 1:
|
|
brokerhost = sys.argv[1]
|
|
else:
|
|
brokerhost = '127.0.0.1'
|
|
|
|
if len(sys.argv) > 2:
|
|
brokerport = int(sys.argv[2])
|
|
else:
|
|
brokerport = 1883
|
|
|
|
if len(sys.argv) > 3:
|
|
myport = int(sys.argv[3])
|
|
else:
|
|
if brokerhost == myhost:
|
|
myport = brokerport + 1
|
|
else:
|
|
myport = 1883
|
|
|
|
print("Listening on port", str(myport)+", broker on port", brokerport)
|
|
s = ThreadingTCPServer(("127.0.0.1", myport), MyHandler)
|
|
s.serve_forever()
|
|
|
|
if __name__ == "__main__":
|
|
run()
|