跳转至

gRPC 连接(Connection)

TongSIM Lite 使用 gRPC 连接 Python 与 Unreal。SDK 基于 grpc.aio,并会自动实例化所有服务的 Stub。

多数情况下你不需要手动创建 channel:直接使用 WorldContext.conn,并调用上层 API(例如 UnaryAPI / CaptureAPI)即可。


核心组件

组件 位置 职责
GrpcConnection src/tongsim/connection/grpc/core.py 创建 gRPC channel 并初始化所有 stubs
Stub 自动发现 src/tongsim/connection/grpc/utils.py 通过反射遍历 *_pb2_grpc.py 自动加载 Stub
安全调用封装 src/tongsim/connection/grpc/utils.py 对异步 RPC 调用做异常兜底
SDK↔Proto 转换 src/tongsim/connection/grpc/utils.py Vector3/Transform 与 protobuf 的互转

消息大小

Python 端在 GrpcConnection 中配置了 100MB 的收发限制,用于支持图像与体素等大 payload。


API References

tongsim.connection.grpc.core.GrpcConnection

Lazily instantiate gRPC stubs and provide unified access plus teardown.

Source code in src/tongsim/connection/grpc/core.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class GrpcConnection:
    """
    Lazily instantiate gRPC stubs and provide unified access plus teardown.
    """

    def __init__(self, endpoint: str = "localhost:5726"):
        self._endpoint = endpoint
        self._channel: grpc.aio.Channel | None = grpc.aio.insecure_channel(
            self._endpoint,
            options=[
                ("grpc.max_send_message_length", 100 * 1024 * 1024),
                ("grpc.max_receive_message_length", 100 * 1024 * 1024),
            ],
        )
        self._stubs: dict[type[object], object] = {}
        self._initialize()

    def _initialize(self):
        """Load and instantiate all gRPC stubs from the API protocol package."""
        for _service_name, stub_cls in iter_all_grpc_stubs():
            try:
                _logger.debug(f"GrpcConnection instantiate stub: {_service_name}")
                self._stubs[stub_cls] = stub_cls(self._channel)
            except Exception as e:
                raise RuntimeError(
                    f"GrpcConnection failed to instantiate stub: {_service_name}. {e}"
                ) from e

    def __enter__(self):
        raise RuntimeError("GrpcConnection must be used with 'async'")

    def __exit__(self, exc_type, exc_val, exc_tb):
        raise RuntimeError("GrpcConnection must be used with 'async'")

    def get_stub(self, stub_cls: type[T]) -> T:
        """
        Retrieve the stub instance for the requested service.

        Args:
            stub_cls: Stub class generated by ``*_pb2_grpc.py`` (for example
                ``ExampleServiceStub``).

        Returns:
            T: Stub instance typed to ``stub_cls``.
        """
        if stub_cls not in self._stubs:
            raise ValueError(f"[GrpcConnection] Stub {stub_cls.__name__} not found.")
        return self._stubs[stub_cls]

    def __del__(self):
        if self._channel:
            _logger.error("GrpcConnection was not properly closed.")

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.aclose()

    async def aclose(self):
        """Close the gRPC channel and release all cached stubs."""
        if self._channel:
            await self._channel.close()
            self._channel = None
            _logger.debug(f"[GrpcConnection {self._endpoint}] closed channel")
        self._stubs.clear()

get_stub

get_stub(stub_cls: type[T]) -> T

Retrieve the stub instance for the requested service.

Parameters:

Name Type Description Default
stub_cls type[T]

Stub class generated by *_pb2_grpc.py (for example ExampleServiceStub).

required

Returns:

Name Type Description
T T

Stub instance typed to stub_cls.

Source code in src/tongsim/connection/grpc/core.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def get_stub(self, stub_cls: type[T]) -> T:
    """
    Retrieve the stub instance for the requested service.

    Args:
        stub_cls: Stub class generated by ``*_pb2_grpc.py`` (for example
            ``ExampleServiceStub``).

    Returns:
        T: Stub instance typed to ``stub_cls``.
    """
    if stub_cls not in self._stubs:
        raise ValueError(f"[GrpcConnection] Stub {stub_cls.__name__} not found.")
    return self._stubs[stub_cls]

aclose async

aclose()

Close the gRPC channel and release all cached stubs.

Source code in src/tongsim/connection/grpc/core.py
84
85
86
87
88
89
90
async def aclose(self):
    """Close the gRPC channel and release all cached stubs."""
    if self._channel:
        await self._channel.close()
        self._channel = None
        _logger.debug(f"[GrpcConnection {self._endpoint}] closed channel")
    self._stubs.clear()

tongsim.connection.grpc.utils.iter_all_grpc_stubs

iter_all_grpc_stubs() -> (
    Generator[tuple[str, type], None, None]
)

Iterate through all gRPC service stubs defined in the protocol package.

Yields:

Type Description
tuple[str, type]

tuple[str, type]: Stub class name and the class itself.

Source code in src/tongsim/connection/grpc/utils.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def iter_all_grpc_stubs() -> Generator[tuple[str, type], None, None]:
    """
    Iterate through all gRPC service stubs defined in the protocol package.

    Yields:
        tuple[str, type]: Stub class name and the class itself.
    """
    pkg = importlib.import_module(_PACKAGE)
    for _, modname, ispkg in pkgutil.walk_packages(pkg.__path__, prefix=_PACKAGE + "."):
        if not ispkg and modname.endswith("_pb2_grpc"):
            grpc_module = importlib.import_module(modname)
            for name, obj in inspect.getmembers(grpc_module, inspect.isclass):
                if name.endswith("Stub"):
                    yield name, obj

