Skip to content

🧠 WorldContext

tongsim.core.async_loop

core.async_loop

本模块定义了 AsyncLoop 类,每个 AsyncLoop 对象封装一个独立线程中的 asyncio 事件循环, 用于将异步任务封装在单个线程内运行。

核心: 1、将一个虚拟环境输入(基于Connection) 相关的所有协程派发到同一个线程(AsyncLoop 维护)上来降低并发安全的问题 (注意: 在一个Loop内的协程 仍然可能存在并发安全问题) 2、性能出现瓶颈时,考虑把计算密集性的处理逻辑 offload 到专用的计算线程池来解决。

AsyncLoop

AsyncLoop 管理一个独立线程中的 asyncio 事件循环和永久 TaskGroup。

特性: - 独立后台线程,常驻 EventLoop - 基于 asyncio.TaskGroup 管理所有业务任务

Source code in src\tongsim\core\async_loop.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class AsyncLoop:
    """
    AsyncLoop 管理一个独立线程中的 asyncio 事件循环和永久 TaskGroup。

    特性:
    - 独立后台线程,常驻 EventLoop
    - 基于 asyncio.TaskGroup 管理所有业务任务
    """

    def __init__(self, name: str = "AsyncLoop") -> None:
        """
        初始化 AsyncLoop。

        Args:
            name: 线程和日志标识名,便于调试多个 Loop 实例。
        """
        self._name = name
        self._loop: asyncio.AbstractEventLoop | None = None
        self._thread: threading.Thread | None = None
        self._group_ready = threading.Event()
        self._main_task: asyncio.Task[Any] | None = None
        self._task_group: asyncio.TaskGroup | None = None
        self._business_tasks: set[asyncio.Task[Any]] = (
            set()
        )  # 记录业务 spawn 出来的 task

    @property
    def thread(self) -> threading.Thread:
        return self._thread

    @property
    def loop(self) -> asyncio.AbstractEventLoop:
        return self._loop

    @property
    def name(self) -> str:
        return self._name

    def start(self, timeout: float = 1.0) -> None:
        """
        启动 AsyncLoop 后台线程和事件循环。

        Args:
            timeout: 等待 loop 和 task group 启动的最长秒数。
        Raises:
            RuntimeError: 如果 loop 未能在指定时间内启动。
        """
        if self.is_running():
            raise RuntimeError(f"[AsyncLoop {self._name}] already running.")

        self._thread = threading.Thread(target=self._run, name=self._name, daemon=True)
        self._thread.start()

        if not self._group_ready.wait(timeout):
            raise RuntimeError(f"[AsyncLoop {self._name}] timeout starting event loop.")

        _logger.debug(f"[AsyncLoop {self._name}] started.")

    def _run(self) -> None:
        """后台线程入口: 初始化事件循环和主任务。"""
        self._loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self._loop)  # 将事件循环绑定到当前线程
        self._main_task = self._loop.create_task(self._main(), name="__main_task__")
        try:
            self._loop.run_forever()
        finally:
            self._loop.close()
            _logger.debug(f"[AsyncLoop {self._name}] loop closed.")

    async def _main(self) -> None:
        """
        主协程,在 TaskGroup 上下文中挂起,直至被取消。
        """
        try:
            async with asyncio.TaskGroup() as tg:
                self._task_group = tg
                self._group_ready.set()
                await asyncio.Future()  # 永久挂起,靠 cancel 推出
        except asyncio.CancelledError:
            _logger.debug(
                f"[AsyncLoop {self._name}] main task cancelled; shutting down TaskGroup."
            )
        finally:
            assert self._loop is not None
            self._loop.call_soon_threadsafe(self._loop.stop)

    def spawn(self, coro: Awaitable[Any], name: str = "") -> Future[Any]:
        """
        在 TaskGroup 中提交一个新的异步任务。

        Args:
            coro: 待执行的 coroutine。
            name: 可选,任务名称,用于日志追踪。

        Returns:
            Future,可通过 .result(timeout) 获取 coroutine 返回值或异常。
        """
        if not (self._loop and self._task_group):
            raise RuntimeError(f"[AsyncLoop {self._name}] not started.")

        outer: Future[Any] = Future()

        def _schedule() -> None:
            task: asyncio.Task[Any] = self._task_group.create_task(coro, name=name)
            self._business_tasks.add(task)

            def _on_done(t: asyncio.Task[Any]) -> None:
                self._business_tasks.discard(t)
                if t.cancelled():
                    outer.cancel()
                else:
                    exc = t.exception()
                    if exc:
                        _logger.exception(
                            f"[AsyncLoop {self._name}] Task {name!r} raised: {exc}"
                        )
                        outer.set_exception(exc)
                        # 业务异常直接取消整个主 TaskGroup
                        assert self._main_task is not None
                        self._main_task.cancel()
                    else:
                        outer.set_result(t.result())

            task.add_done_callback(_on_done)

        self._loop.call_soon_threadsafe(_schedule)
        return outer

    def cancel_tasks(self, timeout: float) -> None:
        """
        取消所有业务协程任务(spawn 出来的 task,不包括主协程)。

        Args:
            timeout: 最长等待取消完成的秒数。
        """
        if not self.is_running():
            return

        future = asyncio.run_coroutine_threadsafe(self._cancel_tasks_seq(), self._loop)
        try:
            future.result(timeout)
        except FutureTimeoutError:
            _logger.warning(f"[AsyncLoop {self._name}] cancel_tasks timeout.")

    async def _cancel_tasks_seq(self) -> None:
        """
        在 loop 线程中取消所有已 spawn 的业务任务。
        """
        _logger.debug(
            f"[AsyncLoop {self._name}] cancelling {len(self._business_tasks)} business task(s)."
        )
        if not self._business_tasks:
            return

        tasks = list(self._business_tasks)
        for task in tasks:
            task.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)
        self._business_tasks.clear()

    def stop(self, timeout: float = 5.0) -> None:
        """
        完全停止 AsyncLoop:
        取消所有业务任务 -> 取消主 task -> 停止 loop -> join 线程。

        Args:
            timeout: 等待整个关闭过程的最长秒数。
        """
        if not self.is_running():
            return

        # 取消主协程,TaskGroup 会自动取消所有子任务
        assert self._main_task is not None and self._loop is not None
        self._loop.call_soon_threadsafe(self._main_task.cancel)

        self._thread.join(timeout)
        if self._thread.is_alive():
            _logger.warning(f"AsyncLoop '{self._name}' did not exit cleanly.")
        self._thread = None

    def is_running(self) -> bool:
        """
        判断 AsyncLoop 是否仍在运行。

        Returns:
            True if loop thread alive, else False.
        """
        return bool(self._thread and self._thread.is_alive())

    def log_task_list(self) -> None:
        """
        打印当前 loop 内所有未完成的任务信息,便于调试。
        """
        if not (self._loop and self._task_group):
            return
        task_list = asyncio.all_tasks(self._loop)
        _logger.warning(f"[AsyncLoop {self._name}] {len(task_list)} active task(s):")
        for task in task_list:
            state = (
                "cancelled"
                if task.cancelled()
                else "done" if task.done() else "pending"
            )
            detail = ""
            if task.done() and (exc := task.exception()):
                detail = f"  exception: {type(exc).__name__}: {exc}"
            coro = task.get_coro()
            _logger.warning(
                f"  - {task.get_name()} [{state}]{detail} | coro={coro.__name__ if hasattr(coro, '__name__') else coro}"
            )

    def __del__(self) -> None:
        """析构时自动释放资源。"""
        _logger.debug(f"[AsyncLoop {self._name}] __del__ called, attempting cleanup.")
        with contextlib.suppress(Exception):
            self.stop()

