Skip to content

Core Control

This page documents TongSIM Lite’s baseline control API for interacting with gameplay actors: querying state, spawning/destroying actors, navigation helpers, physics traces, and UE console commands.

Naming note

In the protocol, these methods are implemented by DemoRLService (see protobuf/tongsim_lite_protobuf/demo_rl.proto). Despite the proto name, the API is used as the general-purpose control surface in TongSIM Lite.

Key Functions

  • query_info: Fetch aggregated actor snapshots, including every tracked actor.
  • reset_level: Reload the current level to its initial state (map travel).
  • get_actor_state: Retrieve an actor's position, orientation vectors, and tag metadata by GUID.
  • get_actor_transform / set_actor_transform: Read or update an actor's world transform.
  • spawn_actor / destroy_actor: Create or remove actors in the current world.
  • simple_move_towards: Move an actor toward a world target with a constant speed helper.
  • query_navigation_path: Ask the UE navigation system for a path between two world locations.
  • navigate_to_location: Move a character using UE NavMesh navigation.
  • pick_up_object / drop_object: Task-oriented interaction helpers (level support required).
  • exec_console_command: Execute arbitrary UE console commands on the server.
  • single_line_trace_by_object / multi_line_trace_by_object: Perform physics traces and gather hit information.

API References

tongsim.connection.grpc.unary_api.UnaryAPI.query_info async staticmethod

query_info(conn: GrpcConnection) -> list[dict]

Fetch state snapshots for every actor in the current Demo RL scene.

Returns:

Type Description
list[dict]

list[dict]: One entry per actor with GUID, name, class path, location vectors, basis vectors, bounding box and tag metadata.

Source code in src/tongsim/connection/grpc/unary_api.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
@staticmethod
@safe_async_rpc(default=[])
async def query_info(conn: GrpcConnection) -> list[dict]:
    """
    Fetch state snapshots for every actor in the current Demo RL scene.

    Returns:
        list[dict]: One entry per actor with GUID, name, class path,
            location vectors, basis vectors, bounding box and tag metadata.
    """
    stub = conn.get_stub(DemoRLServiceStub)
    resp: DemoRLState = await stub.QueryState(Empty(), timeout=2.0)

    result: list[dict] = []
    for actor in resp.actor_states:
        result.append(_actor_state_to_dict(actor))
    return result

tongsim.connection.grpc.unary_api.UnaryAPI.reset_level async staticmethod

reset_level(
    conn: GrpcConnection, timeout: float = 60.0
) -> bool

Reset the active Demo RL level.

Source code in src/tongsim/connection/grpc/unary_api.py
184
185
186
187
188
189
190
@staticmethod
@safe_async_rpc(default=False)
async def reset_level(conn: GrpcConnection, timeout: float = 60.0) -> bool:
    """Reset the active Demo RL level."""
    stub = conn.get_stub(DemoRLServiceStub)
    await stub.ResetLevel(Empty(), timeout=timeout)
    return True

tongsim.connection.grpc.unary_api.UnaryAPI.get_actor_state async staticmethod

get_actor_state(
    conn: GrpcConnection, actor_id: str
) -> dict | None

Fetch the state of a single actor by identifier.

Returns:

Type Description
dict | None

dict | None: Actor metadata dictionary, or None on failure.

Source code in src/tongsim/connection/grpc/unary_api.py
246
247
248
249
250
251
252
253
254
255
256
257
258
@staticmethod
@safe_async_rpc(default=None)
async def get_actor_state(conn: GrpcConnection, actor_id: str) -> dict | None:
    """
    Fetch the state of a single actor by identifier.

    Returns:
        dict | None: Actor metadata dictionary, or ``None`` on failure.
    """
    stub = conn.get_stub(DemoRLServiceStub)
    req = GetActorStateRequest(actor_id=_to_object_id(actor_id))
    resp: GetActorStateResponse = await stub.GetActorState(req, timeout=2.0)
    return _actor_state_to_dict(resp.actor_state)

tongsim.connection.grpc.unary_api.UnaryAPI.get_actor_transform async staticmethod

get_actor_transform(
    conn: GrpcConnection, actor_id: str
) -> Transform

Retrieve an actor's world transform.

Returns:

Type Description
Transform

Transform | None: World transform, or None on failure.

