作者|郑建华

更新|赵露阳


通过这篇笔记,希望能初步了解 OneFlow 在 Eager 模式下对设备的管理方式、设备执行计算的过程以及如何充分利用设备计算能力。
这里的设备主要指类似 CUDA 这样的并行计算加速设备。

1

设备、流相关类型及关系

框架通过流(Stream)向设备(Device)提交计算任务。一个 Stream 是一个命令序列,可以类比 CUDA Stream(https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#streams),或者 CPU Thread 的指令序列。同一个 Stream 中的命令按顺序执行;不同 Stream 之间的命令有依赖关系时,需要同步。不同的任务,比如 kernel 计算、host2device、device2host 等都有自己独立的 Stream,可以并发执行,从而在 Eager 模式下尽可能充分利用设备的异步并发执行能力。
OneFlow 中Device和Stream相关的部分类结构如下所示:

Device相关类型

oneflow::Device

oneflow::Device是用于表示设备的基础类型,例如:构建tensor时 flow.tensor(shape, device="cuda:1")就会在内部构造出这个基础的Device类型,其中设备编号为1、设备类型为CUDA。
oneflow/core/framework/device.h:
classDevicefinal {public: ...private: Device(conststd::string& type, int64_t device_id); Maybe<void> Init();conststd::string type_; DeviceType enum_type_;constint64_t device_id_;constsize_t hash_value_;std::shared_ptr<MemoryCase> mem_case_;};
oneflow::Device中最重要的两个成员变量分别是用于表示设备类型的DeviceType用于表示设备编号的device_id_

DeviceType

DeviceType是一个枚举类,不同的值代表不同的计算设备类型,其定义位于 oneflow/core/common/device_type.proto
enumDeviceType{ kInvalidDevice = 0; // 无效设备 kCPU = 1; // cpu设备 kCUDA = 2; // cuda设备 kMockDevice = 3; // pseudo device for test.}
目前在oneflow master分支中,主要有kCPU表示cpu设备;kCUDA表示nvidia cuda设备;在其他多设备支持的分支中,这里还增加了更多的设备类型。

oneflow::ep::Device

oneflow::Device是oneflow中对设备的基础封装类型,而oneflow::ep::Device则是一个抽象类,属于oneflow的ep模块(execution provider),是对设备行为的封装,ep模块为多硬件设备提供了更高层次的抽象,方便oneflow支持和兼容多硬件设备提供了更高的灵活性和可拓展性
oneflow::ep::Device不仅提供了表示设备类型的device_type()方法、表示设备编号的device_index()方法,还提供了创建/销毁ep::Stream、创建/销毁Event、在设备上申请/释放内存的各种方法。
oneflow/core/ep/include/device.h
classDevice {public: OF_DISALLOW_COPY_AND_MOVE(Device); Device() = default;virtual ~Device() = default;virtualvoidSetAsActiveDevice()= 0;virtual DeviceType device_type()const= 0;virtual size_t device_index()const= 0;virtual DeviceManager* device_manager()const= 0;virtual Stream* CreateStream()= 0;virtualvoidDestroyStream(Stream* stream)= 0;virtual Event* CreateEvent();virtualvoidDestroyEvent(Event* event);virtualvoidCreateEvents(Event** events, size_t count)= 0;virtualvoidDestroyEvents(Event** events, size_t count)= 0;virtual Maybe<void> Alloc(const AllocationOptions& options, void** ptr, size_t size) = 0;virtualvoidFree(const AllocationOptions& options, void* ptr)= 0;virtual Maybe<void> AllocPinned(const AllocationOptions& options, void** ptr, size_t size) = 0;virtualvoidFreePinned(const AllocationOptions& options, void* ptr)= 0;virtualboolIsStreamOrderedMemoryAllocationSupported()const;};
oneflow::ep::Device有如下子类实现:

Stream相关类型

oneflow::Stream和cuda device以及stream的关系类似,oneflow中也存在类似的基础Stream类型。

oneflow/core/framework/stream.h
classStreamfinal { ....private: Stream(Symbol<Device> device, StreamType stream_type, size_t thread_uid);static Maybe<Symbol<Stream>> RawNew(Symbol<Device> device, StreamType stream_type,size_t thread_uid); Maybe<void> Init(size_t unique_stream_id); Symbol<Device> device_; StreamType stream_type_;size_t thread_uid_;size_t unique_stream_id_;};
可以看见Stream类中的成员变量:
  • device_  表示该Stream对象将在何种设备上执行
  • streamtype_  表示该Stream的类型,是用于计算的compute stream还是用于数据搬运的host2device、device2host stream等
  • threaduid_ 表示负责启动该Stream的线程id
  • unique_streamid_ 表示这个stream自身的unique id

StreamType

DeviceType分为kCpu和kCuda类似,Stream也有各种类型之分,具体如下:
oneflow/core/common/stream_type.h
enumclassStreamType { kInvalid = 0, // 无效 kCompute, // kernel计算流 kHost2Device, // 数据搬运(host -> device)流 kDevice2Host, // 数据搬运(device -> host)流 kCcl, // 集合通信流 kBarrier, // 线程屏障流 kCriticalSection,// 临界区流 kLazyJobLauncher,// job启动流(lazy mode) kPinnedCompute // pinned memory kernel计算流};

oneflow::ep::Stream

oneflow中的ep模块提供了一个更高层次的对Stream的抽象类,除了可以获取设备的device()、获取设备类型的device_type()方法外,还提供了一系列虚方法如:
  • 同步Sync()
  • 执行Event事件RecordEvent()
oneflow/core/ep/include/stream.h
classStream {public: OF_DISALLOW_COPY_AND_MOVE(Stream); Stream() = default;virtual ~Stream() = default;virtual DeviceType device_type()const= 0;virtual Device* device()const= 0;virtual Maybe<void> Sync() = 0;virtualvoidRecordEvent(Event* event)= 0;virtual Maybe<void> GetAsyncError() { return Maybe<void>::Ok(); }virtual Maybe<void> AllocAsync(void** ptr, size_t size) { UNIMPLEMENTED_THEN_RETURN(); }virtual Maybe<void> FreeAsync(void* ptr) { UNIMPLEMENTED_THEN_RETURN(); }template<typename T> Maybe<void> AllocAsync(T** ptr, size_t size) {return AllocAsync(reinterpret_cast<void**>(ptr), size); }virtual Maybe<void> OnExecutionContextSetup() { return Maybe<void>::Ok(); }virtual Maybe<void> OnExecutionContextTeardown() { return Maybe<void>::Ok(); }template<typename T>T* As(){returnstatic_cast<T*>(this); }};
oneflow::ep::Stream有如下子类实现:

oneflow::vm::Stream

oneflow vm(virtual machine)中的oneflow::vm::Stream类型,用于vm内部维护stream极其依赖关系、StreamPolicy、调度线程等。
oneflow/core/vm/stream.h
classStreamfinal :public intrusive::Base {public: ...private: ...// fields ThreadCtx* thread_ctx_; Symbol<Device> device_; StreamType stream_type_;std::shared_ptr<StreamPolicy> stream_policy_;bool on_scheduler_thread_;std::unique_ptr<char, std::function<void(char*)>> small_pinned_mem_ptr_; ...};

StreamPolicy

StreamPolicy是oneflow vm中独有的概念,提供了一系列虚方法如:
  • stream() 获取oneflow::ep::Stream指针
  • mut_allocator() 获取vm::Allocator指针(用于tensor内存管理)
  • device_type() 获取device设备类型
除此之外,提供了一系列vm相关的指令状态初始化、查询、删除等方法。
oneflow/core/vm/stream_policy.h
classStreamPolicy {public:virtual ~StreamPolicy() = default;virtual ep::Stream* stream()= 0;virtual vm::Allocator* mut_allocator()= 0;virtual DeviceType device_type()const= 0;virtualvoidInitInstructionStatus(const Stream& stream, InstructionStatusBuffer* status_buffer)const = 0;virtualvoidDeleteInstructionStatus(const Stream& stream, InstructionStatusBuffer* status_buffer)const = 0;virtualboolQueryInstructionStatusLaunched(const Stream& stream, const InstructionStatusBuffer& status_buffer)const = 0;virtualboolQueryInstructionStatusDone(const Stream& stream,const InstructionStatusBuffer& status_buffer)const = 0;virtualboolOnSchedulerThread(StreamType stream_type)const;virtualboolSupportingTransportInstructions()const= 0;voidRunIf(Instruction* instruction)const;protected: StreamPolicy() = default;private:virtualvoidRun(Instruction* instruction)const= 0;};
StreamPolicy有如下子类实现:

2

Eager Local模式下的Device和Stream推导

下面,梳理一下普通的eager模式(eager local mode)下,算子执行全过程中device和stream相关的推导流程。

2.1 推导Device

首先,对于一个算子(op)来说,要为其设置一个默认的device用于实际计算,这一步在:
Symbol<Device> default_device = JUST(GetDefaultDevice(inputs, ctx))
这里GetDefaultDevice的逻辑是:
  • 1.如果inputs tensor非空,则根据第一个input tensor的device来设置default的device
  • 2.如果inputs tensor为空,则优先从OpExprInterpContext中获取device,若OpExprInterpContext中未设置,则会通过
    Device::New("cpu")
    ;默认给一个cpu device
值得说明的是,在1.种情况时,如果input tensor创建时指定了device为cuda设备,则这里推导出的default device同样为相同的cuda device;如果未显示指定,则默认还是cpu device。

2.2 推导Stream

oneflow::Stream的推导主要在:
  • JUST(user_op_expr.mut_local_tensor_infer_cache()->GetOrInfer(infer_args))
    );
    Symbol<Stream> stream = JUST(InferDeviceAndStream(...));

InferDeviceAndStream
中,Stream推导的逻辑是会根据user_op_expr是否定义了device_and_stream_infer_fn而有所区别
  • (少数情况)如果该op定义了推导函数,则调用此推导函数来推导Stream,例如 tensor.cuda()法,inputs 在 CPU 上, outputs 在 CUDA,二者的设备类型不同。这时就不会默认推导而是利用 op 注册的推导函数获取 oneflow::Stream(
    例如 CopyOp::InferDeviceAndStream
  • (多数情况)否则会通过
    stream = JUST(GetDefaultStreamByDevice(default_device))
    ;来推导。
GetDefaultStreamByDevice的具体实现:
Maybe<Symbol<Stream>> RawGetDefaultStreamByDevice(Symbol<Device> device) { return Stream::New(device, StreamType::kCompute);}
可以看见,根据传入的deviceStreamType::kCompute,new了一个oneflow::Stream

2.3 InstructionsBuilder::Call和vm::Stream推导

在上述device和stream推导完成后,会通过InstructionsBuilder调用Call方法
JUST(PhysicalRun([&](InstructionsBuilder* builder) -> Maybe<void> { return builder->Call(kernel, std::move(input_eager_blob_objects), std::move(output_eager_blob_objects), ctx, result->stream());}));
Call方法中会通过
  • JUST(SoftSyncStream(output_eager_blob_objects, stream));
  • JUST(SoftSyncStream(input_eager_blob_objects, stream));
  • auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream));
完成outputs inputs tensor的流同步(SoftSyncStream)过程以及vm::Stream的推导,然后通过构造OpCallInstructionPolicy指令派发至vm执行。
SoftSyncStream的同步这里省略,具体过程见第4节。

2.3.1 构造 ThreadCtx 对象,启动执行指令的线程

ThreadCtx 对象指针保存在 VirtualMachine 的 HashMap中。每个 DeviceType(CPU或CUDA)对应一个 ThreadCtx 对象;临界区和 LazyJob有自己的 ThreadCtx 对象
首次访问 HashMap时得到的是零值(空指针),需要调用 CreateThreadCtx创建对象。实际通过虚拟机指令创建对象,ThreadCtx 对象保存在 VirtualMachineEngine::thread_ctx_list_ 中
ThreadCtx 对象构造后,会创建一个 worker 线程、执行 WorkerLoop 方法,并添加到 worker_threads_。所以 worker_threads_ 是与 ThreadCtx 对象一一对应的。
这个线程负责其所归属的指令的执行:
  • WorkerLoop 在收到通知后,会调用 ThreadCtx::TryReceiveAndRun处理指令。
  • 在这个函数中,将 ThreadCtx 的指令挪到临时列表、通过 StreamPolicy 执行每个指令
  • ThreadCtx 的指令,是 VirtualMachineEngine 在 DispatchInstruction 时添加进去的。
ThreadCtx创建完成后,将持有vm::Stream对象oneflow::vm::Streamoneflow::Stream的数量是一一对应的,vm::Stream 按照<DeviceType, StreamRole>分组存储在对应的 ThreadCtx 中
vm::Stream的推导流程细节如下:
  • auto* vm_stream = JUST(Singleton<VirtualMachine>::Get()->GetVmStream(stream));
    • VirtualMachine::GetVmStream()
    • Maybe<vm::Stream*>      VirtualMachine::CreateStream(Symbol<Stream> stream)
    • Stream::__Init__(ThreadCtx* thread_ctx, Symbol<Device> device, StreamType stream_type...)
    • stream_policy_ = CHECK_JUST(CreateStreamPolicy::Visit(stream_type, device));

2.4 执行OpCall指令和ep::Stream推导

有几个场景会创建(获取) ep::Stream 对象。比如 kernel 执行时。
OpCall指令在构造时,指令策略类型是 OpCallInstructionPolicy。虚拟机在 DispatchInstruction 时,无论哪个分支,后续都会调用 EpStreamType::Run,最终通过
  • EpStreamPolicyBase::Run()
    • instruction->Compute()
    • OpCallInstructionPolicy::Compute()
    • OpCallInstructionUtil::Compute()
    • OpCallInstructionUtil::OpKernelCompute()
    • op_call_instruction_policy->mut_opkernel()->Compute()执行 kernel 的 Compute 方法
例如 GpuL2NormalizeKernel::Compute,最终在其kernel的Compute方法中,会通过ctx->stream()创建(获取)ep::Stream 对象,launch kernel 执行计算。

2.4.1 获取/创建ep::Stream

下面,我们重点看一下OpCall指令实际执行时,调用的OpCallInstructionUtil::Compute()方法:
static inline Maybe<void> Compute(OpCallInstructionPolicy* op_call_instruction_policy, Instruction* instruction) { Allocator* allocator = instruction->mut_stream()->mut_stream_policy()->mut_allocator(); JUST(AllocateOutputBlobsMemory(op_call_instruction_policy, allocator, instruction));if (unlikely(op_call_instruction_policy->need_temp_storage())) { JUST(TryAllocateTempStorage(op_call_instruction_policy, allocator)); } ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream(); user_op::OpKernelState* state = nullptr; user_op::OpKernelCache* cache = nullptr;if (op_call_instruction_policy->user_opkernel()->has_state_or_cache()) { TryInitOpKernelStateAndCache(op_call_instruction_policy, stream, &state, &cache); } OpKernelCompute(op_call_instruction_policy, stream, state, cache);if (unlikely(op_call_instruction_policy->need_temp_storage())) { DeallocateTempStorage(op_call_instruction_policy, allocator); }return Maybe<void>::Ok(); }
其中会通过ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream();完成ep::Stream的推导,之后在OpKernelCompute()方法中实际完成op/kernel的执行。
ep::Stream* stream = instruction->mut_stream()->mut_stream_policy()->stream();
  • ep::Stream* stream() override { return GetOrCreateEpStream(); }
    • GetOrCreateEpStream()
    • ep_stream_ = GetOrCreateEpDevice()->CreateStream();
这里->stream()会调用ep_stream_policy_base.h中的:
ep::Stream* stream() override { return GetOrCreateEpStream(); }
这是一个private方法:
private: ep::Stream* GetOrCreateEpStream()const{if (unlikely(ep_stream_ == nullptr)) { ep_stream_ = GetOrCreateEpDevice()->CreateStream(); CHECK(ep_stream_ != nullptr); }return ep_stream_; }
可以看到,如果成员变量ep_stream_非空,则直接返回;否则,通过 ep_stream_ = GetOrCreateEpDevice()->CreateStream(); 来创建创建ep::Stream

2.4.2 获取/创建ep::Device

而这里的GetOrCreateEpDevice方法如下:
ep::Device* GetOrCreateEpDevice() const {if (unlikely(ep_device_ == nullptr)) { ep_device_ = Singleton<ep::DeviceManagerRegistry>::Get()->GetDevice(device_->enum_type(), device_->device_id()); CHECK(ep_device_); }return ep_device_.get(); }
根据oneflow::Device中拿到的device id和device type,去全局单例的ep::DeviceManagerRegistry中取出对应的oneflow::ep::Device
oneflow::vm::StreamPolicy和oneflow::vm::EpStreamPolicy推导
  • stream_policy_ = CHECK_JUST(CreateStreamPolicy::Visit(stream_type, device));
    • std::shared_ptr<vm::StreamPolicy>(new vm::EpStreamPolicy(device));

3

Eager Global模式下的Device和Stream推导

eager global模式下,device信息隐藏在placement中,placement不仅包括了device type信息还包括其tensor具体分布在哪些ranks上的信息,placement 在 C++ 中的对应类型是 ParallelDesc
所以device以及stream的部分推导过程和eager local模式下有所区别,但OpCall指令执行;device、vm::Stream和ep::Stream的推导过程都和eager local模式下是类似的。

3.1 推导Device

3.1.1 placement 的 parallel_id

oneflow中的placement表示tensor存放的设备集群(device group),如:
p = flow.placement(type="cuda", ranks=[0, 1, 2, 3])
表示tensor分布于1台机器上,cuda device 0、1、2、3四个设备上;
p = flow.placement(type="cuda", ranks=[[0, 1], [2, 3]])
则表示tensor分布于2台机器上,host1的device0、1以及host2的device2、3。
在 oneflow 的分布式环境下,各个 host 上需要有相同数量的device,每个进程使用一个device。这样根据环境变量 RANK 可以得出 machine_id,LOCAL_RANK 就是进程在 制定host 上的 rank序号。
如果 input tensor 的 placement 与当前进程无关,可以省掉很多不必要的计算。通过 placement 的 parallel_id 可以判断计算任务是否与当前进程相关。
placement 在 C++ 中的对应类型是 ParallelDesc,其中并没有 parallel_id 字段,这个信息隐含在其它字段中。
ParallelDesc 在构造时会调用 ClearUp 函数,从中可以看到
  • ParallelDesc::parallel_id2machine_id_ 是 placement 分布的 machine。
  • ParallelDesc::parallel_id2device_id_ 是 placement 分布的 device_id。
  • parallel_id 是上述 2 个数组的索引,一个 parallel_id 对应一个 machine_id:device_id 组合。这样,根据parallel_id可以查到对应的 machine_id 和 device_id。
  • 反过来,根据 machine_id:device_id 也可以从 machine_id2device_id2parallel_id_ 查到 parallel_id。

3.1.2 eager 模式下根据 parallel_id 忽略无关计算任务

在 eager 分布时场景处理计算任务时,会调用GetTensorDevice4CurrentProcessCtx推导得到输出tensor的device,以及获取当前进程的 machine_id、device_id 在 placement 中的 parallel_id 值。

如果当前进程与该 placement 无关,parallel_id 就是空,后续处理时就可以忽略一些计算:
  • EagerGlobalTensorImpl::New 中只需要用 functional::Empty 构造一个 shape 为 0 的空的 tensor。
  • GetBoxingOutput计算时,如果parallel_id为空则表示当前rank进程无效,无需计算直接返回。
  • Interpret 可以不给 vm 提交指令、提前返回

3.2 推导Stream

ConsistentTensorInferCache 中推导 SBP Signature 时,也会同时推导出当前的 tensor 计算任务、在当前进程所用的device。推导时,会先确认所有 inputs 的 placement 是一致的,都分布在相同的device上。如前所述,如果计算任务与当前进程无关,会提前返回;而一个进程只使用一个device。
这里和eager local模式下stream的推导类似,通过JUST(InferDeviceAndStream(user_op_expr, infer_args))推导出oneflow::Stream对象,StreamRole 是 kCompute。区别在于eager global模式下

3.2.1 unique_stream_id

unique_stream_id 表示 oneflow::Stream 对象的创建次序。
所有的 oneflow::Stream 对象都保存在全局的 StreamMgr::stream2unique_stream_id_ 中unique_stream_id2stream_symbol_ 可看作是引用类型的副本,unique_stream_id 就是 Stream 对象在这个数组中的索引。与 parallel_id 不同,unique_stream_id 是 Stream 对象在进程内的唯一标识。
并不是每次都需要加锁访问 StreamMgroneflow::Stream 包含的都是描述性信息,其引用是以 ThreadLocal 的方式存储的,可以提升后续读取的效率。虚拟机在执行指令时,也会用 unique_stream_id 进行逻辑判断。

4

Eager模式下的Stream同步——SoftSyncStream

设想以下场景:将 CPU 下的 tensor 拷贝到 CUDA 设备,然后在 CUDA 上再进行 tensor add 的计算。这涉及到两个流,一个是 Host2Device,一个是 CUDA Compute。这两个流的计算任务是并发执行的。需要有同步措施,才能保证拷贝完再执行 add 计算。
Eager 模式下,在 InstructionsBuilder::Call 中构造指令时,对 SoftSyncStream的调用会在必要时向指令列表插入同步指令。
SoftSyncStream 中,几个重要概念:
  • tensor在oneflow内存中的实际承载者是 eager_blob_object
  • last_used_stream 表示一个tensor(blob)上一次使用到的stream,可能是compute stream、h2d stream、d2h stream、集合通信ccl stream等
  • 如果 last_used_stream 与当前计算执行的流 stream 相同,则可以忽略,因为相同stream间天然顺序执行所以无需同步,否则就需要进行后续的同步处理
SoftSyncStream代码如下:
Maybe<void> InstructionsBuilder::SoftSyncStream(const vm::EagerBlobObjectList& eager_blob_objects, Symbol<Stream> stream) { JUST(ForEachEagerBlobObjectsNeedingSoftSync( eager_blob_objects, stream, [&](Symbol<Stream> last_used_stream, auto&& dep_objects) -> Maybe<void> {return SoftSyncStreamBetween(std::move(dep_objects), last_used_stream, stream); }));for (constauto& eager_blob_object : eager_blob_objects) { eager_blob_object->set_last_used_stream(stream); }return Maybe<void>::Ok();}
主体逻辑是,会在ForEachEagerBlobObjectsNeedingSoftSync方法中遍历每一个tensor对象(eager blob object),对于每一个需要同步的blob运用lambda方法并最终调用SoftSyncStreamBetween完成stream间的同步。
这里,我们看一下ForEachEagerBlobObjectsNeedingSoftSync的逻辑:
首先if/else的主体业务逻辑是类似的,主要区别在于,当blob的size <= kOpArgsReservedSize时(默认为4)会使用small vector来存放LocalDepObject变量,效率会更快一些(否则会走到else分支,主体逻辑类似,这里就不看了)。
  • const auto& opt_last_used_stream = eager_blob_object->last_used_stream()
    ;
  • if (unlikely(!opt_last_used_stream.has_value())) { continue; }
这两句是查询该tensor(blob)上一次被使用时用到的stream——last_used_stream,如果为空,则直接continue跳过,因为如果此tensor之前并未被任何stream使用,则无需进行stream间的同步操作,因为在当前stream上不会有关于该tensor的其他依赖关系;
if (last_used_stream != stream) { small_vector<intrusive::shared_ptr<LocalDepObject>, kOpArgsReservedSize> dep_objects{ intrusive::shared_ptr<LocalDepObject>( JUST(eager_blob_object->compute_local_dep_object()))}; JUST(DoEach(last_used_stream, std::move(dep_objects))); }
如果last_used_stream!=stream则表示需要在两个stream间进行同步,则会应用传入的lambda函数DoEach进行处理,在这里lambda函数即:
[&](Symbol<Stream> last_used_stream, auto&& dep_objects) -> Maybe<void> { return SoftSyncStreamBetween(std::move(dep_objects), last_used_stream, stream); }));
既实际调用的是SoftSyncStreamBetween来完成实际的stream间同步,这里主要有3个变量:
  • dep_objects
    存储了tensor间的依赖关系
  • last_used_stream
    则是该tensor上一次使用的stream
  • stream
    该tensor当前使用的stream
