Capture
Capture API 提供基于 Snapshot 的 RGB/Depth 采集能力。
- 协议:
protobuf/tongsim_lite_protobuf/capture.proto
- SDK 封装:
tongsim.connection.grpc.capture_api.CaptureAPI
Key Functions
list_cameras:列出当前会话中创建的采集相机。
create_camera:生成采集相机 actor,并应用参数。
set_camera_pose / attach_camera:移动相机或挂到父 actor。
update_camera_params:更新参数(相机捕获中会失败)。
capture_snapshot:采集单帧(color/depth 可选)。
get_status:查询采集状态。
destroy_camera:销毁相机并清理资源。
API References
tongsim.connection.grpc.capture_api.CaptureAPI.list_cameras
async
staticmethod
Source code in src/tongsim/connection/grpc/capture_api.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 | @staticmethod
@safe_async_rpc()
async def list_cameras(conn: GrpcConnection) -> list[dict[str, Any]]:
stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
resp = await stub.ListCaptureCameras(capture_pb2.ListCaptureCamerasRequest())
out: list[dict[str, Any]] = []
for desc in resp.cameras:
item = {
"camera": {
"id": desc.camera.id.guid,
"name": desc.camera.name,
"class_path": desc.camera.class_path,
},
"params": desc.params,
"status": desc.status,
}
out.append(item)
return out
|
tongsim.connection.grpc.capture_api.CaptureAPI.create_camera
async
staticmethod
create_camera(
conn: GrpcConnection,
*,
transform: Transform,
params: dict[str, Any],
capture_name: str | None = None,
attach_parent: bytes | None = None,
attach_socket: str = "",
keep_world: bool = True
) -> bytes | None
Source code in src/tongsim/connection/grpc/capture_api.py
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120 | @staticmethod
@safe_async_rpc(default=None)
async def create_camera(
conn: GrpcConnection,
*,
transform: Transform,
params: dict[str, Any],
capture_name: str | None = None,
attach_parent: bytes | None = None,
attach_socket: str = "",
keep_world: bool = True,
) -> bytes | None:
stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
req = capture_pb2.CreateCaptureCameraRequest()
if capture_name:
req.capture_name = capture_name
req.world_transform.CopyFrom(_transform_to_proto(transform))
req.params.CopyFrom(_dict_to_params(params))
if attach_parent:
req.attach_parent.guid = attach_parent
req.attach_socket = attach_socket
req.keep_world = keep_world
resp = await stub.CreateCaptureCamera(req)
return resp.camera.id.guid
|
tongsim.connection.grpc.capture_api.CaptureAPI.destroy_camera
async
staticmethod
destroy_camera(
conn: GrpcConnection,
camera_id: bytes,
force_stop: bool = True,
) -> bool
Source code in src/tongsim/connection/grpc/capture_api.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135 | @staticmethod
@safe_async_rpc(default=False)
async def destroy_camera(
conn: GrpcConnection,
camera_id: bytes,
force_stop: bool = True,
) -> bool:
stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
req = capture_pb2.DestroyCaptureCameraRequest(
camera_id=object_pb2.ObjectId(guid=camera_id)
)
req.force_stop_capture = force_stop
await stub.DestroyCaptureCamera(req)
return True
|
tongsim.connection.grpc.capture_api.CaptureAPI.set_camera_pose
async
staticmethod
Source code in src/tongsim/connection/grpc/capture_api.py
137
138
139
140
141
142
143
144
145
146
147
148 | @staticmethod
@safe_async_rpc(default=False)
async def set_camera_pose(
conn: GrpcConnection, camera_id: bytes, transform: Transform
) -> bool:
stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
req = capture_pb2.SetCaptureCameraPoseRequest(
camera_id=object_pb2.ObjectId(guid=camera_id),
world_transform=_transform_to_proto(transform),
)
await stub.SetCaptureCameraPose(req)
return True
|
tongsim.connection.grpc.capture_api.CaptureAPI.update_camera_params
async
staticmethod
update_camera_params(
conn: GrpcConnection,
camera_id: bytes,
params: dict[str, Any],
) -> bool
Source code in src/tongsim/connection/grpc/capture_api.py
150
151
152
153
154
155
156
157
158
159
160
161 | @staticmethod
@safe_async_rpc(default=False)
async def update_camera_params(
conn: GrpcConnection, camera_id: bytes, params: dict[str, Any]
) -> bool:
stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
req = capture_pb2.UpdateCaptureCameraParamsRequest(
camera_id=object_pb2.ObjectId(guid=camera_id),
params=_dict_to_params(params),
)
await stub.UpdateCaptureCameraParams(req)
return True
|
tongsim.connection.grpc.capture_api.CaptureAPI.attach_camera
async
staticmethod
attach_camera(
conn: GrpcConnection,
camera_id: bytes,
parent_id: bytes,
socket_name: str = "",
keep_world: bool = True,
) -> bool
Source code in src/tongsim/connection/grpc/capture_api.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 | @staticmethod
@safe_async_rpc(default=False)
async def attach_camera(
conn: GrpcConnection,
camera_id: bytes,
parent_id: bytes,
socket_name: str = "",
keep_world: bool = True,
) -> bool:
stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
req = capture_pb2.AttachCaptureCameraRequest(
camera_id=object_pb2.ObjectId(guid=camera_id),
parent_actor_id=object_pb2.ObjectId(guid=parent_id),
socket_name=socket_name,
keep_world=keep_world,
)
await stub.AttachCaptureCamera(req)
return True
|
tongsim.connection.grpc.capture_api.CaptureAPI.capture_snapshot
async
staticmethod
capture_snapshot(
conn: GrpcConnection,
camera_id: bytes,
*,
include_color: bool = True,
include_depth: bool = True,
timeout_seconds: float = 0.5
) -> dict[str, Any] | None
Source code in src/tongsim/connection/grpc/capture_api.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200 | @staticmethod
@safe_async_rpc(default=None)
async def capture_snapshot(
conn: GrpcConnection,
camera_id: bytes,
*,
include_color: bool = True,
include_depth: bool = True,
timeout_seconds: float = 0.5,
) -> dict[str, Any] | None:
stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
req = capture_pb2.CaptureSnapshotRequest(
camera_id=object_pb2.ObjectId(guid=camera_id),
include_color=include_color,
include_depth=include_depth,
timeout_seconds=timeout_seconds,
)
resp = await stub.CaptureSnapshot(req)
return _frame_to_dict(resp)
|
tongsim.connection.grpc.capture_api.CaptureAPI.get_status
async
staticmethod
get_status(
conn: GrpcConnection, camera_id: bytes
) -> dict[str, Any] | None
Source code in src/tongsim/connection/grpc/capture_api.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220 | @staticmethod
@safe_async_rpc(default=None)
async def get_status(
conn: GrpcConnection, camera_id: bytes
) -> dict[str, Any] | None:
stub = conn.get_stub(capture_pb2_grpc.CaptureServiceStub)
req = capture_pb2.GetCaptureStatusRequest(
camera_id=object_pb2.ObjectId(guid=camera_id)
)
resp = await stub.GetCaptureStatus(req)
return {
"capturing": resp.status.capturing,
"queue_count": resp.status.queue_count,
"compressed_queue_count": resp.status.compressed_queue_count,
"width": resp.status.width,
"height": resp.status.height,
"fov_degrees": resp.status.fov_degrees,
"depth_mode": resp.status.depth_mode,
}
|