Source code in src/tongsim/connection/grpc/unary_api.py
260
261
262
263
264
265
266
267
268
269
270
271
272
@staticmethod
@safe_async_rpc(default=None)
async def get_actor_transform(conn: GrpcConnection, actor_id: str) -> Transform:
    """
    Retrieve an actor's world transform.

    Returns:
        Transform | None: World transform, or ``None`` on failure.
    """
    stub = conn.get_stub(DemoRLServiceStub)
    req = GetActorTransformRequest(actor_id=_to_object_id(actor_id))
    resp: GetActorTransformResponse = await stub.GetActorTransform(req, timeout=2.0)
    return proto_to_sdk(resp.transform)

tongsim.connection.grpc.unary_api.UnaryAPI.set_actor_transform async staticmethod

set_actor_transform(
    conn: GrpcConnection,
    actor_id: bytes | str | dict,
    transform: Transform,
) -> bool

Teleport an actor to the supplied world transform (TeleportPhysics).

Returns:

Name Type Description
bool bool

True on success.

Source code in src/tongsim/connection/grpc/unary_api.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
@staticmethod
@safe_async_rpc(default=False)
async def set_actor_transform(
    conn: GrpcConnection, actor_id: bytes | str | dict, transform: Transform
) -> bool:
    """
    Teleport an actor to the supplied world transform (TeleportPhysics).

    Returns:
        bool: True on success.
    """
    stub = conn.get_stub(DemoRLServiceStub)
    req = SetActorTransformRequest(
        actor_id=_to_object_id(actor_id),
        transform=sdk_to_proto(transform),
    )
    await stub.SetActorTransform(req, timeout=2.0)
    return True

tongsim.connection.grpc.unary_api.UnaryAPI.spawn_actor async staticmethod

spawn_actor(
    conn: GrpcConnection,
    blueprint: str,
    transform: Transform,
    name: str | None = None,
    tags: list[str] | None = None,
    timeout: float = 5.0,
) -> dict | None

Spawn an actor in the Demo RL scene and return its identity information.

Returns:

Type Description
dict | None

dict | None: Dictionary with id, name and class_path.

Source code in src/tongsim/connection/grpc/unary_api.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
@staticmethod
@safe_async_rpc(default=None)
async def spawn_actor(
    conn: GrpcConnection,
    blueprint: str,
    transform: Transform,
    name: str | None = None,
    tags: list[str] | None = None,
    timeout: float = 5.0,
) -> dict | None:
    """
    Spawn an actor in the Demo RL scene and return its identity information.

    Returns:
        dict | None: Dictionary with ``id``, ``name`` and ``class_path``.
    """
    stub = conn.get_stub(DemoRLServiceStub)
    req = SpawnActorRequest(
        blueprint=blueprint,
        transform=sdk_to_proto(transform),
    )
    if name:
        req.name = name
    if tags:
        req.tags.extend(tags)

    resp: SpawnActorResponse = await stub.SpawnActor(req, timeout=timeout)
    ai = resp.actor
    return {
        "id": _fguid_bytes_to_str(ai.id.guid),
        "name": ai.name,
        "class_path": ai.class_path,
    }

tongsim.connection.grpc.unary_api.UnaryAPI.destroy_actor async staticmethod

destroy_actor(
    conn: GrpcConnection, actor_id: bytes | str | dict
) -> bool

Destroy an actor in the Demo RL scene.

Returns:

Name Type Description
bool bool

True on success.

Source code in src/tongsim/connection/grpc/unary_api.py
744
745
746
747
748
749
750
751
752
753
754
755
756
@staticmethod
@safe_async_rpc(default=False)
async def destroy_actor(conn: GrpcConnection, actor_id: bytes | str | dict) -> bool:
    """
    Destroy an actor in the Demo RL scene.

    Returns:
        bool: True on success.
    """
    stub = conn.get_stub(DemoRLServiceStub)
    req = DestroyActorRequest(actor_id=_to_object_id(actor_id))
    await stub.DestroyActor(req, timeout=2.0)
    return True

tongsim.connection.grpc.unary_api.UnaryAPI.simple_move_towards async staticmethod

simple_move_towards(
    conn: GrpcConnection,
    target_location: Vector3,
    actor_id: bytes | str | dict,
    orientation_mode: RLDemoOrientationMode = ORIENTATION_KEEP_CURRENT,
    given_forward: Vector3 | None = None,
    timeout: float = 3600.0,
    speed_uu_per_sec: float = 300.0,
    tolerance_uu: float = 5.0,
) -> tuple[dict | None, dict | None]

