# -*- coding: utf-8 -*- """ATA over Ethernet Protocol.""" from __future__ import absolute_import import struct from . import dpkt from .compat import iteritems class AOE(dpkt.Packet): """ATA over Ethernet Protocol. See more about the AOE on https://en.wikipedia.org/wiki/ATA_over_Ethernet Attributes: __hdr__: Header fields of AOE. data: Message data. """ __hdr__ = ( ('ver_fl', 'B', 0x10), ('err', 'B', 0), ('maj', 'H', 0), ('min', 'B', 0), ('cmd', 'B', 0), ('tag', 'I', 0), ) _cmdsw = {} @property def ver(self): return self.ver_fl >> 4 @ver.setter def ver(self, ver): self.ver_fl = (ver << 4) | (self.ver_fl & 0xf) @property def fl(self): return self.ver_fl & 0xf @fl.setter def fl(self, fl): self.ver_fl = (self.ver_fl & 0xf0) | fl @classmethod def set_cmd(cls, cmd, pktclass): cls._cmdsw[cmd] = pktclass @classmethod def get_cmd(cls, cmd): return cls._cmdsw[cmd] def unpack(self, buf): dpkt.Packet.unpack(self, buf) try: self.data = self._cmdsw[self.cmd](self.data) setattr(self, self.data.__class__.__name__.lower(), self.data) except (KeyError, struct.error, dpkt.UnpackError): pass AOE_CMD_ATA = 0 AOE_CMD_CFG = 1 AOE_FLAG_RSP = 1 << 3 def _load_cmds(): prefix = 'AOE_CMD_' g = globals() for k, v in iteritems(g): if k.startswith(prefix): name = 'aoe' + k[len(prefix):].lower() try: mod = __import__(name, g, level=1) AOE.set_cmd(v, getattr(mod, name.upper())) except (ImportError, AttributeError): continue def _mod_init(): """Post-initialization called when all dpkt modules are fully loaded""" if not AOE._cmdsw: _load_cmds() def test_creation(): aoe = AOE() # hdr fields assert aoe.ver_fl == 0x10 assert aoe.err == 0 assert aoe.maj == 0 assert aoe.min == 0 assert aoe.cmd == 0 assert aoe.tag == 0 assert bytes(aoe) == b'\x10' + b'\x00'*9 def test_properties(): aoe = AOE() # propery getters assert aoe.ver == 1 assert aoe.fl == 0 # property setters aoe.ver = 2 assert aoe.ver == 2 assert aoe.ver_fl == 0x20 aoe.fl = 12 assert aoe.fl == 12 assert aoe.ver_fl == 0x2C def test_unpack(): from binascii import unhexlify buf = unhexlify( '1000000000' '00' # cmd: AOE_CMD_ATA '00000000' # tag ) aoe = AOE(buf) # AOE_CMD_ATA speficied, but no data supplied assert aoe.data == b'' buf = unhexlify( '1000000000' '00' # cmd: AOE_CMD_ATA '00000000' # tag # AOEDATA specification '030a6b190000000045000028941f0000e30699b4232b2400de8e8442abd100500035e1' '2920d9000000229bf0e204656b' ) aoe = AOE(buf) assert aoe.aoeata == aoe.data def test_cmds(): import dpkt assert AOE.get_cmd(AOE_CMD_ATA) == dpkt.aoeata.AOEATA assert AOE.get_cmd(AOE_CMD_CFG) == dpkt.aoecfg.AOECFG def test_cmd_loading(): # this test checks that failing to load a module isn't catastrophic standard_cmds = AOE._cmdsw # delete the existing code->module mappings AOE._cmdsw = {} assert not AOE._cmdsw # create a new global constant pointing to a module which doesn't exist globals()['AOE_CMD_FAIL'] = "FAIL" _mod_init() # check that the same modules were loaded, ignoring the fail assert AOE._cmdsw == standard_cmds