MQTTV311.py 30 KB


  1. """
  2. *******************************************************************
  3. Copyright (c) 2013, 2018 IBM Corp.
  4. All rights reserved. This program and the accompanying materials
  5. are made available under the terms of the Eclipse Public License v2.0
  6. and Eclipse Distribution License v1.0 which accompany this distribution.
  7. The Eclipse Public License is available at
  8. https://www.eclipse.org/legal/epl-2.0/
  9. and the Eclipse Distribution License is available at
  10. http://www.eclipse.org/org/documents/edl-v10.php.
  11. Contributors:
  12. Ian Craggs - initial implementation and/or documentation
  13. *******************************************************************
  14. """
  15. """
  16. Assertions are used to validate incoming data, but are omitted from outgoing packets. This is
  17. so that the tests that use this package can send invalid data for error testing.
  18. """
  19. import logging
  20. logger = logging.getLogger("mqttsas")
  21. # Low-level protocol interface
  22. class MQTTException(Exception):
  23. pass
  24. # Message types
  25. CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, \
  26. PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, \
  27. PINGREQ, PINGRESP, DISCONNECT = range(1, 15)
  28. packetNames = [ "reserved", \
  29. "Connect", "Connack", "Publish", "Puback", "Pubrec", "Pubrel", \
  30. "Pubcomp", "Subscribe", "Suback", "Unsubscribe", "Unsuback", \
  31. "Pingreq", "Pingresp", "Disconnect"]
  32. classNames = [ "reserved", \
  33. "Connects", "Connacks", "Publishes", "Pubacks", "Pubrecs", "Pubrels", \
  34. "Pubcomps", "Subscribes", "Subacks", "Unsubscribes", "Unsubacks", \
  35. "Pingreqs", "Pingresps", "Disconnects"]
  36. def MessageType(byte):
  37. if byte != None:
  38. rc = byte[0] >> 4
  39. else:
  40. rc = None
  41. return rc
  42. def getPacket(aSocket):
  43. "receive the next packet"
  44. buf = aSocket.recv(1) # get the first byte fixed header
  45. if buf == b"":
  46. return None
  47. if str(aSocket).find("[closed]") != -1:
  48. closed = True
  49. else:
  50. closed = False
  51. if closed:
  52. return None
  53. # now get the remaining length
  54. multiplier = 1
  55. remlength = 0
  56. while 1:
  57. next = aSocket.recv(1)
  58. while len(next) == 0:
  59. next = aSocket.recv(1)
  60. buf += next
  61. digit = buf[-1]
  62. remlength += (digit & 127) * multiplier
  63. if digit & 128 == 0:
  64. break
  65. multiplier *= 128
  66. # receive the remaining length if there is any
  67. rest = bytes([])
  68. if remlength > 0:
  69. while len(rest) < remlength:
  70. rest += aSocket.recv(remlength-len(rest))
  71. assert len(rest) == remlength
  72. return buf + rest
  73. class FixedHeaders:
  74. def __init__(self, aMessageType):
  75. self.MessageType = aMessageType
  76. self.DUP = False
  77. self.QoS = 0
  78. self.RETAIN = False
  79. self.remainingLength = 0
  80. def __eq__(self, fh):
  81. return self.MessageType == fh.MessageType and \
  82. self.DUP == fh.DUP and \
  83. self.QoS == fh.QoS and \
  84. self.RETAIN == fh.RETAIN # and \
  85. # self.remainingLength == fh.remainingLength
  86. def __str__(self):
  87. "return printable stresentation of our data"
  88. return classNames[self.MessageType]+'(DUP='+str(self.DUP)+ \
  89. ", QoS="+str(self.QoS)+", Retain="+str(self.RETAIN)
  90. def pack(self, length):
  91. "pack data into string buffer ready for transmission down socket"
  92. buffer = bytes([(self.MessageType << 4) | (self.DUP << 3) |\
  93. (self.QoS << 1) | self.RETAIN])
  94. self.remainingLength = length
  95. buffer += self.encode(length)
  96. return buffer
  97. def encode(self, x):
  98. assert 0 <= x <= 268435455
  99. buffer = b''
  100. while 1:
  101. digit = x % 128
  102. x //= 128
  103. if x > 0:
  104. digit |= 0x80
  105. buffer += bytes([digit])
  106. if x == 0:
  107. break
  108. return buffer
  109. def unpack(self, buffer):
  110. "unpack data from string buffer into separate fields"
  111. b0 = buffer[0]
  112. self.MessageType = b0 >> 4
  113. self.DUP = ((b0 >> 3) & 0x01) == 1
  114. self.QoS = (b0 >> 1) & 0x03
  115. self.RETAIN = (b0 & 0x01) == 1
  116. (self.remainingLength, bytes) = self.decode(buffer[1:])
  117. return bytes + 1 # length of fixed header
  118. def decode(self, buffer):
  119. multiplier = 1
  120. value = 0
  121. bytes = 0
  122. while 1:
  123. bytes += 1
  124. digit = buffer[0]
  125. buffer = buffer[1:]
  126. value += (digit & 127) * multiplier
  127. if digit & 128 == 0:
  128. break
  129. multiplier *= 128
  130. return (value, bytes)
  131. def writeInt16(length):
  132. return bytes([length // 256, length % 256])
  133. def readInt16(buf):
  134. return buf[0]*256 + buf[1]
  135. def writeUTF(data):
  136. # data could be a string, or bytes. If string, encode into bytes with utf-8
  137. return writeInt16(len(data)) + (data if type(data) == type(b"") else bytes(data, "utf-8"))
  138. def readUTF(buffer, maxlen):
  139. if maxlen >= 2:
  140. length = readInt16(buffer)
  141. else:
  142. raise MQTTException("Not enough data to read string length")
  143. maxlen -= 2
  144. if length > maxlen:
  145. raise MQTTException("Length delimited string too long")
  146. buf = buffer[2:2+length].decode("utf-8")
  147. logger.info("[MQTT-4.7.3-2] topic names and filters not include null")
  148. zz = buf.find("\x00") # look for null in the UTF string
  149. if zz != -1:
  150. raise MQTTException("[MQTT-1.5.3-2] Null found in UTF data "+buf)
  151. for c in range (0xD800, 0xDFFF):
  152. zz = buf.find(chr(c)) # look for D800-DFFF in the UTF string
  153. if zz != -1:
  154. raise MQTTException("[MQTT-1.5.3-1] D800-DFFF found in UTF data "+buf)
  155. if buf.find("\uFEFF") != -1:
  156. logger.info("[MQTT-1.5.3-3] U+FEFF in UTF string")
  157. return buf
  158. def writeBytes(buffer):
  159. return writeInt16(len(buffer)) + buffer
  160. def readBytes(buffer):
  161. length = readInt16(buffer)
  162. return buffer[2:2+length]
  163. class Packets:
  164. def pack(self):
  165. buffer = self.fh.pack(0)
  166. return buffer
  167. def __str__(self):
  168. return str(self.fh)
  169. def __eq__(self, packet):
  170. return self.fh == packet.fh if packet else False
  171. class Connects(Packets):
  172. def __init__(self, buffer = None):
  173. self.fh = FixedHeaders(CONNECT)
  174. # variable header
  175. self.ProtocolName = "MQTT"
  176. self.ProtocolVersion = 4
  177. self.CleanSession = True
  178. self.WillFlag = False
  179. self.WillQoS = 0
  180. self.WillRETAIN = 0
  181. self.KeepAliveTimer = 30
  182. self.usernameFlag = False
  183. self.passwordFlag = False
  184. # Payload
  185. self.ClientIdentifier = "" # UTF-8
  186. self.WillTopic = None # UTF-8
  187. self.WillMessage = None # binary
  188. self.username = None # UTF-8
  189. self.password = None # binary
  190. if buffer != None:
  191. self.unpack(buffer)
  192. def pack(self):
  193. connectFlags = bytes([(self.CleanSession << 1) | (self.WillFlag << 2) | \
  194. (self.WillQoS << 3) | (self.WillRETAIN << 5) | \
  195. (self.usernameFlag << 6) | (self.passwordFlag << 7)])
  196. buffer = writeUTF(self.ProtocolName) + bytes([self.ProtocolVersion]) + \
  197. connectFlags + writeInt16(self.KeepAliveTimer)
  198. buffer += writeUTF(self.ClientIdentifier)
  199. if self.WillFlag:
  200. buffer += writeUTF(self.WillTopic)
  201. buffer += writeBytes(self.WillMessage)
  202. if self.usernameFlag:
  203. buffer += writeUTF(self.username)
  204. if self.passwordFlag:
  205. buffer += writeBytes(self.password)
  206. buffer = self.fh.pack(len(buffer)) + buffer
  207. return buffer
  208. def unpack(self, buffer):
  209. assert len(buffer) >= 2
  210. assert MessageType(buffer) == CONNECT
  211. try:
  212. fhlen = self.fh.unpack(buffer)
  213. packlen = fhlen + self.fh.remainingLength
  214. assert len(buffer) >= packlen, "buffer length %d packet length %d" % (len(buffer), packlen)
  215. curlen = fhlen # points to after header + remaining length
  216. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  217. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS was not 0, was %d" % self.fh.QoS
  218. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  219. self.ProtocolName = readUTF(buffer[curlen:], packlen - curlen)
  220. curlen += len(self.ProtocolName) + 2
  221. assert self.ProtocolName == "MQTT", "Wrong protocol name %s" % self.ProtocolName
  222. self.ProtocolVersion = buffer[curlen]
  223. curlen += 1
  224. connectFlags = buffer[curlen]
  225. assert (connectFlags & 0x01) == 0, "[MQTT-3.1.2-3] reserved connect flag must be 0"
  226. self.CleanSession = ((connectFlags >> 1) & 0x01) == 1
  227. self.WillFlag = ((connectFlags >> 2) & 0x01) == 1
  228. self.WillQoS = (connectFlags >> 3) & 0x03
  229. self.WillRETAIN = (connectFlags >> 5) & 0x01
  230. self.passwordFlag = ((connectFlags >> 6) & 0x01) == 1
  231. self.usernameFlag = ((connectFlags >> 7) & 0x01) == 1
  232. curlen +=1
  233. if self.WillFlag:
  234. assert self.WillQoS in [0, 1, 2], "[MQTT-3.1.2-14] will qos must not be 3"
  235. else:
  236. assert self.WillQoS == 0, "[MQTT-3.1.2-13] will qos must be 0, if will flag is false"
  237. assert self.WillRETAIN == False, "[MQTT-3.1.2-14] will retain must be false, if will flag is false"
  238. self.KeepAliveTimer = readInt16(buffer[curlen:])
  239. curlen += 2
  240. logger.info("[MQTT-3.1.3-3] Clientid must be present, and first field")
  241. logger.info("[MQTT-3.1.3-4] Clientid must be Unicode, and between 0 and 65535 bytes long")
  242. self.ClientIdentifier = readUTF(buffer[curlen:], packlen - curlen)
  243. curlen += len(self.ClientIdentifier) + 2
  244. if self.WillFlag:
  245. self.WillTopic = readUTF(buffer[curlen:], packlen - curlen)
  246. curlen += len(self.WillTopic) + 2
  247. self.WillMessage = readBytes(buffer[curlen:])
  248. curlen += len(self.WillMessage) + 2
  249. logger.info("[[MQTT-3.1.2-9] will topic and will message fields must be present")
  250. else:
  251. self.WillTopic = self.WillMessage = None
  252. if self.usernameFlag:
  253. assert len(buffer) > curlen+2, "Buffer too short to read username length"
  254. self.username = readUTF(buffer[curlen:], packlen - curlen)
  255. curlen += len(self.username) + 2
  256. logger.info("[MQTT-3.1.2-19] username must be in payload if user name flag is 1")
  257. else:
  258. logger.info("[MQTT-3.1.2-18] username must not be in payload if user name flag is 0")
  259. assert self.passwordFlag == False, "[MQTT-3.1.2-22] password flag must be 0 if username flag is 0"
  260. if self.passwordFlag:
  261. assert len(buffer) > curlen+2, "Buffer too short to read password length"
  262. self.password = readBytes(buffer[curlen:])
  263. curlen += len(self.password) + 2
  264. logger.info("[MQTT-3.1.2-21] password must be in payload if password flag is 0")
  265. else:
  266. logger.info("[MQTT-3.1.2-20] password must not be in payload if password flag is 0")
  267. if self.WillFlag and self.usernameFlag and self.passwordFlag:
  268. logger.info("[MQTT-3.1.3-1] clientid, will topic, will message, username and password all present")
  269. assert curlen == packlen, "Packet is wrong length curlen %d != packlen %d"
  270. except:
  271. logger.exception("[MQTT-3.1.4-1] server must validate connect packet and close connection without connack if it does not conform")
  272. raise
  273. def __str__(self):
  274. buf = str(self.fh)+", ProtocolName="+str(self.ProtocolName)+", ProtocolVersion=" +\
  275. str(self.ProtocolVersion)+", CleanSession="+str(self.CleanSession) +\
  276. ", WillFlag="+str(self.WillFlag)+", KeepAliveTimer=" +\
  277. str(self.KeepAliveTimer)+", ClientId="+str(self.ClientIdentifier) +\
  278. ", usernameFlag="+str(self.usernameFlag)+", passwordFlag="+str(self.passwordFlag)
  279. if self.WillFlag:
  280. buf += ", WillQoS=" + str(self.WillQoS) +\
  281. ", WillRETAIN=" + str(self.WillRETAIN) +\
  282. ", WillTopic='"+ self.WillTopic +\
  283. "', WillMessage='"+str(self.WillMessage)+"'"
  284. if self.username:
  285. buf += ", username="+self.username
  286. if self.password:
  287. buf += ", password="+str(self.password)
  288. return buf+")"
  289. def __eq__(self, packet):
  290. rc = Packets.__eq__(self, packet) and \
  291. self.ProtocolName == packet.ProtocolName and \
  292. self.ProtocolVersion == packet.ProtocolVersion and \
  293. self.CleanSession == packet.CleanSession and \
  294. self.WillFlag == packet.WillFlag and \
  295. self.KeepAliveTimer == packet.KeepAliveTimer and \
  296. self.ClientIdentifier == packet.ClientIdentifier and \
  297. self.WillFlag == packet.WillFlag
  298. if rc and self.WillFlag:
  299. rc = self.WillQoS == packet.WillQoS and \
  300. self.WillRETAIN == packet.WillRETAIN and \
  301. self.WillTopic == packet.WillTopic and \
  302. self.WillMessage == packet.WillMessage
  303. return rc
  304. class Connacks(Packets):
  305. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, ReturnCode=0):
  306. self.fh = FixedHeaders(CONNACK)
  307. self.fh.DUP = DUP
  308. self.fh.QoS = QoS
  309. self.fh.Retain = Retain
  310. self.flags = 0
  311. self.returnCode = ReturnCode
  312. if buffer != None:
  313. self.unpack(buffer)
  314. def pack(self):
  315. buffer = bytes([self.flags, self.returnCode])
  316. buffer = self.fh.pack(len(buffer)) + buffer
  317. return buffer
  318. def unpack(self, buffer):
  319. assert len(buffer) >= 4
  320. assert MessageType(buffer) == CONNACK
  321. self.fh.unpack(buffer)
  322. assert self.fh.remainingLength == 2, "Connack packet is wrong length %d" % self.fh.remainingLength
  323. assert buffer[2] in [0, 1], "Connect Acknowledge Flags"
  324. self.returnCode = buffer[3]
  325. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  326. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  327. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  328. def __str__(self):
  329. return str(self.fh)+", Session present="+str((self.flags & 0x01) == 1)+", ReturnCode="+str(self.returnCode)+")"
  330. def __eq__(self, packet):
  331. return Packets.__eq__(self, packet) and \
  332. self.returnCode == packet.returnCode
  333. class Disconnects(Packets):
  334. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False):
  335. self.fh = FixedHeaders(DISCONNECT)
  336. self.fh.DUP = DUP
  337. self.fh.QoS = QoS
  338. self.fh.Retain = Retain
  339. if buffer != None:
  340. self.unpack(buffer)
  341. def unpack(self, buffer):
  342. assert len(buffer) >= 2
  343. assert MessageType(buffer) == DISCONNECT
  344. self.fh.unpack(buffer)
  345. assert self.fh.remainingLength == 0, "Disconnect packet is wrong length %d" % self.fh.remainingLength
  346. logger.info("[MQTT-3.14.1-1] disconnect reserved bits must be 0")
  347. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  348. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  349. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  350. def __str__(self):
  351. return str(self.fh)+")"
  352. class Publishes(Packets):
  353. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0, TopicName="", Payload=b""):
  354. self.fh = FixedHeaders(PUBLISH)
  355. self.fh.DUP = DUP
  356. self.fh.QoS = QoS
  357. self.fh.Retain = Retain
  358. # variable header
  359. self.topicName = TopicName
  360. self.messageIdentifier = MsgId
  361. # payload
  362. self.data = Payload
  363. if buffer != None:
  364. self.unpack(buffer)
  365. def pack(self):
  366. buffer = writeUTF(self.topicName)
  367. if self.fh.QoS != 0:
  368. buffer += writeInt16(self.messageIdentifier)
  369. buffer += self.data
  370. buffer = self.fh.pack(len(buffer)) + buffer
  371. return buffer
  372. def unpack(self, buffer):
  373. assert len(buffer) >= 2
  374. assert MessageType(buffer) == PUBLISH
  375. fhlen = self.fh.unpack(buffer)
  376. assert self.fh.QoS in [0, 1, 2], "QoS in Publish must be 0, 1, or 2"
  377. packlen = fhlen + self.fh.remainingLength
  378. assert len(buffer) >= packlen
  379. curlen = fhlen
  380. try:
  381. self.topicName = readUTF(buffer[fhlen:], packlen - curlen)
  382. except UnicodeDecodeError:
  383. logger.info("[MQTT-3.3.2-1] topic name in publish must be utf-8")
  384. raise
  385. curlen += len(self.topicName) + 2
  386. if self.fh.QoS != 0:
  387. self.messageIdentifier = readInt16(buffer[curlen:])
  388. logger.info("[MQTT-2.3.1-1] packet indentifier must be in publish if QoS is 1 or 2")
  389. curlen += 2
  390. assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
  391. else:
  392. logger.info("[MQTT-2.3.1-5] no packet indentifier in publish if QoS is 0")
  393. self.messageIdentifier = 0
  394. self.data = buffer[curlen:fhlen + self.fh.remainingLength]
  395. if self.fh.QoS == 0:
  396. assert self.fh.DUP == False, "[MQTT-2.1.2-4]"
  397. return fhlen + self.fh.remainingLength
  398. def __str__(self):
  399. rc = str(self.fh)
  400. if self.fh.QoS != 0:
  401. rc += ", MsgId="+str(self.messageIdentifier)
  402. rc += ", TopicName="+str(self.topicName)+", Payload="+str(self.data)+")"
  403. return rc
  404. def __eq__(self, packet):
  405. rc = Packets.__eq__(self, packet) and \
  406. self.topicName == packet.topicName and \
  407. self.data == packet.data
  408. if rc and self.fh.QoS != 0:
  409. rc = self.messageIdentifier == packet.messageIdentifier
  410. return rc
  411. class Pubacks(Packets):
  412. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
  413. self.fh = FixedHeaders(PUBACK)
  414. self.fh.DUP = DUP
  415. self.fh.QoS = QoS
  416. self.fh.Retain = Retain
  417. # variable header
  418. self.messageIdentifier = MsgId
  419. if buffer != None:
  420. self.unpack(buffer)
  421. def pack(self):
  422. buffer = writeInt16(self.messageIdentifier)
  423. buffer = self.fh.pack(len(buffer)) + buffer
  424. return buffer
  425. def unpack(self, buffer):
  426. assert len(buffer) >= 2
  427. assert MessageType(buffer) == PUBACK
  428. fhlen = self.fh.unpack(buffer)
  429. assert self.fh.remainingLength == 2, "Puback packet is wrong length %d" % self.fh.remainingLength
  430. assert len(buffer) >= fhlen + self.fh.remainingLength
  431. self.messageIdentifier = readInt16(buffer[fhlen:])
  432. assert self.fh.DUP == False, "[MQTT-2.1.2-1] Puback reserved bits must be 0"
  433. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] Puback reserved bits must be 0"
  434. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Puback reserved bits must be 0"
  435. return fhlen + 2
  436. def __str__(self):
  437. return str(self.fh)+", MsgId "+str(self.messageIdentifier)
  438. def __eq__(self, packet):
  439. return Packets.__eq__(self, packet) and \
  440. self.messageIdentifier == packet.messageIdentifier
  441. class Pubrecs(Packets):
  442. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
  443. self.fh = FixedHeaders(PUBREC)
  444. self.fh.DUP = DUP
  445. self.fh.QoS = QoS
  446. self.fh.Retain = Retain
  447. # variable header
  448. self.messageIdentifier = MsgId
  449. if buffer != None:
  450. self.unpack(buffer)
  451. def pack(self):
  452. buffer = writeInt16(self.messageIdentifier)
  453. buffer = self.fh.pack(len(buffer)) + buffer
  454. return buffer
  455. def unpack(self, buffer):
  456. assert len(buffer) >= 2
  457. assert MessageType(buffer) == PUBREC
  458. fhlen = self.fh.unpack(buffer)
  459. assert self.fh.remainingLength == 2, "Pubrec packet is wrong length %d" % self.fh.remainingLength
  460. assert len(buffer) >= fhlen + self.fh.remainingLength
  461. self.messageIdentifier = readInt16(buffer[fhlen:])
  462. assert self.fh.DUP == False, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0"
  463. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0"
  464. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0"
  465. return fhlen + 2
  466. def __str__(self):
  467. return str(self.fh)+", MsgId="+str(self.messageIdentifier)+")"
  468. def __eq__(self, packet):
  469. return Packets.__eq__(self, packet) and \
  470. self.messageIdentifier == packet.messageIdentifier
  471. class Pubrels(Packets):
  472. def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0):
  473. self.fh = FixedHeaders(PUBREL)
  474. self.fh.DUP = DUP
  475. self.fh.QoS = QoS
  476. self.fh.Retain = Retain
  477. # variable header
  478. self.messageIdentifier = MsgId
  479. if buffer != None:
  480. self.unpack(buffer)
  481. def pack(self):
  482. buffer = writeInt16(self.messageIdentifier)
  483. buffer = self.fh.pack(len(buffer)) + buffer
  484. return buffer
  485. def unpack(self, buffer):
  486. assert len(buffer) >= 2
  487. assert MessageType(buffer) == PUBREL
  488. fhlen = self.fh.unpack(buffer)
  489. assert self.fh.remainingLength == 2, "Pubrel packet is wrong length %d" % self.fh.remainingLength
  490. assert len(buffer) >= fhlen + self.fh.remainingLength
  491. self.messageIdentifier = readInt16(buffer[fhlen:])
  492. assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be False in PUBREL"
  493. assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS should be 1 in PUBREL"
  494. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN should be False in PUBREL"
  495. logger.info("[MQTT-3.6.1-1] bits in fixed header for pubrel are ok")
  496. return fhlen + 2
  497. def __str__(self):
  498. return str(self.fh)+", MsgId="+str(self.messageIdentifier)+")"
  499. def __eq__(self, packet):
  500. return Packets.__eq__(self, packet) and \
  501. self.messageIdentifier == packet.messageIdentifier
  502. class Pubcomps(Packets):
  503. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
  504. self.fh = FixedHeaders(PUBCOMP)
  505. self.fh.DUP = DUP
  506. self.fh.QoS = QoS
  507. self.fh.Retain = Retain
  508. # variable header
  509. self.messageIdentifier = MsgId
  510. if buffer != None:
  511. self.unpack(buffer)
  512. def pack(self):
  513. buffer = writeInt16(self.messageIdentifier)
  514. buffer = self.fh.pack(len(buffer)) + buffer
  515. return buffer
  516. def unpack(self, buffer):
  517. assert len(buffer) >= 2
  518. assert MessageType(buffer) == PUBCOMP
  519. fhlen = self.fh.unpack(buffer)
  520. assert len(buffer) >= fhlen + self.fh.remainingLength
  521. assert self.fh.remainingLength == 2, "Pubcomp packet is wrong length %d" % self.fh.remainingLength
  522. self.messageIdentifier = readInt16(buffer[fhlen:])
  523. assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be False in Pubcomp"
  524. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in Pubcomp"
  525. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in Pubcomp"
  526. return fhlen + 2
  527. def __str__(self):
  528. return str(self.fh)+", MsgId="+str(self.messageIdentifier)+")"
  529. def __eq__(self, packet):
  530. return Packets.__eq__(self, packet) and \
  531. self.messageIdentifier == packet.messageIdentifier
  532. class Subscribes(Packets):
  533. def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0, Data=[]):
  534. self.fh = FixedHeaders(SUBSCRIBE)
  535. self.fh.DUP = DUP
  536. self.fh.QoS = QoS
  537. self.fh.Retain = Retain
  538. # variable header
  539. self.messageIdentifier = MsgId
  540. # payload - list of topic, qos pairs
  541. self.data = Data[:]
  542. if buffer != None:
  543. self.unpack(buffer)
  544. def pack(self):
  545. buffer = writeInt16(self.messageIdentifier)
  546. for d in self.data:
  547. buffer += writeUTF(d[0]) + bytes([d[1]])
  548. buffer = self.fh.pack(len(buffer)) + buffer
  549. return buffer
  550. def unpack(self, buffer):
  551. assert len(buffer) >= 2
  552. assert MessageType(buffer) == SUBSCRIBE
  553. fhlen = self.fh.unpack(buffer)
  554. assert len(buffer) >= fhlen + self.fh.remainingLength
  555. logger.info("[MQTT-2.3.1-1] packet indentifier must be in subscribe")
  556. self.messageIdentifier = readInt16(buffer[fhlen:])
  557. assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
  558. leftlen = self.fh.remainingLength - 2
  559. self.data = []
  560. while leftlen > 0:
  561. topic = readUTF(buffer[-leftlen:], leftlen)
  562. leftlen -= len(topic) + 2
  563. qos = buffer[-leftlen]
  564. assert qos in [0, 1, 2], "[MQTT-3-8.3-2] reserved bits must be zero"
  565. leftlen -= 1
  566. self.data.append((topic, qos))
  567. assert len(self.data) > 0, "[MQTT-3.8.3-1] at least one topic, qos pair must be in subscribe"
  568. assert leftlen == 0
  569. assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP must be false in subscribe"
  570. assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS must be 1 in subscribe"
  571. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN must be false in subscribe"
  572. return fhlen + self.fh.remainingLength
  573. def __str__(self):
  574. return str(self.fh)+", MsgId="+str(self.messageIdentifier)+\
  575. ", Data="+str(self.data)+")"
  576. def __eq__(self, packet):
  577. return Packets.__eq__(self, packet) and \
  578. self.messageIdentifier == packet.messageIdentifier and \
  579. self.data == packet.data
  580. class Subacks(Packets):
  581. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0, Data=[]):
  582. self.fh = FixedHeaders(SUBACK)
  583. self.fh.DUP = DUP
  584. self.fh.QoS = QoS
  585. self.fh.Retain = Retain
  586. # variable header
  587. self.messageIdentifier = MsgId
  588. # payload - list of qos
  589. self.data = Data[:]
  590. if buffer != None:
  591. self.unpack(buffer)
  592. def pack(self):
  593. buffer = writeInt16(self.messageIdentifier)
  594. for d in self.data:
  595. buffer += bytes([d])
  596. buffer = self.fh.pack(len(buffer)) + buffer
  597. return buffer
  598. def unpack(self, buffer):
  599. assert len(buffer) >= 2
  600. assert MessageType(buffer) == SUBACK
  601. fhlen = self.fh.unpack(buffer)
  602. assert len(buffer) >= fhlen + self.fh.remainingLength
  603. self.messageIdentifier = readInt16(buffer[fhlen:])
  604. leftlen = self.fh.remainingLength - 2
  605. self.data = []
  606. while leftlen > 0:
  607. qos = buffer[-leftlen]
  608. assert qos in [0, 1, 2, 0x80], "[MQTT-3.9.3-2] return code in QoS must be 0, 1, 2 or 0x80"
  609. leftlen -= 1
  610. self.data.append(qos)
  611. assert leftlen == 0
  612. assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be false in suback"
  613. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in suback"
  614. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in suback"
  615. return fhlen + self.fh.remainingLength
  616. def __str__(self):
  617. return str(self.fh)+", MsgId="+str(self.messageIdentifier)+\
  618. ", Data="+str(self.data)+")"
  619. def __eq__(self, packet):
  620. return Packets.__eq__(self, packet) and \
  621. self.messageIdentifier == packet.messageIdentifier and \
  622. self.data == packet.data
  623. class Unsubscribes(Packets):
  624. def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0, Data=[]):
  625. self.fh = FixedHeaders(UNSUBSCRIBE)
  626. self.fh.DUP = DUP
  627. self.fh.QoS = QoS
  628. self.fh.Retain = Retain
  629. # variable header
  630. self.messageIdentifier = MsgId
  631. # payload - list of topics
  632. self.data = Data[:]
  633. if buffer != None:
  634. self.unpack(buffer)
  635. def pack(self):
  636. buffer = writeInt16(self.messageIdentifier)
  637. for d in self.data:
  638. buffer += writeUTF(d)
  639. buffer = self.fh.pack(len(buffer)) + buffer
  640. return buffer
  641. def unpack(self, buffer):
  642. assert len(buffer) >= 2
  643. assert MessageType(buffer) == UNSUBSCRIBE
  644. fhlen = self.fh.unpack(buffer)
  645. assert len(buffer) >= fhlen + self.fh.remainingLength
  646. logger.info("[MQTT-2.3.1-1] packet indentifier must be in unsubscribe")
  647. self.messageIdentifier = readInt16(buffer[fhlen:])
  648. assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
  649. leftlen = self.fh.remainingLength - 2
  650. self.data = []
  651. while leftlen > 0:
  652. topic = readUTF(buffer[-leftlen:], leftlen)
  653. leftlen -= len(topic) + 2
  654. self.data.append(topic)
  655. assert leftlen == 0
  656. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  657. assert self.fh.QoS == 1, "[MQTT-2.1.2-1]"
  658. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  659. logger.info("[MQTT-3-10.1-1] fixed header bits are 0,0,1,0")
  660. return fhlen + self.fh.remainingLength
  661. def __str__(self):
  662. return str(self.fh)+", MsgId="+str(self.messageIdentifier)+\
  663. ", Data="+str(self.data)+")"
  664. def __eq__(self, packet):
  665. return Packets.__eq__(self, packet) and \
  666. self.messageIdentifier == packet.messageIdentifier and \
  667. self.data == packet.data
  668. class Unsubacks(Packets):
  669. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
  670. self.fh = FixedHeaders(UNSUBACK)
  671. self.fh.DUP = DUP
  672. self.fh.QoS = QoS
  673. self.fh.Retain = Retain
  674. # variable header
  675. self.messageIdentifier = MsgId
  676. if buffer != None:
  677. self.unpack(buffer)
  678. def pack(self):
  679. buffer = writeInt16(self.messageIdentifier)
  680. buffer = self.fh.pack(len(buffer)) + buffer
  681. return buffer
  682. def unpack(self, buffer):
  683. assert len(buffer) >= 2
  684. assert MessageType(buffer) == UNSUBACK
  685. fhlen = self.fh.unpack(buffer)
  686. assert len(buffer) >= fhlen + self.fh.remainingLength
  687. self.messageIdentifier = readInt16(buffer[fhlen:])
  688. assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
  689. self.messageIdentifier = readInt16(buffer[fhlen:])
  690. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  691. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  692. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  693. return fhlen + self.fh.remainingLength
  694. def __str__(self):
  695. return str(self.fh)+", MsgId="+str(self.messageIdentifier)+")"
  696. def __eq__(self, packet):
  697. return Packets.__eq__(self, packet) and \
  698. self.messageIdentifier == packet.messageIdentifier
  699. class Pingreqs(Packets):
  700. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False):
  701. self.fh = FixedHeaders(PINGREQ)
  702. self.fh.DUP = DUP
  703. self.fh.QoS = QoS
  704. self.fh.Retain = Retain
  705. if buffer != None:
  706. self.unpack(buffer)
  707. def unpack(self, buffer):
  708. assert len(buffer) >= 2
  709. assert MessageType(buffer) == PINGREQ
  710. fhlen = self.fh.unpack(buffer)
  711. assert self.fh.remainingLength == 0
  712. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  713. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  714. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  715. return fhlen
  716. def __str__(self):
  717. return str(self.fh)+")"
  718. class Pingresps(Packets):
  719. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False):
  720. self.fh = FixedHeaders(PINGRESP)
  721. self.fh.DUP = DUP
  722. self.fh.QoS = QoS
  723. self.fh.Retain = Retain
  724. if buffer != None:
  725. self.unpack(buffer)
  726. def unpack(self, buffer):
  727. assert len(buffer) >= 2
  728. assert MessageType(buffer) == PINGRESP
  729. fhlen = self.fh.unpack(buffer)
  730. assert self.fh.remainingLength == 0
  731. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  732. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  733. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  734. return fhlen
  735. def __str__(self):
  736. return str(self.fh)+")"
  737. classes = [None, Connects, Connacks, Publishes, Pubacks, Pubrecs,
  738. Pubrels, Pubcomps, Subscribes, Subacks, Unsubscribes,
  739. Unsubacks, Pingreqs, Pingresps, Disconnects]
  740. def unpackPacket(buffer):
  741. if MessageType(buffer) != None:
  742. packet = classes[MessageType(buffer)]()
  743. packet.unpack(buffer)
  744. else:
  745. packet = None
  746. return packet
  747. if __name__ == "__main__":
  748. fh = FixedHeaders(CONNECT)
  749. tests = [0, 56, 127, 128, 8888, 16383, 16384, 65535, 2097151, 2097152,
  750. 20555666, 268435454, 268435455]
  751. for x in tests:
  752. try:
  753. assert x == fh.decode(fh.encode(x))[0]
  754. except AssertionError:
  755. print("Test failed for x =", x, fh.decode(fh.encode(x)))
  756. try:
  757. fh.decode(fh.encode(268435456))
  758. print("Error")
  759. except AssertionError:
  760. pass
  761. for packet in classes[1:]:
  762. before = str(packet())
  763. after = str(unpackPacket(packet().pack()))
  764. try:
  765. assert before == after
  766. except:
  767. print("before:", before, "\nafter:", after)
  768. print("End")