Skip to content

Runtime

This section introduces the core runtime building blocks that every TongSim Python session relies on.

  • TongSim exposes a synchronous, user-friendly facade that bootstraps WorldContext and offers high-level helpers.
  • WorldContext owns the dedicated AsyncLoop, gRPC connections, and the overall lifecycle management for a running session.
  • AsyncLoop wraps an asyncio event loop inside a background thread so SDK code can drive asynchronous calls safely from synchronous workflows.
  • Runtime helpers also include basic logging setup and version reporting.

References

TongSim

tongsim.tongsim.TongSim

High-level SDK entry point for controlling a connected TongSim UE instance. All methods expose synchronous, blocking interfaces for scripts or synchronous applications.

Source code in src/tongsim/tongsim.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class TongSim:
    """
    High-level SDK entry point for controlling a connected TongSim UE instance.
    All methods expose synchronous, blocking interfaces for scripts or
    synchronous applications.
    """

    def __init__(self, grpc_endpoint: str = "127.0.0.1:5726"):
        """
        Create a TongSim runtime binding.

        Args:
            grpc_endpoint (str): gRPC endpoint of the UE server, for example
                "localhost:5726".
        """
        self._context: Final[WorldContext] = WorldContext(grpc_endpoint)
        self._utils: Final[UtilFuncs] = UtilFuncs(self._context)

    @property
    def utils(self) -> UtilFuncs:
        """
        Return helper utilities that wrap frequently used runtime operations.

        Returns:
            UtilFuncs: Helper wrapper exposing convenience functions.
        """
        return self._utils

    @property
    def context(self) -> WorldContext:
        """
        Access the runtime context that manages connections, event loop and
        task dispatch.

        Returns:
            WorldContext: Context object owning the async loop and gRPC
                resources.
        """
        return self._context

    def close(self):
        """Shut down the current runtime and release all managed resources."""
        self._context.release()

    def __enter__(self):
        """Support ``with`` statements."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Support ``with`` statements."""
        self.close()

utils property

utils: UtilFuncs

Return helper utilities that wrap frequently used runtime operations.

Returns:

Name Type Description
UtilFuncs UtilFuncs

Helper wrapper exposing convenience functions.

context property

context: WorldContext

Access the runtime context that manages connections, event loop and task dispatch.

Returns:

Name Type Description
WorldContext WorldContext

Context object owning the async loop and gRPC resources.

close

close()

Shut down the current runtime and release all managed resources.

Source code in src/tongsim/tongsim.py
56
57
58
def close(self):
    """Shut down the current runtime and release all managed resources."""
    self._context.release()

WorldContext

tongsim.core.world_context.WorldContext

Aggregate runtime resources for a TongSim session.

Responsibilities: - Manage the dedicated AsyncLoop. - Hold the gRPC connection (GrpcConnection and LegacyGrpcStreamClient).

Notes
  • All owned resources are closed automatically during teardown.
