# $Id: mrt.py 29 2007-01-26 02:29:07Z jon.oberheide $ # -*- coding: utf-8 -*- """Multi-threaded Routing Toolkit.""" from __future__ import absolute_import from . import dpkt from . import bgp # Multi-threaded Routing Toolkit # http://www.ietf.org/internet-drafts/draft-ietf-grow-mrt-03.txt # MRT Types NULL = 0 START = 1 DIE = 2 I_AM_DEAD = 3 PEER_DOWN = 4 BGP = 5 # Deprecated by BGP4MP RIP = 6 IDRP = 7 RIPNG = 8 BGP4PLUS = 9 # Deprecated by BGP4MP BGP4PLUS_01 = 10 # Deprecated by BGP4MP OSPF = 11 TABLE_DUMP = 12 BGP4MP = 16 BGP4MP_ET = 17 ISIS = 32 ISIS_ET = 33 OSPF_ET = 64 # BGP4MP Subtypes BGP4MP_STATE_CHANGE = 0 BGP4MP_MESSAGE = 1 BGP4MP_ENTRY = 2 BGP4MP_SNAPSHOT = 3 BGP4MP_MESSAGE_32BIT_AS = 4 # Address Family Types AFI_IPv4 = 1 AFI_IPv6 = 2 class MRTHeader(dpkt.Packet): __hdr__ = ( ('ts', 'I', 0), ('type', 'H', 0), ('subtype', 'H', 0), ('len', 'I', 0) ) class TableDump(dpkt.Packet): __hdr__ = ( ('view', 'H', 0), ('seq', 'H', 0), ('prefix', 'I', 0), ('prefix_len', 'B', 0), ('status', 'B', 1), ('originated_ts', 'I', 0), ('peer_ip', 'I', 0), ('peer_as', 'H', 0), ('attr_len', 'H', 0) ) def unpack(self, buf): dpkt.Packet.unpack(self, buf) plen = self.attr_len l_ = [] while plen > 0: attr = bgp.BGP.Update.Attribute(self.data) self.data = self.data[len(attr):] plen -= len(attr) l_.append(attr) self.attributes = l_ class BGP4MPMessage(dpkt.Packet): __hdr__ = ( ('src_as', 'H', 0), ('dst_as', 'H', 0), ('intf', 'H', 0), ('family', 'H', AFI_IPv4), ('src_ip', 'I', 0), ('dst_ip', 'I', 0) ) class BGP4MPMessage_32(dpkt.Packet): __hdr__ = ( ('src_as', 'I', 0), ('dst_as', 'I', 0), ('intf', 'H', 0), ('family', 'H', AFI_IPv4), ('src_ip', 'I', 0), ('dst_ip', 'I', 0) ) def test_tabledump(): from binascii import unhexlify buf_tabledump = unhexlify( '0001' # view '0002' # seq '00000003' # prefix '04' # prefix_len '05' # status '00000006' # originated_ts '00000007' # peer_ip '0008' # peer_as '0002' # attr_len ) buf_attrs = unhexlify( '01' # flags '01' # type (ORIGIN) '01' # length '02' # Origin.type (INCOMPLETE) ) buf = buf_tabledump + buf_attrs table_dump = TableDump(buf) assert table_dump.view == 1 assert table_dump.seq == 2 assert table_dump.prefix == 3 assert table_dump.prefix_len == 4 assert table_dump.status == 5 assert table_dump.originated_ts == 6 assert table_dump.peer_ip == 7 assert table_dump.peer_as == 8 assert table_dump.attr_len == 2 assert len(table_dump.attributes) == 1 attr = table_dump.attributes[0] assert isinstance(attr, bgp.BGP.Update.Attribute) assert isinstance(attr.data, bgp.BGP.Update.Attribute.Origin)