Ray
Ray的介绍
Hello,Ray
import ray
import time
# Start Ray.
ray.init()
@ray.remote
def f(x):
time.sleep(1)
return x
# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
result_ids.append(f.remote(i))
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_ids) # [0, 1, 2, 3]以上代码演示了Ray如何将一个普通函数变成一个可以并行运行在多核或多机上的远程任务。Ray的工作流程不是线性的,分为任务分发和结果收集两个阶段:
ray.init():这个命令是启动Ray的引擎,初始化了Ray的运行时,包括任务调度器和本地工作进程。@ray.remote:这是代码的核心,这个装饰器告诉 Ray:“不要像普通函数一样执行f(x)。当这个函数被调用时,把它当作一个远程任务,发送给 Ray 的工作进程去执行。
1.任务分发:
for i in range(4): 循环执行了 4 次。但这里的调用是 f.remote(i),而不是 f(i)
f.remote()是一个非阻塞的调用。当你的程序执行到这一行时,它会立刻把f(i)这个任务扔给 Ray,然后继续往下执行,它不会等待任务完成f.remote()不会返回最终结果,它会返回一个未来对象(Future Object),也就是result_id。这个对象只是一个占位符,代表着某个任务的最终结果
因此,for循环几乎是瞬间完成,程序在不到1ms的时间里,把4个任务全部扔给了Ray,然后Ray在后台并行地执行。
2.结果收集:
results = ray.get(result_ids) 这是 f.remote() 调用的另一半。ray.get() 是一个阻塞的调用
它告诉你的程序:“等等。停在这里,直到所有
result_ids对应的任务都执行完成,并且结果都可用时,你才能继续往下走。因为 Ray 在后台并行地运行了这 4 个任务,每个任务需要 1 秒钟。在拥有足够核心的情况下,它们会同时开始和结束。所以,
ray.get()只会等待大约 1 秒钟,而不是 4 秒钟。
License:
CC BY 4.0