Skip to content

Wrappers

envs.freeciv_wrapper.tensor_wrapper.TensorWrapper

Bases: Wrapper

TensorWrapper is used to make Civrealm environment tensorized by converting observations from FreecivBaseEnv into tensors and tensor actions back to actions compatible with FreecivBaseEnv.

TensorWrapper is composed TensorBase, TensorAction, TensorObservation and CacheLastObs.

Parameters:

Name Type Description Default
env FreecivBaseEnv
required
config dict

tensor env configuration

default_tensor_config

Attributes:

Name Type Description
config dict

tensor wrapper configuration

Source code in src/civrealm/envs/freeciv_wrapper/tensor_wrapper.py
class TensorWrapper(Wrapper):
    """
    TensorWrapper is used to make Civrealm environment tensorized by converting
    observations from FreecivBaseEnv into tensors and tensor actions back to actions compatible with
    FreecivBaseEnv.

    TensorWrapper is composed `TensorBase`, `TensorAction`, `TensorObservation`
    and `CacheLastObs`.

    Parameters
    ----------
    env
    config:
        tensor env configuration

    Attributes
    ----------
    config: dict
        tensor wrapper configuration

    """

    def __init__(self, env: FreecivBaseEnv, config: dict = default_tensor_config):
        self.config = config
        super().__init__(
            CacheLastObs(
                TensorObservation(TensorAction(TensorBase(env, config=config)))
            )
        )

envs.freeciv_wrapper.tensor_wrapper.TensorBase

Bases: Wrapper

A basic wrapper that deals with config loading and entity id recording, required by all tensor-related wrappers.

Parameters:

Name Type Description Default
env FreecivBaseEnv
required
config dict

tensor env configuration

default_tensor_config

Attributes:

Name Type Description
config dict

A dict that specifies all configurations related to tensor wrapper.

my_player_id int

My player id.

unit_ids list

A sorted list of my unit ids.

city_ids list

A sorted list of my city ids.

others_unit_ids list

A sorted list of others unit ids.

others_city_ids list

A sorted list of others city ids.

dipl_ids list

A list of others player ids.

units dict

ruleset information about units.

unit_types list

A list of all unit types.

unit_costs list

A list of int indicating unit costs.

improvements dict

Ruleset information about city improvements.

impr_costs list

A list of int indicating city improvements costs.

Source code in src/civrealm/envs/freeciv_wrapper/tensor_base_wrapper.py
class TensorBase(Wrapper):
    """
    A basic wrapper that deals with config loading and entity id recording, 
    required by all tensor-related wrappers.


    Parameters
    ----------
    env: FreecivBaseEnv
    config: dict
        tensor env configuration

    Attributes
    ---------
    config: dict
        A dict that specifies all configurations related to tensor wrapper.
    my_player_id: int
        My player id.
    unit_ids: list
        A sorted list of my unit ids.
    city_ids: list
        A sorted list of my city ids.
    others_unit_ids: list
        A sorted list of others unit ids.
    others_city_ids: list
        A sorted list of others city ids.
    dipl_ids : list
        A list of others player ids.
    units : dict
        ruleset information about units.
    unit_types :list
        A list of all unit types.
    unit_costs : list
        A list of int indicating unit costs.
    improvements : dict
        Ruleset information about city improvements.
    impr_costs :list
        A list of int indicating city improvements costs.

    """

    def __init__(self, env: FreecivBaseEnv, config: dict = default_tensor_config):
        self.config = config
        self.my_player_id = -1

        # mutable ids
        self.unit_ids = []
        self.city_ids = []
        self.others_unit_ids = []
        self.others_city_ids = []
        self.dipl_ids = []

        # ruleset
        self.units = {}
        self.unit_types = []
        self.unit_costs = []
        self.improvements = {}
        self.impr_costs = []

        super().__init__(env)

    def update_sequence_ids(self, observation):
        """
        Use city, unit and dipl information in observation to update ids.
        """
        self.unit_ids = sorted(
            list(
                k
                for k in observation.get("unit", {}).keys()
                if observation["unit"][k]["owner"] == self.my_player_id
            )
        )
        self.others_unit_ids = sorted(
            list(
                k
                for k in observation.get("unit", {}).keys()
                if observation["unit"][k]["owner"] != self.my_player_id
            )
        )
        self.city_ids = sorted(
            list(
                k
                for k in observation.get("city", {}).keys()
                if observation["city"][k]["owner"] == self.my_player_id
            )
        )
        self.others_city_ids = sorted(
            list(
                k
                for k in observation.get("city", {}).keys()
                if observation["city"][k]["owner"] != self.my_player_id
            )
        )
        self.dipl_ids = [
            player
            for player in sorted(observation.get("dipl", {}).keys())
            if player != self.my_player_id
        ]

    def update_config(self):
        """
        Update config using ruleset information at the start of the turn.
        """
        self.units = self.unwrapped.civ_controller.rule_ctrl.unit_types
        self.unit_types = [self.units[i]["name"]
                           for i in range(len(self.units))]
        self.unit_costs = [self.units[i]["build_cost"]
                           for i in range(len(self.units))]
        self.improvements = self.unwrapped.civ_controller.rule_ctrl.improvements
        self.impr_costs = [
            self.improvements[i]["build_cost"] for i in range(len(self.improvements))
        ]
        self.config["obs_ops"]["unit"]["type_rule_name"] = onehotifier_maker(
            self.unit_types
        )
        self.config["obs_ops"]["rules"]["build_cost"] = lambda _: np.array(
            self.unit_costs + self.impr_costs
        )

    def reset(self, *args, **kwargs):
        obs, info = self.env.reset(*args, **kwargs)
        self.my_player_id = self.unwrapped.civ_controller.player_ctrl.my_player_id

        self.update_config()
        self.update_sequence_ids(obs)
        return obs, info

    def step(self, *args, **kwargs):
        obs, reward, terminated, truncated, info = self.env.step(
            *args, **kwargs)
        self.update_sequence_ids(obs)
        return obs, reward, terminated, truncated, info

update_config()

Update config using ruleset information at the start of the turn.

Source code in src/civrealm/envs/freeciv_wrapper/tensor_base_wrapper.py
def update_config(self):
    """
    Update config using ruleset information at the start of the turn.
    """
    self.units = self.unwrapped.civ_controller.rule_ctrl.unit_types
    self.unit_types = [self.units[i]["name"]
                       for i in range(len(self.units))]
    self.unit_costs = [self.units[i]["build_cost"]
                       for i in range(len(self.units))]
    self.improvements = self.unwrapped.civ_controller.rule_ctrl.improvements
    self.impr_costs = [
        self.improvements[i]["build_cost"] for i in range(len(self.improvements))
    ]
    self.config["obs_ops"]["unit"]["type_rule_name"] = onehotifier_maker(
        self.unit_types
    )
    self.config["obs_ops"]["rules"]["build_cost"] = lambda _: np.array(
        self.unit_costs + self.impr_costs
    )

update_sequence_ids(observation)

Use city, unit and dipl information in observation to update ids.

Source code in src/civrealm/envs/freeciv_wrapper/tensor_base_wrapper.py
def update_sequence_ids(self, observation):
    """
    Use city, unit and dipl information in observation to update ids.
    """
    self.unit_ids = sorted(
        list(
            k
            for k in observation.get("unit", {}).keys()
            if observation["unit"][k]["owner"] == self.my_player_id
        )
    )
    self.others_unit_ids = sorted(
        list(
            k
            for k in observation.get("unit", {}).keys()
            if observation["unit"][k]["owner"] != self.my_player_id
        )
    )
    self.city_ids = sorted(
        list(
            k
            for k in observation.get("city", {}).keys()
            if observation["city"][k]["owner"] == self.my_player_id
        )
    )
    self.others_city_ids = sorted(
        list(
            k
            for k in observation.get("city", {}).keys()
            if observation["city"][k]["owner"] != self.my_player_id
        )
    )
    self.dipl_ids = [
        player
        for player in sorted(observation.get("dipl", {}).keys())
        if player != self.my_player_id
    ]