Move an actor toward the given world-space target using the simple mover.

Parameters:

Name Type Description Default
target_location Vector3

Destination in world coordinates.

required
actor_id bytes | str | dict

Actor identifier (FGuid bytes or GUID string).

required
orientation_mode RLDemoOrientationMode

Orientation strategy applied during movement.

ORIENTATION_KEEP_CURRENT
given_forward Vector3 | None

Forward vector used when orientation_mode is ORIENTATION_GIVEN.

None
timeout float

RPC timeout in seconds.

3600.0
speed_uu_per_sec float

Movement speed in Unreal units per second.

300.0
tolerance_uu float

Distance threshold treated as arrival.

5.0

Returns:

Type Description
tuple[dict | None, dict | None]

tuple[Vector3 | None, dict | None]: Current location and optional hit metadata with hit_actor when blocked.

Source code in src/tongsim/connection/grpc/unary_api.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
@staticmethod
@safe_async_rpc(default=(None, None))
async def simple_move_towards(
    conn: GrpcConnection,
    target_location: Vector3,
    actor_id: bytes | str | dict,
    orientation_mode: RLDemoOrientationMode = RLDemoOrientationMode.ORIENTATION_KEEP_CURRENT,
    given_forward: Vector3 | None = None,
    timeout: float = 3600.0,
    speed_uu_per_sec: float = 300.0,
    tolerance_uu: float = 5.0,
) -> tuple[dict | None, dict | None]:
    """
    Move an actor toward the given world-space target using the simple mover.

    Args:
        target_location (Vector3): Destination in world coordinates.
        actor_id (bytes | str | dict): Actor identifier (FGuid bytes or GUID string).
        orientation_mode (RLDemoOrientationMode): Orientation strategy applied during movement.
        given_forward (Vector3 | None): Forward vector used when orientation_mode is ORIENTATION_GIVEN.
        timeout (float): RPC timeout in seconds.
        speed_uu_per_sec (float): Movement speed in Unreal units per second.
        tolerance_uu (float): Distance threshold treated as arrival.

    Returns:
        tuple[Vector3 | None, dict | None]: Current location and optional hit metadata with ``hit_actor`` when blocked.
    """
    req = SimpleMoveTowardsRequest(
        actor_id=_to_object_id(actor_id),
        target_location=sdk_to_proto(target_location),
        orientation_mode=orientation_mode,
        speed_uu_per_sec=float(speed_uu_per_sec),
        tolerance_uu=float(tolerance_uu),
    )

    if (
        orientation_mode == RLDemoOrientationMode.ORIENTATION_GIVEN
        and given_forward is not None
    ):
        req.given_orientation.CopyFrom(sdk_to_proto(given_forward))

    stub = conn.get_stub(DemoRLServiceStub)
    resp: SimpleMoveTowardsResponse = await stub.SimpleMoveTowards(
        req, timeout=timeout
    )

    current_location = proto_to_sdk(resp.current_location)

    hit_result = None
    if resp.HasField("hit_result"):
        hit_result = {"hit_actor": (resp.hit_result.hit_actor)}

    return current_location, hit_result

tongsim.connection.grpc.unary_api.UnaryAPI.query_navigation_path async staticmethod

query_navigation_path(
    conn: GrpcConnection,
    start: Vector3,
    end: Vector3,
    allow_partial: bool = True,
    require_navigable_end_location: bool = False,
    cost_limit: float | None = None,
    timeout: float = 2.0,
) -> dict | None

Compute a navigation path between two world positions using the UE navigation system.

Parameters:

Name Type Description Default
start Vector3

Starting world location.

required
end Vector3

Target world location.

required
allow_partial bool

Allow returning partial paths when a full path is unavailable.

True
require_navigable_end_location bool

Enforce the end point to lie on the navmesh.

False
cost_limit float | None

Optional cost threshold; values <= 0 disable it.

None
timeout float

RPC timeout in seconds.

2.0

Returns:

Name Type Description
dict dict | None

Path data including points (list[Vector3]), is_partial (bool), path_cost (float) and path_length (float).