start

start(timeout: float = 1.0) -> None

启动 AsyncLoop 后台线程和事件循环。

Parameters:

Name Type Description Default
timeout float

等待 loop 和 task group 启动的最长秒数。

1.0

Raises: RuntimeError: 如果 loop 未能在指定时间内启动。

Source code in src\tongsim\core\async_loop.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def start(self, timeout: float = 1.0) -> None:
    """
    启动 AsyncLoop 后台线程和事件循环。

    Args:
        timeout: 等待 loop 和 task group 启动的最长秒数。
    Raises:
        RuntimeError: 如果 loop 未能在指定时间内启动。
    """
    if self.is_running():
        raise RuntimeError(f"[AsyncLoop {self._name}] already running.")

    self._thread = threading.Thread(target=self._run, name=self._name, daemon=True)
    self._thread.start()

    if not self._group_ready.wait(timeout):
        raise RuntimeError(f"[AsyncLoop {self._name}] timeout starting event loop.")

    _logger.debug(f"[AsyncLoop {self._name}] started.")

spawn

spawn(coro: Awaitable[Any], name: str = '') -> Future[Any]

在 TaskGroup 中提交一个新的异步任务。

Parameters:

Name Type Description Default
coro Awaitable[Any]

待执行的 coroutine。

required
name str