envs.freeciv_wrapper.action_wrapper.TensorAction

Bases: Wrapper

A wrapper that defines tensor action spaces, transforms tensor actions into actions that could be handeled by FreecivBaseEnv instance, and adds masks to observations.

TensorAction wrapper is composed of five wrappers, including TruncateDiplCity, DiplomacyLoop, CombineTechResearchGoal, PersistentCityProduction, and EmbarkWrapper.

Parameters:

Name Type Description Default
env TensorBase

A FreecivBaseEnv instance that has been wrapped by TensorBase.

required

Attributes:

Name Type Description
aciton_config dict

a dict that configs that specify sizes of mutable entities and action layout.

mask dict

a dict of masks of type numpy ndarray indicating available actions and entities. 0-> unavilalbe, 1->availble.

available_actions dict

cached info['available_actions'], a dict that indicates available actions.

action_space Dict

a gymnasium.spaces.Dict with keys ['actor_type','city_id','unit_id', 'dipl_id','city_action_type','unit_action_type','dipl_action_type', 'gov_action_type','tech_action_type']

Source code in src/civrealm/envs/freeciv_wrapper/action_wrapper.py
class TensorAction(Wrapper):
    """
    A wrapper that defines tensor action spaces,  transforms tensor actions into
    actions that could be handeled by FreecivBaseEnv instance, and adds masks to
    observations.

    TensorAction wrapper is composed of five wrappers, including `TruncateDiplCity`,
    `DiplomacyLoop`, `CombineTechResearchGoal`, `PersistentCityProduction`, and `EmbarkWrapper`.



    Parameters
    ----------
    env: TensorBase
        A FreecivBaseEnv instance that has been wrapped by TensorBase.

    Attributes
    ----------
    aciton_config: dict
        a dict that configs that specify sizes of mutable entities and action layout.
    mask: dict
        a dict of masks of type numpy ndarray indicating available actions and entities. 0-> unavilalbe, 1->availble.
    available_actions: dict
        cached info['available_actions'], a dict that indicates available actions.
    action_space: gymnasium.spaces.Dict
        a gymnasium.spaces.Dict with keys `['actor_type','city_id','unit_id',
        'dipl_id','city_action_type','unit_action_type','dipl_action_type',
        'gov_action_type','tech_action_type']`
    """

    def __init__(self, env: TensorBase):
        self.action_config = env.get_wrapper_attr("config")
        self.action_config["resize"]["dipl"] = self.action_config["resize"][
            "others_player"
        ]
        self.actor_type_list = self.action_config["actor_type_list"]
        self.available_actions = {}
        self.mask = {}
        self.__turn = -1
        self.__dealing_with_incoming = False

        super().__init__(
            TruncateDiplCity(
                DiplomacyLoop(
                    CombineTechResearchGoal(
                        PersistentCityProduction(EmbarkWrapper(env))
                    )
                )
            )
        )

        self.action_space = spaces.Dict(
            {
                "actor_type": spaces.Discrete(len(self.actor_type_list)),
                "city_id": spaces.Discrete(self.action_config["resize"]["city"]),
                "city_action_type": spaces.Discrete(
                    sum(self.action_config["action_layout"]["city"].values())
                ),
                "unit_id": spaces.Discrete(self.action_config["resize"]["unit"]),
                "unit_action_type": spaces.Discrete(
                    sum(self.action_config["action_layout"]["unit"].values())
                ),
                "dipl_id": spaces.Discrete(self.action_config["resize"]["dipl"]),
                "dipl_action_type": spaces.Discrete(
                    sum(self.action_config["action_layout"]["dipl"].values())
                ),
                "gov_action_type": spaces.Discrete(
                    sum(self.action_config["action_layout"]["gov"].values())
                ),
                "tech_action_type": spaces.Discrete(
                    sum(self.action_config["action_layout"]["tech"].values())
                ),
            }
        )

    def step(self, action):
        # Get {k:value.item()} if value is array
        action = {
            k: (v.item() if isinstance(v, np.ndarray) else v) for k, v in action.items()
        }

        base_action = self.action(action)
        if tensor_debug:
            print(base_action)
        obs, reward, terminated, truncated, info = self.env.step(base_action)
        if tensor_debug:
            print(f"reward:{reward},done:{terminated or truncated}")

        obs = self.update_obs_with_mask(obs, info, action)
        return obs, reward, terminated, truncated, info

    def reset(
        self,
        *,
        seed: Optional[int] = None,
        options: Optional[Dict[str, Any]] = None,
        **kwargs,
    ):
        obs, info = self.env.reset(seed=seed, options=options, **kwargs)
        obs = self.update_obs_with_mask(obs, info)
        return obs, info

    def action(self, action):
        """
        Translate tensor action, a dict of keys `['actor_type','city_id','unit_id',
        'dipl_id','city_action_type','unit_action_type','dipl_action_type',
        'gov_action_type','tech_action_type']` to `FreecivBaseEnv` action,
        a tuple `(actor_type, entity_id, action_name)`.

        """
        if tensor_debug:
            self._check_action_layout()

        actor_type = action["actor_type"]
        actor_name = self.actor_type_list[actor_type]

        if actor_name == "turn done":
            return None
        if actor_name in ["gov", "tech"]:
            entity_pos = None
            entity_id = self.get_wrapper_attr("my_player_id")
            action_index = action[actor_name + "_action_type"]
        else:
            entity_pos, action_index = (
                action[actor_name + "_id"],
                action[actor_name + "_action_type"],
            )
            entity_id = self.get_wrapper_attr(actor_name + "_ids")[
                action[actor_name + "_id"]
            ]

        if tensor_debug:
            assert (
                self.mask[actor_name +
                          "_action_type_mask"][entity_pos, action_index]
                == 1
            ), f"{actor_name} action of id pos {entity_pos}, \
                    action type index {action_index} is masked"

        action_name = sorted(
            list(self.available_actions[actor_name][entity_id].keys())
        )[action_index]

        return (actor_name, entity_id, action_name)

    def update_obs_with_mask(self, observation, info, action=None):
        """
        Update self.mask using observation, info and action from the unwrapped env,
        and add self.mask to the observation of the wrapped env.
        """
        if info[
            "turn"
        ] != self.__turn or self.__dealing_with_incoming != self.get_wrapper_attr(
            "dealing_with_incoming"
        ):
            self.reset_mask()
        self.available_actions = deepcopy(info["available_actions"])
        self.__turn = info["turn"]
        self.__dealing_with_incoming = self.get_wrapper_attr(
            "dealing_with_incoming")
        self._update_mask(observation, info, action)

        return update(observation, deepcopy(self.mask))

    def reset_mask(self):
        """
        Reset self.mask

        This is usually called at the start of a new turn to reset masks.
        """
        # Reset mask
        sizes = self.action_config["resize"]
        self.mask["actor_type_mask"] = np.ones(
            len(self.actor_type_list), dtype=np.int32
        )

        # Units/Cities/Players and others Masks
        for field in ["unit", "city", "others_unit", "others_city", "others_player"]:
            self.mask[field + "_mask"] = np.ones(sizes[field], dtype=np.int32)[
                ..., np.newaxis
            ]

        # Units/Cities Id Masks same as their Masks
        self.mask["unit_id_mask"] = self.mask["unit_mask"]
        self.mask["city_id_mask"] = self.mask["city_mask"]

        # Dipl id mask
        self.mask["dipl_id_mask"] = np.ones(sizes["dipl"], dtype=np.int32)[
            ..., np.newaxis
        ]

        # Action type mask
        for field in ["city", "unit", "dipl"]:
            self.mask[field + "_action_type_mask"] = np.ones(
                (
                    sizes[field],
                    sum(self.action_config["action_layout"][field].values()),
                ),
                dtype=np.int32,
            )
        for field in ["gov", "tech"]:
            self.mask[field + "_action_type_mask"] = np.ones(
                (sum(self.action_config["action_layout"][field].values()),),
                dtype=np.int32,
            )

    def _update_mask(self, observation, info, action):
        # update self.mask using action, observation and info
        if action:
            self._mask_from_action(action)
        self._mask_from_obs(observation)
        self._mask_from_info(info)

    def _mask_from_action(self, action):
        # Mask out actions that have been performed in this turn.
        actor_type = action["actor_type"]
        actor_name = self.actor_type_list[actor_type]
        if actor_name == "unit":
            # self.mask["unit_action_type_mask"][
            #     action["unit_id"], action["unit_action_type"]
            # ] = 0
            pass
        elif actor_name == "city":
            # self.mask["city_action_type_mask"][action["city_id"], :] = 0
            pass
        elif actor_name == "gov":
            self.mask["gov_action_type_mask"][:] &= 0
        elif actor_name == "tech":
            self.mask["tech_action_type_mask"][:] &= 0

    def _mask_from_obs(self, observation):
        # Mask mutable entities using observation

        # Mask out trailing spaces for unit and city
        self.mask["unit_id_mask"][len(
            self.get_wrapper_attr("unit_ids"))::, :] = 0
        self.mask["city_id_mask"][len(
            self.get_wrapper_attr("city_ids"))::, :] = 0
        self.mask["dipl_id_mask"][len(
            self.get_wrapper_attr("dipl_ids"))::, :] = 0
        self.mask["unit_mask"] = self.mask["unit_id_mask"].copy()
        self.mask["city_mask"] = self.mask["city_id_mask"].copy()

        self.mask["unit_action_type_mask"][
            len(self.get_wrapper_attr("unit_ids"))::, :
        ] = 0
        self.mask["city_action_type_mask"][
            len(self.get_wrapper_attr("city_ids"))::, :
        ] = 0

        # Mask Unit
        for pos, unit_id in enumerate(
            self.get_wrapper_attr("unit_ids")[
                : self.action_config["resize"]["unit"]]
        ):
            unit = observation["unit"][unit_id]
            if unit["moves_left"] == 0 or self.unwrapped.civ_controller.unit_ctrl.units[
                unit_id
            ]["activity"] not in [
                ACTIVITY_IDLE,
                ACTIVITY_FORTIFIED,
                ACTIVITY_SENTRY,
                ACTIVITY_FORTIFYING,
            ]:  # agent busy or fortified
                self.mask["unit_id_mask"][pos] &= 0
                self.mask["unit_action_type_mask"][pos, :] &= 0

        self.mask["others_unit_mask"][
            len(self.get_wrapper_attr("others_unit_ids"))::, :
        ] &= 0
        self.mask["others_city_mask"][
            len(self.get_wrapper_attr("others_city_ids"))::, :
        ] &= 0

        if self.get_wrapper_attr("researching"):
            self.mask["tech_action_type_mask"][:] &= 0
        if not self.get_wrapper_attr("researching") and tensor_debug:
            print(
                f"techs_researched: {self.get_wrapper_attr('techs_researched')}")

    def _mask_from_info(self, info):
        others_player_num = len(
            info["available_actions"].get("player", {}).keys())
        self.mask["others_player_mask"][others_player_num::, :] &= 0

        # Mask City and Unit
        for mutable in ["city", "unit", "dipl"]:
            entities = info["available_actions"].get(mutable, {})
            if len(entities) == 0:
                self.mask[mutable + "_action_type_mask"][:, :] &= 0
                self.mask[mutable + "_id_mask"][:] &= 0
                continue
            for i, entity_id in enumerate(
                self.env.get_wrapper_attr(mutable + "_ids")[
                    : self.action_config["resize"][mutable]
                ]
            ):
                actions = entities.get(entity_id, {})
                if len(actions) == 0:
                    self.mask[mutable + "_action_type_mask"][i, :] &= 0
                    self.mask[mutable + "_id_mask"][i] &= 0
                    continue
                for action_id, act_name in enumerate(sorted(list(actions.keys()))):
                    self.mask[mutable + "_action_type_mask"][i, action_id] &= int(
                        actions[act_name]
                    )
                self.mask[mutable + "_id_mask"][i] &= int(
                    any(self.mask[mutable + "_action_type_mask"][i])
                )
        for mutable in ["city", "unit", "dipl"]:
            actor_type_index = self.actor_type_list.index(mutable)
            self.mask["actor_type_mask"][actor_type_index] &= int(
                any(self.mask[mutable + "_id_mask"])
            )

        # Mask Gov and Tech
        for immutable in ["gov", "tech"]:
            options = info["available_actions"].get(immutable, {})
            if len(options) == 0:
                self.mask[immutable + "_action_type_mask"][:] &= 0
                continue
            my_player_id = self.get_wrapper_attr("my_player_id")
            for action_id, act_name in enumerate(
                sorted(list(options[my_player_id].keys()))
            ):
                self.mask[immutable + "_action_type_mask"][action_id] &= int(
                    options[my_player_id][act_name]
                )
        for immutable in ["gov", "tech"]:
            actor_type_index = self.actor_type_list.index(immutable)
            self.mask["actor_type_mask"][actor_type_index] &= int(
                any(self.mask[immutable + "_action_type_mask"])
            )

    def _check_action_layout(self):
        action_layout = self.action_config["action_layout"]
        for field in ["city", "unit"]:
            for id, entity in self.available_actions.get(field, {}).items():
                assert len(entity) == sum(action_layout[field].values())
        assert len(
            self.available_actions["gov"][self.get_wrapper_attr(
                "my_player_id")]
        ) == sum(action_layout["gov"].values())

