Skip to content

Capture

The Capture API provides snapshot-based RGB/Depth capture from Unreal.

  • Protocol: protobuf/tongsim_lite_protobuf/capture.proto
  • SDK wrapper: tongsim.connection.grpc.capture_api.CaptureAPI

Usage guide

See TongSim Capture for end-to-end examples and decoding notes.


Key Functions

  • list_cameras: List capture cameras created in the current session.
  • create_camera: Spawn a capture camera actor and apply capture parameters.
  • set_camera_pose / attach_camera: Move the camera or attach it to a parent actor.
  • update_camera_params: Update parameters (fails if the camera is capturing).
  • capture_snapshot: Capture a single frame (color/depth optional).
  • get_status: Query capture status.
  • destroy_camera: Cleanup a camera.

API References

tongsim.connection.grpc.capture_api.CaptureAPI.list_cameras async staticmethod

list_cameras(conn: GrpcConnection) -> list[dict[str, Any]]
Source code in src/tongsim/connection/grpc/capture_api.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@staticmethod
@safe_async_rpc()
async def list_cameras(conn: GrpcConnection) -> list[dict[str, Any]]:
    stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
    resp = await stub.ListCaptureCameras(capture_pb2.ListCaptureCamerasRequest())
    out: list[dict[str, Any]] = []
    for desc in resp.cameras:
        item = {
            "camera": {
                "id": desc.camera.id.guid,
                "name": desc.camera.name,
                "class_path": desc.camera.class_path,
            },
            "params": desc.params,
            "status": desc.status,
        }
        out.append(item)
    return out

tongsim.connection.grpc.capture_api.CaptureAPI.create_camera async staticmethod

create_camera(
    conn: GrpcConnection,
    *,
    transform: Transform,
    params: dict[str, Any],
    capture_name: str | None = None,
    attach_parent: bytes | None = None,
    attach_socket: str = "",
    keep_world: bool = True
) -> bytes | None
Source code in src/tongsim/connection/grpc/capture_api.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@staticmethod
@safe_async_rpc(default=None)
async def create_camera(
    conn: GrpcConnection,
    *,
    transform: Transform,
    params: dict[str, Any],
    capture_name: str | None = None,
    attach_parent: bytes | None = None,
    attach_socket: str = "",
    keep_world: bool = True,
) -> bytes | None:
    stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
    req = capture_pb2.CreateCaptureCameraRequest()
    if capture_name:
        req.capture_name = capture_name
    req.world_transform.CopyFrom(_transform_to_proto(transform))
    req.params.CopyFrom(_dict_to_params(params))
    if attach_parent:
        req.attach_parent.guid = attach_parent
    req.attach_socket = attach_socket
    req.keep_world = keep_world
    resp = await stub.CreateCaptureCamera(req)
    return resp.camera.id.guid

tongsim.connection.grpc.capture_api.CaptureAPI.destroy_camera async staticmethod

destroy_camera(
    conn: GrpcConnection,
    camera_id: bytes,
    force_stop: bool = True,
) -> bool
Source code in src/tongsim/connection/grpc/capture_api.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@staticmethod
@safe_async_rpc(default=False)
async def destroy_camera(
    conn: GrpcConnection,
    camera_id: bytes,
    force_stop: bool = True,
) -> bool:
    stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
    req = capture_pb2.DestroyCaptureCameraRequest(
        camera_id=object_pb2.ObjectId(guid=camera_id)
    )
    req.force_stop_capture = force_stop
    await stub.DestroyCaptureCamera(req)
    return True

tongsim.connection.grpc.capture_api.CaptureAPI.set_camera_pose async staticmethod

set_camera_pose(
    conn: GrpcConnection,
    camera_id: bytes,
    transform: Transform,
) -> bool
Source code in src/tongsim/connection/grpc/capture_api.py
137
138
139
140
141
142
143
144
145
146
147
148
@staticmethod
@safe_async_rpc(default=False)
async def set_camera_pose(
    conn: GrpcConnection, camera_id: bytes, transform: Transform
) -> bool:
    stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
    req = capture_pb2.SetCaptureCameraPoseRequest(
        camera_id=object_pb2.ObjectId(guid=camera_id),
        world_transform=_transform_to_proto(transform),
    )
    await stub.SetCaptureCameraPose(req)
    return True