Source code in src/tongsim/connection/grpc/unary_api.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
@staticmethod
@safe_async_rpc(default=None)
async def query_navigation_path(
    conn: GrpcConnection,
    start: Vector3,
    end: Vector3,
    allow_partial: bool = True,
    require_navigable_end_location: bool = False,
    cost_limit: float | None = None,
    timeout: float = 2.0,
) -> dict | None:
    """
    Compute a navigation path between two world positions using the UE navigation system.

    Args:
        start (Vector3): Starting world location.
        end (Vector3): Target world location.
        allow_partial (bool): Allow returning partial paths when a full path is unavailable.
        require_navigable_end_location (bool): Enforce the end point to lie on the navmesh.
        cost_limit (float | None): Optional cost threshold; values <= 0 disable it.
        timeout (float): RPC timeout in seconds.

    Returns:
        dict: Path data including ``points`` (list[Vector3]), ``is_partial`` (bool), ``path_cost`` (float) and ``path_length`` (float).
    """
    stub = conn.get_stub(DemoRLServiceStub)
    req = QueryNavigationPathRequest(
        start=sdk_to_proto(start),
        end=sdk_to_proto(end),
        allow_partial=allow_partial,
        require_navigable_end_location=require_navigable_end_location,
    )
    if cost_limit is not None and cost_limit > 0:
        req.cost_limit = float(cost_limit)

    resp: QueryNavigationPathResponse = await stub.QueryNavigationPath(
        req, timeout=timeout
    )
    return {
        "points": [proto_to_sdk(p) for p in resp.path_points],
        "is_partial": bool(resp.is_partial),
        "path_cost": float(resp.path_cost) if hasattr(resp, "path_cost") else 0.0,
        "path_length": float(resp.path_length)
        if hasattr(resp, "path_length")
        else 0.0,
    }

tongsim.connection.grpc.unary_api.UnaryAPI.navigate_to_location async staticmethod

navigate_to_location(
    conn: GrpcConnection,
    actor_id: bytes | str | dict,
    target_location: Vector3,
    accept_radius: float,
    allow_partial: bool = True,
    speed_uu_per_sec: float | None = None,
    timeout: float = 3600.0,
) -> dict | None

Navigate a Character to a location using UE NavMesh (server-side async Reactor).

Returns:

Name Type Description
dict dict | None

success, message, final_location and is_partial.

Source code in src/tongsim/connection/grpc/unary_api.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
@staticmethod
@safe_async_rpc(default=None)
async def navigate_to_location(
    conn: GrpcConnection,
    actor_id: bytes | str | dict,
    target_location: Vector3,
    accept_radius: float,
    allow_partial: bool = True,
    speed_uu_per_sec: float | None = None,
    timeout: float = 3600.0,
) -> dict | None:
    """
    Navigate a Character to a location using UE NavMesh (server-side async Reactor).

    Returns:
        dict: ``success``, ``message``, ``final_location`` and ``is_partial``.
    """
    stub = conn.get_stub(DemoRLServiceStub)
    req = NavigateToLocationRequest(
        actor_id=_to_object_id(actor_id),
        target_location=sdk_to_proto(target_location),
        accept_radius=float(accept_radius),
        allow_partial=bool(allow_partial),
    )
    if speed_uu_per_sec is not None:
        req.speed_uu_per_sec = float(speed_uu_per_sec)
    resp: NavigateToLocationResponse = await stub.NavigateToLocation(
        req, timeout=timeout
    )
    return {
        "success": bool(resp.success),
        "message": str(resp.message),
        "final_location": proto_to_sdk(resp.final_location),
        "is_partial": bool(resp.is_partial),
    }

tongsim.connection.grpc.unary_api.UnaryAPI.pick_up_object async staticmethod

pick_up_object(
    conn: GrpcConnection,
    actor_id: bytes | str | dict,
    target_object_id: bytes | str | dict,
    target_object_location: Vector3 | None = None,
    hand: RLDemoHandType = HAND_RIGHT,
    timeout: float = 5.0,
) -> dict

Request the UE server to pick up the specified target actor.