可选,任务名称,用于日志追踪。

''

Returns:

Type Description
Future[Any]

Future,可通过 .result(timeout) 获取 coroutine 返回值或异常。

Source code in src\tongsim\core\async_loop.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def spawn(self, coro: Awaitable[Any], name: str = "") -> Future[Any]:
    """
    在 TaskGroup 中提交一个新的异步任务。

    Args:
        coro: 待执行的 coroutine。
        name: 可选,任务名称,用于日志追踪。

    Returns:
        Future,可通过 .result(timeout) 获取 coroutine 返回值或异常。
    """
    if not (self._loop and self._task_group):
        raise RuntimeError(f"[AsyncLoop {self._name}] not started.")

    outer: Future[Any] = Future()

    def _schedule() -> None:
        task: asyncio.Task[Any] = self._task_group.create_task(coro, name=name)
        self._business_tasks.add(task)

        def _on_done(t: asyncio.Task[Any]) -> None:
            self._business_tasks.discard(t)
            if t.cancelled():
                outer.cancel()
            else:
                exc = t.exception()
                if exc:
                    _logger.exception(
                        f"[AsyncLoop {self._name}] Task {name!r} raised: {exc}"
                    )
                    outer.set_exception(exc)
                    # 业务异常直接取消整个主 TaskGroup
                    assert self._main_task is not None
                    self._main_task.cancel()
                else:
                    outer.set_result(t.result())

        task.add_done_callback(_on_done)

    self._loop.call_soon_threadsafe(_schedule)
    return outer

cancel_tasks

cancel_tasks(timeout: float) -> None

取消所有业务协程任务(spawn 出来的 task,不包括主协程)。

Parameters:

Name Type Description Default
timeout float

最长等待取消完成的秒数。

required
Source code in src\tongsim\core\async_loop.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def cancel_tasks(self, timeout: float) -> None:
    """
    取消所有业务协程任务(spawn 出来的 task,不包括主协程)。

    Args:
        timeout: 最长等待取消完成的秒数。
    """
    if not self.is_running():
        return

    future = asyncio.run_coroutine_threadsafe(self._cancel_tasks_seq(), self._loop)
    try:
        future.result(timeout)
    except FutureTimeoutError:
        _logger.warning(f"[AsyncLoop {self._name}] cancel_tasks timeout.")

stop

stop(timeout: float = 5.0) -> None

完全停止 AsyncLoop: 取消所有业务任务 -> 取消主 task -> 停止 loop -> join 线程。

Parameters:

Name Type Description Default
timeout float

等待整个关闭过程的最长秒数。

5.0
Source code in src\tongsim\core\async_loop.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def stop(self, timeout: float = 5.0) -> None:
    """
    完全停止 AsyncLoop:
    取消所有业务任务 -> 取消主 task -> 停止 loop -> join 线程。

    Args:
        timeout: 等待整个关闭过程的最长秒数。
    """
    if not self.is_running():
        return

    # 取消主协程,TaskGroup 会自动取消所有子任务
    assert self._main_task is not None and self._loop is not None
    self._loop.call_soon_threadsafe(self._main_task.cancel)

    self._thread.join(timeout)
    if self._thread.is_alive():
        _logger.warning(f"AsyncLoop '{self._name}' did not exit cleanly.")
    self._thread = None

is_running

is_running() -> bool

判断 AsyncLoop 是否仍在运行。

Returns:

Type Description
bool

True if loop thread alive, else False.

Source code in src\tongsim\core\async_loop.py
206
207
208
209
210
211
212
213
def is_running(self) -> bool:
    """
    判断 AsyncLoop 是否仍在运行。

    Returns:
        True if loop thread alive, else False.
    """
    return bool(self._thread and self._thread.is_alive())

log_task_list

log_task_list() -> None

打印当前 loop 内所有未完成的任务信息,便于调试。