action(action)

Translate tensor action, a dict of keys ['actor_type','city_id','unit_id', 'dipl_id','city_action_type','unit_action_type','dipl_action_type', 'gov_action_type','tech_action_type'] to FreecivBaseEnv action, a tuple (actor_type, entity_id, action_name).

Source code in src/civrealm/envs/freeciv_wrapper/action_wrapper.py
def action(self, action):
    """
    Translate tensor action, a dict of keys `['actor_type','city_id','unit_id',
    'dipl_id','city_action_type','unit_action_type','dipl_action_type',
    'gov_action_type','tech_action_type']` to `FreecivBaseEnv` action,
    a tuple `(actor_type, entity_id, action_name)`.

    """
    if tensor_debug:
        self._check_action_layout()

    actor_type = action["actor_type"]
    actor_name = self.actor_type_list[actor_type]

    if actor_name == "turn done":
        return None
    if actor_name in ["gov", "tech"]:
        entity_pos = None
        entity_id = self.get_wrapper_attr("my_player_id")
        action_index = action[actor_name + "_action_type"]
    else:
        entity_pos, action_index = (
            action[actor_name + "_id"],
            action[actor_name + "_action_type"],
        )
        entity_id = self.get_wrapper_attr(actor_name + "_ids")[
            action[actor_name + "_id"]
        ]

    if tensor_debug:
        assert (
            self.mask[actor_name +
                      "_action_type_mask"][entity_pos, action_index]
            == 1
        ), f"{actor_name} action of id pos {entity_pos}, \
                action type index {action_index} is masked"

    action_name = sorted(
        list(self.available_actions[actor_name][entity_id].keys())
    )[action_index]

    return (actor_name, entity_id, action_name)

reset_mask()

Reset self.mask

This is usually called at the start of a new turn to reset masks.

Source code in src/civrealm/envs/freeciv_wrapper/action_wrapper.py
def reset_mask(self):
    """
    Reset self.mask

    This is usually called at the start of a new turn to reset masks.
    """
    # Reset mask
    sizes = self.action_config["resize"]
    self.mask["actor_type_mask"] = np.ones(
        len(self.actor_type_list), dtype=np.int32
    )

    # Units/Cities/Players and others Masks
    for field in ["unit", "city", "others_unit", "others_city", "others_player"]:
        self.mask[field + "_mask"] = np.ones(sizes[field], dtype=np.int32)[
            ..., np.newaxis
        ]

    # Units/Cities Id Masks same as their Masks
    self.mask["unit_id_mask"] = self.mask["unit_mask"]
    self.mask["city_id_mask"] = self.mask["city_mask"]

    # Dipl id mask
    self.mask["dipl_id_mask"] = np.ones(sizes["dipl"], dtype=np.int32)[
        ..., np.newaxis
    ]

    # Action type mask
    for field in ["city", "unit", "dipl"]:
        self.mask[field + "_action_type_mask"] = np.ones(
            (
                sizes[field],
                sum(self.action_config["action_layout"][field].values()),
            ),
            dtype=np.int32,
        )
    for field in ["gov", "tech"]:
        self.mask[field + "_action_type_mask"] = np.ones(
            (sum(self.action_config["action_layout"][field].values()),),
            dtype=np.int32,
        )

