# $Id: 80211.py 53 2008-12-18 01:22:57Z jon.oberheide $ # -*- coding: utf-8 -*- """IEEE 802.11.""" from __future__ import print_function from __future__ import absolute_import import struct from . import dpkt from .compat import ntole # Frame Types MGMT_TYPE = 0 CTL_TYPE = 1 DATA_TYPE = 2 # Frame Sub-Types M_ASSOC_REQ = 0 M_ASSOC_RESP = 1 M_REASSOC_REQ = 2 M_REASSOC_RESP = 3 M_PROBE_REQ = 4 M_PROBE_RESP = 5 M_BEACON = 8 M_ATIM = 9 M_DISASSOC = 10 M_AUTH = 11 M_DEAUTH = 12 M_ACTION = 13 C_BLOCK_ACK_REQ = 8 C_BLOCK_ACK = 9 C_PS_POLL = 10 C_RTS = 11 C_CTS = 12 C_ACK = 13 C_CF_END = 14 C_CF_END_ACK = 15 D_DATA = 0 D_DATA_CF_ACK = 1 D_DATA_CF_POLL = 2 D_DATA_CF_ACK_POLL = 3 D_NULL = 4 D_CF_ACK = 5 D_CF_POLL = 6 D_CF_ACK_POLL = 7 D_QOS_DATA = 8 D_QOS_CF_ACK = 9 D_QOS_CF_POLL = 10 D_QOS_CF_ACK_POLL = 11 D_QOS_NULL = 12 D_QOS_CF_POLL_EMPTY = 14 TO_DS_FLAG = 10 FROM_DS_FLAG = 1 INTER_DS_FLAG = 11 # Bitshifts for Frame Control _VERSION_MASK = 0x0300 _TYPE_MASK = 0x0c00 _SUBTYPE_MASK = 0xf000 _TO_DS_MASK = 0x0001 _FROM_DS_MASK = 0x0002 _MORE_FRAG_MASK = 0x0004 _RETRY_MASK = 0x0008 _PWR_MGT_MASK = 0x0010 _MORE_DATA_MASK = 0x0020 _WEP_MASK = 0x0040 _ORDER_MASK = 0x0080 _VERSION_SHIFT = 8 _TYPE_SHIFT = 10 _SUBTYPE_SHIFT = 12 _TO_DS_SHIFT = 0 _FROM_DS_SHIFT = 1 _MORE_FRAG_SHIFT = 2 _RETRY_SHIFT = 3 _PWR_MGT_SHIFT = 4 _MORE_DATA_SHIFT = 5 _WEP_SHIFT = 6 _ORDER_SHIFT = 7 # IEs IE_SSID = 0 IE_RATES = 1 IE_FH = 2 IE_DS = 3 IE_CF = 4 IE_TIM = 5 IE_IBSS = 6 IE_HT_CAPA = 45 IE_ESR = 50 IE_HT_INFO = 61 FCS_LENGTH = 4 FRAMES_WITH_CAPABILITY = [M_BEACON, M_ASSOC_RESP, M_ASSOC_REQ, M_REASSOC_REQ, ] # Block Ack control constants _ACK_POLICY_SHIFT = 0 _MULTI_TID_SHIFT = 1 _COMPRESSED_SHIFT = 2 _TID_SHIFT = 12 _ACK_POLICY_MASK = 0x0001 _MULTI_TID_MASK = 0x0002 _COMPRESSED_MASK = 0x0004 _TID_MASK = 0xf000 _COMPRESSED_BMP_LENGTH = 8 _BMP_LENGTH = 128 # Action frame categories BLOCK_ACK = 3 # Block ack category action codes BLOCK_ACK_CODE_REQUEST = 0 BLOCK_ACK_CODE_RESPONSE = 1 BLOCK_ACK_CODE_DELBA = 2 class IEEE80211(dpkt.Packet): """IEEE 802.11. TODO: Longer class information.... Attributes: __hdr__: Header fields of IEEE802.11. TODO. """ __hdr__ = ( ('framectl', 'H', 0), ('duration', 'H', 0) ) # The standard really defines the entire MAC protocol as little-endian on the wire, # however there is broken logic in the rest of the module preventing this from working right now # __byte_order__ = '<' @property def version(self): return (self.framectl & _VERSION_MASK) >> _VERSION_SHIFT @version.setter def version(self, val): self.framectl = (val << _VERSION_SHIFT) | (self.framectl & ~_VERSION_MASK) @property def type(self): return (self.framectl & _TYPE_MASK) >> _TYPE_SHIFT @type.setter def type(self, val): self.framectl = (val << _TYPE_SHIFT) | (self.framectl & ~_TYPE_MASK) @property def subtype(self): return (self.framectl & _SUBTYPE_MASK) >> _SUBTYPE_SHIFT @subtype.setter def subtype(self, val): self.framectl = (val << _SUBTYPE_SHIFT) | (self.framectl & ~_SUBTYPE_MASK) @property def to_ds(self): return (self.framectl & _TO_DS_MASK) >> _TO_DS_SHIFT @to_ds.setter def to_ds(self, val): self.framectl = (val << _TO_DS_SHIFT) | (self.framectl & ~_TO_DS_MASK) @property def from_ds(self): return (self.framectl & _FROM_DS_MASK) >> _FROM_DS_SHIFT @from_ds.setter def from_ds(self, val): self.framectl = (val << _FROM_DS_SHIFT) | (self.framectl & ~_FROM_DS_MASK) @property def more_frag(self): return (self.framectl & _MORE_FRAG_MASK) >> _MORE_FRAG_SHIFT @more_frag.setter def more_frag(self, val): self.framectl = (val << _MORE_FRAG_SHIFT) | (self.framectl & ~_MORE_FRAG_MASK) @property def retry(self): return (self.framectl & _RETRY_MASK) >> _RETRY_SHIFT @retry.setter def retry(self, val): self.framectl = (val << _RETRY_SHIFT) | (self.framectl & ~_RETRY_MASK) @property def pwr_mgt(self): return (self.framectl & _PWR_MGT_MASK) >> _PWR_MGT_SHIFT @pwr_mgt.setter def pwr_mgt(self, val): self.framectl = (val << _PWR_MGT_SHIFT) | (self.framectl & ~_PWR_MGT_MASK) @property def more_data(self): return (self.framectl & _MORE_DATA_MASK) >> _MORE_DATA_SHIFT @more_data.setter def more_data(self, val): self.framectl = (val << _MORE_DATA_SHIFT) | (self.framectl & ~_MORE_DATA_MASK) @property def wep(self): return (self.framectl & _WEP_MASK) >> _WEP_SHIFT @wep.setter def wep(self, val): self.framectl = (val << _WEP_SHIFT) | (self.framectl & ~_WEP_MASK) @property def order(self): return (self.framectl & _ORDER_MASK) >> _ORDER_SHIFT @order.setter def order(self, val): self.framectl = (val << _ORDER_SHIFT) | (self.framectl & ~_ORDER_MASK) def unpack_ies(self, buf): self.ies = [] ie_decoder = { IE_SSID: ('ssid', self.IE), IE_RATES: ('rate', self.IE), IE_FH: ('fh', self.FH), IE_DS: ('ds', self.DS), IE_CF: ('cf', self.CF), IE_TIM: ('tim', self.TIM), IE_IBSS: ('ibss', self.IBSS), IE_HT_CAPA: ('ht_capa', self.IE), IE_ESR: ('esr', self.IE), IE_HT_INFO: ('ht_info', self.IE) } # each IE starts with an ID and a length while len(buf) > FCS_LENGTH: ie_id = struct.unpack('B', buf[:1])[0] try: parser = ie_decoder[ie_id][1] name = ie_decoder[ie_id][0] except KeyError: parser = self.IE name = 'ie_' + str(ie_id) ie = parser(buf) ie.data = buf[2:2 + ie.len] setattr(self, name, ie) self.ies.append(ie) buf = buf[2 + ie.len:] class Capability(object): def __init__(self, field): self.ess = field & 1 self.ibss = (field >> 1) & 1 self.cf_poll = (field >> 2) & 1 self.cf_poll_req = (field >> 3) & 1 self.privacy = (field >> 4) & 1 self.short_preamble = (field >> 5) & 1 self.pbcc = (field >> 6) & 1 self.hopping = (field >> 7) & 1 self.spec_mgmt = (field >> 8) & 1 self.qos = (field >> 9) & 1 self.short_slot = (field >> 10) & 1 self.apsd = (field >> 11) & 1 self.dsss = (field >> 13) & 1 self.delayed_blk_ack = (field >> 14) & 1 self.imm_blk_ack = (field >> 15) & 1 def __init__(self, *args, **kwargs): if kwargs and 'fcs' in kwargs: self.fcs_present = kwargs.pop('fcs') else: self.fcs_present = False super(IEEE80211, self).__init__(*args, **kwargs) def unpack(self, buf): dpkt.Packet.unpack(self, buf) self.data = buf[self.__hdr_len__:] m_decoder = { M_BEACON: ('beacon', self.Beacon), M_ASSOC_REQ: ('assoc_req', self.Assoc_Req), M_ASSOC_RESP: ('assoc_resp', self.Assoc_Resp), M_DISASSOC: ('diassoc', self.Disassoc), M_REASSOC_REQ: ('reassoc_req', self.Reassoc_Req), M_REASSOC_RESP: ('reassoc_resp', self.Assoc_Resp), M_AUTH: ('auth', self.Auth), M_PROBE_RESP: ('probe_resp', self.Beacon), M_DEAUTH: ('deauth', self.Deauth), M_ACTION: ('action', self.Action) } c_decoder = { C_RTS: ('rts', self.RTS), C_CTS: ('cts', self.CTS), C_ACK: ('ack', self.ACK), C_BLOCK_ACK_REQ: ('bar', self.BlockAckReq), C_BLOCK_ACK: ('back', self.BlockAck), C_CF_END: ('cf_end', self.CFEnd), } d_dsData = { 0: self.Data, FROM_DS_FLAG: self.DataFromDS, TO_DS_FLAG: self.DataToDS, INTER_DS_FLAG: self.DataInterDS } # For now decode everything with DATA. Haven't checked about other QoS # additions d_decoder = { # modified the decoder to consider the ToDS and FromDS flags # Omitting the 11 case for now D_DATA: ('data_frame', d_dsData), D_NULL: ('data_frame', d_dsData), D_QOS_DATA: ('data_frame', d_dsData), D_QOS_NULL: ('data_frame', d_dsData) } decoder = { MGMT_TYPE: m_decoder, CTL_TYPE: c_decoder, DATA_TYPE: d_decoder } # Strip off the FCS field if self.fcs_present: self.fcs = struct.unpack('> _COMPRESSED_SHIFT @compressed.setter def compressed(self, val): self.ctl = (val << _COMPRESSED_SHIFT) | (self.ctl & ~_COMPRESSED_MASK) @property def ack_policy(self): return (self.ctl & _ACK_POLICY_MASK) >> _ACK_POLICY_SHIFT @ack_policy.setter def ack_policy(self, val): self.ctl = (val << _ACK_POLICY_SHIFT) | (self.ctl & ~_ACK_POLICY_MASK) @property def multi_tid(self): return (self.ctl & _MULTI_TID_MASK) >> _MULTI_TID_SHIFT @multi_tid.setter def multi_tid(self, val): self.ctl = (val << _MULTI_TID_SHIFT) | (self.ctl & ~_MULTI_TID_MASK) @property def tid(self): return (self.ctl & _TID_MASK) >> _TID_SHIFT @tid.setter def tid(self, val): self.ctl = (val << _TID_SHIFT) | (self.ctl & ~_TID_MASK) def unpack(self, buf): dpkt.Packet.unpack(self, buf) self.data = buf[self.__hdr_len__:] self.ctl = ntole(self.ctl) if self.compressed: self.bmp = struct.unpack('8s', self.data[0:_COMPRESSED_BMP_LENGTH])[0] else: self.bmp = struct.unpack('128s', self.data[0:_BMP_LENGTH])[0] self.data = self.data[len(self.__hdr__) + len(self.bmp):] class RTS(dpkt.Packet): __hdr__ = ( ('dst', '6s', '\x00' * 6), ('src', '6s', '\x00' * 6) ) class CTS(dpkt.Packet): __hdr__ = ( ('dst', '6s', '\x00' * 6), ) class ACK(dpkt.Packet): __hdr__ = ( ('dst', '6s', '\x00' * 6), ) class CFEnd(dpkt.Packet): __hdr__ = ( ('dst', '6s', '\x00' * 6), ('src', '6s', '\x00' * 6), ) class MGMT_Frame(dpkt.Packet): __hdr__ = ( ('dst', '6s', '\x00' * 6), ('src', '6s', '\x00' * 6), ('bssid', '6s', '\x00' * 6), ('frag_seq', 'H', 0) ) class Beacon(dpkt.Packet): __hdr__ = ( ('timestamp', 'Q', 0), ('interval', 'H', 0), ('capability', 'H', 0) ) class Disassoc(dpkt.Packet): __hdr__ = ( ('reason', 'H', 0), ) class Assoc_Req(dpkt.Packet): __hdr__ = ( ('capability', 'H', 0), ('interval', 'H', 0) ) class Assoc_Resp(dpkt.Packet): __hdr__ = ( ('capability', 'H', 0), ('status', 'H', 0), ('aid', 'H', 0) ) class Reassoc_Req(dpkt.Packet): __hdr__ = ( ('capability', 'H', 0), ('interval', 'H', 0), ('current_ap', '6s', '\x00' * 6) ) # This obviously doesn't support any of AUTH frames that use encryption class Auth(dpkt.Packet): __hdr__ = ( ('algorithm', 'H', 0), ('auth_seq', 'H', 0), ) class Deauth(dpkt.Packet): __hdr__ = ( ('reason', 'H', 0), ) class Action(dpkt.Packet): __hdr__ = ( ('category', 'B', 0), ('code', 'B', 0), ) def unpack(self, buf): dpkt.Packet.unpack(self, buf) action_parser = { BLOCK_ACK: { BLOCK_ACK_CODE_REQUEST: ('block_ack_request', IEEE80211.BlockAckActionRequest), BLOCK_ACK_CODE_RESPONSE: ('block_ack_response', IEEE80211.BlockAckActionResponse), BLOCK_ACK_CODE_DELBA: ('block_ack_delba', IEEE80211.BlockAckActionDelba), }, } try: decoder = action_parser[self.category][self.code][1] field_name = action_parser[self.category][self.code][0] except KeyError: raise dpkt.UnpackError("KeyError: category=%s code=%s" % (self.category, self.code)) field = decoder(self.data) setattr(self, field_name, field) self.data = field.data class BlockAckActionRequest(dpkt.Packet): __hdr__ = ( ('dialog', 'B', 0), ('parameters', 'H', 0), ('timeout', 'H', 0), ('starting_seq', 'H', 0), ) class BlockAckActionResponse(dpkt.Packet): __hdr__ = ( ('dialog', 'B', 0), ('status_code', 'H', 0), ('parameters', 'H', 0), ('timeout', 'H', 0), ) class BlockAckActionDelba(dpkt.Packet): __byte_order__ = '<' __hdr__ = ( ('delba_param_set', 'H', 0), ('reason_code', 'H', 0), # ('gcr_group_addr', '8s', '\x00' * 8), # Standard says it must be there, but it isn't? ) class Data(dpkt.Packet): __hdr__ = ( ('dst', '6s', '\x00' * 6), ('src', '6s', '\x00' * 6), ('bssid', '6s', '\x00' * 6), ('frag_seq', 'H', 0) ) class DataFromDS(dpkt.Packet): __hdr__ = ( ('dst', '6s', '\x00' * 6), ('bssid', '6s', '\x00' * 6), ('src', '6s', '\x00' * 6), ('frag_seq', 'H', 0) ) class DataToDS(dpkt.Packet): __hdr__ = ( ('bssid', '6s', '\x00' * 6), ('src', '6s', '\x00' * 6), ('dst', '6s', '\x00' * 6), ('frag_seq', 'H', 0) ) class DataInterDS(dpkt.Packet): __hdr__ = ( ('dst', '6s', '\x00' * 6), ('src', '6s', '\x00' * 6), ('da', '6s', '\x00' * 6), ('frag_seq', 'H', 0), ('sa', '6s', '\x00' * 6) ) class QoS_Data(dpkt.Packet): __hdr__ = ( ('control', 'H', 0), ) class IE(dpkt.Packet): __hdr__ = ( ('id', 'B', 0), ('len', 'B', 0) ) def unpack(self, buf): dpkt.Packet.unpack(self, buf) self.info = buf[2:self.len + 2] class FH(dpkt.Packet): __hdr__ = ( ('id', 'B', 0), ('len', 'B', 0), ('tu', 'H', 0), ('hopset', 'B', 0), ('hoppattern', 'B', 0), ('hopindex', 'B', 0) ) class DS(dpkt.Packet): __hdr__ = ( ('id', 'B', 0), ('len', 'B', 0), ('ch', 'B', 0) ) class CF(dpkt.Packet): __hdr__ = ( ('id', 'B', 0), ('len', 'B', 0), ('count', 'B', 0), ('period', 'B', 0), ('max', 'H', 0), ('dur', 'H', 0) ) class TIM(dpkt.Packet): __hdr__ = ( ('id', 'B', 0), ('len', 'B', 0), ('count', 'B', 0), ('period', 'B', 0), ('ctrl', 'H', 0) ) def unpack(self, buf): dpkt.Packet.unpack(self, buf) self.bitmap = buf[5:self.len + 2] class IBSS(dpkt.Packet): __hdr__ = ( ('id', 'B', 0), ('len', 'B', 0), ('atim', 'H', 0) ) def test_802211_ack(): s = b'\xd4\x00\x00\x00\x00\x12\xf0\xb6\x1c\xa4\xff\xff\xff\xff' ieee = IEEE80211(s, fcs=True) assert ieee.version == 0 assert ieee.type == CTL_TYPE assert ieee.subtype == C_ACK assert ieee.to_ds == 0 assert ieee.from_ds == 0 assert ieee.pwr_mgt == 0 assert ieee.more_data == 0 assert ieee.wep == 0 assert ieee.order == 0 assert ieee.ack.dst == b'\x00\x12\xf0\xb6\x1c\xa4' fcs = struct.unpack('