Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 78 additions & 41 deletions gymnasium/envs/toy_text/taxi.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ class TaxiEnv(Env):

Map:

+---------+
|R: | : :G|
| : | : : |
| : : : : |
| | : | : |
|Y| : |B: |
+---------+
+---------+
|R: | : :G|
| : | : : |
| : : : : |
| | : | : |
|Y| : |B: |
+---------+

From "Hierarchical Reinforcement Learning with the MAXQ Value Function Decomposition"
by Tom Dietterich [<a href="#taxi_ref">1</a>].
Expand All @@ -68,7 +68,7 @@ class TaxiEnv(Env):
locations of the passenger (including the case when the passenger is in the
taxi), and 4 destination locations.

Destination on the map are represented with the first letter of the color.
Destination on the ansi rendered map are represented with the first letter of the color.

Passenger locations:
- 0: Red
Expand Down Expand Up @@ -100,7 +100,7 @@ class TaxiEnv(Env):
and 3 destinations (excluding the passenger's current location).

## Rewards
- -1 per step unless other reward is triggered.
- -1 per step unless another reward is triggered.
- +20 delivering passenger.
- -10 executing "pickup" and "drop-off" actions illegally.

Expand All @@ -111,16 +111,16 @@ class TaxiEnv(Env):
The episode ends if the following happens:

- Termination:
1. The taxi drops off the passenger.
1. The taxi drops off the passenger.

- Truncation (when using the time_limit wrapper):
1. The length of the episode is 200.
1. The length of the episode is 200.

## Information

`step()` and `reset()` return a dict with the following keys:
- p - transition probability for the state.
- action_mask - if actions will cause a transition to a new state.
- "p": transition probability for the state.
- "action_mask": if actions will cause a transition to a new state. This was added in v0.25.0

For some cases, taking an action will have no effect on the state of the episode.
In v0.25.0, ``info["action_mask"]`` contains a np.ndarray for each of the actions specifying
Expand All @@ -136,22 +136,30 @@ class TaxiEnv(Env):
gym.make('Taxi-v3')
```

<a id="is_raining"></a>`is_raining=False`: If True the cab will move in intended direction with
probability of 80% else will move in either left or right of target direction with
equal probability of 10% in both directions.
<a id="is_rainy"></a>`is_rainy=False`: If True the cab will move in the intended direction with probability
80%, controlled by `rainy_probability`, else in a lateral direction with equal probability.
Pickup and dropoff actions remain deterministic (probability 1.0).

<a id="fickle_passenger"></a>`fickle_passenger=False`: If true the passenger has a 30% chance of changing
destinations when the cab has moved one square away from the passenger's source location. Passenger fickleness
only happens on the first pickup and successful movement. If the passenger is dropped off at the source location
and picked up again, it is not triggered again.
<a id="rainy_probability"></a>`rainy_probability=0.8`: When `is_rainy=True`, the probability of
moving in the intended direction. Each lateral direction is given `(1 - rainy_probability) / 2`.

<a id="fickle_passenger"></a>`fickle_passenger=False`: If True the passenger has a chance 30%,
controlled by `fickle_probability`, of changing destinations when the cab has moved one square away from the
passenger's source location. Passenger fickleness only happens on the first pickup and successful movement.
If the passenger is dropped off at the source location and picked up again, it isn't triggered again.

<a id="fickle_probability"></a>`fickle_probability=0.3`: When `fickle_passenger=True`, the probability
that the passenger changes destination on the first move after pickup.

## References
<a id="taxi_ref"></a>[1] T. G. Dietterich, “Hierarchical Reinforcement Learning with the MAXQ Value Function Decomposition,”
Journal of Artificial Intelligence Research, vol. 13, pp. 227–303, Nov. 2000, doi: 10.1613/jair.639.

## Version History
* v3: Map Correction + Cleaner Domain Description, v0.25.0 action masking added to the reset and step information
* v3: Map Correction + Cleaner Domain Description,
- v0.25.0 action masking added to the reset and step information
- In Gymnasium `1.2.0` the `is_rainy` and `fickle_passenger` arguments were added to align with Dietterich, 2000
- Subsequently the `rainy_probability` and `fickle_probability` arguments were added to tune the stochastic behaviour
* v2: Disallow Taxi start location = goal location, Update Taxi observations in the rollout, Update Taxi reward threshold.
* v1: Remove (3,2) from locs, add passidx<4 check
* v0: Initial version release
Expand Down Expand Up @@ -217,15 +225,21 @@ def _build_dry_transitions(self, row, col, pass_idx, dest_idx, action):
new_state = self.encode(new_row, new_col, new_pass_idx, dest_idx)
self.P[state][action].append((1.0, new_state, reward, terminated))

def _calc_new_position(self, row, col, movement, offset=0):
"""Calculates the new position for a row and col to the movement."""
def _calc_new_position(self, row, col, movement):
"""Calculates the new position for a row and col to the movement.

East/west moves are checked against the interior wall characters in
``self.desc``; north/south moves have no walls so only the grid
boundary clamp applies.
"""
dr, dc = movement
new_row = max(0, min(row + dr, self.max_row))
new_col = max(0, min(col + dc, self.max_col))
if self.desc[1 + new_row, 2 * new_col + offset] == b":":
return new_row, new_col
else: # Default to current position if not traversable
return row, col
if dc == 1 and self.desc[1 + new_row, 2 * new_col] != b":":
return row, col # east wall blocks
if dc == -1 and self.desc[1 + new_row, 2 * new_col + 2] != b":":
return row, col # west wall blocks
return new_row, new_col

def _build_rainy_transitions(self, row, col, pass_idx, dest_idx, action):
"""Computes the next action for a state (row, col, pass_idx, dest_idx) and action for `is_rainy`."""
Expand All @@ -236,24 +250,33 @@ def _build_rainy_transitions(self, row, col, pass_idx, dest_idx, action):
reward = -1 # default reward when there is no pickup/dropoff
terminated = False

# (forward, left, right) relative to each heading.
# Left/right follow the standard navigation convention:
# south → left=east, right=west
# north → left=west, right=east
# east → left=north, right=south
# west → left=south, right=north
moves = {
0: ((1, 0), (0, -1), (0, 1)), # Down
1: ((-1, 0), (0, -1), (0, 1)), # Up
2: ((0, 1), (1, 0), (-1, 0)), # Right
3: ((0, -1), (1, 0), (-1, 0)), # Left
0: ((1, 0), (0, 1), (0, -1)), # Down (south): left=east, right=west
1: ((-1, 0), (0, -1), (0, 1)), # Up (north): left=west, right=east
2: ((0, 1), (-1, 0), (1, 0)), # Right (east): left=north, right=south
3: ((0, -1), (1, 0), (-1, 0)), # Left (west): left=south, right=north
}

# Check if movement is allowed
# Check if the primary move is possible. When it is not (wall or grid
# boundary), no lateral drift occurs either — consistent across all four
# directions.
if (
action in {0, 1}
(action == 0 and row < self.max_row)
or (action == 1 and row > 0)
or (action == 2 and self.desc[1 + row, 2 * col + 2] == b":")
or (action == 3 and self.desc[1 + row, 2 * col] == b":")
):
dr, dc = moves[action][0]
new_row = max(0, min(row + dr, self.max_row))
new_col = max(0, min(col + dc, self.max_col))

left_pos = self._calc_new_position(row, col, moves[action][1], offset=2)
left_pos = self._calc_new_position(row, col, moves[action][1])
right_pos = self._calc_new_position(row, col, moves[action][2])
elif action == 4: # pickup
new_pass_idx, reward = self._pickup(taxi_loc, new_pass_idx, reward)
Expand All @@ -269,9 +292,15 @@ def _build_rainy_transitions(self, row, col, pass_idx, dest_idx, action):
right_pos[0], right_pos[1], new_pass_idx, dest_idx
)

self.P[state][action].append((0.8, intended_state, -1, terminated))
self.P[state][action].append((0.1, left_state, -1, terminated))
self.P[state][action].append((0.1, right_state, -1, terminated))
self.P[state][action].append(
(self.rainy_probability, intended_state, -1, terminated)
)
self.P[state][action].append(
(self._rainy_lateral_probability, left_state, -1, terminated)
)
self.P[state][action].append(
(self._rainy_lateral_probability, right_state, -1, terminated)
)
else:
self.P[state][action].append((1.0, intended_state, reward, terminated))

Expand All @@ -280,12 +309,18 @@ def __init__(
render_mode: str | None = None,
is_rainy: bool = False,
fickle_passenger: bool = False,
rainy_probability: float = 0.8,
fickle_probability: float = 0.3,
):
self.desc = np.asarray(MAP, dtype="c")

self.locs = locs = [(0, 0), (0, 4), (4, 0), (4, 3)]
self.locs_colors = [(255, 0, 0), (0, 255, 0), (255, 255, 0), (0, 0, 255)]

self.rainy_probability = rainy_probability
self._rainy_lateral_probability = (1.0 - rainy_probability) / 2.0
self.fickle_probability = fickle_probability

num_states = 500
num_rows = 5
num_columns = 5
Expand Down Expand Up @@ -328,7 +363,7 @@ def __init__(

self.render_mode = render_mode
self.fickle_passenger = fickle_passenger
self.fickle_step = self.fickle_passenger and self.np_random.random() < 0.3
self.fickle_step = False

# pygame utils
self.window = None
Expand Down Expand Up @@ -356,7 +391,7 @@ def encode(self, taxi_row, taxi_col, pass_loc, dest_idx):
i += dest_idx
return i

def decode(self, i):
def decode(self, i) -> tuple[int, int, int, int]:
out = []
out.append(i % 4)
i = i // 4
Expand All @@ -366,7 +401,7 @@ def decode(self, i):
i = i // 5
out.append(i)
assert 0 <= i < 5
return reversed(out)
return tuple(reversed(out))

def action_mask(self, state: int):
"""Computes an action mask for the action space using the state information."""
Expand Down Expand Up @@ -429,7 +464,9 @@ def reset(
super().reset(seed=seed)
self.s = categorical_sample(self.initial_state_distrib, self.np_random)
self.lastaction = None
self.fickle_step = self.fickle_passenger and self.np_random.random() < 0.3
self.fickle_step = (
self.fickle_passenger and self.np_random.random() < self.fickle_probability
)
self.taxi_orientation = 0

if self.render_mode == "human":
Expand Down
100 changes: 1 addition & 99 deletions tests/envs/test_env_implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import gymnasium as gym
from gymnasium.envs.box2d import BipedalWalker, CarRacing
from gymnasium.envs.box2d.lunar_lander import demo_heuristic_lander
from gymnasium.envs.toy_text import CliffWalkingEnv, TaxiEnv
from gymnasium.envs.toy_text import CliffWalkingEnv
from gymnasium.envs.toy_text.frozen_lake import generate_random_map
from gymnasium.error import InvalidAction

Expand Down Expand Up @@ -184,104 +184,6 @@ def test_frozenlake_map_generation_with_seed(map_size: int, seed: int):
assert map1 != map2


def test_taxi_action_mask():
env = TaxiEnv()

for state in env.P:
mask = env.action_mask(state)
for action, possible in enumerate(mask):
_, next_state, _, _ = env.P[state][action][0]
assert state != next_state if possible else state == next_state


def test_taxi_encode_decode():
env = TaxiEnv()

state, info = env.reset()
for _ in range(100):
assert env.encode(*env.decode(state)) == state, (
f"state={state}, encode(decode(state))={env.encode(*env.decode(state))}"
)
state, _, _, _, _ = env.step(env.action_space.sample())


def test_taxi_is_rainy():
env = TaxiEnv(is_rainy=True)
for state_dict in env.P.values():
for action, transitions in state_dict.items():
if action <= 3:
assert sum([t[0] for t in transitions]) == 1
assert {t[0] for t in transitions} == {0.8, 0.1}
else:
assert len(transitions) == 1
assert transitions[0][0] == 1.0

state, _ = env.reset()
_, _, _, _, info = env.step(0)
assert info["prob"] in {0.8, 0.1}

env = TaxiEnv(is_rainy=False)
for state_dict in env.P.values():
for transitions in state_dict.values():
assert len(transitions) == 1
assert transitions[0][0] == 1.0

state, _ = env.reset()
_, _, _, _, info = env.step(0)
assert info["prob"] == 1.0


def test_taxi_disallowed_transitions():
disallowed_transitions = [
((0, 1), (0, 3)),
((0, 3), (0, 1)),
((1, 0), (1, 2)),
((1, 2), (1, 0)),
((3, 1), (3, 3)),
((3, 3), (3, 1)),
((3, 3), (3, 5)),
((3, 5), (3, 3)),
((4, 1), (4, 3)),
((4, 3), (4, 1)),
((4, 3), (4, 5)),
((4, 5), (4, 3)),
]
for rain in {True, False}:
env = TaxiEnv(is_rainy=rain)
for state, state_dict in env.P.items():
start_row, start_col, _, _ = env.decode(state)
for transitions in state_dict.values():
for transition in transitions:
end_row, end_col, _, _ = env.decode(transition[1])
assert (
(start_row, start_col),
(end_row, end_col),
) not in disallowed_transitions


def test_taxi_fickle_passenger():
env = TaxiEnv(fickle_passenger=True)
# This is a fickle seed, if randomness or the draws from the PRNG were recently updated, find a new seed
env.reset(seed=43)
state, *_ = env.step(0)
taxi_row, taxi_col, pass_idx, orig_dest_idx = env.decode(state)
# force taxi to passenger location
env.s = env.encode(
env.locs[pass_idx][0], env.locs[pass_idx][1], pass_idx, orig_dest_idx
)
# pick up the passenger
env.step(4)
if env.locs[pass_idx][0] == 0:
# if we're on the top row, move down
state, *_ = env.step(0)
else:
# otherwise move up
state, *_ = env.step(1)
taxi_row, taxi_col, pass_idx, dest_idx = env.decode(state)
# check that passenger has changed their destination
assert orig_dest_idx != dest_idx


@pytest.mark.parametrize(
"env_name",
["Acrobot-v1", "CartPole-v1", "MountainCar-v0", "MountainCarContinuous-v0"],
Expand Down
Empty file added tests/envs/toy_text/__init__.py
Empty file.
Loading
Loading