SoftSyncStreamBetween的代码如下:
Maybe<void> InstructionsBuilder::SoftSyncStreamBetween( small_vector<intrusive::shared_ptr<LocalDepObject>, kOpArgsReservedSize>&& dependences, Symbol<Stream> from_stream, Symbol<Stream> to_stream) { CHECK(from_stream != to_stream) << "synchronization is unnecessary";if (SupportingStreamWait(from_stream, to_stream)) { JUST(StreamWait(std::move(dependences), from_stream, to_stream)); } else { JUST(RecordEvent(std::move(dependences), from_stream)); }return Maybe<void>::Ok();}
SoftSyncStreamBetween的主要逻辑如下:
  • 先额外做了一次check,检测如果待同步的两个stream相同,则check会报错并提示"synchronization is unnecessary"
  • 通过SupportingStreamWait判断from 和 to stream间是否支持stream wait,是则调用StreamWait方法;否则,直接调用RecordEvent方法
  • SupportingStreamWait的主要逻辑是,通过stream的device、以及StreamType的Visit方法来判断。简单来说,如果from 和 to stream之间是不同的device(譬如cpu stream <-> cuda stream之间的同步),或者from stream的device为cpu,则SupportingStreamWait一定是false;如果是相同的,则继续通过其他判断条件进行判断。