Source code in src/tongsim/core/world_context.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class WorldContext:
    """
    Aggregate runtime resources for a TongSim session.

    Responsibilities:
    - Manage the dedicated AsyncLoop.
    - Hold the gRPC connection (GrpcConnection and LegacyGrpcStreamClient).

    Notes:
        - All owned resources are closed automatically during teardown.
    """

    def __init__(self, grpc_endpoint: str):
        self._uuid: Final[uuid.UUID] = uuid.uuid4()
        self._loop: Final[AsyncLoop] = AsyncLoop(name=f"world-main-loop-{self._uuid}")
        self._loop.start()

        self._conn: Final[GrpcConnection]

        # Ensure stubs are initialised on the AsyncLoop so gRPC sees the same loop.
        self.sync_run(self._async_init_grpc(grpc_endpoint))

        _logger.debug(f"[WorldContext {self._uuid}] started.")
        self._is_shutdown: bool = False

    # TODO: classmethod
    async def _async_init_grpc(self, grpc_endpoint: str):
        self._conn = GrpcConnection(grpc_endpoint)

    @property
    def uuid(self) -> str:
        """Short identifier (first eight characters) for this world instance."""
        return str(self._uuid)[:8]

    @property
    def loop(self) -> AsyncLoop:
        """Primary AsyncLoop used for background scheduling."""
        return self._loop

    @property
    def conn(self) -> GrpcConnection:
        """Underlying gRPC connection."""
        return self._conn

    def sync_run(self, coro: Awaitable, timeout: float | None = None) -> Any:
        """
        Execute an async coroutine on the loop and wait for it synchronously.

        Args:
            coro (Awaitable): Coroutine to run.
            timeout (float | None): Optional timeout in seconds. Raises TimeoutError
                if exceeded.

        Returns:
            Any: Result returned by the coroutine.
        """
        if threading.current_thread() is self._loop.thread:
            raise RuntimeError(
                f"Cannot call `sync_run` from the same thread as AsyncLoop [{self._loop.name}] - this would cause a deadlock."
            )

        return self._loop.spawn(
            coro, name=f"[World-Context {self.uuid} sync task]"
        ).result(timeout=timeout)

    def async_task(self, coro: Awaitable[Any], name: str) -> Future[Any]:
        """Schedule a coroutine on the loop without waiting for completion."""
        return self._loop.spawn(coro, name=name)

    def release(self):
        """
        Release all managed resources:
        - cancel outstanding tasks
        - close the gRPC connection
        - stop the event loop
        """
        if self._is_shutdown:
            return
        self._is_shutdown = True

        _logger.debug(f"[WorldContext {self._uuid}] releasing...")

        try:
            self._loop.cancel_tasks(timeout=1.0)
            self._loop.spawn(
                self._conn.aclose(),
                name=f"WorldContext {self.uuid} release gRPC connection.",
            ).result(timeout=1.0)
        except Exception as e:
            _logger.warning(
                f"[WorldContext {self._uuid}] failed to release cleanly: {e}"
            )

        self._loop.stop()
        _logger.debug(f"[WorldContext {self._uuid}] release complete.")

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

    def __del__(self):
        _logger.debug(f"[WorldContext {self._uuid}] gc.")
        self.release()

uuid property

uuid: str

Short identifier (first eight characters) for this world instance.

loop property

loop: AsyncLoop

Primary AsyncLoop used for background scheduling.

conn property

Underlying gRPC connection.

sync_run

sync_run(
    coro: Awaitable, timeout: float | None = None
) -> Any

Execute an async coroutine on the loop and wait for it synchronously.

Parameters:

Name Type Description Default
coro Awaitable

Coroutine to run.

required
timeout float | None

Optional timeout in seconds. Raises TimeoutError if exceeded.

None

Returns:

Name Type Description
Any Any

Result returned by the coroutine.

Source code in src/tongsim/core/world_context.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def sync_run(self, coro: Awaitable, timeout: float | None = None) -> Any:
    """
    Execute an async coroutine on the loop and wait for it synchronously.

    Args:
        coro (Awaitable): Coroutine to run.
        timeout (float | None): Optional timeout in seconds. Raises TimeoutError
            if exceeded.

    Returns:
        Any: Result returned by the coroutine.
    """
    if threading.current_thread() is self._loop.thread:
        raise RuntimeError(
            f"Cannot call `sync_run` from the same thread as AsyncLoop [{self._loop.name}] - this would cause a deadlock."
        )

    return self._loop.spawn(
        coro, name=f"[World-Context {self.uuid} sync task]"
    ).result(timeout=timeout)

async_task

async_task(coro: Awaitable[Any], name: str) -> Future[Any]

Schedule a coroutine on the loop without waiting for completion.

Source code in src/tongsim/core/world_context.py
88
89
90
def async_task(self, coro: Awaitable[Any], name: str) -> Future[Any]:
    """Schedule a coroutine on the loop without waiting for completion."""
    return self._loop.spawn(coro, name=name)

release

release()

Release all managed resources: - cancel outstanding tasks - close the gRPC connection - stop the event loop