Source code in src/tongsim/connection/grpc/unary_api.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
@staticmethod
@safe_async_rpc(default={"success": False, "message": ""})
async def pick_up_object(
    conn: GrpcConnection,
    actor_id: bytes | str | dict,
    target_object_id: bytes | str | dict,
    target_object_location: Vector3 | None = None,
    hand: RLDemoHandType = RLDemoHandType.HAND_RIGHT,
    timeout: float = 5.0,
) -> dict:
    """
    Request the UE server to pick up the specified target actor.
    """
    stub = conn.get_stub(DemoRLServiceStub)
    req = PickUpObjectRequest(
        actor_id=_to_object_id(actor_id),
        hand=int(hand),
        target_object_id=_to_object_id(target_object_id),
    )
    if target_object_location is not None:
        req.target_object_location.CopyFrom(sdk_to_proto(target_object_location))
    resp: PickUpObjectResponse = await stub.PickUpObject(req, timeout=timeout)
    return {"success": bool(resp.success), "message": str(resp.message)}

tongsim.connection.grpc.unary_api.UnaryAPI.drop_object async staticmethod

drop_object(
    conn: GrpcConnection,
    actor_id: bytes | str | dict,
    target_drop_location: Vector3,
    hand: RLDemoHandType = HAND_RIGHT,
    enable_physics: bool = False,
    timeout: float = 5.0,
) -> dict

Request the UE server to drop an object (placeholder Reactor endpoint).

Source code in src/tongsim/connection/grpc/unary_api.py
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
@staticmethod
@safe_async_rpc(default={"success": False, "message": ""})
async def drop_object(
    conn: GrpcConnection,
    actor_id: bytes | str | dict,
    target_drop_location: Vector3,
    hand: RLDemoHandType = RLDemoHandType.HAND_RIGHT,
    enable_physics: bool = False,
    timeout: float = 5.0,
) -> dict:
    """
    Request the UE server to drop an object (placeholder Reactor endpoint).
    """
    stub = conn.get_stub(DemoRLServiceStub)
    req = DropObjectRequest(
        actor_id=_to_object_id(actor_id),
        hand=int(hand),
        target_drop_location=sdk_to_proto(target_drop_location),
        enable_physics=bool(enable_physics),
    )
    resp: DropObjectResponse = await stub.DropObject(req, timeout=timeout)
    return {"success": bool(resp.success), "message": str(resp.message)}

tongsim.connection.grpc.unary_api.UnaryAPI.exec_console_command async staticmethod

exec_console_command(
    conn: GrpcConnection,
    command: str,
    write_to_log: bool = True,
    timeout: float = 2.0,
) -> bool

Execute a UE console command such as stat fps or r.Streaming.PoolSize 4000.

Returns:

Name Type Description
bool bool

True when the command was accepted (console output text is unavailable).

Source code in src/tongsim/connection/grpc/unary_api.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
@staticmethod
@safe_async_rpc(default=False)
async def exec_console_command(
    conn: GrpcConnection,
    command: str,
    write_to_log: bool = True,
    timeout: float = 2.0,
) -> bool:
    """
    Execute a UE console command such as ``stat fps`` or ``r.Streaming.PoolSize 4000``.

    Returns:
        bool: True when the command was accepted (console output text is unavailable).
    """
    stub = conn.get_stub(DemoRLServiceStub)
    req = ExecConsoleCommandRequest(command=command, write_to_log=write_to_log)
    resp: ExecConsoleCommandResponse = await stub.ExecConsoleCommand(
        req, timeout=timeout
    )
    return bool(resp.success)

tongsim.connection.grpc.unary_api.UnaryAPI.single_line_trace_by_object async staticmethod

single_line_trace_by_object(
    conn: GrpcConnection,
    jobs: list[dict],
    timeout: float = 5.0,
) -> list[dict]

Run batch SingleLineTraceByObject requests and return hit summaries.

Parameters:

Name Type Description Default
jobs list[dict]

Each job describes start/end vectors, collision object types, optional trace_complex flag and actors_to_ignore collection.

required
timeout float

RPC timeout in seconds.

5.0

Returns:

Type Description
list[dict]

list[dict]: Per-job results including job_index, blocking_hit, distance, impact_point and optional actor_state.

