Skip to content

🦾 Action 基类方法

tongsim.entity.action.base

ActionBase dataclass

Bases: ABC

动作抽象基类。

每个 Action 将由若干个 Animation 组成,submit() 只负责提交,不阻塞等待结果。 结果收集与异常处理将在 collect_results() 中统一完成。

子类必须实现 __call__() 方法,用于描述 Action 的 Animation 组成。

Source code in src\tongsim\entity\action\base.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@dataclass
class ActionBase(ABC):
    """
    动作抽象基类。

    每个 Action 将由若干个 Animation 组成,`submit()` 只负责提交,不阻塞等待结果。
    结果收集与异常处理将在 `collect_results()` 中统一完成。

    子类必须实现 `__call__()` 方法,用于描述 Action 的 Animation 组成。
    """

    _anim_ids: list[int] = field(default_factory=list, init=False)
    _streamer: AnimationStreamer | None = field(default=None, init=False)
    _context: WorldContext | None = field(default=None, init=False)
    _action_ability: AgentActionAbility | None = field(default=None, init=False)
    _track_result = False

    @property
    def anim_ids(self):
        return self._anim_ids

    @property
    def streamer(self):
        return self._streamer

    @abstractmethod
    async def execute(self) -> None:
        """
        异步执行动作,将 Animation 提交到 streamer 中,不阻塞等待结果。
        """
        raise NotImplementedError("Subclasses must implement the `execute` method.")

    @abstractmethod
    def validate(self) -> None:
        """
        验证 Action 的输入参数。

        子类可覆盖此方法以实现具体的参数校验逻辑。
        """
        raise NotImplementedError("Subclasses must implement the `validate` method.")

    def initialize(
        self,
        action_ability: AgentActionAbility,
        streamer: AnimationStreamer,
        context: WorldContext,
        track_result: bool,
    ) -> None:
        """
        设置 AnimationStreamer,用于提交动画命令。
        """
        self._streamer = streamer
        self._context = context
        self._track_result = track_result
        self._action_ability = action_ability

    async def submit(self, cmd_spec: CommandSpec) -> None:
        """
        提交单个 Animation 命令,并记录其 command_id。
        """
        if not self._streamer:
            raise RuntimeError("Streamer not set for action.")

        try:
            command_id = await self._streamer.submit(cmd_spec, self._track_result)
            self._anim_ids.append(command_id)
            _logger.debug(
                f"Action {self.__class__.__name__}: Submitted animation command {command_id}"
            )

        except Exception as e:
            tag = getattr(cmd_spec, "tag", "<unknown>")
            _logger.error(f"Error in submitting command {tag}, error_msg: {e}")

    async def wait_any_started(self) -> AnimResultInfo:
        """
        等待本 Action 中任意一个 Animation 的 BEGIN 阶段到达。

        Returns:
            AnimResultInfo: 最先开始的动画信息。
        """
        if not self._streamer:
            raise RuntimeError("Streamer not set for action.")

        return await self._streamer.wait_any_begin(self._anim_ids)

    async def collect_results(
        self, cancel_on_error: bool = False
    ) -> list[AnimResultInfo]:
        """
        收集当前 Action 中所有 Animation 的结果。

        Args:
            cancel_on_error (bool): 若为 True,当任一 Animation 发生错误时立即取消剩余动画。

        Returns:
            List[AnimResultInfo]: 每个 Animation 的执行结果。
        """
        if not self._streamer:
            raise RuntimeError("Streamer not set for action.")

        # TODO:  cancel_on_error

        results: list[AnimResultInfo] = await self._streamer.wait_all_end(
            self._anim_ids
        )
        self._anim_ids.clear()
        return results

    async def _cancel_pending(self) -> None:
        """
        取消当前 Action 内所有未完成的 Animation。
        """
        if not self._streamer:
            raise RuntimeError("Streamer not set for action.")
        # TODO:
        raise NotImplementedError("Cancellation logic is not implemented.")

    async def run(self) -> None:
        """
        统一执行入口。

        - 首先进行参数验证。
        - 然后执行 Animation 提交。
        """
        try:
            self.validate()
            await self.execute()
        except Exception as e:
            _logger.error(f"Action {self.__class__.__name__} execution failed: {e}")
            raise

execute abstractmethod async

execute() -> None

异步执行动作,将 Animation 提交到 streamer 中,不阻塞等待结果。

