ó šÄïYc@@sÚdZddlmZddlZddlZddlmZddlmZddlm Z m Z m Z m Z m Z mZddlmZmZdd lmZd „Zd „Zd efd „ƒYZdd„ZdS(sB Key value store interface of MXNet for parameter synchronization.i(tabsolute_importNi(tNDArray(t_LIB(t check_calltc_arraytc_strt string_typestmx_uinttpy_str(t NDArrayHandlet KVStoreHandle(t optimizerc C@srt|ttfƒr§t|ƒt|ƒks3t‚g}g}xFt||ƒD]5\}}t||ƒ\}}||7}||7}qOWttj |ƒtt |ƒfSg}t |ƒ}t|t ƒr|j t|ƒƒttj |ƒtt |jgƒfSx#|D]} t| t ƒst‚qWttj t|ƒgt|ƒƒtt g|D]} | j^qUƒfSdS(N(t isinstancettupletlisttlentAssertionErrortzipt_ctype_key_valueRtctypestc_char_pR tstrRtappendRthandle( tkeystvalstc_keystc_valstkeytvaltc_key_itc_val_itnamestvalue((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyRs&   "c@s‡fd†}|S(s&A wrapper for the user-defined handle.c@s8tt|ƒƒ}tt|ƒƒ}ˆ|||ƒdS(s ctypes function N(RR (Rt lhs_handlet rhs_handlet_tlhstrhs(tupdater(s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pytupdater_handle6s((R'R(((R's-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyt_updater_wrapper4stKVStorecB@s§eZdZd„Zd„Zd„Zdd„Zddd„Zd„Z e d„ƒZ e d „ƒZ e d „ƒZ d „Zd „Zd „Zd„Zd„ZRS(sGA key-value store for synchronization of values, over multiple devices.cC@s4t|tƒst‚||_d|_d|_dS(sInitializes a new KVStore. Parameters ---------- handle : KVStoreHandle `KVStore` handle of C API. N(R R RRtNonet_updatert _updater_func(tselfR((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyt__init__@s  cC@sttj|jƒƒdS(N(RRt MXKVStoreFreeR(R.((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyt__del__MscC@sDt||ƒ\}}ttj|jtt|ƒƒ||ƒƒdS(sú Initializes a single or a sequence of key-value pairs into the store. For each key, one must `init` it before calling `push` or `pull`. When multiple workers invoke `init` for the same key, only the value supplied by worker with rank `0` is used. This function returns after data has been initialized successfully. Parameters ---------- key : str or sequence of str The keys. value : NDArray or sequence of NDArray Values corresponding to the keys. Examples -------- >>> # init a single key-value pair >>> shape = (2,3) >>> kv = mx.kv.create('local') >>> kv.init('3', mx.nd.ones(shape)*2) >>> a = mx.nd.zeros(shape) >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]] >>> # init a list of key-value pairs >>> keys = ['5', '7', '9'] >>> kv.init(keys, [mx.nd.ones(shape)]*len(keys)) N(RRRtMXKVStoreInitExRRR(R.RR!tckeystcvals((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pytinitPsicC@sPt||ƒ\}}ttj|jtt|ƒƒ||tj|ƒƒƒdS(sZ Pushes a single or a sequence of key-value pairs into the store. This function returns immediately after adding an operator to the engine. The actual operation is executed asynchronously after all previous `push` and `pull` calls for the same input key(s) are finished. There is no synchronization between workers. One can use ``_barrier()`` to sync all workers. Parameters ---------- key : str or list of str Keys. value : NDArray or list of NDArray or list of list of NDArray Values corresponding to the keys. priority : int, optional The priority of the push operation. Higher priority push operations are likely to be executed before other push actions. Examples -------- >>> # push a single key-value pair >>> kv.push('3', mx.nd.ones(shape)*8) >>> kv.pull('3', out=a) # pull out the value >>> print a.asnumpy() [[ 8. 8. 8.] [ 8. 8. 8.]] >>> # aggregate the value and the push >>> gpus = [mx.gpu(i) for i in range(4)] >>> b = [mx.nd.ones(shape, gpu) for gpu in gpus] >>> kv.push('3', b) >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 4. 4. 4.] [ 4. 4. 4.]] >>> # push a list of keys. >>> # single device >>> kv.push(keys, [mx.nd.ones(shape)]*len(keys)) >>> b = [mx.nd.zeros(shape)]*len(keys) >>> kv.pull(keys, out=b) >>> print b[1].asnumpy() [[ 1. 1. 1.] [ 1. 1. 1.]] >>> # multiple devices: >>> b = [[mx.nd.ones(shape, gpu) for gpu in gpus]] * len(keys) >>> kv.push(keys, b) >>> kv.pull(keys, out=b) >>> print b[1][1].asnumpy() [[ 4. 4. 4.] [ 4. 4. 4.]] N( RRRtMXKVStorePushExRRRRtc_int(R.RR!tpriorityR3R4((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pytpushrs9 cC@sb|dk st‚t||ƒ\}}ttj|jtt|ƒƒ||t j |ƒƒƒdS(s Pulls a single value or a sequence of values from the store. This function returns immediately after adding an operator to the engine. Subsequent attempts to read from the `out` variable will be blocked until the pull operation completes. `pull` is executed asynchronously after all previous `push` and `pull` calls for the same input key(s) are finished. The returned values are gauranteed to be the latest values in the store. Parameters ---------- key : int or list of int Keys. out: NDArray or list of NDArray or list of list of NDArray Values corresponding to the keys. priority : int, optional The priority of the pull operation. Higher priority pull operations are likely to be executed before other pull actions. Examples -------- >>> # pull a single key-value pair >>> a = mx.nd.zeros(shape) >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]] >>> # pull into multiple devices >>> b = [mx.nd.ones(shape, gpu) for gpu in gpus] >>> kv.pull('3', out=b) >>> print b[1].asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]] >>> # pull a list of key-value pairs. >>> # On single device >>> keys = ['5', '7', '9'] >>> b = [mx.nd.zeros(shape)]*len(keys) >>> kv.pull(keys, out=b) >>> print b[1].asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]] >>> # On multiple devices >>> b = [[mx.nd.ones(shape, gpu) for gpu in gpus]] * len(keys) >>> kv.pull(keys, out=b) >>> print b[1][1].asnumpy() [[ 2. 2. 2.] [ 2. 2. 2.]] N( R+RRRRtMXKVStorePullExRRRRR7(R.RtoutR8R3R4((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pytpull±s 8 cC@stjƒ}ttjtj|ƒƒƒd|jkrv|jrvytj |dƒ}Wn ‚nX|j d|ƒn|j t j |ƒƒdS(s Registers an optimizer with the kvstore. When using a single machine, this function updates the local optimizer. If using multiple machines and this operation is invoked from a worker node, it will serialized the optimizer with pickle and send it to all servers. The function returns after all servers have been updated. Parameters ---------- optimizer : Optimizer The new optimizer for the store Examples -------- >>> kv = mx.kv.create() >>> shape = (2, 2) >>> weight = mx.nd.zeros(shape) >>> kv.init(3, weight) >>> # set the optimizer for kvstore as the default SGD optimizer >>> kv.set_optimizer(mx.optimizer.SGD()) >>> grad = mx.nd.ones(shape) >>> kv.push(3, grad) >>> kv.pull(3, out = weight) >>> # weight is updated via gradient descent >>> weight.asnumpy() array([[-0.01, -0.01], [-0.01, -0.01]], dtype=float32) tdistiN(RR7RRtMXKVStoreIsWorkerNodetbyrefttypeR!tpickletdumpst_send_command_to_serverst _set_updatertoptt get_updater(R.R t is_workert optim_str((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyt set_optimizerïs cC@s;tjƒ}ttj|jtj|ƒƒƒt|jƒS(s{ Returns the type of this kvstore. Returns ------- type : str the string type ( RRRRtMXKVStoreGetTypeRR?RR!(R.tkv_type((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyR@s "cC@s5tjƒ}ttj|jtj|ƒƒƒ|jS(s« Returns the rank of this worker node. Returns ------- rank : int The rank of this node, which is in range [0, num_workers()) (RR7RRtMXKVStoreGetRankRR?R!(R.trank((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyRM)s "cC@s5tjƒ}ttj|jtj|ƒƒƒ|jS(s‡Returns the number of worker nodes. Returns ------- size :int The number of worker nodes. (RR7RRtMXKVStoreGetGroupSizeRR?R!(R.tsize((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyt num_workers6s "cC@sM|jdk stdƒ‚t|dƒ}|j|jjƒƒWdQXdS(sçSaves the optimizer (updater) state to a file. This is often used when checkpointing the model during training. Parameters ---------- fname : str Path to the output states file. s+Cannot save states for distributed trainingtwbN(R,R+Rtopentwritet get_states(R.tfnametfout((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pytsave_optimizer_statesCs cC@s>|jdk stdƒ‚|jjt|dƒjƒƒdS(sLoads the optimizer (updater) state from the file. Parameters ---------- fname : str Path to input states file. s+Cannot save states for distributed trainingtrbN(R,R+Rt set_statesRRtread(R.RU((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pytload_optimizer_statesPscC@sb||_tjdtjtttjƒ}|t|ƒƒ|_t t j |j |jdƒƒdS(sñSets a push updater into the store. This function only changes the local store. When running on multiple machines one must use `set_optimizer`. Parameters ---------- updater : function The updater function. Examples -------- >>> def update(key, input, stored): ... print "update on key: %d" % key ... stored += input * 2 >>> kv._set_updater(update) >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 4. 4. 4.] [ 4. 4. 4.]] >>> kv.push('3', mx.nd.ones(shape)) update on key: 3 >>> kv.pull('3', out=a) >>> print a.asnumpy() [[ 6. 6. 6.] [ 6. 6. 6.]] N( R,Rt CFUNCTYPER+R7R tc_void_pR)R-RRtMXKVStoreSetUpdaterR(R.R't_updater_proto((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyRD[s  cC@sttj|jƒƒdS(sXInvokes global barrier among all worker nodes. For example, assume there are `n` machines. We would like machine `0` to first `init` the values and then have all the workers `pull` the initialized value. Before pulling, we can place invoke `_barrier()` to guarantee that the initialization is finished. N(RRtMXKVStoreBarrierR(R.((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyt_barrier~scC@s,ttj|jt|ƒt|ƒƒƒdS(s±Sends a command to all server nodes. Sending command to a server node will cause that server node to invoke ``KVStoreServer.controller`` to execute the command. This function returns after the command has been executed on all server nodes. Parameters ---------- head : int the head of the command. body : str the body of the command. N(RRtMXKVStoreSendCommmandToServersRRR(R.theadtbody((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyRCˆs N(t__name__t __module__t__doc__R/R1R5R9R+R<RItpropertyR@RMRPRWR[RDRaRC(((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyR*>s  " ?> -   # tlocalcC@sVt|tƒstdƒ‚ntƒ}ttjt|ƒtj |ƒƒƒt |ƒS(sCreates a new KVStore. For single machine training, there are two commonly used types: ``local``: Copies all gradients to CPU memory and updates weights there. ``device``: Aggregates gradients and updates weights on GPUs. With this setting, the KVStore also attempts to use GPU peer-to-peer communication, potentially accelerating the communication. For distributed training, KVStore also supports a number of types: ``dist_sync``: Behaves similarly to ``local`` but with one major difference. With ``dist_sync``, batch-size now means the batch size used on each machine. So if there are ``n`` machines and we use batch size ``b``, then ``dist_sync`` behaves like ``local`` with batch size ``n * b``. ``dist_device_sync``: Identical to ``dist_sync`` with the difference similar to ``device`` vs ``local``. ``dist_async``: Performs asynchronous updates. The weights are updated whenever gradients are received from any machine. No two updates happen on the same weight at the same time. However, the order is not guaranteed. Parameters ---------- name : {'local', 'device', 'dist_sync', 'dist_device_sync', 'dist_async'} The type of KVStore. Returns ------- kv : KVStore The created KVStore. sname must be a string( R Rt TypeErrorR RRtMXKVStoreCreateRRR?R*(tnameR((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pytcreate›s # (Rgt __future__RRRAtndarrayRtbaseRRRRRRRR R tR RERR)tobjectR*Rm(((s-build/bdist.linux-armv7l/egg/mxnet/kvstore.pyts  .  ÿ^