ó ùµÈ[c@@s.dZddlmZddlmZddlZddlZddlmZddlmZddl m Z m Z m Z m Z mZmZdd l mZmZmZmZdd l mZmZdd lmZdd lmZd „Zd„Zd„Zd„Zdefd„ƒYZ dd„Z!dS(sB Key value store interface of MXNet for parameter synchronization.i(tabsolute_import(tarrayNi(tNDArray(t _ndarray_cls(t_LIBt c_str_arraytc_handle_arraytc_arrayt c_array_buftc_str(t check_callt string_typestmx_uinttpy_str(t NDArrayHandlet KVStoreHandle(t optimizer(tset_kvstore_handlec C@s't|ttfƒr t|ƒt|ƒks3t‚g}g}d}xyt||ƒD]h\}}t||ƒ\}}} ||7}||7}|dkrŸ| n|}|| ksUtdƒ‚qUW|rÙtt j |ƒntt j |ƒ} tt j |ƒ} | | |fSt|t ftƒs<tdtt|ƒƒƒ‚t|tƒ}t|tƒr£|rot|gƒntt j td|gƒƒ}|t|gƒ|fSx#|D]} t| tƒsªt‚qªW|rèt|gt|ƒƒn%tt j td|gt|ƒƒƒ}|t|ƒ|fSdS(sw Returns ctype arrays for the key-value args, and the whether string keys are used. For internal use only. s$inconsistent types of keys detected.sunexpected type for keys: tiN(t isinstancettupletlisttlentAssertionErrortNonetzipt_ctype_key_valueRtctypestc_char_ptc_inttc_void_ptintR tstrttypeRRRRR( tkeystvalstc_keystc_valst use_str_keystkeytvaltc_key_itc_val_it str_keys_it c_keys_arrt c_vals_arrtvalue((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyR!s6    (cC@st|tƒs+tdtt|ƒƒƒ‚ttjg|jƒD]}t |ƒ^qAƒ}ttjg|j ƒD]}t t|ƒƒ^qrƒ}||fS(sX Returns ctype arrays for keys and values(converted to strings) in a dictionary s unexpected type for param_dict: ( RtdictRR R!RRRR"R tvalues(t param_dicttkR$tvR%((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyt _ctype_dictDs 17c@s‡fd†}|S(s&A wrapper for the user-defined handle.c@s8tt|ƒƒ}tt|ƒƒ}ˆ|||ƒdS(s ctypes function N(RR(R't lhs_handlet rhs_handlet_tlhstrhs(tupdater(sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pytupdater_handlePs((R:R;((R:sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyt_updater_wrapperNscC@sPidd6dd6dd6dd6d d 6d d 6}||ksHtd ƒ‚||S(Nit kControlleritkSetMultiPrecisionit kStopServerit kSyncModeitkSetGradientCompressionitkSetProfilerParamss&Unknown command type to send to server(R(tcommandt command_types((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyt _get_kvstore_server_command_typeWs  tKVStorecB@sÈeZdZd„Zd„Zd„Zdd„Zdded„Z dddd„Z d„Z d „Z e d „ƒZe d „ƒZe d „ƒZed „Zd„Zd„Zd„Zd„ZRS(sGA key-value store for synchronization of values, over multiple devices.cC@s=t|tƒst‚||_d|_d|_d|_dS(sInitializes a new KVStore. Parameters ---------- handle : KVStoreHandle `KVStore` handle of C API. N(RRRthandleRt_updatert _updater_funct_str_updater_func(tselfRG((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyt__init__cs    cC@sttj|jƒƒdS(N(R Rt MXKVStoreFreeRG(RK((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyt__del__qscC@s{t||ƒ\}}}|rLttj|jtt|ƒƒ||ƒƒn+ttj|jtt|ƒƒ||ƒƒdS(sQ Initializes a single or a sequence of key-value pairs into the store. For each key, one must `init` it before calling `push` or `pull`. When multiple workers invoke `init` for the same key, only the value supplied by worker with rank `0` is used. This function returns after data has been initialized successfully. Parameters ---------- key : str, int, or sequence of str or int The keys. value : NDArray, RowSparseNDArray or sequence of NDArray or RowSparseNDArray Values corresponding to the keys. Examples -------- >>> # init a single key-value pair >>> shape = (2,3) >>> kv = mx.kv.create('local') >>> kv.init('3', mx.nd.ones(shape)*2) >>> a = mx.nd.zeros(shape) >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]] >>> # init a list of key-value pairs >>> keys = ['5', '7', '9'] >>> kv.init(keys, [mx.nd.ones(shape)]*len(keys)) >>> # init a row_sparse value >>> kv.init('4', mx.nd.ones(shape).tostype('row_sparse')) >>> b = mx.nd.sparse.zeros('row_sparse', shape) >>> kv.row_sparse_pull('4', row_ids=mx.nd.array([0, 1]), out=b) >>> print b N(RR RtMXKVStoreInitExRGR Rt MXKVStoreInit(RKR'R.tckeystcvalsR&((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pytinitts&.icC@s“t||ƒ\}}}|rXttj|jtt|ƒƒ||tj|ƒƒƒn7ttj |jtt|ƒƒ||tj|ƒƒƒdS(sû Pushes a single or a sequence of key-value pairs into the store. This function returns immediately after adding an operator to the engine. The actual operation is executed asynchronously. If there are consecutive pushes to the same key, there is no guarantee on the serialization of pushes. The execution of a push does not guarantee that all previous pushes are finished. There is no synchronization between workers. One can use ``_barrier()`` to sync all workers. Parameters ---------- key : str, int, or sequence of str or int Keys. value : NDArray, RowSparseNDArray, list of NDArray or RowSparseNDArray, or list of list of NDArray or RowSparseNDArray Values corresponding to the keys. priority : int, optional The priority of the push operation. Higher priority push operations are likely to be executed before other push actions. Examples -------- >>> # push a single key-value pair >>> kv.push('3', mx.nd.ones(shape)*8) >>> kv.pull('3', out=a) # pull out the value >>> print a.asnumpy() [[ 8. 8. 8.] [ 8. 8. 8.]] >>> # aggregate the value and the push >>> gpus = [mx.gpu(i) for i in range(4)] >>> b = [mx.nd.ones(shape, gpu) for gpu in gpus] >>> kv.push('3', b) >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 4. 4. 4.] [ 4. 4. 4.]] >>> # push a list of keys. >>> # single device >>> keys = ['4', '5', '6'] >>> kv.push(keys, [mx.nd.ones(shape)]*len(keys)) >>> b = [mx.nd.zeros(shape)]*len(keys) >>> kv.pull(keys, out=b) >>> print b[1].asnumpy() [[ 1. 1. 1.] [ 1. 1. 1.]] >>> # multiple devices: >>> keys = ['7', '8', '9'] >>> b = [[mx.nd.ones(shape, gpu) for gpu in gpus]] * len(keys) >>> kv.push(keys, b) >>> kv.pull(keys, out=b) >>> print b[1][1].asnumpy() [[ 4. 4. 4.] [ 4. 4. 4.]] >>> # push a row_sparse value >>> b = mx.nd.sparse.zeros('row_sparse', shape) >>> kv.init('10', mx.nd.sparse.zeros('row_sparse', shape)) >>> kv.push('10', mx.nd.ones(shape).tostype('row_sparse')) >>> # pull out the value >>> kv.row_sparse_pull('10', row_ids=mx.nd.array([0, 1]), out=b) >>> print b N( RR RtMXKVStorePushExRGR RRRt MXKVStorePush(RKR'R.tpriorityRQRRR&((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pytpush s G 1 c C@s½|dk st‚t||ƒ\}}}|rvttj|jtt|ƒƒ||t j |ƒt j |ƒƒƒnCttj |jtt|ƒƒ||t j |ƒt j |ƒƒƒdS(sA Pulls a single value or a sequence of values from the store. This function returns immediately after adding an operator to the engine. Subsequent attempts to read from the `out` variable will be blocked until the pull operation completes. `pull` is executed asynchronously after all previous `pull` calls and only the last `push` call for the same input key(s) are finished. The returned values are guaranteed to be the latest values in the store. pull with `RowSparseNDArray` is not supported for dist kvstore. Please use ``row_sparse_pull`` instead. Parameters ---------- key : str, int, or sequence of str or int Keys. out: NDArray or list of NDArray or list of list of NDArray Values corresponding to the keys. priority : int, optional The priority of the pull operation. Higher priority pull operations are likely to be executed before other pull actions. ignore_sparse: bool, optional, default True Whether to ignore sparse arrays in the request. Examples -------- >>> # pull a single key-value pair >>> a = mx.nd.zeros(shape) >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]] >>> # pull into multiple devices >>> b = [mx.nd.ones(shape, gpu) for gpu in gpus] >>> kv.pull('3', out=b) >>> print b[1].asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]] >>> # pull a list of key-value pairs. >>> # On single device >>> keys = ['5', '7', '9'] >>> b = [mx.nd.zeros(shape)]*len(keys) >>> kv.pull(keys, out=b) >>> print b[1].asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]] >>> # On multiple devices >>> keys = ['6', '8', '10'] >>> b = [[mx.nd.ones(shape, gpu) for gpu in gpus]] * len(keys) >>> kv.pull(keys, out=b) >>> print b[1][1].asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]] N( RRRR RtMXKVStorePullWithSparseExRGR RRRtc_booltMXKVStorePullWithSparse(RKR'toutRVt ignore_sparseRQRRR&((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pytpullðs?!!c C@s¡|dk st‚|dk s$t‚t|tƒr?|g}nt|tƒsZtdƒ‚|}t}t|ƒdkrt|tƒrt}|dg}nt||ƒ\}}} t||ƒ\} } } t| ƒt|ƒksñtdƒ‚| r4t t j |j t t|ƒƒ||| tj|ƒƒƒn:t t j|j t t|ƒƒ||| tj|ƒƒƒ|rx&|dD]} |dj| ƒqWndS(s Pulls a single RowSparseNDArray value or a sequence of RowSparseNDArray values from the store with specified row_ids. When there is only one row_id, KVStoreRowSparsePull is invoked just once and the result is broadcast to all the rest of outputs. `row_sparse_pull` is executed asynchronously after all previous `pull`/`row_sparse_pull` calls and the last `push` call for the same input key(s) are finished. The returned values are guaranteed to be the latest values in the store. Parameters ---------- key : str, int, or sequence of str or int Keys. out: RowSparseNDArray or list of RowSparseNDArray or list of list of RowSparseNDArray Values corresponding to the keys. The stype is expected to be row_sparse priority : int, optional The priority of the pull operation. Higher priority pull operations are likely to be executed before other pull actions. row_ids : NDArray or list of NDArray The row_ids for which to pull for each value. Each row_id is an 1-D NDArray whose values don't have to be unique nor sorted. Examples -------- >>> shape = (3, 3) >>> kv.init('3', mx.nd.ones(shape).tostype('row_sparse')) >>> a = mx.nd.sparse.zeros('row_sparse', shape) >>> row_ids = mx.nd.array([0, 2], dtype='int64') >>> kv.row_sparse_pull('3', out=a, row_ids=row_ids) >>> print a.asnumpy() [[ 1. 1. 1.] [ 0. 0. 0.] [ 1. 1. 1.]] >>> duplicate_row_ids = mx.nd.array([2, 2], dtype='int64') >>> kv.row_sparse_pull('3', out=a, row_ids=duplicate_row_ids) >>> print a.asnumpy() [[ 0. 0. 0.] [ 0. 0. 0.] [ 1. 1. 1.]] >>> unsorted_row_ids = mx.nd.array([1, 0], dtype='int64') >>> kv.row_sparse_pull('3', out=a, row_ids=unsorted_row_ids) >>> print a.asnumpy() [[ 1. 1. 1.] [ 1. 1. 1.] [ 0. 0. 0.]] s,row_ids should be NDArray or list of NDArrayiis8the number of row_ids doesn't match the number of valuesN(RRRRRtFalseRtTrueRR RtMXKVStorePullRowSparseExRGR RRtMXKVStorePullRowSparsetcopyto( RKR'R[RVtrow_idst first_outt single_rowidRQRRR&R7tcrow_idstout_i((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pytrow_sparse_pull:s.4  !  4 1cC@snd|jksd|jkr^t|ƒ\}}ttj|jtt|ƒƒ||ƒƒn tdƒ‚dS(se Specifies type of low-bit quantization for gradient compression and additional arguments depending on the type of compression being used. 2bit Gradient Compression takes a positive float `threshold`. The technique works by thresholding values such that positive values in the gradient above threshold will be set to threshold. Negative values whose absolute values are higher than threshold, will be set to the negative of threshold. Values whose absolute values are less than threshold will be set to 0. By doing so, each value in the gradient is in one of three states. 2bits are used to represent these states, and every 16 float values in the original gradient can be represented using one float. This compressed representation can reduce communication costs. The difference between these thresholded values and original values is stored at the sender's end as residual and added to the gradient in the next iteration. When kvstore is 'local', gradient compression is used to reduce communication between multiple devices (gpus). Gradient is quantized on each GPU which computed the gradients, then sent to the GPU which merges the gradients. This receiving GPU dequantizes the gradients and merges them. Note that this increases memory usage on each GPU because of the residual array stored. When kvstore is 'dist', gradient compression is used to reduce communication from worker to sender. Gradient is quantized on each worker which computed the gradients, then sent to the server which dequantizes this data and merges the gradients from each worker. Note that this increases CPU memory usage on each worker because of the residual array stored. Only worker to server communication is compressed in this setting. If each machine has multiple GPUs, currently this GPU to GPU or GPU to CPU communication is not compressed. Server to worker communication (in the case of pull) is also not compressed. To use 2bit compression, we need to specify `type` as `2bit`. Only specifying `type` would use default value for the threshold. To completely specify the arguments for 2bit compression, we would need to pass a dictionary which includes `threshold` like: {'type': '2bit', 'threshold': 0.5} Parameters ---------- compression_params : dict A dictionary specifying the type and parameters for gradient compression. The key `type` in this dictionary is a required string argument and specifies the type of gradient compression. Currently `type` can be only `2bit` Other keys in this dictionary are optional and specific to the type of gradient compression. tdevicetdists>Gradient compression is not supported for this type of kvstoreN( R!R4R RtMXKVStoreSetGradientCompressionRGR Rt Exception(RKtcompression_paramsRQRR((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pytset_gradient_compressionŠs 0cC@sÊtjƒ}ttjtj|ƒƒƒd|jkr°|jr°ytt j |dƒƒ}Wn ‚nXt dƒ}|j ||ƒ|j rÆt dƒ}|j |dƒqÆn|jtj|ƒƒdS(s Registers an optimizer with the kvstore. When using a single machine, this function updates the local optimizer. If using multiple machines and this operation is invoked from a worker node, it will serialized the optimizer with pickle and send it to all servers. The function returns after all servers have been updated. Parameters ---------- optimizer : Optimizer The new optimizer for the store Examples -------- >>> kv = mx.kv.create() >>> shape = (2, 2) >>> weight = mx.nd.zeros(shape) >>> kv.init(3, weight) >>> # set the optimizer for kvstore as the default SGD optimizer >>> kv.set_optimizer(mx.optimizer.SGD()) >>> grad = mx.nd.ones(shape) >>> kv.push(3, grad) >>> kv.pull(3, out = weight) >>> # weight is updated via gradient descent >>> weight.asnumpy() array([[-0.01, -0.01], [-0.01, -0.01]], dtype=float32) RjiR=R>tN(RRR RtMXKVStoreIsWorkerNodetbyrefR!R.R tpickletdumpsREt_send_command_to_serverstmulti_precisiont _set_updatertoptt get_updater(RKRt is_workert optim_strtcmd((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyt set_optimizerÂs    cC@s;tjƒ}ttj|jtj|ƒƒƒt|jƒS(s{ Returns the type of this kvstore. Returns ------- type : str the string type ( RRR RtMXKVStoreGetTypeRGRqR R.(RKtkv_type((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyR!ós "cC@s5tjƒ}ttj|jtj|ƒƒƒ|jS(s« Returns the rank of this worker node. Returns ------- rank : int The rank of this node, which is in range [0, num_workers()) (RRR RtMXKVStoreGetRankRGRqR.(RKtrank((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyR€s "cC@s5tjƒ}ttj|jtj|ƒƒƒ|jS(s‡Returns the number of worker nodes. Returns ------- size :int The number of worker nodes. (RRR RtMXKVStoreGetGroupSizeRGRqR.(RKtsize((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyt num_workers s "cC@sP|jdk stdƒ‚t|dƒ }|j|jj|ƒƒWdQXdS(s´Saves the optimizer (updater) state to a file. This is often used when checkpointing the model during training. Parameters ---------- fname : str Path to the output states file. dump_optimizer : bool, default False Whether to also save the optimizer itself. This would also save optimizer information such as learning rate and weight decay schedules. s+Cannot save states for distributed trainingtwbN(RHRRtopentwritet get_states(RKtfnametdump_optimizertfout((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pytsave_optimizer_statess cC@s>|jdk stdƒ‚|jjt|dƒjƒƒdS(sLoads the optimizer (updater) state from the file. Parameters ---------- fname : str Path to input states file. s+Cannot load states for distributed trainingtrbN(RHRRt set_statesR…tread(RKRˆ((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pytload_optimizer_states*scC@sž||_tjdtjtttjƒ}|t|ƒƒ|_tjdtj tttjƒ}|t|ƒƒ|_ t t j |j|j|j dƒƒdS(sñSets a push updater into the store. This function only changes the local store. When running on multiple machines one must use `set_optimizer`. Parameters ---------- updater : function The updater function. Examples -------- >>> def update(key, input, stored): ... print "update on key: %d" % key ... stored += input * 2 >>> kv._set_updater(update) >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 4. 4. 4.] [ 4. 4. 4.]] >>> kv.push('3', mx.nd.ones(shape)) update on key: 3 >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 6. 6. 6.] [ 6. 6. 6.]] N(RHRt CFUNCTYPERRRRR<RIRRJR RtMXKVStoreSetUpdaterExRG(RKR:t_updater_protot_str_updater_proto((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyRv5s cC@sttj|jƒƒdS(sXInvokes global barrier among all worker nodes. For example, assume there are `n` machines. We would like machine `0` to first `init` the values and then have all the workers `pull` the initialized value. Before pulling, we can place invoke `_barrier()` to guarantee that the initialization is finished. N(R RtMXKVStoreBarrierRG(RK((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyt_barrier^scC@s,ttj|jt|ƒt|ƒƒƒdS(s±Sends a command to all server nodes. Sending command to a server node will cause that server node to invoke ``KVStoreServer.controller`` to execute the command. This function returns after the command has been executed on all server nodes. Parameters ---------- head : int the head of the command. body : str the body of the command. N(R RtMXKVStoreSendCommmandToServersRGR R (RKtheadtbody((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyRths N(t__name__t __module__t__doc__RLRNRSRWRR_R]RhRnR|tpropertyR!R€RƒR^R‹RRvR•Rt(((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyRFas"   , PJP 8 1    ) tlocalcC@sit|tƒstdƒ‚ntƒ}ttjt|ƒtj |ƒƒƒt |ƒ}t |j ƒ|S(s#Creates a new KVStore. For single machine training, there are two commonly used types: ``local``: Copies all gradients to CPU memory and updates weights there. ``device``: Aggregates gradients and updates weights on GPUs. With this setting, the KVStore also attempts to use GPU peer-to-peer communication, potentially accelerating the communication. For distributed training, KVStore also supports a number of types: ``dist_sync``: Behaves similarly to ``local`` but with one major difference. With ``dist_sync``, batch-size now means the batch size used on each machine. So if there are ``n`` machines and we use batch size ``b``, then ``dist_sync`` behaves like ``local`` with batch size ``n * b``. ``dist_device_sync``: Identical to ``dist_sync`` with the difference similar to ``device`` vs ``local``. ``dist_async``: Performs asynchronous updates. The weights are updated whenever gradients are received from any machine. No two updates happen on the same weight at the same time. However, the order is not guaranteed. Parameters ---------- name : {'local', 'device', 'nccl', 'dist_sync', 'dist_device_sync', 'dist_async'} The type of KVStore. Returns ------- kv : KVStore The created KVStore. sname must be a string( RR t TypeErrorRR RtMXKVStoreCreateR RRqRFRRG(tnameRGtkv((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pytcreate{s#   ("R›t __future__RRRRrtndarrayRRtbaseRRRRRR R R R R RRRoRRwtprofilerRRR4R<REtobjectRFR¢(((sM/usr/local/lib/python2.7/site-packages/mxnet-1.3.1-py2.7.egg/mxnet/kvstore.pyts&  ." # ÿÿ