tongsim.connection.grpc.capture_api.CaptureAPI.update_camera_params async staticmethod

update_camera_params(
    conn: GrpcConnection,
    camera_id: bytes,
    params: dict[str, Any],
) -> bool
Source code in src/tongsim/connection/grpc/capture_api.py
150
151
152
153
154
155
156
157
158
159
160
161
@staticmethod
@safe_async_rpc(default=False)
async def update_camera_params(
    conn: GrpcConnection, camera_id: bytes, params: dict[str, Any]
) -> bool:
    stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
    req = capture_pb2.UpdateCaptureCameraParamsRequest(
        camera_id=object_pb2.ObjectId(guid=camera_id),
        params=_dict_to_params(params),
    )
    await stub.UpdateCaptureCameraParams(req)
    return True

tongsim.connection.grpc.capture_api.CaptureAPI.attach_camera async staticmethod

attach_camera(
    conn: GrpcConnection,
    camera_id: bytes,
    parent_id: bytes,
    socket_name: str = "",
    keep_world: bool = True,
) -> bool
Source code in src/tongsim/connection/grpc/capture_api.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
@staticmethod
@safe_async_rpc(default=False)
async def attach_camera(
    conn: GrpcConnection,
    camera_id: bytes,
    parent_id: bytes,
    socket_name: str = "",
    keep_world: bool = True,
) -> bool:
    stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
    req = capture_pb2.AttachCaptureCameraRequest(
        camera_id=object_pb2.ObjectId(guid=camera_id),
        parent_actor_id=object_pb2.ObjectId(guid=parent_id),
        socket_name=socket_name,
        keep_world=keep_world,
    )
    await stub.AttachCaptureCamera(req)
    return True

tongsim.connection.grpc.capture_api.CaptureAPI.capture_snapshot async staticmethod

capture_snapshot(
    conn: GrpcConnection,
    camera_id: bytes,
    *,
    include_color: bool = True,
    include_depth: bool = True,
    timeout_seconds: float = 0.5
) -> dict[str, Any] | None
Source code in src/tongsim/connection/grpc/capture_api.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
@staticmethod
@safe_async_rpc(default=None)
async def capture_snapshot(
    conn: GrpcConnection,
    camera_id: bytes,
    *,
    include_color: bool = True,
    include_depth: bool = True,
    timeout_seconds: float = 0.5,
) -> dict[str, Any] | None:
    stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
    req = capture_pb2.CaptureSnapshotRequest(
        camera_id=object_pb2.ObjectId(guid=camera_id),
        include_color=include_color,
        include_depth=include_depth,
        timeout_seconds=timeout_seconds,
    )
    resp = await stub.CaptureSnapshot(req)
    return _frame_to_dict(resp)

tongsim.connection.grpc.capture_api.CaptureAPI.get_status async staticmethod

get_status(
    conn: GrpcConnection, camera_id: bytes
) -> dict[str, Any] | None
Source code in src/tongsim/connection/grpc/capture_api.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@staticmethod
@safe_async_rpc(default=None)
async def get_status(
    conn: GrpcConnection, camera_id: bytes
) -> dict[str, Any] | None:
    stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
    req = capture_pb2.GetCaptureStatusRequest(
        camera_id=object_pb2.ObjectId(guid=camera_id)
    )
    resp = await stub.GetCaptureStatus(req)
    return {
        "capturing": resp.status.capturing,
        "queue_count": resp.status.queue_count,
        "compressed_queue_count": resp.status.compressed_queue_count,
        "width": resp.status.width,
        "height": resp.status.height,
        "fov_degrees": resp.status.fov_degrees,
        "depth_mode": resp.status.depth_mode,
    }