Source code in src/tongsim/core/world_context.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def release(self):
    """
    Release all managed resources:
    - cancel outstanding tasks
    - close the gRPC connection
    - stop the event loop
    """
    if self._is_shutdown:
        return
    self._is_shutdown = True

    _logger.debug(f"[WorldContext {self._uuid}] releasing...")

    try:
        self._loop.cancel_tasks(timeout=1.0)
        self._loop.spawn(
            self._conn.aclose(),
            name=f"WorldContext {self.uuid} release gRPC connection.",
        ).result(timeout=1.0)
    except Exception as e:
        _logger.warning(
            f"[WorldContext {self._uuid}] failed to release cleanly: {e}"
        )

    self._loop.stop()
    _logger.debug(f"[WorldContext {self._uuid}] release complete.")

AsyncLoop

tongsim.core.async_loop.AsyncLoop

Wrap an asyncio event loop that lives on a dedicated background thread and exposes a TaskGroup for scheduling.

Features: - Persistent background thread hosting the event loop. - Managed asyncio.TaskGroup for business coroutines.

Source code in src/tongsim/core/async_loop.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class AsyncLoop:
    """
    Wrap an asyncio event loop that lives on a dedicated background thread and
    exposes a TaskGroup for scheduling.

    Features:
    - Persistent background thread hosting the event loop.
    - Managed asyncio.TaskGroup for business coroutines.
    """

    def __init__(self, name: str = "AsyncLoop") -> None:
        """
        Initialise the AsyncLoop wrapper.

        Args:
            name: Identifier used for the loop thread and logging.
        """
        self._name = name
        self._loop: asyncio.AbstractEventLoop | None = None
        self._thread: threading.Thread | None = None
        self._group_ready = threading.Event()
        self._main_task: asyncio.Task[Any] | None = None
        self._task_group: asyncio.TaskGroup | None = None
        self._business_tasks: set[asyncio.Task[Any]] = (
            set()
        )  # Track tasks spawned for application work.

    @property
    def thread(self) -> threading.Thread:
        return self._thread

    @property
    def loop(self) -> asyncio.AbstractEventLoop:
        return self._loop

    @property
    def name(self) -> str:
        return self._name

    def start(self, timeout: float = 1.0) -> None:
        """
        Launch the background thread and event loop.

        Args:
            timeout: Maximum time to wait for the loop and TaskGroup to become ready.

        Raises:
            RuntimeError: If the loop fails to start within the timeout.
        """
        if self.is_running():
            raise RuntimeError(f"[AsyncLoop {self._name}] already running.")

        self._thread = threading.Thread(target=self._run, name=self._name, daemon=True)
        self._thread.start()

        if not self._group_ready.wait(timeout):
            raise RuntimeError(f"[AsyncLoop {self._name}] timeout starting event loop.")

        _logger.debug(f"[AsyncLoop {self._name}] started.")

    def _run(self) -> None:
        """Background thread body: create and drive the event loop."""
        self._loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self._loop)  # Bind loop to the current thread.
        self._main_task = self._loop.create_task(self._main(), name="__main_task__")
        try:
            self._loop.run_forever()
        finally:
            self._loop.close()
            _logger.debug(f"[AsyncLoop {self._name}] loop closed.")

    async def _main(self) -> None:
        """Main coroutine: keep the TaskGroup alive until shutdown."""
        try:
            async with asyncio.TaskGroup() as tg:
                self._task_group = tg
                self._group_ready.set()
                await asyncio.Future()  # Keep running; cancellation ends the loop.
        except asyncio.CancelledError:
            _logger.debug(
                f"[AsyncLoop {self._name}] main task cancelled; shutting down TaskGroup."
            )
        finally:
            assert self._loop is not None
            self._loop.call_soon_threadsafe(self._loop.stop)

    def spawn(self, coro: Awaitable[Any], name: str = "") -> Future[Any]:
        """
        Submit a coroutine to the TaskGroup.

        Args:
            coro: Coroutine object to execute.
            name: Optional name for logging.

        Returns:
            Future: A concurrent.futures.Future mirroring coroutine completion or
                raising the underlying exception.
        """
        if not (self._loop and self._task_group):
            raise RuntimeError(f"[AsyncLoop {self._name}] not started.")

        outer: Future[Any] = Future()

        def _schedule() -> None:
            task: asyncio.Task[Any] = self._task_group.create_task(coro, name=name)
            self._business_tasks.add(task)

            def _on_done(t: asyncio.Task[Any]) -> None:
                self._business_tasks.discard(t)
                if t.cancelled():
                    outer.cancel()
                else:
                    exc = t.exception()
                    if exc:
                        _logger.exception(
                            f"[AsyncLoop {self._name}] Task {name!r} raised: {exc}"
                        )
                        outer.set_exception(exc)
                        # Bubble the exception so the TaskGroup cancels outstanding work.
                        assert self._main_task is not None
                        self._main_task.cancel()
                    else:
                        outer.set_result(t.result())

            task.add_done_callback(_on_done)

        self._loop.call_soon_threadsafe(_schedule)
        return outer

    def cancel_tasks(self, timeout: float) -> None:
        """
        Cancel all application tasks that were spawned via ``spawn``.

        Args:
            timeout: Maximum time to wait for cancellation to finish.
        """
        if not self.is_running():
            return

        future = asyncio.run_coroutine_threadsafe(self._cancel_tasks_seq(), self._loop)
        try:
            future.result(timeout)
        except FutureTimeoutError:
            _logger.warning(f"[AsyncLoop {self._name}] cancel_tasks timeout.")

    async def _cancel_tasks_seq(self) -> None:
        """Internal helper: cancel tracked business tasks on the loop thread."""
        _logger.debug(
            f"[AsyncLoop {self._name}] cancelling {len(self._business_tasks)} business task(s)."
        )
        if not self._business_tasks:
            return

        tasks = list(self._business_tasks)
        for task in tasks:
            task.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)
        self._business_tasks.clear()

    def stop(self, timeout: float = 5.0) -> None:
        """
        Gracefully stop the AsyncLoop:
        cancel business tasks -> cancel main task -> stop loop -> join thread.

        Args:
            timeout: Maximum time to wait for shutdown.
        """
        if not self.is_running():
            return

        # Cancelling the main TaskGroup will cascade to outstanding tasks.
        assert self._main_task is not None and self._loop is not None
        self._loop.call_soon_threadsafe(self._main_task.cancel)

        self._thread.join(timeout)
        if self._thread.is_alive():
            _logger.warning(f"AsyncLoop '{self._name}' did not exit cleanly.")
        self._thread = None

    def is_running(self) -> bool:
        """
        Check whether the loop thread is alive.

        Returns:
            bool: ``True`` when the loop is running, otherwise ``False``.
        """
        return bool(self._thread and self._thread.is_alive())

    def log_task_list(self) -> None:
        """Log all tasks currently known to the loop for diagnostics."""
        if not (self._loop and self._task_group):
            return
        task_list = asyncio.all_tasks(self._loop)
        _logger.warning(f"[AsyncLoop {self._name}] {len(task_list)} active task(s):")
        for task in task_list:
            state = (
                "cancelled"
                if task.cancelled()
                else "done"
                if task.done()
                else "pending"
            )
            detail = ""
            if task.done() and (exc := task.exception()):
                detail = f"  exception: {type(exc).__name__}: {exc}"
            coro = task.get_coro()
            _logger.warning(
                f"  - {task.get_name()} [{state}]{detail} | coro={coro.__name__ if hasattr(coro, '__name__') else coro}"
            )

    def __del__(self) -> None:
        """Ensure resources are released when the loop object is garbage collected."""
        _logger.debug(f"[AsyncLoop {self._name}] __del__ called, attempting cleanup.")
        with contextlib.suppress(Exception):
            self.stop()

