# $Id: gre.py 75 2010-08-03 14:42:19Z jon.oberheide $ # -*- coding: utf-8 -*- """Generic Routing Encapsulation.""" from __future__ import absolute_import import struct import codecs from . import dpkt from . import ethernet from .compat import compat_izip GRE_CP = 0x8000 # Checksum Present GRE_RP = 0x4000 # Routing Present GRE_KP = 0x2000 # Key Present GRE_SP = 0x1000 # Sequence Present GRE_SS = 0x0800 # Strict Source Route GRE_AP = 0x0080 # Acknowledgment Present GRE_opt_fields = ( (GRE_CP | GRE_RP, 'sum', 'H'), (GRE_CP | GRE_RP, 'off', 'H'), (GRE_KP, 'key', 'I'), (GRE_SP, 'seq', 'I'), (GRE_AP, 'ack', 'I') ) class GRE(dpkt.Packet): """Generic Routing Encapsulation. TODO: Longer class information.... Attributes: __hdr__: Header fields of GRE. TODO. """ __hdr__ = ( ('flags', 'H', 0), ('p', 'H', 0x0800), # ETH_TYPE_IP ) sre = () @property def v(self): return self.flags & 0x7 @v.setter def v(self, v): self.flags = (self.flags & ~0x7) | (v & 0x7) @property def recur(self): return (self.flags >> 5) & 0x7 @recur.setter def recur(self, v): self.flags = (self.flags & ~0xe0) | ((v & 0x7) << 5) class SRE(dpkt.Packet): __hdr__ = [ ('family', 'H', 0), ('off', 'B', 0), ('len', 'B', 0) ] def unpack(self, buf): dpkt.Packet.unpack(self, buf) self.data = self.data[:self.len] def opt_fields_fmts(self): if self.v == 0: fields, fmts = [], [] opt_fields = GRE_opt_fields else: fields, fmts = ['len', 'callid'], ['H', 'H'] opt_fields = GRE_opt_fields[-2:] for flags, field, fmt in opt_fields: if self.flags & flags: fields.append(field) fmts.append(fmt) return fields, fmts def unpack(self, buf): dpkt.Packet.unpack(self, buf) fields, fmts = self.opt_fields_fmts() if fields: fmt = ''.join(fmts) fmtlen = struct.calcsize(fmt) vals = struct.unpack("!" + fmt, self.data[:fmtlen]) self.data = self.data[fmtlen:] self.__dict__.update(dict(compat_izip(fields, vals))) if self.flags & GRE_RP: l_ = [] while True: sre = self.SRE(self.data) self.data = self.data[len(sre):] l_.append(sre) if not sre.len: break self.sre = l_ try: self.data = ethernet.Ethernet._typesw[self.p](self.data) setattr(self, self.data.__class__.__name__.lower(), self.data) except (KeyError, dpkt.UnpackError): # data alrady set pass def __len__(self): opt_fmtlen = struct.calcsize(''.join(self.opt_fields_fmts()[1])) return self.__hdr_len__ + opt_fmtlen + sum(map(len, self.sre)) + len(self.data) def __bytes__(self): fields, fmts = self.opt_fields_fmts() if fields: vals = [] for f in fields: vals.append(getattr(self, f)) opt_s = struct.pack('!' + ''.join(fmts), *vals) else: opt_s = b'' return self.pack_hdr() + opt_s + b''.join(map(bytes, self.sre)) + bytes(self.data) def test_gre_v1(): # Runs all the test associated with this class/file s = codecs.decode("3081880a0067178000068fb100083a76", 'hex') + b"A" * 103 g = GRE(s) assert g.v == 1 assert g.p == 0x880a assert g.seq == 430001 assert g.ack == 539254 assert g.callid == 6016 assert g.len == 103 assert g.data == b"A" * 103 assert len(g) == len(s) s = codecs.decode("3001880a00b2001100083ab8", 'hex') + b"A" * 178 g = GRE(s) assert g.v == 1 assert g.p == 0x880a assert g.seq == 539320 assert g.callid == 17 assert g.len == 178 assert g.data == b"A" * 178 assert len(g) == len(s) def test_gre_len(): from binascii import unhexlify gre = GRE() assert len(gre) == 4 buf = unhexlify("3081880a0067178000068fb100083a76") + b"\x41" * 103 gre = GRE(buf) assert bytes(gre) == buf assert len(gre) == len(buf) def test_gre_accessors(): gre = GRE() for attr in ['v', 'recur']: print(attr) assert hasattr(gre, attr) assert getattr(gre, attr) == 0 setattr(gre, attr, 1) assert getattr(gre, attr) == 1 def test_sre_creation(): from binascii import unhexlify buf = unhexlify( '0000' # family '00' # off '02' # len 'ffff' ) sre = GRE.SRE(buf) assert sre.data == b'\xff\xff' assert len(sre) == 6 assert bytes(sre) == buf def test_gre_nested_sre(): from binascii import unhexlify buf = unhexlify( '4000' # flags (GRE_RP) '0800' # p (ETH_TYPE_IP) '0001' # sum '0002' # off # SRE entry '0003' # family '04' # off '02' # len 'ffff' # SRE entry (no len => last element) '0006' # family '00' # off '00' # len ) gre = GRE(buf) assert hasattr(gre, 'sre') assert isinstance(gre.sre, list) assert len(gre.sre) == 2 assert len(gre) == len(buf) assert bytes(gre) == buf assert gre.data == b'' def test_gre_next_layer(): from binascii import unhexlify from . import ipx buf = unhexlify( '0000' # flags (NONE) '8137' # p (ETH_TYPE_IPX) # IPX packet '0000' # sum '0001' # len '02' # tc '03' # pt '0102030405060708090a0b0c' # dst 'c0b0a0908070605040302010' # src ) gre = GRE(buf) assert hasattr(gre, 'ipx') assert isinstance(gre.data, ipx.IPX) assert gre.data.tc == 2 assert gre.data.src == unhexlify('c0b0a0908070605040302010') assert gre.data.dst == unhexlify('0102030405060708090a0b0c') assert len(gre) == len(buf) assert bytes(gre) == buf