update_obs_with_mask(observation, info, action=None)

Update self.mask using observation, info and action from the unwrapped env, and add self.mask to the observation of the wrapped env.

Source code in src/civrealm/envs/freeciv_wrapper/action_wrapper.py
def update_obs_with_mask(self, observation, info, action=None):
    """
    Update self.mask using observation, info and action from the unwrapped env,
    and add self.mask to the observation of the wrapped env.
    """
    if info[
        "turn"
    ] != self.__turn or self.__dealing_with_incoming != self.get_wrapper_attr(
        "dealing_with_incoming"
    ):
        self.reset_mask()
    self.available_actions = deepcopy(info["available_actions"])
    self.__turn = info["turn"]
    self.__dealing_with_incoming = self.get_wrapper_attr(
        "dealing_with_incoming")
    self._update_mask(observation, info, action)

    return update(observation, deepcopy(self.mask))

envs.freeciv_wrapper.action_wrapper.EmbarkWrapper

Bases: Wrapper

Unify embark actions of all units to 'embark_{dir8}' where dir8 in [0,...7] indicating 8 directions.

Sometimes a unit can embark multiple carrier on the same direction. In that case, the wrapper automatically choose the carrier with the smallest unit id.

Attributes:

Name Type Description
embarkable_units dict

a dict of embarkable units with key=(embarking_unit_id, dir8) and value=[carrier_ids]

Source code in src/civrealm/envs/freeciv_wrapper/embark_wrapper.py
@wrapper_override(["action", "info"])
class EmbarkWrapper(Wrapper):
    """
    Unify embark actions of all units to 'embark_{dir8}' where dir8 in `[0,...7]`
    indicating 8 directions.

    Sometimes a unit can embark multiple carrier on the same direction. In that
    case, the wrapper automatically choose the carrier with the smallest unit id.

    Attributes
    ----------
    embarkable_units : dict
        a dict of embarkable units with key=(embarking_unit_id, dir8) and value=[carrier_ids]
    """

    def __init__(self, env):
        self.embarkable_units = {}
        super().__init__(env)

    def action(self, action):
        """
        Translate `embark_{dir8}` action into embark actions that can be handled by FreecivBaseEnv.
        """
        if action is None:
            return action
        (actor_name, entity_id, action_name) = action
        if actor_name != "unit":
            return action
        if action_name[:6] != "embark":
            return action

        dir8 = int(action_name.split("_")[-1])

        if len(self.embarkable_units.get((entity_id, dir8), [])) > 0:
            assert dir8 <= 8
            target_id = sorted(self.embarkable_units[(entity_id, dir8)])[0]
            action_name = f"embark_{dir8}_{target_id}"

        return (actor_name, entity_id, action_name)

    def info(self, info):
        """
        Complete or modify embark actions in info['availble_actions']['unit']

        If a unit has no `embark_.*` action, then set all `embark_{dir8}` action to False

        If a unit has `embark_{dir}=True`, set all `embark_{other_dirs}` action to False

        If a unit has `embark_{carrier_id}_{dir}=True`, store that carrier_id
        and set its `embark_{dir8}` accordingly.
        """

        self.embarkable_units = {}
        unit_actions = info["available_actions"].get("unit", {})

        if len(unit_actions) == 0:
            return info

        for unit_id, actions in unit_actions.items():
            unavailable_embarks = ["embark_" + f"{i}" for i in range(8)]
            for action in list(actions.keys()):
                if action[:6] != "embark":
                    continue

                args = action.split("_")

                if len(args) == 3:
                    # action ==  embark_dir_id
                    [dir8, target_id] = map(int, args[1::])
                    if (unit_dir := (unit_id, dir8)) not in self.embarkable_units:
                        self.embarkable_units[unit_dir] = [target_id]
                    else:
                        self.embarkable_units[unit_dir].append(target_id)
                    actions.pop(action)
                    embark_action = f"embark_{dir8}"
                else:
                    # action ==  embark_dir
                    assert (
                        len(args) == 2
                    ), f"Expected embark_{{dir}}_{{target_id}},\
                            but got unsupported embark action name {action}"
                    dir8 = int(action.split("_")[-1])
                    embark_action = f"embark_{dir8}"
                actions[f"embark_{dir8}"] = True
                if embark_action in unavailable_embarks:
                    unavailable_embarks.remove(embark_action)

            for embark_action in unavailable_embarks:
                # set unavailable embark actions to False
                actions[embark_action] = False

        info["available_actions"]["unit"] = unit_actions

        return info

action(action)

Translate embark_{dir8} action into embark actions that can be handled by FreecivBaseEnv.

Source code in src/civrealm/envs/freeciv_wrapper/embark_wrapper.py
def action(self, action):
    """
    Translate `embark_{dir8}` action into embark actions that can be handled by FreecivBaseEnv.
    """
    if action is None:
        return action
    (actor_name, entity_id, action_name) = action
    if actor_name != "unit":
        return action
    if action_name[:6] != "embark":
        return action

    dir8 = int(action_name.split("_")[-1])

    if len(self.embarkable_units.get((entity_id, dir8), [])) > 0:
        assert dir8 <= 8
        target_id = sorted(self.embarkable_units[(entity_id, dir8)])[0]
        action_name = f"embark_{dir8}_{target_id}"

    return (actor_name, entity_id, action_name)

info(info)

Complete or modify embark actions in info['availble_actions']['unit']

If a unit has no embark_.* action, then set all embark_{dir8} action to False

If a unit has embark_{dir}=True, set all embark_{other_dirs} action to False

If a unit has embark_{carrier_id}_{dir}=True, store that carrier_id and set its embark_{dir8} accordingly.

Source code in src/civrealm/envs/freeciv_wrapper/embark_wrapper.py
def info(self, info):
    """
    Complete or modify embark actions in info['availble_actions']['unit']

    If a unit has no `embark_.*` action, then set all `embark_{dir8}` action to False

    If a unit has `embark_{dir}=True`, set all `embark_{other_dirs}` action to False

    If a unit has `embark_{carrier_id}_{dir}=True`, store that carrier_id
    and set its `embark_{dir8}` accordingly.
    """

    self.embarkable_units = {}
    unit_actions = info["available_actions"].get("unit", {})

    if len(unit_actions) == 0:
        return info

    for unit_id, actions in unit_actions.items():
        unavailable_embarks = ["embark_" + f"{i}" for i in range(8)]
        for action in list(actions.keys()):
            if action[:6] != "embark":
                continue

            args = action.split("_")

            if len(args) == 3:
                # action ==  embark_dir_id
                [dir8, target_id] = map(int, args[1::])
                if (unit_dir := (unit_id, dir8)) not in self.embarkable_units:
                    self.embarkable_units[unit_dir] = [target_id]
                else:
                    self.embarkable_units[unit_dir].append(target_id)
                actions.pop(action)
                embark_action = f"embark_{dir8}"
            else:
                # action ==  embark_dir
                assert (
                    len(args) == 2
                ), f"Expected embark_{{dir}}_{{target_id}},\
                        but got unsupported embark action name {action}"
                dir8 = int(action.split("_")[-1])
                embark_action = f"embark_{dir8}"
            actions[f"embark_{dir8}"] = True
            if embark_action in unavailable_embarks:
                unavailable_embarks.remove(embark_action)

        for embark_action in unavailable_embarks:
            # set unavailable embark actions to False
            actions[embark_action] = False

    info["available_actions"]["unit"] = unit_actions

    return info