Source code in src\tongsim\core\async_loop.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def log_task_list(self) -> None:
    """
    打印当前 loop 内所有未完成的任务信息,便于调试。
    """
    if not (self._loop and self._task_group):
        return
    task_list = asyncio.all_tasks(self._loop)
    _logger.warning(f"[AsyncLoop {self._name}] {len(task_list)} active task(s):")
    for task in task_list:
        state = (
            "cancelled"
            if task.cancelled()
            else "done" if task.done() else "pending"
        )
        detail = ""
        if task.done() and (exc := task.exception()):
            detail = f"  exception: {type(exc).__name__}: {exc}"
        coro = task.get_coro()
        _logger.warning(
            f"  - {task.get_name()} [{state}]{detail} | coro={coro.__name__ if hasattr(coro, '__name__') else coro}"
        )

tongsim.core.world_context

core.world_context

定义 WorldContext: 管理与单个 TongSim 实例绑定的底层资源, 包括异步事件循环、gRPC 通信等。

WorldContext

WorldContext 集中管理 TongSim 运行时上下文资源

统一管理: - 异步事件主循环(AsyncLoop) - gRPC 连接(GrpcConnection、LegacyGrpcStreamClient)

注意
  • 析构时自动关闭所有资源。
Source code in src\tongsim\core\world_context.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class WorldContext:
    """
    WorldContext 集中管理 TongSim 运行时上下文资源

    统一管理:
    - 异步事件主循环(AsyncLoop)
    - gRPC 连接(GrpcConnection、LegacyGrpcStreamClient)

    注意:
        - 析构时自动关闭所有资源。
    """

    def __init__(self, grpc_endpoint: str, legacy_grpc_endpoint: str):
        self._uuid: Final[uuid.UUID] = uuid.uuid4()
        self._loop: Final[AsyncLoop] = AsyncLoop(name=f"world-main-loop-{self._uuid}")
        self._loop.start()

        self._conn: Final[GrpcConnection]
        self._conn_legacy: Final[GrpcLegacyConnection]
        self._legacy_stream_client: Final[LegacyStreamClient]

        # gRPC 会检查 task 的 loop 一致性, 此处保证 gRPC stub 的初始化都在 AsyncLoop 下:
        self.sync_run(self._async_init_grpc(grpc_endpoint, legacy_grpc_endpoint))

        _logger.debug(f"[WorldContext {self._uuid}] started.")
        self._is_shutdown: bool = False

    # TODO: classmethod
    async def _async_init_grpc(self, grpc_endpoint: str, legacy_grpc_endpoint: str):
        self._conn = GrpcConnection(grpc_endpoint)
        self._conn_legacy = GrpcLegacyConnection(legacy_grpc_endpoint)
        self._legacy_stream_client = LegacyStreamClient(self._conn_legacy, self._loop)
        await self._legacy_stream_client.start()

    @property
    def uuid(self) -> str:
        """当前 World 实例的唯一标识符的前八位字符"""
        return str(self._uuid)[:8]

    @property
    def loop(self) -> AsyncLoop:
        """主事件循环"""
        return self._loop

    @property
    def conn(self) -> GrpcConnection:
        """gRPC 连接"""
        return self._conn

    @property
    def conn_legacy(self) -> GrpcLegacyConnection:
        """弃用的 gRPC 连接"""
        return self._conn_legacy

    @property
    def legacy_stream_client(self) -> LegacyStreamClient:
        """弃用的 gRPC 双向流客户端"""
        # if self._legacy_stream_client is None:
        #     with self._legacy_stream_client_lock:
        #         if self._legacy_stream_client is None:
        #             self._legacy_stream_client = LegacyStreamClient(
        #                 self._conn_legacy, self._loop
        #             )
        #             self.sync_run(self._legacy_stream_client.start())
        return self._legacy_stream_client

    def sync_run(self, coro: Awaitable, timeout: float | None = None) -> Any:
        """
        在事件循环中同步运行异步任务,并阻塞直到任务完成。

        Args:
            coro (Awaitable): 要执行的异步协程。
            timeout (float | None): 可选的超时时间(秒),超过此时间将抛出 TimeoutError。

        Returns:
            Any: 协程的返回结果。
        """
        if threading.current_thread() is self._loop.thread:
            raise RuntimeError(
                f"Cannot call `sync_run` from the same thread as AsyncLoop [{self._loop.name}] — this would cause a deadlock."
            )

        return self._loop.spawn(
            coro, name=f"[World-Context {self.uuid} sync task]"
        ).result(timeout=timeout)

    def async_task(self, coro: Awaitable[Any], name: str) -> Future[Any]:
        """
        启动一个异步任务
        """
        return self._loop.spawn(coro, name=name)

    def release(self):
        """
        释放所有资源,包括:
        - 停止任务组
        - 关闭 gRPC
        - 停止事件循环
        """
        if self._is_shutdown:
            return
        self._is_shutdown = True

        _logger.debug(f"[WorldContext {self._uuid}] releasing...")

        try:
            self._loop.cancel_tasks(timeout=1.0)
            self._loop.spawn(
                self._conn_legacy.aclose(),
                name=f"WorldContext {self.uuid} release legacy gRPC connection.",
            ).result(timeout=1.0)
            self._loop.spawn(
                self._conn.aclose(),
                name=f"WorldContext {self.uuid} release gRPC connection.",
            ).result(timeout=1.0)
        except Exception as e:
            _logger.warning(
                f"[WorldContext {self._uuid}] failed to release cleanly: {e}"
            )

        self._loop.stop()
        _logger.debug(f"[WorldContext {self._uuid}] release complete.")

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

    def __del__(self):
        _logger.debug(f"[WorldContext {self._uuid}] gc.")
        self.release()

