Skip to content

gRPC Connection

TongSIM Lite uses gRPC to connect Python to Unreal. The SDK wraps grpc.aio and auto-instantiates all service stubs for you.

In most code you don’t need to build channels manually—use WorldContext.conn and call the service helpers (for example UnaryAPI / CaptureAPI).


Key building blocks

Component Location What it does
GrpcConnection src/tongsim/connection/grpc/core.py Creates the gRPC channel and instantiates all stubs
Stub discovery src/tongsim/connection/grpc/utils.py Auto-finds *_pb2_grpc.py stubs via reflection
Safe wrappers src/tongsim/connection/grpc/utils.py Error-handling decorators for async RPC calls
SDK↔Proto conversion src/tongsim/connection/grpc/utils.py Converts Vector3/Transform to protobuf messages

Message size

The Python channel is configured with a 100MB send/receive limit in GrpcConnection to support image and voxel payloads.


API References

tongsim.connection.grpc.core.GrpcConnection

Lazily instantiate gRPC stubs and provide unified access plus teardown.

Source code in src/tongsim/connection/grpc/core.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class GrpcConnection:
    """
    Lazily instantiate gRPC stubs and provide unified access plus teardown.
    """

    def __init__(self, endpoint: str = "localhost:5726"):
        self._endpoint = endpoint
        self._channel: grpc.aio.Channel | None = grpc.aio.insecure_channel(
            self._endpoint,
            options=[
                ("grpc.max_send_message_length", 100 * 1024 * 1024),
                ("grpc.max_receive_message_length", 100 * 1024 * 1024),
            ],
        )
        self._stubs: dict[type[object], object] = {}
        self._initialize()

    def _initialize(self):
        """Load and instantiate all gRPC stubs from the API protocol package."""
        for _service_name, stub_cls in iter_all_grpc_stubs():
            try:
                _logger.debug(f"GrpcConnection instantiate stub: {_service_name}")
                self._stubs[stub_cls] = stub_cls(self._channel)
            except Exception as e:
                raise RuntimeError(
                    f"GrpcConnection failed to instantiate stub: {_service_name}. {e}"
                ) from e

    def __enter__(self):
        raise RuntimeError("GrpcConnection must be used with 'async'")

    def __exit__(self, exc_type, exc_val, exc_tb):
        raise RuntimeError("GrpcConnection must be used with 'async'")

    def get_stub(self, stub_cls: type[T]) -> T:
        """
        Retrieve the stub instance for the requested service.

        Args:
            stub_cls: Stub class generated by ``*_pb2_grpc.py`` (for example
                ``ExampleServiceStub``).

        Returns:
            T: Stub instance typed to ``stub_cls``.
        """
        if stub_cls not in self._stubs:
            raise ValueError(f"[GrpcConnection] Stub {stub_cls.__name__} not found.")
        return self._stubs[stub_cls]

    def __del__(self):
        if self._channel:
            _logger.error("GrpcConnection was not properly closed.")

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.aclose()

    async def aclose(self):
        """Close the gRPC channel and release all cached stubs."""
        if self._channel:
            await self._channel.close()
            self._channel = None
            _logger.debug(f"[GrpcConnection {self._endpoint}] closed channel")
        self._stubs.clear()

get_stub

get_stub(stub_cls: type[T]) -> T

Retrieve the stub instance for the requested service.

Parameters:

Name Type Description Default
stub_cls type[T]

Stub class generated by *_pb2_grpc.py (for example ExampleServiceStub).

required

Returns:

Name Type Description
T T

Stub instance typed to stub_cls.

Source code in src/tongsim/connection/grpc/core.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def get_stub(self, stub_cls: type[T]) -> T:
    """
    Retrieve the stub instance for the requested service.

    Args:
        stub_cls: Stub class generated by ``*_pb2_grpc.py`` (for example
            ``ExampleServiceStub``).

    Returns:
        T: Stub instance typed to ``stub_cls``.
    """
    if stub_cls not in self._stubs:
        raise ValueError(f"[GrpcConnection] Stub {stub_cls.__name__} not found.")
    return self._stubs[stub_cls]

aclose async

aclose()

Close the gRPC channel and release all cached stubs.

Source code in src/tongsim/connection/grpc/core.py
84
85
86
87
88
89
90
async def aclose(self):
    """Close the gRPC channel and release all cached stubs."""
    if self._channel:
        await self._channel.close()
        self._channel = None
        _logger.debug(f"[GrpcConnection {self._endpoint}] closed channel")
    self._stubs.clear()

tongsim.connection.grpc.utils.iter_all_grpc_stubs

iter_all_grpc_stubs() -> (
    Generator[tuple[str, type], None, None]
)

Iterate through all gRPC service stubs defined in the protocol package.

Yields:

Type Description
tuple[str, type]

tuple[str, type]: Stub class name and the class itself.

Source code in src/tongsim/connection/grpc/utils.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def iter_all_grpc_stubs() -> Generator[tuple[str, type], None, None]:
    """
    Iterate through all gRPC service stubs defined in the protocol package.

    Yields:
        tuple[str, type]: Stub class name and the class itself.
    """
    pkg = importlib.import_module(_PACKAGE)
    for _, modname, ispkg in pkgutil.walk_packages(pkg.__path__, prefix=_PACKAGE + "."):
        if not ispkg and modname.endswith("_pb2_grpc"):
            grpc_module = importlib.import_module(modname)
            for name, obj in inspect.getmembers(grpc_module, inspect.isclass):
                if name.endswith("Stub"):
                    yield name, obj