envs.freeciv_wrapper.observation_wrapper.TensorObservation

Bases: Wrapper

A wrapper that defines tensor observation space, transforms observations got from FreecivBaseEnv into tensor observations.

Parameters:

Name Type Description Default
env TensorBase

A FreecivBaseEnv wrapped by TensorBase wrapper

required

Attributes:

Name Type Description
observation_config dict

tensor observation configuration

observation_space Dict

a gymnasium.spaces.Dict with keys speficified in configuration; observation with keywords mask would not be removed.

obs_initialized bool

whether observation spaces has been initialized

obs_layout dict

a dict that specify shapes of flattened numpy arrays in observation

Source code in src/civrealm/envs/freeciv_wrapper/observation_wrapper.py
@wrapper_override(["observation"])
class TensorObservation(Wrapper):
    """
    A wrapper that defines tensor observation space, transforms observations got from
    FreecivBaseEnv into tensor observations.

    Parameters
    ----------
    env:
        A FreecivBaseEnv wrapped by TensorBase wrapper

    Attributes
    ---------
    observation_config: dict
        tensor observation configuration
    observation_space: gymnasium.spaces.Dict
        a gymnasium.spaces.Dict with keys speficified in configuration;
        observation with keywords `mask` would not be removed.
    obs_initialized: bool
        whether observation spaces has been initialized
    obs_layout: dict
        a dict that specify shapes of flattened numpy arrays in observation
    """

    mutable_fields = [
        "city",
        "unit",
        "others_city",
        "others_unit",
        "others_player",
        "dipl",
    ]
    immutable_fields = ["map", "rules", "player", "gov"]

    def __init__(self, env: TensorBase):
        self.obs_initialized = False
        self.observation_config = env.get_wrapper_attr("config")
        self.observation_config["resize"]["dipl"] = self.observation_config["resize"][
            "others_player"
        ]
        self.obs_layout = {}
        self.others_player_ids = []
        super().__init__(env)

    def observation(self, observation):
        """
        convert observations obtained from `FreecivBaseEnv` into a dict of flattend numpy arrays.
        """
        # in case of gameover, return None as observation
        if len(observation.get("player", {})) == 0:
            return None

        observation = deepcopy(observation)
        observation = self._merge_player_techs(observation)
        obs_dict = self._handle_dict(observation)
        obs = self._embed_immutable(deepcopy(obs_dict))
        obs = self._embed_mutable(obs)

        if not self.obs_initialized:
            self.observation_space = self._infer_obs_space(obs)
            self.obs_initialized = True
        if tensor_debug:
            self._check_obs_layout(obs)
        return obs

    def _handle_dict(self, obs):
        obs["city"] = obs.get("city", {})
        obs["unit"] = obs.get("unit", {})

        # TODO: This should be the base env's reponsibility
        # Add info to city and unit from civcontroller
        update(obs["city"], self.unwrapped.civ_controller.city_ctrl.cities)
        update(obs["unit"], self.unwrapped.civ_controller.unit_ctrl.units)
        # update player info with dipl_state
        update(obs["player"], obs.get("dipl", {}))

        my_player_id = self.get_wrapper_attr("my_player_id")

        obs["dipl"] = {
            player: state["diplomacy_clause_map"]
            for player, state in obs.get("dipl", {}).items()
            if player != my_player_id
        }
        for player, treaty in obs["dipl"].items():
            obs["dipl"][player] = self._encode_treaty(treaty, player)

        # remove unused fields and keep mask if given
        obs = {
            k: v
            for k, v in obs.items()
            if k in self.observation_config["filter_observation"] or k.endswith("mask")
        }

        # Add others fields and initialize

        obs["others_unit"] = {}
        obs["others_city"] = {}

        for field in ["unit", "city"]:
            for key, val in list(obs[field].items()):
                if val["owner"] != my_player_id:
                    # delete others' entity from unit and city
                    obs["others_" + field][key] = obs[field].pop(key)

        obs["others_player"] = {
            key: obs["player"].pop(key)
            for key in list(obs["player"].keys())
            if key != my_player_id
        }
        obs["player"] = obs["player"][my_player_id]

        # Initialize build_cost with 0 for now
        obs["rules"]["build_cost"] = 0

        mutable_fields = [field for field in obs.keys() if field in self.mutable_fields]
        immutable_fields = [
            field for field in obs.keys() if field in self.immutable_fields
        ]

        ops = self.observation_config["obs_ops"]

        # Handle immutable
        # delete unused keywords and transform useful keywords
        def apply_ops(field):
            for k, val in list(obs[field].items()):
                if k in list(ops[field].keys()):
                    obs[field][k] = ops[field][k](val)
                else:
                    obs[field].pop(k)

        for field in immutable_fields:
            apply_ops(field)

        # Handle mutable
        # delete unused keywords and transform useful keywords
        def apply_ops_mutable(field):
            for entity_id, entity in list(obs[field].items()):
                for k, val in list(entity.items()):
                    if k in list(ops[field].keys()):
                        entity[k] = ops[field][k](val)
                    else:
                        entity.pop(k)

        for field in mutable_fields:
            apply_ops_mutable(field)

        self.others_player_ids = sorted(obs["others_player"].keys())

        return obs

    def _embed_immutable(self, obs):
        immutable = {
            field: obs[field] for field in obs if field in self.immutable_fields
        }

        if not self.obs_initialized:
            for field, field_dict in immutable.items():
                self.obs_layout[field] = OrderedDict(
                    [(k, field_dict[k].shape) for k in sorted(list(field_dict.keys()))]
                )

        for field, field_dict in immutable.items():
            # check field layout is correct
            if tensor_debug:
                assert self.obs_layout[field] == {
                    k: v.shape for k, v in field_dict.items()
                }

            obs[field] = np.concatenate(
                [field_dict[k] for k in sorted(list(field_dict.keys()))], axis=-1
            ).astype(np.int32)
        return obs

    def _embed_mutable(self, obs):
        mutable = {field: obs[field] for field in obs if field in self.mutable_fields}
        mutable_layout = self.observation_config["obs_mutable_layout"]

        if not self.obs_initialized:
            for field, entity_dict in mutable.items():
                layout = mutable_layout[field]
                self.obs_layout[field] = OrderedDict(
                    [(key, layout[key]) for key in sorted(layout)]
                )

        for field, entity_dict in mutable.items():
            # for empty field, fill with zero
            if len(entity_dict) == 0:
                mutable[field] = np.zeros(
                    [
                        self.observation_config["resize"][field],
                        *reduce(add_shape, self.obs_layout[field].values()),
                    ],
                    dtype=np.int32,
                )
                continue
            if tensor_debug:
                # check entity layout is correct
                assert all(
                    self.obs_layout[field] == {k: v.shape for k, v in entity.items()}
                    for entity in entity_dict.values()
                )
            # combine every entity's properties into an array along the last axis
            entity_dict = {
                id: np.concatenate([entity[k] for k in sorted(entity.keys())], axis=-1)
                for id, entity in entity_dict.items()
            }
            # combine all entities in a field into an array along the first axis
            mutable[field] = np.stack(
                [entity_dict[id] for id in self.get_wrapper_attr(field + "_ids")],
                axis=0,
            ).astype(np.int32)

        # resize to maximum entity shape
        for field in mutable:
            size = self.observation_config["resize"][field]
            mutable[field] = resize_data(mutable[field], size).astype(np.int32)

        update(obs, mutable)
        return obs

    def _infer_obs_space(self, observation) -> spaces.Dict:
        return spaces.Dict(
            [
                (key, spaces.Box(low=0, high=1000, shape=space.shape, dtype=np.int32))
                for key, space in observation.items()
            ]
        )

    def _check_obs_layout(self, obs):
        for field, val in self.obs_layout.items():
            shape = reduce(add_shape, val.values())
            assert shape[-1] == obs[field].shape[-1]

    def _merge_player_techs(self, obs):
        for player in obs["player"].values():
            player["techs"] = []
            for tech in sorted(obs["tech"]):
                player_tech = player.pop(f"tech_{tech}")
                player["techs"].append(player_tech if player_tech is not None else 255)
        return obs

    def _encode_treaty(self, treaty, player):
        encoded = {
            "type": np.zeros(10 * 2, dtype=np.int32),
            "give_city": np.zeros(
                self.observation_config["resize"]["city"], dtype=np.int32
            ),
            "ask_city": np.zeros(
                self.observation_config["resize"]["others_city"], dtype=np.int32
            ),
            "give_gold": 255,
            "ask_gold": 255,
        }

        for clause in treaty:
            value = clause["value"]

            if clause["type"] == player_const.CLAUSE_GOLD:
                gold = sum(int(value >= level) for level in GOLD_SET)
                if clause["giver"] == player:
                    encoded["ask_gold"] = gold
                else:
                    encoded["give_gold"] = gold
            elif clause["type"] == player_const.CLAUSE_CITY:
                if clause["giver"] == player:
                    city_list = self.get_wrapper_attr("others_city_ids")
                    field = "ask_city"
                else:
                    city_list = self.get_wrapper_attr("city_ids")
                    field = "give_city"
                if value in city_list:
                    city_idx = city_list.index(value)
                    encoded[field][city_idx] = 1

            if clause["giver"] == player:
                encoded["type"][clause["type"]] = 1
            else:
                encoded["type"][clause["type"] + 10] = 1

        return encoded

