/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /*! * \file rpc_module.cc * \brief RPC runtime module. */ #include #include #include #include #include #include #if defined(_M_X64) || defined(__x86_64__) #include #endif #include "rpc_endpoint.h" #include "rpc_session.h" namespace tvm { namespace runtime { // deleter of RPC remote array static void RemoteNDArrayDeleter(Object* obj) { auto* ptr = static_cast(obj); RemoteSpace* space = static_cast(ptr->dl_tensor.data); if (ptr->manager_ctx != nullptr) { space->sess->FreeHandle(ptr->manager_ctx, kTVMNDArrayHandle); } delete space; delete ptr; } /*! * \brief Build a local NDArray with remote backing storage. * \param sess the RPCSession which owns the given handle. * \param handle A pointer valid on the remote end which should form the `data` field of the * underlying DLTensor. * \param template_tensor An empty DLTensor whose shape and dtype fields are used to fill the newly * created array. Needed because it's difficult to pass a shape vector as a PackedFunc arg. * \param dev Remote device used with this tensor. Must have non-zero RPCSessMask. * \param remote_ndarray_handle The handle returned by RPC server to identify the NDArray. */ NDArray NDArrayFromRemoteOpaqueHandle(std::shared_ptr sess, void* handle, DLTensor* template_tensor, Device dev, void* remote_ndarray_handle) { ICHECK_EQ(sess->table_index(), GetRPCSessionIndex(dev)) << "The Device given does not belong to the given session"; RemoteSpace* space = new RemoteSpace(); space->sess = sess; space->data = handle; std::vector shape_vec{template_tensor->shape, template_tensor->shape + template_tensor->ndim}; NDArray::Container* data = new NDArray::Container(static_cast(space), std::move(shape_vec), template_tensor->dtype, dev); data->manager_ctx = remote_ndarray_handle; data->SetDeleter(RemoteNDArrayDeleter); return NDArray(GetObjectPtr(data)); } /*! * \brief A wrapped remote function as a PackedFunc. */ class RPCWrappedFunc : public Object { public: RPCWrappedFunc(void* handle, std::shared_ptr sess) : handle_(handle), sess_(sess) {} void operator()(TVMArgs args, TVMRetValue* rv) const { std::vector values(args.values, args.values + args.size()); std::vector type_codes(args.type_codes, args.type_codes + args.size()); std::vector> temp_dltensors; // scan and check whether we need rewrite these arguments // to their remote variant. for (int i = 0; i < args.size(); ++i) { if (args[i].IsObjectRef()) { String str = args[i]; type_codes[i] = kTVMStr; values[i].v_str = str.c_str(); continue; } int tcode = type_codes[i]; switch (tcode) { case kTVMDLTensorHandle: case kTVMNDArrayHandle: { // Pass NDArray as DLTensor, NDArray and DLTensor // are compatible to each other, just need to change the index. type_codes[i] = kTVMDLTensorHandle; // translate to a remote view of DLTensor auto dptr = std::make_unique(*static_cast(values[i].v_handle)); dptr->device = RemoveSessMask(dptr->device); dptr->data = static_cast(dptr->data)->data; values[i].v_handle = dptr.get(); temp_dltensors.emplace_back(std::move(dptr)); break; } case kDLDevice: { values[i].v_device = RemoveSessMask(values[i].v_device); break; } case kTVMPackedFuncHandle: case kTVMModuleHandle: { values[i].v_handle = UnwrapRemoteValueToHandle(TVMArgValue(values[i], tcode)); break; } } } auto set_return = [this, rv](TVMArgs args) { this->WrapRemoteReturnToValue(args, rv); }; sess_->CallFunc(handle_, values.data(), type_codes.data(), args.size(), set_return); } ~RPCWrappedFunc() { try { sess_->FreeHandle(handle_, kTVMPackedFuncHandle); } catch (const Error& e) { // fault tolerance to remote close } } private: // remote function handle void* handle_{nullptr}; // pointer to the session. std::shared_ptr sess_; // unwrap a remote value to the underlying handle. void* UnwrapRemoteValueToHandle(const TVMArgValue& arg) const; // wrap a remote return via Set void WrapRemoteReturnToValue(TVMArgs args, TVMRetValue* rv) const; // remove a remote session mask Device RemoveSessMask(Device dev) const { ICHECK(IsRPCSessionDevice(dev)) << "Can not pass in local device"; ICHECK_EQ(GetRPCSessionIndex(dev), sess_->table_index()) << "Can not pass in device with a different remote session"; return RemoveRPCSessionMask(dev); } }; // RPC that represents a remote module session. class RPCModuleNode final : public ModuleNode { public: RPCModuleNode(void* module_handle, std::shared_ptr sess) : module_handle_(module_handle), sess_(sess) {} ~RPCModuleNode() { if (module_handle_ != nullptr) { try { sess_->FreeHandle(module_handle_, kTVMModuleHandle); } catch (const Error& e) { // fault tolerance to remote close } module_handle_ = nullptr; } } const char* type_key() const final { return "rpc"; } PackedFunc GetFunction(const std::string& name, const ObjectPtr& sptr_to_self) final { if (module_handle_ == nullptr) { return WrapRemoteFunc(sess_->GetFunction(name)); } else { InitRemoteFunc(&remote_mod_get_function_, "tvm.rpc.server.ModuleGetFunction"); return remote_mod_get_function_(GetRef(this), name, false); } } std::string GetSource(const std::string& format) final { LOG(FATAL) << "GetSource for rpc Module is not supported"; return ""; } PackedFunc GetTimeEvaluator(const std::string& name, Device dev, int number, int repeat, int min_repeat_ms, const std::string& f_preproc_name) { InitRemoteFunc(&remote_get_time_evaluator_, "runtime.RPCTimeEvaluator"); // Remove session mask because we pass dev by parts. ICHECK_EQ(GetRPCSessionIndex(dev), sess_->table_index()) << "ValueError: Need to pass the matched remote device to RPCModule.GetTimeEvaluator"; dev = RemoveRPCSessionMask(dev); if (module_handle_ != nullptr) { return remote_get_time_evaluator_(GetRef(this), name, static_cast(dev.device_type), dev.device_id, number, repeat, min_repeat_ms, f_preproc_name); } else { return remote_get_time_evaluator_(Optional(nullptr), name, static_cast(dev.device_type), dev.device_id, number, repeat, min_repeat_ms, f_preproc_name); } } Module LoadModule(std::string name) { InitRemoteFunc(&remote_load_module_, "tvm.rpc.server.load_module"); return remote_load_module_(name); } void ImportModule(Module other) { InitRemoteFunc(&remote_import_module_, "tvm.rpc.server.ImportModule"); remote_import_module_(GetRef(this), other); } const std::shared_ptr& sess() { return sess_; } void* module_handle() const { return module_handle_; } private: template void InitRemoteFunc(FType* func, const std::string& name) { if (*func != nullptr) return; RPCSession::PackedFuncHandle handle = sess_->GetFunction(name); ICHECK(handle != nullptr) << "Cannot found remote function " << name; *func = WrapRemoteFunc(handle); } PackedFunc WrapRemoteFunc(RPCSession::PackedFuncHandle handle) { if (handle == nullptr) return PackedFunc(); auto wf = std::make_shared(handle, sess_); return PackedFunc([wf](TVMArgs args, TVMRetValue* rv) { return wf->operator()(args, rv); }); } // The module handle void* module_handle_{nullptr}; // The local channel std::shared_ptr sess_; // remote function to get time evaluator TypedPackedFunc, std::string, int, int, int, int, int, std::string)> remote_get_time_evaluator_; // remote function getter for modules. TypedPackedFunc remote_mod_get_function_; // remote function getter for load module TypedPackedFunc remote_load_module_; // remote function getter for load module TypedPackedFunc remote_import_module_; }; void* RPCWrappedFunc::UnwrapRemoteValueToHandle(const TVMArgValue& arg) const { if (arg.type_code() == kTVMModuleHandle) { Module mod = arg; std::string tkey = mod->type_key(); ICHECK_EQ(tkey, "rpc") << "ValueError: Cannot pass a non-RPC module to remote"; auto* rmod = static_cast(mod.operator->()); ICHECK(rmod->sess() == sess_) << "ValueError: Cannot pass in module into a different remote session"; return rmod->module_handle(); } else { LOG(FATAL) << "ValueError: Cannot pass type " << runtime::ArgTypeCode2Str(arg.type_code()) << " as an argument to the remote"; return nullptr; } } void RPCWrappedFunc::WrapRemoteReturnToValue(TVMArgs args, TVMRetValue* rv) const { int tcode = args[0]; if (tcode == kTVMNullptr) return; if (tcode == kTVMPackedFuncHandle) { ICHECK_EQ(args.size(), 2); void* handle = args[1]; auto wf = std::make_shared(handle, sess_); *rv = PackedFunc([wf](TVMArgs args, TVMRetValue* rv) { return wf->operator()(args, rv); }); } else if (tcode == kTVMModuleHandle) { ICHECK_EQ(args.size(), 2); void* handle = args[1]; auto n = make_object(handle, sess_); *rv = Module(n); } else if (tcode == kTVMDLTensorHandle || tcode == kTVMNDArrayHandle) { ICHECK_EQ(args.size(), 3); DLTensor* tensor = args[1]; void* nd_handle = args[2]; *rv = NDArrayFromRemoteOpaqueHandle(sess_, tensor->data, tensor, AddRPCSessionMask(tensor->device, sess_->table_index()), nd_handle); } else { ICHECK_EQ(args.size(), 2); *rv = args[1]; } } Module CreateRPCSessionModule(std::shared_ptr sess) { auto n = make_object(nullptr, sess); RPCSession::InsertToSessionTable(sess); return Module(n); } std::shared_ptr RPCModuleGetSession(Module mod) { std::string tkey = mod->type_key(); ICHECK_EQ(tkey, "rpc") << "ValueError: Cannot pass a non-RPC module to remote"; auto* rmod = static_cast(mod.operator->()); return rmod->sess(); } /*! * \brief Flush the cache. * \param addr The address of data we want to flush * \param len The length of data */ /* * When we are in the tuning of TVM, we will make TVM occupy * the cache fully and doesn't flush it during iteration. * This has problems then in e2e testing, since arrays that * we assume exist in cache (ie. weights) are evicted during e2e runs, * which leads to lower performance. */ inline void CPUCacheFlushImpl(const char* addr, unsigned int len) { #if (defined(_M_X64) || defined(__x86_64__) || defined(__aarch64__)) #if defined(__aarch64__) size_t ctr_el0 = 0; asm volatile("mrs %0, ctr_el0" : "=r"(ctr_el0)); const size_t cache_line = 4 << ((ctr_el0 >> 16) & 15); #else const size_t cache_line = 64; #endif if (addr == nullptr || len <= 0) { return; } for (uintptr_t uptr = (uintptr_t)addr & ~(cache_line - 1); uptr < (uintptr_t)addr + len; uptr += cache_line) { #if defined(__aarch64__) asm volatile("dc civac, %0\n\t" : : "r"(reinterpret_cast(uptr)) : "memory"); #else _mm_clflush(reinterpret_cast(uptr)); #endif } #if defined(__aarch64__) asm volatile("dmb ishst" : : : "memory"); #endif #endif } inline void CPUCacheFlush(int begin_index, const TVMArgs& args) { for (int i = begin_index; i < args.size(); i++) { CPUCacheFlushImpl(static_cast((args[i].operator DLTensor*()->data)), GetDataSize(*(args[i].operator DLTensor*()))); } } PackedFunc WrapTimeEvaluator(PackedFunc pf, Device dev, int number, int repeat, int min_repeat_ms, PackedFunc f_preproc) { ICHECK(pf != nullptr); if (static_cast(dev.device_type) == static_cast(kDLMicroDev)) { auto get_micro_time_evaluator = runtime::Registry::Get("micro._GetMicroTimeEvaluator"); ICHECK(get_micro_time_evaluator != nullptr) << "micro backend not enabled"; return (*get_micro_time_evaluator)(pf, dev, number, repeat); } auto ftimer = [pf, dev, number, repeat, min_repeat_ms, f_preproc](TVMArgs args, TVMRetValue* rv) mutable { TVMRetValue temp; std::ostringstream os; // skip first time call, to activate lazy compilation components. pf.CallPacked(args, &temp); DeviceAPI::Get(dev)->StreamSync(dev, nullptr); for (int i = 0; i < repeat; ++i) { if (f_preproc != nullptr) { f_preproc.CallPacked(args, &temp); } double duration_ms = 0.0; do { if (duration_ms > 0.0) { number = static_cast(std::max((min_repeat_ms / (duration_ms / number) + 1), number * 1.618)); // 1.618 is chosen by random } Timer t = Timer::Start(dev); // start timing for (int i = 0; i < number; ++i) { pf.CallPacked(args, &temp); } t->Stop(); int64_t t_nanos = t->SyncAndGetElapsedNanos(); duration_ms = t_nanos / 1e6; } while (duration_ms < min_repeat_ms); double speed = duration_ms / 1e3 / number; os.write(reinterpret_cast(&speed), sizeof(speed)); } std::string blob = os.str(); TVMByteArray arr; arr.size = blob.length(); arr.data = blob.data(); // return the time. *rv = arr; }; return PackedFunc(ftimer); } TVM_REGISTER_GLOBAL("runtime.RPCTimeEvaluator") .set_body_typed([](Optional opt_mod, std::string name, int device_type, int device_id, int number, int repeat, int min_repeat_ms, std::string f_preproc_name) { Device dev; dev.device_type = static_cast(device_type); dev.device_id = device_id; if (opt_mod.defined()) { Module m = opt_mod.value(); std::string tkey = m->type_key(); if (tkey == "rpc") { return static_cast(m.operator->()) ->GetTimeEvaluator(name, dev, number, repeat, min_repeat_ms, f_preproc_name); } else { PackedFunc f_preproc; if (!f_preproc_name.empty()) { auto* pf_preproc = runtime::Registry::Get(f_preproc_name); ICHECK(pf_preproc != nullptr) << "Cannot find " << f_preproc_name << " in the global function"; f_preproc = *pf_preproc; } PackedFunc pf = m.GetFunction(name, false); CHECK(pf != nullptr) << "Cannot find " << name << " in the global registry"; return WrapTimeEvaluator(pf, dev, number, repeat, min_repeat_ms, f_preproc); } } else { auto* pf = runtime::Registry::Get(name); ICHECK(pf != nullptr) << "Cannot find " << name << " in the global function"; PackedFunc f_preproc; if (!f_preproc_name.empty()) { auto* pf_preproc = runtime::Registry::Get(f_preproc_name); ICHECK(pf_preproc != nullptr) << "Cannot find " << f_preproc_name << " in the global function"; f_preproc = *pf_preproc; } return WrapTimeEvaluator(*pf, dev, number, repeat, min_repeat_ms, f_preproc); } }); TVM_REGISTER_GLOBAL("cache_flush_cpu_non_first_arg").set_body([](TVMArgs args, TVMRetValue* rv) { CPUCacheFlush(1, args); }); // server function registration. TVM_REGISTER_GLOBAL("tvm.rpc.server.ImportModule").set_body_typed([](Module parent, Module child) { parent->Import(child); }); TVM_REGISTER_GLOBAL("tvm.rpc.server.ModuleGetFunction") .set_body_typed([](Module parent, std::string name, bool query_imports) { return parent->GetFunction(name, query_imports); }); // functions to access an RPC module. TVM_REGISTER_GLOBAL("rpc.LoadRemoteModule").set_body_typed([](Module sess, std::string name) { std::string tkey = sess->type_key(); ICHECK_EQ(tkey, "rpc"); return static_cast(sess.operator->())->LoadModule(name); }); TVM_REGISTER_GLOBAL("rpc.ImportRemoteModule").set_body_typed([](Module parent, Module child) { std::string tkey = parent->type_key(); ICHECK_EQ(tkey, "rpc"); static_cast(parent.operator->())->ImportModule(child); }); TVM_REGISTER_GLOBAL("rpc.SessTableIndex").set_body([](TVMArgs args, TVMRetValue* rv) { Module m = args[0]; std::string tkey = m->type_key(); ICHECK_EQ(tkey, "rpc"); *rv = static_cast(m.operator->())->sess()->table_index(); }); TVM_REGISTER_GLOBAL("tvm.rpc.NDArrayFromRemoteOpaqueHandle") .set_body_typed([](Module mod, void* remote_array, DLTensor* template_tensor, Device dev, void* ndarray_handle) -> NDArray { return NDArrayFromRemoteOpaqueHandle(RPCModuleGetSession(mod), remote_array, template_tensor, dev, ndarray_handle); }); } // namespace runtime } // namespace tvm