Source code in src\tongsim\entity\action\base.py
43
44
45
46
47
48
@abstractmethod
async def execute(self) -> None:
    """
    异步执行动作,将 Animation 提交到 streamer 中,不阻塞等待结果。
    """
    raise NotImplementedError("Subclasses must implement the `execute` method.")

validate abstractmethod

validate() -> None

验证 Action 的输入参数。

子类可覆盖此方法以实现具体的参数校验逻辑。

Source code in src\tongsim\entity\action\base.py
50
51
52
53
54
55
56
57
@abstractmethod
def validate(self) -> None:
    """
    验证 Action 的输入参数。

    子类可覆盖此方法以实现具体的参数校验逻辑。
    """
    raise NotImplementedError("Subclasses must implement the `validate` method.")

initialize

initialize(
    action_ability: AgentActionAbility,
    streamer: AnimationStreamer,
    context: WorldContext,
    track_result: bool,
) -> None

设置 AnimationStreamer,用于提交动画命令。

Source code in src\tongsim\entity\action\base.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def initialize(
    self,
    action_ability: AgentActionAbility,
    streamer: AnimationStreamer,
    context: WorldContext,
    track_result: bool,
) -> None:
    """
    设置 AnimationStreamer,用于提交动画命令。
    """
    self._streamer = streamer
    self._context = context
    self._track_result = track_result
    self._action_ability = action_ability

submit async

submit(cmd_spec: CommandSpec) -> None

提交单个 Animation 命令,并记录其 command_id。

Source code in src\tongsim\entity\action\base.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
async def submit(self, cmd_spec: CommandSpec) -> None:
    """
    提交单个 Animation 命令,并记录其 command_id。
    """
    if not self._streamer:
        raise RuntimeError("Streamer not set for action.")

    try:
        command_id = await self._streamer.submit(cmd_spec, self._track_result)
        self._anim_ids.append(command_id)
        _logger.debug(
            f"Action {self.__class__.__name__}: Submitted animation command {command_id}"
        )

    except Exception as e:
        tag = getattr(cmd_spec, "tag", "<unknown>")
        _logger.error(f"Error in submitting command {tag}, error_msg: {e}")

wait_any_started async

wait_any_started() -> AnimResultInfo

等待本 Action 中任意一个 Animation 的 BEGIN 阶段到达。

Returns:

Name Type Description
AnimResultInfo AnimResultInfo

最先开始的动画信息。

Source code in src\tongsim\entity\action\base.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def wait_any_started(self) -> AnimResultInfo:
    """
    等待本 Action 中任意一个 Animation 的 BEGIN 阶段到达。

    Returns:
        AnimResultInfo: 最先开始的动画信息。
    """
    if not self._streamer:
        raise RuntimeError("Streamer not set for action.")

    return await self._streamer.wait_any_begin(self._anim_ids)

collect_results async

collect_results(
    cancel_on_error: bool = False,
) -> list[AnimResultInfo]

收集当前 Action 中所有 Animation 的结果。

Parameters:

Name Type Description Default
cancel_on_error bool

若为 True,当任一 Animation 发生错误时立即取消剩余动画。

False

Returns:

Type Description
list[AnimResultInfo]

List[AnimResultInfo]: 每个 Animation 的执行结果。

Source code in src\tongsim\entity\action\base.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def collect_results(
    self, cancel_on_error: bool = False
) -> list[AnimResultInfo]:
    """
    收集当前 Action 中所有 Animation 的结果。

    Args:
        cancel_on_error (bool): 若为 True,当任一 Animation 发生错误时立即取消剩余动画。

    Returns:
        List[AnimResultInfo]: 每个 Animation 的执行结果。
    """
    if not self._streamer:
        raise RuntimeError("Streamer not set for action.")

    # TODO:  cancel_on_error

    results: list[AnimResultInfo] = await self._streamer.wait_all_end(
        self._anim_ids
    )
    self._anim_ids.clear()
    return results

run async

run() -> None

统一执行入口。

  • 首先进行参数验证。
  • 然后执行 Animation 提交。
Source code in src\tongsim\entity\action\base.py
136
137
138
139
140
141
142
143
144
145
146
147
148
async def run(self) -> None:
    """
    统一执行入口。

    - 首先进行参数验证。
    - 然后执行 Animation 提交。
    """
    try:
        self.validate()
        await self.execute()
    except Exception as e:
        _logger.error(f"Action {self.__class__.__name__} execution failed: {e}")
        raise