start

start(timeout: float = 1.0) -> None

Launch the background thread and event loop.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for the loop and TaskGroup to become ready.

1.0

Raises:

Type Description
RuntimeError

If the loop fails to start within the timeout.

Source code in src/tongsim/core/async_loop.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def start(self, timeout: float = 1.0) -> None:
    """
    Launch the background thread and event loop.

    Args:
        timeout: Maximum time to wait for the loop and TaskGroup to become ready.

    Raises:
        RuntimeError: If the loop fails to start within the timeout.
    """
    if self.is_running():
        raise RuntimeError(f"[AsyncLoop {self._name}] already running.")

    self._thread = threading.Thread(target=self._run, name=self._name, daemon=True)
    self._thread.start()

    if not self._group_ready.wait(timeout):
        raise RuntimeError(f"[AsyncLoop {self._name}] timeout starting event loop.")

    _logger.debug(f"[AsyncLoop {self._name}] started.")

spawn

spawn(coro: Awaitable[Any], name: str = '') -> Future[Any]

Submit a coroutine to the TaskGroup.

Parameters:

Name Type Description Default
coro Awaitable[Any]

Coroutine object to execute.

required
name str

Optional name for logging.

''

Returns:

Name Type Description
Future Future[Any]

A concurrent.futures.Future mirroring coroutine completion or raising the underlying exception.