observation(observation)

convert observations obtained from FreecivBaseEnv into a dict of flattend numpy arrays.

Source code in src/civrealm/envs/freeciv_wrapper/observation_wrapper.py
def observation(self, observation):
    """
    convert observations obtained from `FreecivBaseEnv` into a dict of flattend numpy arrays.
    """
    # in case of gameover, return None as observation
    if len(observation.get("player", {})) == 0:
        return None

    observation = deepcopy(observation)
    observation = self._merge_player_techs(observation)
    obs_dict = self._handle_dict(observation)
    obs = self._embed_immutable(deepcopy(obs_dict))
    obs = self._embed_mutable(obs)

    if not self.obs_initialized:
        self.observation_space = self._infer_obs_space(obs)
        self.obs_initialized = True
    if tensor_debug:
        self._check_obs_layout(obs)
    return obs

envs.freeciv_wrapper.observation_wrapper.CacheLastObs

Bases: Wrapper

Cache last observation, and override observation with cached observation if terminated or truncated.

Attributes:

Name Type Description
cached_last_obs dict

observation cached from the last call of step() or reset()

Source code in src/civrealm/envs/freeciv_wrapper/observation_wrapper.py
class CacheLastObs(Wrapper):
    """
    Cache last observation, and override observation with cached observation
    if terminated or truncated.

    Attributes
    -------------
    cached_last_obs: dict
        observation cached from the last call of step() or reset()
    """

    def __init__(self, env):
        self.cached_last_obs = None
        super().__init__(env)

    def reset(self, *args, **kwargs):
        obs, info = self.env.reset(*args, **kwargs)
        self.cached_last_obs = deepcopy(obs)
        return obs, info

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)

        if terminated or truncated:
            obs = self.cached_last_obs
            info = {} if info is None else info
            return obs, reward, terminated, truncated, info

        self.cached_last_obs = deepcopy(obs)
        return obs, reward, terminated, truncated, info

envs.freeciv_wrapper.llm_wrapper.LLMWrapper

Bases: Wrapper

A wrapper for llm. It tells the surrounding observations of each unit and city extracted from FreecivBaseEnv, based on which llm can generate actions for units and cities. It transforms action_keys of actions from FreecivBaseEnv to readable action_names such that llm can understand. After llm have chosen an action and return the action_name to env, this wrapper transforms the action_name to an action_key, and then execute the corresponding action.

Parameters:

Name Type Description Default
env FreecivBaseEnv

A FreecivBaseEnv

required

Attributes:

Name Type Description
llm_default_settings dict

settings for llm_wrapper.

action_names dict

a dict matches action_keys from FreecivBaseEnv to readable action_names

tile_length_radius int

(length of a tile - 1) / 2

tile_width_radius int

(width of a tile - 1) / 2

tile_info_template dict

a dict describes detailed surrounding observations of a unit or a city

block_length_radius int

(length of a block - 1) / 2

block_width_radius int

(width of a block - 1) / 2

block_info_template dict

a dict describes zoomed-out surrounding observations of a unit or a city

ctrl_types list

a list describes which components of the game to control by llm; we temporarily only consider unit and city in llm_wrapper

ctrl_action_categories dict

a dict describes which categories of actions llm can take; it can be seen as an action mask

