U C^2@snddlZddlmZddlZddlmZmZddlmZddl Z e dZ GdddeZ Gd d d eZ dS) N)quote)AbstractFileSystemAbstractBufferedFileinfer_storage_optionswebhdfscseZdZdZdZdZd8fdd Zd d Zd9d dZd:ddZ e ddZ e ddZ e ddZddZd;ddZddZddZd d!Zdd0d1Zd2d3Zd?d4d5Zd6d7ZZS)@WebHDFSaF Interface to HDFS over HTTP Three auth mechanisms are supported: insecure: no auth is done, and the user is assumed to be whoever they say they are (parameter `user`), or a predefined value such as "dr.who" if not given spnego: when kerberos authentication is enabled, auth is negotiated by requests_kerberos https://github.com/requests/requests-kerberos . This establishes a session based on existing kinit login and/or specified principal/password; paraneters are passed with ``kerb_kwargs`` token: uses an existing Hadoop delegation token from another secured service. Indeed, this client can also generate such tokens when not insecure. Note that tokens expire, but can be renewed (by a previously specified user) and may allow for proxying. z/tmp)rZwebHDFSFNc  s|jr dStjf| dj||d|_||_|p4i|_i|_|pDi|_|dk rr|dk s`|dk rht d||jd<|dk r||jd<|dk r||jd<|r|dk rt d| dS) as Parameters ---------- host: str Name-node address port: int Port for webHDFS kerberos: bool Whether to authenticate with kerberos for this connection token: str or None If given, use this token on every call to authenticate. A user and user-proxy may be encoded in the token and should not be also given user: str or None If given, assert the user name to connect with proxy_to: str or None If given, the user has the authority to proxy, and this value is the user in who's name actions are taken kerb_kwargs: dict Any extra arguments for HTTPKerberosAuth, see https://github.com/requests/requests-kerberos/blob/master/requests_kerberos/kerberos_.py data_proxy: dict, callable or None If given, map data-node addresses. This can be necessary if the HDFS cluster is behind a proxy, running on Docker or otherwise has a mismatch between the host-names given by the name-node and the address by which to refer to them from the client. If a dict, maps host names `host->data_proxy[host]`; if a callable, full URLs are passed, and function must conform to `url->data_proxy(url)`. kwargs Nzhttp://{host}:{port}/webhdfs/v1)hostportz_If passing a delegation token, must not set user or proxy_to, as these are encoded in the tokenZ delegationz user.nameZdoaszJIf using Kerberos auth, do not specify the user, this is handled by kinit.) _cachedsuper__init__formaturlkerb kerb_kwargsparsproxy ValueError_connect) selfr r ZkerberostokenuserZproxy_torZ data_proxykwargs __class__B/tmp/pip-install-6_kvzl1k/fsspec/fsspec/implementations/webhdfs.pyr$s.+      zWebHDFS.__init__cCs0t|_|jr,ddlm}|f|j|j_dS)Nr)HTTPKerberosAuth)requestsSessionsessionrZrequests_kerberosr rauth)rr rrrrjs  zWebHDFS._connectgetTc Ks|jt|p d}|}||j||d<t||||jj |||||d} | j dkrlt || j dkrt |p~d| j dkrt | | S)Nop)methodrparamsdataallow_redirectsiii)rrcopyupdaterupperloggerdebugr#request status_codeFileNotFoundErrorPermissionErrorraise_for_status) rr'r(pathr*redirectrrargsoutrrr_callqs(      z WebHDFS._callrbc Ks$|p|j}t|||||j|||dS)a^ Parameters ---------- path: str File location mode: str 'rb', 'wb', etc. block_size: int Client buffer size for read-ahead or write buffer autocommit: bool If False, writes to temporary file that only gets put in final location upon commit replication: int Number of copies of file on the cluster, write mode only permissions: str or int posix permissions, write mode only kwargs Returns ------- WebHDFile instance )mode block_sizetempdir autocommit replication permissions) blocksize WebHDFiler>)rr6r<r=r?r@rArrrr_opens! z WebHDFS._opencCs |d|d<|d|d<|S)Ntypelengthsize)lower)inforrr _process_infos zWebHDFS._process_infocCs t|dS)Nr6r)clsr6rrr_strip_protocolszWebHDFS._strip_protocolcCs:t|}|dd|ddd|kr6|d|d<|S)Nr6protocolusernamer)rpop)Zurlpathr9rrr_get_kwargs_from_urlss   zWebHDFS._get_kwargs_from_urlscCs,|jd|d}|d}||d<||S)NZ GETFILESTATUSr6 FileStatusname)r:jsonrJ)rr6r9rIrrrrIs z WebHDFS.infocCsv|jd|d}|dd}|D](}|||dd|d|d<q"|r`t|dd d Std d |DSdS) NZ LISTSTATUSrQZ FileStatusesrR/Z pathSuffixrScSs|dS)NrSr)irrrzWebHDFS.ls..)keycss|]}|dVqdS)rSNr).0rIrrr szWebHDFS.ls..)r:rTrJrstripsorted)rr6Zdetailr9ZinfosrIrrrlss z WebHDFS.lscCs|jd|d}|dS)z8Total numbers of files, directories and bytes under pathZGETCONTENTSUMMARYrQZContentSummaryr:rT)rr6r9rrrcontent_summaryszWebHDFS.content_summarycCs@|jd|dd}||jd}|j|}||dS)z/Checksum info of file, giving method and resultZGETFILECHECKSUMF)r6r7LocationZ FileChecksum)r: _apply_proxyheadersr#r%r5rT)rr6r9locationout2rrrukeys  z WebHDFS.ukeycCs|d}|dS)zGet user's home directoryZGETHOMEDIRECTORYPathr_)rr9rrrhome_directorys zWebHDFS.home_directorycCsB|r|jd|d}n |d}|d}|dkr:td|dS)zRetrieve token which can give the same authority to other uses Parameters ---------- renewer: str or None User who may use this token; if None, will be current user ZGETDELEGATIONTOKEN)renewerTokenNz1No token available for this user/security contextZ urlString)r:rTr)rrir9trrrget_delegation_tokens  zWebHDFS.get_delegation_tokencCs|jdd|d}|dS)z/Make token live longer. Returns new expiry timeZRENEWDELEGATIONTOKENputr(rlongr_)rrr9rrrrenew_delegation_tokenszWebHDFS.renew_delegation_tokencCs|jdd|ddS)z Stop the token from being usefulZCANCELDELEGATIONTOKENrmrnNr:)rrrrrcancel_delegation_tokenszWebHDFS.cancel_delegation_tokencCs|jdd||ddS)aSet the permission at path Parameters ---------- path: str location to set (file or directory) mod: str or int posix epresentation or permission, give as oct string, e.g, '777' or 0o777 Z SETPERMISSIONrm)r(r6Z permissionNrq)rr6modrrrchmods z WebHDFS.chmodcCs>i}|dk r||d<|dk r$||d<|jdd|d|dS)zChange owning user and/or groupNownergroupSETOWNERrmr(r6)rwrq)rr6rurvrrrrchowns z WebHDFS.chowncCs|jd|d|ddS)a9 Set file replication factor Parameters ---------- path: str File location (not for directories) replication: int Number of copies of file on the cluster. Should be smaller than number of data nodes; normally 3 on most systems. ZSETREPLICATIONrm)r6r(r@Nrq)rr6r@rrrset_replications zWebHDFS.set_replicationcKs|jdd|ddS)NZMKDIRSrmrxrq)rr6rrrrmkdir(sz WebHDFS.mkdircCs(|dkr||rt|||dS)NF)existsFileExistsErrorr{)rr6exist_okrrrmakedirs+szWebHDFS.makedirscKs|jdd||ddS)NZRENAMErm)r(r6Z destinationrq)rpath1path2rrrrmv0sz WebHDFS.mvcKs|jdd||rdndddS)NDELETEdeletetruefalse)r(r6 recursiverq)rr6rrrrrrm3s  z WebHDFS.rmcCsH|jrt|jr||}n(|jrD|jD]\}}|||d}q,|S)N)rcallableitemsreplace)rrdkvrrrrb;s  zWebHDFS._apply_proxy)r FNNNNN)r%NNT)r;NTNN)F)N)NN)F)F) __name__ __module__ __qualname____doc__r>rMrrr:rD staticmethodrJ classmethodrLrPrIr^r`rfrhrlrprrrtryrzr{rrrrb __classcell__rrrrr sRF  -        r csJeZdZdZfddZdddZddZd d Zd d Zd dZ Z S)rCz"A file living in HDFS over webHDFSc stj||f||}|dddkr6|dd|dddkrR|dd|dd|_|d}|dddkr|j|_d|t t g|_dS)NrAr@ir>r?FrU) rrr,r%rOrAr6targetjoinstruuidZuuid4)rfsr6rr>rrrrHs   zWebHDFile.__init__FcCs&|jjj|j|jd}|dS)z Write one part of a multi-block file upload Parameters ========== final: bool This is the last block, so should complete file, if self.autocommit is True. )r*T)rr#postrdbuffergetvaluer5)rfinalr9rrr _upload_chunkUs zWebHDFile._upload_chunkcCsd|jkrd\}}n$d\}}|j|jr8|j|j|jj|||jfddi|j}|j|jd}d|jkr|jj |}| | dd |_ d S) z Create remote file/upload a)APPENDPOST)CREATEPUTr7FrawrrN)r<rr|r6rr:rrbrcr#rmr5rrd)rr'r(r9rdrerrr_initiate_uploadbs    zWebHDFile._initiate_uploadcCsJ|jjd|j|||dd}||jd}|jj|j|}|jS)NZOPENF)r6offsetrFr7ra) rr:r6r5rcr#r%rbcontent)rstartendr9rdrerrr _fetch_rangess zWebHDFile._fetch_rangecCs|j|j|jdSN)rrr6rrrrrcommit|szWebHDFile.commitcCs|j|jdSr)rrr6rrrrdiscardszWebHDFile.discard)F) rrrrrrrrrrrrrrrrCEs  rC)r! urllib.parserrspecrrutilsrlogging getLoggerr/r rCrrrrs   :