Source code in src/tongsim/core/async_loop.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def spawn(self, coro: Awaitable[Any], name: str = "") -> Future[Any]:
    """
    Submit a coroutine to the TaskGroup.

    Args:
        coro: Coroutine object to execute.
        name: Optional name for logging.

    Returns:
        Future: A concurrent.futures.Future mirroring coroutine completion or
            raising the underlying exception.
    """
    if not (self._loop and self._task_group):
        raise RuntimeError(f"[AsyncLoop {self._name}] not started.")

    outer: Future[Any] = Future()

    def _schedule() -> None:
        task: asyncio.Task[Any] = self._task_group.create_task(coro, name=name)
        self._business_tasks.add(task)

        def _on_done(t: asyncio.Task[Any]) -> None:
            self._business_tasks.discard(t)
            if t.cancelled():
                outer.cancel()
            else:
                exc = t.exception()
                if exc:
                    _logger.exception(
                        f"[AsyncLoop {self._name}] Task {name!r} raised: {exc}"
                    )
                    outer.set_exception(exc)
                    # Bubble the exception so the TaskGroup cancels outstanding work.
                    assert self._main_task is not None
                    self._main_task.cancel()
                else:
                    outer.set_result(t.result())

        task.add_done_callback(_on_done)

    self._loop.call_soon_threadsafe(_schedule)
    return outer

cancel_tasks

cancel_tasks(timeout: float) -> None

Cancel all application tasks that were spawned via spawn.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for cancellation to finish.

required
Source code in src/tongsim/core/async_loop.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def cancel_tasks(self, timeout: float) -> None:
    """
    Cancel all application tasks that were spawned via ``spawn``.

    Args:
        timeout: Maximum time to wait for cancellation to finish.
    """
    if not self.is_running():
        return

    future = asyncio.run_coroutine_threadsafe(self._cancel_tasks_seq(), self._loop)
    try:
        future.result(timeout)
    except FutureTimeoutError:
        _logger.warning(f"[AsyncLoop {self._name}] cancel_tasks timeout.")

stop

stop(timeout: float = 5.0) -> None

Gracefully stop the AsyncLoop: cancel business tasks -> cancel main task -> stop loop -> join thread.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for shutdown.

5.0
Source code in src/tongsim/core/async_loop.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def stop(self, timeout: float = 5.0) -> None:
    """
    Gracefully stop the AsyncLoop:
    cancel business tasks -> cancel main task -> stop loop -> join thread.

    Args:
        timeout: Maximum time to wait for shutdown.
    """
    if not self.is_running():
        return

    # Cancelling the main TaskGroup will cascade to outstanding tasks.
    assert self._main_task is not None and self._loop is not None
    self._loop.call_soon_threadsafe(self._main_task.cancel)

    self._thread.join(timeout)
    if self._thread.is_alive():
        _logger.warning(f"AsyncLoop '{self._name}' did not exit cleanly.")
    self._thread = None