tongsim.connection.grpc.utils.iter_all_proto_messages

iter_all_proto_messages() -> (
    Generator[tuple[str, type[Message]], None, None]
)

Iterate over all protobuf Message types defined in _PACKAGE.

Yields:

Type Description
tuple[str, type[Message]]

tuple[str, type[ProtoMessage]]: Fully qualified name and class object.

Source code in src/tongsim/connection/grpc/utils.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def iter_all_proto_messages() -> Generator[tuple[str, type[ProtoMessage]], None, None]:
    """
    Iterate over all protobuf ``Message`` types defined in ``_PACKAGE``.

    Yields:
        tuple[str, type[ProtoMessage]]: Fully qualified name and class object.
    """
    pkg = importlib.import_module(_PACKAGE)
    for _, modname, ispkg in pkgutil.walk_packages(pkg.__path__, prefix=_PACKAGE + "."):
        if not ispkg and not modname.endswith("_pb2_grpc") and modname.endswith("_pb2"):
            proto_module = importlib.import_module(modname)
            for _, obj in inspect.getmembers(proto_module):
                if inspect.isclass(obj) and issubclass(obj, ProtoMessage):
                    yield obj.DESCRIPTOR.full_name, obj

tongsim.connection.grpc.utils.safe_async_rpc

safe_async_rpc(
    default: T | None = None, raise_on_error: bool = False
) -> Callable[
    [Callable[P, Awaitable[T]]],
    Callable[P, Awaitable[T]],
]

Decorator that wraps async RPC invocations with safety guards.

Parameters:

Name Type Description Default
default T | None

Value (or awaitable factory) returned when an exception occurs.

None
raise_on_error bool

When True, re-raise the captured exception instead of suppressing it.

False

Usage::

@safe_async_rpc(default={}, raise_on_error=False)
async def my_method(...):
    ...
Source code in src/tongsim/connection/grpc/utils.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def safe_async_rpc[T](
    default: T | None = None, raise_on_error: bool = False
) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]:
    """
    Decorator that wraps async RPC invocations with safety guards.

    Args:
        default: Value (or awaitable factory) returned when an exception occurs.
        raise_on_error: When ``True``, re-raise the captured exception instead of
            suppressing it.

    Usage::

        @safe_async_rpc(default={}, raise_on_error=False)
        async def my_method(...):
            ...
    """

    def decorator(func: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
        @functools.wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            try:
                _logger.debug(f"gRPC async call {func.__name__}")
                return await func(*args, **kwargs)
            except Exception:
                _logger.error(f"gRPC async call {func.__name__} failed", exc_info=True)
                if raise_on_error:
                    raise
            if callable(default) and inspect.iscoroutinefunction(default):
                return await default()
            return default

        return cast(Callable[P, Awaitable[T]], wrapper)

    return decorator

tongsim.connection.grpc.utils.safe_unary_stream

safe_unary_stream(
    raise_on_error: bool = False,
) -> Callable[
    [Callable[P, AsyncIterator[T]]],
    Callable[P, AsyncIterator[T]],
]

Decorator that guards async unary-stream RPC generators.

Parameters:

Name Type Description Default
raise_on_error bool

Re-raise exceptions instead of silently stopping iteration.

False
Source code in src/tongsim/connection/grpc/utils.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def safe_unary_stream(
    raise_on_error: bool = False,
) -> Callable[[Callable[P, AsyncIterator[T]]], Callable[P, AsyncIterator[T]]]:
    """
    Decorator that guards async unary-stream RPC generators.

    Args:
        raise_on_error: Re-raise exceptions instead of silently stopping iteration.
    """

    def decorator(func: Callable[P, AsyncIterator[T]]) -> Callable[P, AsyncIterator[T]]:
        @functools.wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]:
            _logger.debug(f"gRPC Unary-Stream  {func.__name__} starting.")

            try:
                async for item in func(*args, **kwargs):
                    yield item
            except Exception as e:
                _logger.error(
                    f"gRPC Unary-Stream Error in {func.__name__}: {e}", exc_info=True
                )
                if raise_on_error:
                    raise
                return  # Stop async iteration.

            _logger.debug(f"gRPC Unary-Stream {func.__name__} completed.")

        return cast(Callable[P, AsyncIterator[T]], wrapper)

    return decorator

tongsim.connection.grpc.utils.sdk_to_proto

sdk_to_proto(obj: Any) -> Message
Source code in src/tongsim/connection/grpc/utils.py
164
165
166
167
168
def sdk_to_proto(obj: Any) -> ProtoMessage:
    handler = _sdk_to_proto_dispatch.get(type(obj))
    if handler is None:
        raise TypeError(f"Unsupported SDK type: {type(obj)}")
    return handler(obj)

tongsim.connection.grpc.utils.proto_to_sdk

proto_to_sdk(message: Message) -> Any
Source code in src/tongsim/connection/grpc/utils.py
195
196
197
198
199
def proto_to_sdk(message: ProtoMessage) -> Any:
    handler = _proto_to_sdk_dispatch.get(type(message))
    if handler is None:
        raise TypeError(f"Unsupported Proto message: {type(message)}")
    return handler(message)