Source code in src/civrealm/envs/freeciv_wrapper/llm_wrapper.py
class LLMWrapper(Wrapper):
    """
    A wrapper for llm. It tells the surrounding observations of each unit and city extracted from FreecivBaseEnv, based
    on which llm can generate actions for units and cities. It transforms action_keys of actions from FreecivBaseEnv to
    readable action_names such that llm can understand. After llm have chosen an action and return the action_name to
    env, this wrapper transforms the action_name to an action_key, and then execute the corresponding action.

    Parameters
    ----------
    env:
        A FreecivBaseEnv

    Attributes
    ---------
    llm_default_settings: dict
        settings for llm_wrapper.
    action_names: dict
        a dict matches action_keys from FreecivBaseEnv to readable action_names
    tile_length_radius: int
        (length of a tile - 1) / 2
    tile_width_radius: int
        (width of a tile - 1) / 2
    tile_info_template: dict
        a dict describes detailed surrounding observations of a unit or a city
    block_length_radius: int
        (length of a block - 1) / 2
    block_width_radius: int
        (width of a block - 1) / 2
    block_info_template: dict
        a dict describes zoomed-out surrounding observations of a unit or a city
    ctrl_types: list
        a list describes which components of the game to control by llm; we temporarily only consider unit and city in
        llm_wrapper
    ctrl_action_categories: dict
        a dict describes which categories of actions llm can take; it can be seen as an action mask
    """

    def __init__(self, env: FreecivBaseEnv):
        super().__init__(env)
        self.llm_default_settings = load_config('llm_wrapper_settings.yaml')

        (self.action_names, self.tile_length_radius, self.tile_width_radius, self.tile_info_template,
         self.block_length_radius, self.block_width_radius, self.block_info_template, self.ctrl_types,
         self.ctrl_action_categories) = self.llm_default_settings.values()

        self.action_keys = {val: key for key, val in self.action_names.items()}
        self.controller = self.unwrapped.civ_controller

    def reset(self, seed=None, options=None, **kwargs):
        if 'minitask_pattern' in kwargs:
            observation, info = self.env.reset(
                minitask_pattern=kwargs['minitask_pattern'])
        else:
            observation, info = self.env.reset()

        info['llm_info'] = self.get_llm_info(observation, info)
        info['my_player_id'] = self.controller.player_ctrl.my_player_id
        return observation, info

    def step(self, action):
        if action is not None:
            action_name = action[2]
            action = (action[0], action[1], get_action_from_readable_name(
                action_name, self.action_keys))

        observation, reward, terminated, truncated, info = self.env.step(
            action)
        info['llm_info'] = self.get_llm_info(observation, info)
        info['my_player_id'] = self.controller.player_ctrl.my_player_id
        return observation, reward, terminated, truncated, info

    def get_llm_info(self, obs, info):
        """
        Convert observations and available actions of all actors from `FreecivBaseEnv` into a dict of natural language
        """
        current_turn = info['turn']

        llm_info = dict()
        for ctrl_type, actors_can_act in info['available_actions'].items():
            llm_info[ctrl_type] = dict()

            if ctrl_type == 'unit':
                units = self.controller.unit_ctrl.units
                for unit_id in actors_can_act:
                    if (units[unit_id]['type'] == 1 and units[unit_id]['activity'] not in
                            [ACTIVITY_IDLE, ACTIVITY_FORTIFIED, ACTIVITY_SENTRY, ACTIVITY_FORTIFYING]):
                        continue

                    x = obs[ctrl_type][unit_id]['x']
                    y = obs[ctrl_type][unit_id]['y']
                    utype = obs[ctrl_type][unit_id]['type_rule_name']

                    unit_dict = self.get_actor_info(
                        x, y, obs, info, ctrl_type, unit_id, utype)
                    if unit_dict:
                        llm_info[ctrl_type][unit_id] = unit_dict

            elif ctrl_type == 'city':
                for city_id in actors_can_act:
                    # The following two conditions are used to check if 1.  the city is just built or is building
                    # coinage, and 2. the city has just built a unit or an improvement last turn and there are some
                    # production points left in stock.
                    if (obs[ctrl_type][city_id]['prod_process'] == 0 or
                            current_turn == obs[ctrl_type][city_id]['turn_last_built'] + 1):
                        x = obs[ctrl_type][city_id]['x']
                        y = obs[ctrl_type][city_id]['y']

                        city_dict = self.get_actor_info(
                            x, y, obs, info, ctrl_type, city_id)
                        if city_dict:
                            llm_info[ctrl_type][city_id] = city_dict
                    else:
                        continue
            else:
                continue

        return llm_info

    def get_actor_info(self, x, y, obs, info, ctrl_type, actor_id, utype=None):
        """
        Convert observations and available actions of a specific actor from `FreecivBaseEnv` into a dict of natural language
        """
        actor_info = dict()

        actor_name = None
        if ctrl_type == 'unit':
            actor_name = utype + ' ' + str(actor_id)
        elif ctrl_type == 'city':
            actor_name = ctrl_type + ' ' + str(actor_id)
        actor_info['name'] = actor_name

        available_actions = get_valid_actions(info, ctrl_type, actor_id)
        if not available_actions or (len(available_actions) == 1 and available_actions[0] == 'keep_activity'):
            return dict()
        else:
            if ctrl_type not in self.ctrl_action_categories:
                actor_info['available_actions'] = make_action_list_readable(
                    available_actions, self.action_names)
            else:
                actor_info['available_actions'] = make_action_list_readable(action_mask(
                    self.ctrl_action_categories[ctrl_type], available_actions), self.action_names)

        actor_info['observations'] = dict()
        actor_info['observations']['minimap'] = self.get_mini_map_info(
            x, y, self.tile_length_radius, self.tile_width_radius, self.tile_info_template)
        actor_info['observations']['upper_map'] = self.get_mini_map_info(
            x, y, self.block_length_radius, self.block_width_radius, self.block_info_template)

        if ctrl_type == 'city':
            actor_info['observations']['producing'] = self.get_city_producing(
                obs[ctrl_type], actor_id)

        fc_logger.debug(f'actor observations: {actor_info}')

        return actor_info

    def get_city_producing(self, obs, actor_id):
        producing = None
        if obs[actor_id]['production_kind'] == VUT_UTYPE:
            producing = self.controller.rule_ctrl.unit_types_list[obs[actor_id]
                                                                  ['production_value'] - self.controller.rule_ctrl.ruleset_control['num_impr_types']]
        elif obs[actor_id]['production_kind'] == VUT_IMPROVEMENT:
            producing = self.controller.rule_ctrl.improvement_types_list[
                obs[actor_id]['production_value']]
        return producing

    def get_mini_map_info(self, x, y, length_r, width_r, template):
        """
        Convert observations of a specific actor from `FreecivBaseEnv` into a dict of natural language
        """
        mini_map_info = dict()

        tile_id = 0
        map_state = self.controller.map_ctrl.prop_state.get_state()
        for ptile in template:
            mini_map_info[ptile] = []
            pdir = DIR[tile_id]
            center_x = x + pdir[0] * (length_r * 2 + 1)
            center_y = y + pdir[1] * (width_r * 2 + 1)

            if not self.controller.map_ctrl.is_out_of_map(center_x, center_y):
                """ consider map_const.TF_WRAPX == 1 """
                start_x = center_x - length_r
                end_x = center_x + length_r + 1
                start_y = center_y - width_r
                end_y = center_y + width_r + 1

                status_arr = read_sub_arr_with_wrap(
                    map_state['status'], start_x, end_x, start_y, end_y)
                terrain_arr = read_sub_arr_with_wrap(
                    map_state['terrain'], start_x, end_x, start_y, end_y)
                extras_arr = read_sub_arr_with_wrap(
                    map_state['extras'], start_x, end_x, start_y, end_y)
                unit_arr = read_sub_arr_with_wrap(
                    map_state['unit'], start_x, end_x, start_y, end_y)
                unit_owner_arr = read_sub_arr_with_wrap(
                    map_state['unit_owner'], start_x, end_x, start_y, end_y)
                city_owner_arr = read_sub_arr_with_wrap(
                    map_state['city_owner'], start_x, end_x, start_y, end_y)

                unexplored_tiles_num = len(list(status_arr[status_arr == 0]))
                if unexplored_tiles_num > 0:
                    status_str = str(unexplored_tiles_num) + \
                        ' ' + 'tiles unexplored'
                    mini_map_info[ptile].append(status_str)

                for terrain_id, terrain in enumerate(TERRAIN_NAMES):
                    terrains_num = len(
                        list(terrain_arr[terrain_arr == terrain_id]))
                    if terrains_num > 0:
                        terrain_str = str(terrains_num) + ' ' + terrain
                        mini_map_info[ptile].append(terrain_str)

                for extra_id, extra in enumerate(EXTRA_NAMES):
                    extras_of_id = extras_arr[:, :, extra_id]
                    extras_num = len(list(extras_of_id[extras_of_id != 0]))
                    if extras_num > 0:
                        extra_str = str(extras_num) + ' ' + extra
                        mini_map_info[ptile].append(extra_str)

                for unit_id, unit in enumerate(self.controller.rule_ctrl.unit_types_list):
                    units_of_id = unit_arr[:, :, unit_id]
                    units_num = np.sum(units_of_id)
                    if units_num > 0:
                        unit_str = str(int(units_num)) + ' ' + unit
                        mini_map_info[ptile].append(unit_str)

                unit_owners = list(unit_owner_arr[unit_owner_arr != 255])
                if len(unit_owners) != 0:
                    owner_set = []
                    unit_owner_str = 'unit owners are:'
                    for unit_owner in unit_owners:
                        if unit_owner in owner_set:
                            continue

                        if unit_owner == self.controller.player_ctrl.my_player_id:
                            unit_owner_str += ' myself player_' + \
                                str(int(unit_owner))
                        else:
                            ds_of_owner = self.controller.dipl_ctrl.diplstates[unit_owner]
                            unit_owner_str += ' ' + \
                                DS_TXT[ds_of_owner] + ' player_' + \
                                str(int(unit_owner))
                        owner_set.append(unit_owner)
                    mini_map_info[ptile].append(unit_owner_str)

                city_owners = list(city_owner_arr[city_owner_arr != 255])
                for city_owner in self.controller.player_ctrl.players:
                    owner_num = city_owners.count(city_owner)
                    if owner_num == 0:
                        continue

                    if city_owner == self.controller.player_ctrl.my_player_id:
                        city_owner_str = str(
                            owner_num) + ' cities of myself player_' + str(int(city_owner))
                    else:
                        ds_of_owner = self.controller.dipl_ctrl.diplstates[city_owner]
                        city_owner_str = (str(owner_num) + ' cities of a ' + DS_TXT[ds_of_owner] +
                                          ' player_' + str(int(city_owner)))
                    mini_map_info[ptile].append(city_owner_str)

            tile_id += 1
        return mini_map_info

get_actor_info(x, y, obs, info, ctrl_type, actor_id, utype=None)

Convert observations and available actions of a specific actor from FreecivBaseEnv into a dict of natural language