is_running

is_running() -> bool

Check whether the loop thread is alive.

Returns:

Name Type Description
bool bool

True when the loop is running, otherwise False.

Source code in src/tongsim/core/async_loop.py
209
210
211
212
213
214
215
216
def is_running(self) -> bool:
    """
    Check whether the loop thread is alive.

    Returns:
        bool: ``True`` when the loop is running, otherwise ``False``.
    """
    return bool(self._thread and self._thread.is_alive())

log_task_list

log_task_list() -> None

Log all tasks currently known to the loop for diagnostics.

Source code in src/tongsim/core/async_loop.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def log_task_list(self) -> None:
    """Log all tasks currently known to the loop for diagnostics."""
    if not (self._loop and self._task_group):
        return
    task_list = asyncio.all_tasks(self._loop)
    _logger.warning(f"[AsyncLoop {self._name}] {len(task_list)} active task(s):")
    for task in task_list:
        state = (
            "cancelled"
            if task.cancelled()
            else "done"
            if task.done()
            else "pending"
        )
        detail = ""
        if task.done() and (exc := task.exception()):
            detail = f"  exception: {type(exc).__name__}: {exc}"
        coro = task.get_coro()
        _logger.warning(
            f"  - {task.get_name()} [{state}]{detail} | coro={coro.__name__ if hasattr(coro, '__name__') else coro}"
        )

Logging

tongsim.logger.initialize_logger

initialize_logger(
    level: int = INFO,
    log_to_file: bool = False,
    log_dir: str = "logs",
)

Configure the default log level and file output options. Call once at program entry.

:param level: Default log level (e.g. logging.INFO). :param log_to_file: Whether to write logs to a file. :param log_dir: Directory for log files (default: logs/).

Source code in src/tongsim/logger.py
102
103
104
105
106
107
108
109
110
111
112
def initialize_logger(
    level: int = logging.INFO, log_to_file: bool = False, log_dir: str = "logs"
):
    """
    Configure the default log level and file output options. Call once at program entry.

    :param level: Default log level (e.g. `logging.INFO`).
    :param log_to_file: Whether to write logs to a file.
    :param log_dir: Directory for log files (default: `logs/`).
    """
    _logger_manager.configure(level, log_to_file, log_dir)

tongsim.logger.set_log_level

set_log_level(module: str, level: int)

Set the log level for a given module.

:param module: Module name. :param level: Log level, e.g. logging.DEBUG or logging.ERROR.

Source code in src/tongsim/logger.py
124
125
126
127
128
129
130
131
def set_log_level(module: str, level: int):
    """
    Set the log level for a given module.

    :param module: Module name.
    :param level: Log level, e.g. `logging.DEBUG` or `logging.ERROR`.
    """
    _logger_manager.set_module_level(module, level)

tongsim.logger.get_logger

get_logger(module: str) -> Logger

Get a module logger. Prefix format: [TongSim_Lite][<module>] <message>.

:param module: Module name.

Source code in src/tongsim/logger.py
115
116
117
118
119
120
121
def get_logger(module: str) -> logging.Logger:
    """
    Get a module logger. Prefix format: `[TongSim_Lite][<module>] <message>`.

    :param module: Module name.
    """
    return _logger_manager.get_logger(module)

Version

tongsim.version.get_version_info

get_version_info() -> str

Get version/runtime information for TongSIM Lite.

Returns:

Name Type Description
str str

A multi-line formatted string describing the current version state.

Source code in src/tongsim/version.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def get_version_info() -> str:
    """
    Get version/runtime information for TongSIM Lite.

    Returns:
        str: A multi-line formatted string describing the current version state.
    """

    info = {
        "tongsim_lite version": VERSION,
        "python version": sys.version.replace("\n", " "),
        "platform": platform.platform(),
    }

    return "\n".join(f"{k:<20}: {v}" for k, v in info.items())