openYuanrong进阶指南——使用有状态函数作为全局信号站
openYuanrong 官网官网gitcode仓库仓库使用有状态函数作为全局信号站在分布式系统中由于不同的任务可能运行在不同的物理节点上Python 原生的 asyncio.Event 无法跨进程工作。通过 openYuanrong 有状态函数我们可以创建一个全局共享的信号站让成百上千个分布式任务同时“监听”某一个事件的发生。核心原理有状态函数作为中心节点维护一个真正的 asyncio.Event 对象。方法订阅其他任务调用有状态函数的 wait() 方法该方法会 await 内部的事件。广播触发当某个任务调用有状态函数的 set() 时所有正在 await 的任务都会被同时唤醒。场景同步启动Barrier Synchronization比如在分布式训练中确保所有节点都加载完模型后再同时开始训练。配置更新当配置发生变化时通过 Event 通知所有运行中的有状态函数重新加载配置。依赖触发任务 B 必须等任务 A 的某个中间步骤完成后才能继续但任务 A 并没有结束无法用yr.get()结果作为触发点。使用示例importyrimportasyncioimporttime# 1. 定义分布式事件中心yr.instanceclassSharedEvent:def__init__(self):# 核心使用 asyncio.Event 驱动异步非阻塞等待self.eventasyncio.Event()asyncdefwait(self):分布式任务在此挂起等待print(信号中心收到一个等待请求...)awaitself.event.wait()returnSIGNAL_RECEIVEDasyncdefset(self):触发事件所有等待的任务将同步启动self.event.set()print(信号中心广播信号已发出)asyncdefclear(self):重置信号以便进行下一轮同步self.event.clear()# 2. 定义分布式 Workeryr.invokedefheavy_worker(worker_id,event_actor):print(fWorker{worker_id}: 正在准备基础环境如加载模型...)time.sleep(1)# 模拟准备工作print(fWorker{worker_id}: 准备就绪阻塞等待全局启动信号...)# 这里会通过网络向 Actor 发起异步等待yr.get(event_actor.wait.invoke())print(fWorker{worker_id}: 收到信号开始执行并行任务)returnfWorker{worker_id}Success# --- 3. 执行流程 ---yr.init()# 创建全局唯一的信号中心event_centerSharedEvent.invoke()# 启动 5 个分布在集群各处的 Workerworker_idsrange(5)futures[heavy_worker.invoke(i,event_center)foriinworker_ids]print(\n--- 主程序等待 3 秒后统一放行 ---)time.sleep(3)# 触发信号唤醒所有阻塞在 wait() 处的 Workerevent_center.set.invoke()# 查看执行结果resultsyr.get(futures)print(\n所有任务执行完毕:,results)yr.finalize()

相关新闻