SupportingStreamWait为True

SupportingStreamWait为True时,即from to stream同为Cuda Stream间的同步情况,在这种情况下会走到StreamWait的函数,该函数最终会派发一个StreamWaitEventInstructionPolicy的指令给vm执行,StreamWaitEventInstructionPolicy的执行逻辑主要是两个cuda event:
  • cudaEventRecord
  • cudaStreamWaitEvent
  • 对于from_stream来说,插入一个
    cudaEventRecord
    ,用于标志from stream是否完成该stream上的event事件;
  • 对于to_stream来说,插入一个
    cudaStreamWaitEvent
    等待from stream上的事件完成后,再继续执行to_stream。

SupportingStreamWait为False

SupportingStreamWait为False时,会直接调用JUST(RecordEvent(std::move(dependences), from_stream)); 其内部实现会从对象池中获取可复用的cuda event对象并执行event。
这里有个细节,由于cuda event的创建和销毁都会引发cuda kernel的launch由异步转同步,所以基于对象池的cuda event可以避免这个开销。
实际上最终调用的还是cudaEventRecordcudaEventRecord本身只是起到一个“占位符”的作用,并不能起到(保证该stream上其他kernel全部执行完)的作用,真正能保证stream同步作用的是oneflow vm(vitual machine)控制下的指令间依赖关系/执行顺序。

5

CPU 下的并行计算

CpuStream 只有一个线程。CPU kernel 应该是通过 OpenMP 或者 Intel OneApi 等实现并行计算加速。

参考资料

1.https://github.com/Oneflow-Inc/oneflow/tree/845595e2c0abc3d384ff047e188295afdc41faaa

其他人都在看

试用OneFlow: github.com/Oneflow-Inc/oneflow/


继续阅读
阅读原文