# -*- coding: utf-8 -*- """Radiotap""" from __future__ import print_function from __future__ import absolute_import from . import dpkt from . import ieee80211 from .compat import compat_ord # Ref: http://www.radiotap.org # Fields Ref: http://www.radiotap.org/defined-fields/all # Present flags _TSFT_SHIFT = 0 _FLAGS_SHIFT = 1 _RATE_SHIFT = 2 _CHANNEL_SHIFT = 3 _FHSS_SHIFT = 4 _ANT_SIG_SHIFT = 5 _ANT_NOISE_SHIFT = 6 _LOCK_QUAL_SHIFT = 7 _TX_ATTN_SHIFT = 8 _DB_TX_ATTN_SHIFT = 9 _DBM_TX_POWER_SHIFT = 10 _ANTENNA_SHIFT = 11 _DB_ANT_SIG_SHIFT = 12 _DB_ANT_NOISE_SHIFT = 13 _RX_FLAGS_SHIFT = 14 _CHANNELPLUS_SHIFT = 18 _EXT_SHIFT = 31 # Flags elements _FLAGS_SIZE = 2 _CFP_FLAG_SHIFT = 0 _PREAMBLE_SHIFT = 1 _WEP_SHIFT = 2 _FRAG_SHIFT = 3 _FCS_SHIFT = 4 _DATA_PAD_SHIFT = 5 _BAD_FCS_SHIFT = 6 _SHORT_GI_SHIFT = 7 # Channel type _CHAN_TYPE_SIZE = 4 _CHANNEL_TYPE_SHIFT = 4 _CCK_SHIFT = 5 _OFDM_SHIFT = 6 _TWO_GHZ_SHIFT = 7 _FIVE_GHZ_SHIFT = 8 _PASSIVE_SHIFT = 9 _DYN_CCK_OFDM_SHIFT = 10 _GFSK_SHIFT = 11 _GSM_SHIFT = 12 _STATIC_TURBO_SHIFT = 13 _HALF_RATE_SHIFT = 14 _QUARTER_RATE_SHIFT = 15 # Flags offsets and masks _FCS_MASK = 0x10 class Radiotap(dpkt.Packet): """Radiotap. TODO: Longer class information.... Attributes: __hdr__: Header fields of Radiotap. TODO. """ __hdr__ = ( ('version', 'B', 0), ('pad', 'B', 0), ('length', 'H', 0), ) __byte_order__ = '<' def _is_present(self, bit): index = bit // 8 mask = 1 << (bit % 8) return 1 if self.present_flags[index] & mask else 0 def _set_bit(self, bit, val): # present_flags is a bytearray, this gets the element index = bit // 8 # mask retrieves every bit except our one mask = ~(1 << (bit % 8) & 0xff) # retrieve all of the bits, then or in the val at the appropriate place # as the mask does not return the value at `bit`, if `val` is zero, the bit remains zero self.present_flags[index] = (self.present_flags[index] & mask) | (val << bit % 8) @property def tsft_present(self): return self._is_present(_TSFT_SHIFT) @tsft_present.setter def tsft_present(self, val): self._set_bit(_TSFT_SHIFT, val) @property def flags_present(self): return self._is_present(_FLAGS_SHIFT) @flags_present.setter def flags_present(self, val): self._set_bit(_FLAGS_SHIFT, val) @property def rate_present(self): return self._is_present(_RATE_SHIFT) @rate_present.setter def rate_present(self, val): self._set_bit(_RATE_SHIFT, val) @property def channel_present(self): return self._is_present(_CHANNEL_SHIFT) @channel_present.setter def channel_present(self, val): self._set_bit(_CHANNEL_SHIFT, val) @property def fhss_present(self): return self._is_present(_FHSS_SHIFT) @fhss_present.setter def fhss_present(self, val): self._set_bit(_FHSS_SHIFT, val) @property def ant_sig_present(self): return self._is_present(_ANT_SIG_SHIFT) @ant_sig_present.setter def ant_sig_present(self, val): self._set_bit(_ANT_SIG_SHIFT, val) @property def ant_noise_present(self): return self._is_present(_ANT_NOISE_SHIFT) @ant_noise_present.setter def ant_noise_present(self, val): self._set_bit(_ANT_NOISE_SHIFT, val) @property def lock_qual_present(self): return self._is_present(_LOCK_QUAL_SHIFT) @lock_qual_present.setter def lock_qual_present(self, val): self._set_bit(_LOCK_QUAL_SHIFT, val) @property def tx_attn_present(self): return self._is_present(_TX_ATTN_SHIFT) @tx_attn_present.setter def tx_attn_present(self, val): self._set_bit(_TX_ATTN_SHIFT, val) @property def db_tx_attn_present(self): return self._is_present(_DB_TX_ATTN_SHIFT) @db_tx_attn_present.setter def db_tx_attn_present(self, val): self._set_bit(_DB_TX_ATTN_SHIFT, val) @property def dbm_tx_power_present(self): return self._is_present(_DBM_TX_POWER_SHIFT) @dbm_tx_power_present.setter def dbm_tx_power_present(self, val): self._set_bit(_DBM_TX_POWER_SHIFT, val) @property def ant_present(self): return self._is_present(_ANTENNA_SHIFT) @ant_present.setter def ant_present(self, val): self._set_bit(_ANTENNA_SHIFT, val) @property def db_ant_sig_present(self): return self._is_present(_DB_ANT_SIG_SHIFT) @db_ant_sig_present.setter def db_ant_sig_present(self, val): self._set_bit(_DB_ANT_SIG_SHIFT, val) @property def db_ant_noise_present(self): return self._is_present(_DB_ANT_NOISE_SHIFT) @db_ant_noise_present.setter def db_ant_noise_present(self, val): self._set_bit(_DB_ANT_NOISE_SHIFT, val) @property def rx_flags_present(self): return self._is_present(_RX_FLAGS_SHIFT) @rx_flags_present.setter def rx_flags_present(self, val): self._set_bit(_RX_FLAGS_SHIFT, val) @property def chanplus_present(self): return self._is_present(_CHANNELPLUS_SHIFT) @chanplus_present.setter def chanplus_present(self, val): self._set_bit(_CHANNELPLUS_SHIFT, val) def unpack(self, buf): dpkt.Packet.unpack(self, buf) self.data = buf[self.length:] self.fields = [] buf = buf[self.__hdr_len__:] self.present_flags = bytearray(buf[:4]) buf = buf[4:] ext_bit = _EXT_SHIFT while self._is_present(ext_bit): self.present_flags += bytearray(buf[:4]) buf = buf[4:] ext_bit += 32 # decode each field into self. (eg. self.tsft) as well as append it self.fields list field_decoder = [ ('tsft', self.tsft_present, self.TSFT), ('flags', self.flags_present, self.Flags), ('rate', self.rate_present, self.Rate), ('channel', self.channel_present, self.Channel), ('fhss', self.fhss_present, self.FHSS), ('ant_sig', self.ant_sig_present, self.AntennaSignal), ('ant_noise', self.ant_noise_present, self.AntennaNoise), ('lock_qual', self.lock_qual_present, self.LockQuality), ('tx_attn', self.tx_attn_present, self.TxAttenuation), ('db_tx_attn', self.db_tx_attn_present, self.DbTxAttenuation), ('dbm_tx_power', self.dbm_tx_power_present, self.DbmTxPower), ('ant', self.ant_present, self.Antenna), ('db_ant_sig', self.db_ant_sig_present, self.DbAntennaSignal), ('db_ant_noise', self.db_ant_noise_present, self.DbAntennaNoise), ('rx_flags', self.rx_flags_present, self.RxFlags), ('chanplus', self.chanplus_present, self.ChannelPlus) ] offset = self.__hdr_len__ + len(self.present_flags) for name, present_bit, parser in field_decoder: if present_bit: ali = parser.__alignment__ if ali > 1 and offset % ali: padding = ali - offset % ali buf = buf[padding:] offset += padding field = parser(buf) field.data = b'' setattr(self, name, field) self.fields.append(field) buf = buf[len(field):] offset += len(field) if len(self.data) > 0: if self.flags_present and self.flags.fcs: self.data = ieee80211.IEEE80211(self.data, fcs=self.flags.fcs) else: self.data = ieee80211.IEEE80211(self.data) class RadiotapField(dpkt.Packet): __alignment__ = 1 __byte_order__ = '<' class Antenna(RadiotapField): __hdr__ = ( ('index', 'B', 0), ) class AntennaNoise(RadiotapField): __hdr__ = ( ('db', 'b', 0), ) class AntennaSignal(RadiotapField): __hdr__ = ( ('db', 'b', 0), ) class Channel(RadiotapField): __alignment__ = 2 __hdr__ = ( ('freq', 'H', 0), ('flags', 'H', 0), ) class FHSS(RadiotapField): __hdr__ = ( ('set', 'B', 0), ('pattern', 'B', 0), ) class Flags(RadiotapField): __hdr__ = ( ('val', 'B', 0), ) @property def fcs(self): return (self.val & _FCS_MASK) >> _FCS_SHIFT @fcs.setter def fcs(self, v): self.val = (v << _FCS_SHIFT) | (v & ~_FCS_MASK) class LockQuality(RadiotapField): __alignment__ = 2 __hdr__ = ( ('val', 'H', 0), ) class RxFlags(RadiotapField): __alignment__ = 2 __hdr__ = ( ('val', 'H', 0), ) class Rate(RadiotapField): __hdr__ = ( ('val', 'B', 0), ) class TSFT(RadiotapField): __alignment__ = 8 __hdr__ = ( ('usecs', 'Q', 0), ) class TxAttenuation(RadiotapField): __alignment__ = 2 __hdr__ = ( ('val', 'H', 0), ) class DbTxAttenuation(RadiotapField): __alignment__ = 2 __hdr__ = ( ('db', 'H', 0), ) class DbAntennaNoise(RadiotapField): __hdr__ = ( ('db', 'B', 0), ) class DbAntennaSignal(RadiotapField): __hdr__ = ( ('db', 'B', 0), ) class DbmTxPower(RadiotapField): __hdr__ = ( ('dbm', 'B', 0), ) class ChannelPlus(RadiotapField): __alignment__ = 4 __hdr__ = ( ('flags', 'I', 0), ('freq', 'H', 0), ('channel', 'B', 0), ('maxpower', 'B', 0), ) def test_radiotap_1(): s = b'\x00\x00\x00\x18\x6e\x48\x00\x00\x00\x02\x6c\x09\xa0\x00\xa8\x81\x02\x00\x00\x00\x00\x00\x00\x00' rad = Radiotap(s) assert(rad.version == 0) assert(rad.present_flags == b'\x6e\x48\x00\x00') assert(rad.tsft_present == 0) assert(rad.flags_present == 1) assert(rad.rate_present == 1) assert(rad.channel_present == 1) assert(rad.fhss_present == 0) assert(rad.ant_sig_present == 1) assert(rad.ant_noise_present == 1) assert(rad.lock_qual_present == 0) assert(rad.db_tx_attn_present == 0) assert(rad.dbm_tx_power_present == 0) assert(rad.ant_present == 1) assert(rad.db_ant_sig_present == 0) assert(rad.db_ant_noise_present == 0) assert(rad.rx_flags_present == 1) assert(rad.channel.freq == 0x096c) assert(rad.channel.flags == 0xa0) assert(len(rad.fields) == 7) def test_radiotap_2(): s = (b'\x00\x00\x30\x00\x2f\x40\x00\xa0\x20\x08\x00\xa0\x20\x08\x00\xa0\x20\x08\x00\x00\x00\x00' b'\x00\x00\x08\x84\xbd\xac\x28\x00\x00\x00\x10\x02\x85\x09\xa0\x00\xa5\x00\x00\x00\xa1\x00' b'\x9f\x01\xa1\x02') rad = Radiotap(s) assert(rad.version == 0) assert(rad.present_flags == b'\x2f\x40\x00\xa0\x20\x08\x00\xa0\x20\x08\x00\xa0\x20\x08\x00\x00') assert(rad.tsft_present) assert(rad.flags_present) assert(rad.rate_present) assert(rad.channel_present) assert(not rad.fhss_present) assert(rad.ant_sig_present) assert(not rad.ant_noise_present) assert(not rad.lock_qual_present) assert(not rad.db_tx_attn_present) assert(not rad.dbm_tx_power_present) assert(not rad.ant_present) assert(not rad.db_ant_sig_present) assert(not rad.db_ant_noise_present) assert(rad.rx_flags_present) assert(rad.channel.freq == 2437) assert(rad.channel.flags == 0x00a0) assert(len(rad.fields) == 6) assert(rad.flags_present) assert(rad.flags.fcs) def test_fcs(): s = b'\x00\x00\x1a\x00\x2f\x48\x00\x00\x34\x8f\x71\x09\x00\x00\x00\x00\x10\x0c\x85\x09\xc0\x00\xcc\x01\x00\x00' rt = Radiotap(s) assert(rt.flags_present == 1) assert(rt.flags.fcs == 1) def test_radiotap_3(): # xchannel aka channel plus field s = ( b'\x00\x00\x20\x00\x67\x08\x04\x00\x84\x84\x66\x25\x00\x00\x00\x00\x22\x0c\xd6\xa0\x01\x00\x00\x00\x40' b'\x01\x00\x00\x3c\x14\x24\x11\x08\x02\x00\x00\xff\xff\xff\xff\xff\xff\x06\x03\x7f\x07\xa0\x16\x00\x19' b'\xe3\xd3\x53\x52\x00\x8e\xaa\xaa\x03\x00\x00\x00\x08\x06\x00\x01\x08\x00\x06\x04\x00\x01\x00\x19\xe3' b'\xd3\x53\x52\xa9\xfe\xf7\x00\x00\x00\x00\x00\x00\x00\x4f\x67\x32\x38' ) rt = Radiotap(s) assert rt.ant_noise.db == -96 assert rt.ant_sig.db == -42 assert rt.ant.index == 1 assert rt.chanplus_present assert rt.chanplus.flags == 0x140 assert rt.chanplus.freq == 5180 assert rt.chanplus.channel == 36 assert rt.chanplus.maxpower == 17 assert len(rt.fields) == 7 assert repr(rt.data).startswith('IEEE80211') def test_radiotap_properties(): from binascii import unhexlify buf = unhexlify( '00' '00' '0018' '0000000000000000000000000000000000000000' ) radiotap = Radiotap(buf) property_keys = [ 'tsft', 'flags', 'rate', 'channel', 'fhss', 'ant_sig', 'ant_noise', 'lock_qual', 'tx_attn', 'db_tx_attn', 'dbm_tx_power', 'ant', 'db_ant_sig', 'db_ant_noise', 'rx_flags', 'chanplus' ] for prop in [key + '_present' for key in property_keys]: print(prop) assert hasattr(radiotap, prop) assert getattr(radiotap, prop) == 0 setattr(radiotap, prop, 1) assert getattr(radiotap, prop) == 1 setattr(radiotap, prop, 0) assert getattr(radiotap, prop) == 0 def test_radiotap_unpack_fcs(): from binascii import unhexlify buf = unhexlify( '00' # version '00' # pad '1800' # length '6e48000011026c09a000a8810200000000000000' 'd40000000012f0b61ca4ffffffff' ) radiotap = Radiotap(buf) assert radiotap.data.fcs_present == 1 def test_flags(): flags = Radiotap.Flags(b'\x00') assert flags.fcs == 0 flags.fcs = 1 assert flags.fcs == 1