Source code in src/tongsim/connection/grpc/unary_api.py
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
@staticmethod
@safe_async_rpc(default=[])
async def single_line_trace_by_object(
    conn: GrpcConnection,
    jobs: list[dict],
    timeout: float = 5.0,
) -> list[dict]:
    """
    Run batch SingleLineTraceByObject requests and return hit summaries.

    Args:
        jobs (list[dict]): Each job describes ``start``/``end`` vectors, collision object types,
            optional ``trace_complex`` flag and ``actors_to_ignore`` collection.
        timeout (float): RPC timeout in seconds.

    Returns:
        list[dict]: Per-job results including ``job_index``, ``blocking_hit``, ``distance``, ``impact_point``
            and optional ``actor_state``.
    """
    req = BatchSingleLineTraceByObjectRequest()
    for j in jobs:
        job = req.jobs.add()
        job.start.CopyFrom(sdk_to_proto(j["start"]))
        job.end.CopyFrom(sdk_to_proto(j["end"]))
        for ot in j.get("object_types", []):
            job.object_types.append(int(ot))
        if "trace_complex" in j and j["trace_complex"] is not None:
            job.trace_complex = bool(j["trace_complex"])
        for ig in j.get("actors_to_ignore", []) or []:
            job.actors_to_ignore.add().CopyFrom(_to_object_id(ig))

    stub = conn.get_stub(DemoRLServiceStub)
    resp = await stub.BatchSingleLineTraceByObject(req, timeout=timeout)

    out: list[dict] = []
    for r in resp.results:
        item = {
            "job_index": int(r.job_index),
            "blocking_hit": bool(r.blocking_hit),
            "distance": float(r.distance),
            "impact_point": proto_to_sdk(r.impact_point),
        }
        if r.HasField("actor_state"):
            item["actor_state"] = _actor_state_to_dict(r.actor_state)
        out.append(item)
    return out

tongsim.connection.grpc.unary_api.UnaryAPI.multi_line_trace_by_object async staticmethod

multi_line_trace_by_object(
    conn: GrpcConnection,
    jobs: list[dict],
    timeout: float = 5.0,
    *,
    enable_debug_draw: bool = False
) -> list[dict]

Run batch MultiLineTraceByObject requests and collect ordered hit lists.

Parameters:

Name Type Description Default
jobs list[dict]

Each job describes start/end vectors, collision object types, optional trace_complex flag and actors_to_ignore collection.

required
timeout float

RPC timeout in seconds.

5.0
enable_debug_draw bool

Whether to render debug lines in UE.

False

Returns:

Type Description
list[dict]

list[dict]: Per-job results with job_index and hits entries containing distance, impact_point, impact_normal and optional actor_state.

Source code in src/tongsim/connection/grpc/unary_api.py
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
@staticmethod
@safe_async_rpc(default=[])
async def multi_line_trace_by_object(
    conn: GrpcConnection,
    jobs: list[dict],
    timeout: float = 5.0,
    *,
    enable_debug_draw: bool = False,
) -> list[dict]:
    """
    Run batch MultiLineTraceByObject requests and collect ordered hit lists.

    Args:
        jobs (list[dict]): Each job describes ``start``/``end`` vectors, collision object types,
            optional ``trace_complex`` flag and ``actors_to_ignore`` collection.
        timeout (float): RPC timeout in seconds.
        enable_debug_draw (bool): Whether to render debug lines in UE.

    Returns:
        list[dict]: Per-job results with ``job_index`` and ``hits`` entries containing ``distance``,
            ``impact_point``, ``impact_normal`` and optional ``actor_state``.
    """
    req = BatchMultiLineTraceByObjectRequest()
    req.enable_debug_draw = bool(enable_debug_draw)
    for j in jobs:
        job = req.jobs.add()
        job.start.CopyFrom(sdk_to_proto(j["start"]))
        job.end.CopyFrom(sdk_to_proto(j["end"]))
        for ot in j.get("object_types", []):
            job.object_types.append(int(ot))
        if "trace_complex" in j and j["trace_complex"] is not None:
            job.trace_complex = bool(j["trace_complex"])
        for ig in j.get("actors_to_ignore", []) or []:
            job.actors_to_ignore.add().CopyFrom(_to_object_id(ig))

    stub = conn.get_stub(DemoRLServiceStub)
    resp = await stub.BatchMultiLineTraceByObject(req, timeout=timeout)

    out: list[dict] = []
    for r in resp.results:
        item = {"job_index": int(r.job_index), "hits": []}
        for h in r.hits:
            hit = {
                "distance": float(h.distance),
                "impact_point": proto_to_sdk(h.impact_point),
                "impact_normal": proto_to_sdk(h.impact_normal),
            }
            if hasattr(h, "actor_state") and h.HasField("actor_state"):
                hit["actor_state"] = _actor_state_to_dict(h.actor_state)
            item["hits"].append(hit)
        out.append(item)
    return out