# $Id: rpc.py 23 2006-11-08 15:45:33Z dugsong $ # -*- coding: utf-8 -*- """Remote Procedure Call.""" from __future__ import absolute_import import struct from . import dpkt # RPC.dir CALL = 0 REPLY = 1 # RPC.Auth.flavor AUTH_NONE = AUTH_NULL = 0 AUTH_UNIX = 1 AUTH_SHORT = 2 AUTH_DES = 3 # RPC.Reply.stat MSG_ACCEPTED = 0 MSG_DENIED = 1 # RPC.Reply.Accept.stat SUCCESS = 0 PROG_UNAVAIL = 1 PROG_MISMATCH = 2 PROC_UNAVAIL = 3 GARBAGE_ARGS = 4 SYSTEM_ERR = 5 # RPC.Reply.Reject.stat RPC_MISMATCH = 0 AUTH_ERROR = 1 class RPC(dpkt.Packet): """Remote Procedure Call. RFC 5531: https://tools.ietf.org/html/rfc5531 TODO: Longer class information.... Attributes: __hdr__: Header fields of RPC. TODO. """ __hdr__ = ( ('xid', 'I', 0), ('dir', 'I', CALL) ) class Auth(dpkt.Packet): __hdr__ = (('flavor', 'I', AUTH_NONE), ) def unpack(self, buf): dpkt.Packet.unpack(self, buf) n = struct.unpack('>I', self.data[:4])[0] self.data = self.data[4:4 + n] def __len__(self): return 8 + len(self.data) def __bytes__(self): return self.pack_hdr() + struct.pack('>I', len(self.data)) + bytes(self.data) class Call(dpkt.Packet): __hdr__ = ( ('rpcvers', 'I', 2), ('prog', 'I', 0), ('vers', 'I', 0), ('proc', 'I', 0) ) def unpack(self, buf): dpkt.Packet.unpack(self, buf) self.cred = RPC.Auth(self.data) self.verf = RPC.Auth(self.data[len(self.cred):]) self.data = self.data[len(self.cred) + len(self.verf):] def __len__(self): return len(bytes(self)) # XXX def __bytes__(self): return dpkt.Packet.__bytes__(self) + \ bytes(getattr(self, 'cred', RPC.Auth())) + \ bytes(getattr(self, 'verf', RPC.Auth())) + \ bytes(self.data) class Reply(dpkt.Packet): __hdr__ = (('stat', 'I', MSG_ACCEPTED), ) class Accept(dpkt.Packet): __hdr__ = (('stat', 'I', SUCCESS), ) def unpack(self, buf): self.verf = RPC.Auth(buf) buf = buf[len(self.verf):] self.stat = struct.unpack('>I', buf[:4])[0] if self.stat == SUCCESS: self.data = buf[4:] elif self.stat == PROG_MISMATCH: self.low, self.high = struct.unpack('>II', buf[4:12]) self.data = buf[12:] def __len__(self): if self.stat == PROG_MISMATCH: n = 8 else: n = 0 return len(self.verf) + 4 + n + len(self.data) def __bytes__(self): if self.stat == PROG_MISMATCH: return bytes(self.verf) + \ struct.pack('>III', self.stat, self.low, self.high) + self.data return bytes(self.verf) + dpkt.Packet.__bytes__(self) class Reject(dpkt.Packet): __hdr__ = (('stat', 'I', AUTH_ERROR), ) def unpack(self, buf): dpkt.Packet.unpack(self, buf) if self.stat == RPC_MISMATCH: self.low, self.high = struct.unpack('>II', self.data[:8]) self.data = self.data[8:] elif self.stat == AUTH_ERROR: self.why = struct.unpack('>I', self.data[:4])[0] self.data = self.data[4:] def __len__(self): if self.stat == RPC_MISMATCH: n = 8 elif self.stat == AUTH_ERROR: n = 4 else: n = 0 return 4 + n + len(self.data) def __bytes__(self): if self.stat == RPC_MISMATCH: return struct.pack('>III', self.stat, self.low, self.high) + self.data elif self.stat == AUTH_ERROR: return struct.pack('>II', self.stat, self.why) + self.data return dpkt.Packet.__bytes__(self) def unpack(self, buf): dpkt.Packet.unpack(self, buf) if self.stat == MSG_ACCEPTED: self.data = self.accept = self.Accept(self.data) elif self.stat == MSG_DENIED: self.data = self.reject = self.Reject(self.data) def unpack(self, buf): dpkt.Packet.unpack(self, buf) if self.dir == CALL: self.data = self.call = self.Call(self.data) elif self.dir == REPLY: self.data = self.reply = self.Reply(self.data) def unpack_xdrlist(cls, buf): l_ = [] while buf: if buf.startswith(b'\x00\x00\x00\x01'): p = cls(buf[4:]) l_.append(p) buf = p.data elif buf.startswith(b'\x00\x00\x00\x00'): break else: raise dpkt.UnpackError('invalid XDR list') return l_ def pack_xdrlist(*args): return b'\x00\x00\x00\x01'.join(map(bytes, args)) + b'\x00\x00\x00\x00' def test_auth(): from binascii import unhexlify auth1 = RPC.Auth() assert auth1.flavor == AUTH_NONE buf = unhexlify('0000000000000000') assert bytes(auth1) == buf auth2 = RPC.Auth(buf) assert auth2.flavor == AUTH_NONE assert len(auth2) == 8 def test_call(): from binascii import unhexlify call1 = RPC.Call() assert call1.rpcvers == 2 assert call1.prog == 0 assert call1.vers == 0 assert call1.proc == 0 buf = unhexlify( '0000000200000000000000000000000000000000000000000000000000000000' ) assert bytes(call1) == buf call2 = RPC.Call(buf) assert call2.rpcvers == 2 assert call2.prog == 0 assert call2.vers == 0 assert call2.proc == 0 assert len(call2) == 32 assert bytes(call2) == buf def test_reply(): from binascii import unhexlify reply1 = RPC.Reply() assert reply1.stat == MSG_ACCEPTED assert bytes(reply1) == b'\00' * 4 buf_accepted = unhexlify( '00000000' # MSG_ACCEPTED '0000000000000000' # Auth '00000000' # SUCCESS '0000000000000000' # Auth ) reply_accepted = RPC.Reply(buf_accepted) assert reply_accepted.stat == MSG_ACCEPTED assert bytes(reply_accepted) == buf_accepted assert len(reply_accepted) == 24 buf_denied = unhexlify( '00000001' # MSG_DENIED '00000000' # RPC_MISMATCH '00000000' # low 'FFFFFFFF' # high '0000000000000000' # Auth ) reply_denied = RPC.Reply(buf_denied) assert reply_denied.stat == MSG_DENIED assert bytes(reply_denied) == buf_denied assert len(reply_denied) == 24 def test_accept(): from binascii import unhexlify accept1 = RPC.Reply.Accept() assert accept1.stat == SUCCESS buf_success = unhexlify( '0000000000000000' # Auth '00000000' # SUCCESS '0000000000000000' # Auth ) accept_success = RPC.Reply.Accept(buf_success) assert accept_success.stat == SUCCESS assert len(accept_success) == 20 assert bytes(accept_success) == buf_success buf_prog_mismatch = unhexlify( '0000000000000000' # Auth '00000002' # PROG_MISMATCH '0000000000000000' # Auth ) accept_prog_mismatch = RPC.Reply.Accept(buf_prog_mismatch) assert accept_prog_mismatch.stat == PROG_MISMATCH assert len(accept_prog_mismatch) == 20 assert bytes(accept_prog_mismatch) == buf_prog_mismatch def test_reject(): from binascii import unhexlify reject1 = RPC.Reply.Reject() assert reject1.stat == AUTH_ERROR buf_rpc_mismatch = unhexlify( '00000000' # RPC_MISMATCH '00000000' # low 'FFFFFFFF' # high '0000000000000000' # Auth ) reject2 = RPC.Reply.Reject(buf_rpc_mismatch) assert bytes(reject2) == buf_rpc_mismatch assert reject2.low == 0 assert reject2.high == 0xffffffff assert len(reject2) == 20 buf_auth_error = unhexlify( '00000001' # AUTH_ERROR '00000000' # low 'FFFFFFFF' # high '0000000000000000' # Auth ) reject3 = RPC.Reply.Reject(buf_auth_error) assert bytes(reject3) == buf_auth_error assert len(reject3) == 20 buf_other = unhexlify( '00000002' # NOT IMPLEMENTED '00000000' # low 'FFFFFFFF' # high '0000000000000000' # Auth ) reject4 = RPC.Reply.Reject(buf_other) assert bytes(reject4) == buf_other assert len(reject4) == 20 def test_rpc(): from binascii import unhexlify rpc = RPC() assert rpc.xid == 0 assert rpc.dir == CALL buf_call = unhexlify( '00000000' # xid '00000000' # CALL '0000000200000000000000000000000000000000000000000000000000000000' ) rpc_call = RPC(buf_call) assert bytes(rpc_call) == buf_call buf_reply = unhexlify( '00000000' # xid '00000001' # REPLY '00000000' # MSG_ACCEPTED '0000000000000000' # Auth '00000000' # SUCCESS '0000000000000000' # Auth ) rpc_reply = RPC(buf_reply) assert bytes(rpc_reply) == buf_reply