tongsim.connection.grpc.utils.iter_all_proto_messages

iter_all_proto_messages() -> (
    Generator[tuple[str, type[Message]], None, None]
)

Iterate over all protobuf Message types defined in _PACKAGE.

Yields:

Type Description
tuple[str, type[Message]]

tuple[str, type[ProtoMessage]]: Fully qualified name and class object.

Source code in src/tongsim/connection/grpc/utils.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def iter_all_proto_messages() -> Generator[tuple[str, type[ProtoMessage]], None, None]:
    """
    Iterate over all protobuf ``Message`` types defined in ``_PACKAGE``.

    Yields:
        tuple[str, type[ProtoMessage]]: Fully qualified name and class object.
    """
    pkg = importlib.import_module(_PACKAGE)
    for _, modname, ispkg in pkgutil.walk_packages(pkg.__path__, prefix=_PACKAGE + "."):
        if not ispkg and not modname.endswith("_pb2_grpc") and modname.endswith("_pb2"):
            proto_module = importlib.import_module(modname)
            for _, obj in inspect.getmembers(proto_module):
                if inspect.isclass(obj) and issubclass(obj, ProtoMessage):
                    yield obj.DESCRIPTOR.full_name, obj

tongsim.connection.grpc.utils.safe_async_rpc

safe_async_rpc(
    default: T | None = None, raise_on_error: bool = False
) -> Callable[
    [Callable[P, Awaitable[T]]],
    Callable[P, Awaitable[T]],
]

Decorator that wraps async RPC invocations with safety guards.

Parameters:

Name Type Description Default
default T | None

Value (or awaitable factory) returned when an exception occurs.

None
raise_on_error bool

When True, re-raise the captured exception instead of suppressing it.

False

Usage::

@safe_async_rpc(default={}, raise_on_error=False)
async def my_method(...):
    ...
Source code in src/tongsim/connection/grpc/utils.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def safe_async_rpc[T](
    default: T | None = None, raise_on_error: bool = False
) -> Callable[[Callable[P, Awaitable[T]]], Callable[P, Awaitable[T]]]:
    """
    Decorator that wraps async RPC invocations with safety guards.

    Args:
        default: Value (or awaitable factory) returned when an exception occurs.
        raise_on_error: When ``True``, re-raise the captured exception instead of
            suppressing it.

    Usage::

        @safe_async_rpc(default={}, raise_on_error=False)
        async def my_method(...):
            ...
    """

    def decorator(func: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
        @functools.wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            try:
                _logger.debug(f"gRPC async call {func.__name__}")
                return await func(*args, **kwargs)
            except Exception:
                _logger.error(f"gRPC async call {func.__name__} failed", exc_info=True)
                if raise_on_error:
                    raise
            if callable(default) and inspect.iscoroutinefunction(default):
                return await default()
            return default

        return cast(Callable[P, Awaitable[T]], wrapper)

    return decorator

tongsim.connection.grpc.utils.safe_unary_stream

safe_unary_stream(
    raise_on_error: bool = False,
) -> Callable[
    [Callable[P, AsyncIterator[T]]],
    Callable[P, AsyncIterator[T]],
]

Decorator that guards async unary-stream RPC generators.

Parameters:

Name Type Description Default
raise_on_error bool

Re-raise exceptions instead of silently stopping iteration.

False
Source code in src/tongsim/connection/grpc/utils.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def safe_unary_stream(
    raise_on_error: bool = False,
) -> Callable[[Callable[P, AsyncIterator[T]]], Callable[P, AsyncIterator[T]]]:
    """
    Decorator that guards async unary-stream RPC generators.

    Args:
        raise_on_error: Re-raise exceptions instead of silently stopping iteration.
    """

    def decorator(func: Callable[P, AsyncIterator[T]]) -> Callable[P, AsyncIterator[T]]:
        @functools.wraps(func)
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]:
            _logger.debug(f"gRPC Unary-Stream  {func.__name__} starting.")

            try:
                async for item in func(*args, **kwargs):
                    yield item
            except Exception as e:
                _logger.error(
                    f"gRPC Unary-Stream Error in {func.__name__}: {e}", exc_info=True
                )
                if raise_on_error:
                    raise
                return  # Stop async iteration.

            _logger.debug(f"gRPC Unary-Stream {func.__name__} completed.")

        return cast(Callable[P, AsyncIterator[T]], wrapper)

    return decorator

tongsim.connection.grpc.utils.sdk_to_proto

sdk_to_proto(obj: Any) -> Message
Source code in src/tongsim/connection/grpc/utils.py
164
165
166
167
168
def sdk_to_proto(obj: Any) -> ProtoMessage:
    handler = _sdk_to_proto_dispatch.get(type(obj))
    if handler is None:
        raise TypeError(f"Unsupported SDK type: {type(obj)}")
    return handler(obj)

tongsim.connection.grpc.utils.proto_to_sdk

proto_to_sdk(message: Message) -> Any
Source code in src/tongsim/connection/grpc/utils.py
195
196
197
198
199
def proto_to_sdk(message: ProtoMessage) -> Any:
    handler = _proto_to_sdk_dispatch.get(type(message))
    if handler is None:
        raise TypeError(f"Unsupported Proto message: {type(message)}")
    return handler(message)