uuid property

uuid: str

当前 World 实例的唯一标识符的前八位字符

loop property

loop: AsyncLoop

主事件循环

conn property

conn: GrpcConnection

gRPC 连接

conn_legacy property

conn_legacy: GrpcLegacyConnection

弃用的 gRPC 连接

legacy_stream_client property

legacy_stream_client: LegacyStreamClient

弃用的 gRPC 双向流客户端

sync_run

sync_run(
    coro: Awaitable, timeout: float | None = None
) -> Any

在事件循环中同步运行异步任务,并阻塞直到任务完成。

Parameters:

Name Type Description Default
coro Awaitable

要执行的异步协程。

required
timeout float | None

可选的超时时间(秒),超过此时间将抛出 TimeoutError。

None

Returns:

Name Type Description
Any Any

协程的返回结果。

Source code in src\tongsim\core\world_context.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def sync_run(self, coro: Awaitable, timeout: float | None = None) -> Any:
    """
    在事件循环中同步运行异步任务,并阻塞直到任务完成。

    Args:
        coro (Awaitable): 要执行的异步协程。
        timeout (float | None): 可选的超时时间(秒),超过此时间将抛出 TimeoutError。

    Returns:
        Any: 协程的返回结果。
    """
    if threading.current_thread() is self._loop.thread:
        raise RuntimeError(
            f"Cannot call `sync_run` from the same thread as AsyncLoop [{self._loop.name}] — this would cause a deadlock."
        )

    return self._loop.spawn(
        coro, name=f"[World-Context {self.uuid} sync task]"
    ).result(timeout=timeout)

async_task

async_task(coro: Awaitable[Any], name: str) -> Future[Any]

启动一个异步任务

Source code in src\tongsim\core\world_context.py
112
113
114
115
116
def async_task(self, coro: Awaitable[Any], name: str) -> Future[Any]:
    """
    启动一个异步任务
    """
    return self._loop.spawn(coro, name=name)

release

release()

释放所有资源,包括: - 停止任务组 - 关闭 gRPC - 停止事件循环

Source code in src\tongsim\core\world_context.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def release(self):
    """
    释放所有资源,包括:
    - 停止任务组
    - 关闭 gRPC
    - 停止事件循环
    """
    if self._is_shutdown:
        return
    self._is_shutdown = True

    _logger.debug(f"[WorldContext {self._uuid}] releasing...")

    try:
        self._loop.cancel_tasks(timeout=1.0)
        self._loop.spawn(
            self._conn_legacy.aclose(),
            name=f"WorldContext {self.uuid} release legacy gRPC connection.",
        ).result(timeout=1.0)
        self._loop.spawn(
            self._conn.aclose(),
            name=f"WorldContext {self.uuid} release gRPC connection.",
        ).result(timeout=1.0)
    except Exception as e:
        _logger.warning(
            f"[WorldContext {self._uuid}] failed to release cleanly: {e}"
        )

    self._loop.stop()
    _logger.debug(f"[WorldContext {self._uuid}] release complete.")