Source code in src/civrealm/envs/freeciv_wrapper/llm_wrapper.py
def get_actor_info(self, x, y, obs, info, ctrl_type, actor_id, utype=None):
    """
    Convert observations and available actions of a specific actor from `FreecivBaseEnv` into a dict of natural language
    """
    actor_info = dict()

    actor_name = None
    if ctrl_type == 'unit':
        actor_name = utype + ' ' + str(actor_id)
    elif ctrl_type == 'city':
        actor_name = ctrl_type + ' ' + str(actor_id)
    actor_info['name'] = actor_name

    available_actions = get_valid_actions(info, ctrl_type, actor_id)
    if not available_actions or (len(available_actions) == 1 and available_actions[0] == 'keep_activity'):
        return dict()
    else:
        if ctrl_type not in self.ctrl_action_categories:
            actor_info['available_actions'] = make_action_list_readable(
                available_actions, self.action_names)
        else:
            actor_info['available_actions'] = make_action_list_readable(action_mask(
                self.ctrl_action_categories[ctrl_type], available_actions), self.action_names)

    actor_info['observations'] = dict()
    actor_info['observations']['minimap'] = self.get_mini_map_info(
        x, y, self.tile_length_radius, self.tile_width_radius, self.tile_info_template)
    actor_info['observations']['upper_map'] = self.get_mini_map_info(
        x, y, self.block_length_radius, self.block_width_radius, self.block_info_template)

    if ctrl_type == 'city':
        actor_info['observations']['producing'] = self.get_city_producing(
            obs[ctrl_type], actor_id)

    fc_logger.debug(f'actor observations: {actor_info}')

    return actor_info

get_llm_info(obs, info)

Convert observations and available actions of all actors from FreecivBaseEnv into a dict of natural language

Source code in src/civrealm/envs/freeciv_wrapper/llm_wrapper.py
def get_llm_info(self, obs, info):
    """
    Convert observations and available actions of all actors from `FreecivBaseEnv` into a dict of natural language
    """
    current_turn = info['turn']

    llm_info = dict()
    for ctrl_type, actors_can_act in info['available_actions'].items():
        llm_info[ctrl_type] = dict()

        if ctrl_type == 'unit':
            units = self.controller.unit_ctrl.units
            for unit_id in actors_can_act:
                if (units[unit_id]['type'] == 1 and units[unit_id]['activity'] not in
                        [ACTIVITY_IDLE, ACTIVITY_FORTIFIED, ACTIVITY_SENTRY, ACTIVITY_FORTIFYING]):
                    continue

                x = obs[ctrl_type][unit_id]['x']
                y = obs[ctrl_type][unit_id]['y']
                utype = obs[ctrl_type][unit_id]['type_rule_name']

                unit_dict = self.get_actor_info(
                    x, y, obs, info, ctrl_type, unit_id, utype)
                if unit_dict:
                    llm_info[ctrl_type][unit_id] = unit_dict

        elif ctrl_type == 'city':
            for city_id in actors_can_act:
                # The following two conditions are used to check if 1.  the city is just built or is building
                # coinage, and 2. the city has just built a unit or an improvement last turn and there are some
                # production points left in stock.
                if (obs[ctrl_type][city_id]['prod_process'] == 0 or
                        current_turn == obs[ctrl_type][city_id]['turn_last_built'] + 1):
                    x = obs[ctrl_type][city_id]['x']
                    y = obs[ctrl_type][city_id]['y']

                    city_dict = self.get_actor_info(
                        x, y, obs, info, ctrl_type, city_id)
                    if city_dict:
                        llm_info[ctrl_type][city_id] = city_dict
                else:
                    continue
        else:
            continue

    return llm_info

get_mini_map_info(x, y, length_r, width_r, template)

Convert observations of a specific actor from FreecivBaseEnv into a dict of natural language

Source code in src/civrealm/envs/freeciv_wrapper/llm_wrapper.py
def get_mini_map_info(self, x, y, length_r, width_r, template):
    """
    Convert observations of a specific actor from `FreecivBaseEnv` into a dict of natural language
    """
    mini_map_info = dict()

    tile_id = 0
    map_state = self.controller.map_ctrl.prop_state.get_state()
    for ptile in template:
        mini_map_info[ptile] = []
        pdir = DIR[tile_id]
        center_x = x + pdir[0] * (length_r * 2 + 1)
        center_y = y + pdir[1] * (width_r * 2 + 1)

        if not self.controller.map_ctrl.is_out_of_map(center_x, center_y):
            """ consider map_const.TF_WRAPX == 1 """
            start_x = center_x - length_r
            end_x = center_x + length_r + 1
            start_y = center_y - width_r
            end_y = center_y + width_r + 1

            status_arr = read_sub_arr_with_wrap(
                map_state['status'], start_x, end_x, start_y, end_y)
            terrain_arr = read_sub_arr_with_wrap(
                map_state['terrain'], start_x, end_x, start_y, end_y)
            extras_arr = read_sub_arr_with_wrap(
                map_state['extras'], start_x, end_x, start_y, end_y)
            unit_arr = read_sub_arr_with_wrap(
                map_state['unit'], start_x, end_x, start_y, end_y)
            unit_owner_arr = read_sub_arr_with_wrap(
                map_state['unit_owner'], start_x, end_x, start_y, end_y)
            city_owner_arr = read_sub_arr_with_wrap(
                map_state['city_owner'], start_x, end_x, start_y, end_y)

            unexplored_tiles_num = len(list(status_arr[status_arr == 0]))
            if unexplored_tiles_num > 0:
                status_str = str(unexplored_tiles_num) + \
                    ' ' + 'tiles unexplored'
                mini_map_info[ptile].append(status_str)

            for terrain_id, terrain in enumerate(TERRAIN_NAMES):
                terrains_num = len(
                    list(terrain_arr[terrain_arr == terrain_id]))
                if terrains_num > 0:
                    terrain_str = str(terrains_num) + ' ' + terrain
                    mini_map_info[ptile].append(terrain_str)

            for extra_id, extra in enumerate(EXTRA_NAMES):
                extras_of_id = extras_arr[:, :, extra_id]
                extras_num = len(list(extras_of_id[extras_of_id != 0]))
                if extras_num > 0:
                    extra_str = str(extras_num) + ' ' + extra
                    mini_map_info[ptile].append(extra_str)

            for unit_id, unit in enumerate(self.controller.rule_ctrl.unit_types_list):
                units_of_id = unit_arr[:, :, unit_id]
                units_num = np.sum(units_of_id)
                if units_num > 0:
                    unit_str = str(int(units_num)) + ' ' + unit
                    mini_map_info[ptile].append(unit_str)

            unit_owners = list(unit_owner_arr[unit_owner_arr != 255])
            if len(unit_owners) != 0:
                owner_set = []
                unit_owner_str = 'unit owners are:'
                for unit_owner in unit_owners:
                    if unit_owner in owner_set:
                        continue

                    if unit_owner == self.controller.player_ctrl.my_player_id:
                        unit_owner_str += ' myself player_' + \
                            str(int(unit_owner))
                    else:
                        ds_of_owner = self.controller.dipl_ctrl.diplstates[unit_owner]
                        unit_owner_str += ' ' + \
                            DS_TXT[ds_of_owner] + ' player_' + \
                            str(int(unit_owner))
                    owner_set.append(unit_owner)
                mini_map_info[ptile].append(unit_owner_str)

            city_owners = list(city_owner_arr[city_owner_arr != 255])
            for city_owner in self.controller.player_ctrl.players:
                owner_num = city_owners.count(city_owner)
                if owner_num == 0:
                    continue

                if city_owner == self.controller.player_ctrl.my_player_id:
                    city_owner_str = str(
                        owner_num) + ' cities of myself player_' + str(int(city_owner))
                else:
                    ds_of_owner = self.controller.dipl_ctrl.diplstates[city_owner]
                    city_owner_str = (str(owner_num) + ' cities of a ' + DS_TXT[ds_of_owner] +
                                      ' player_' + str(int(city_owner)))
                mini_map_info[ptile].append(city_owner_str)

        tile_id += 1
    return mini_map_info