Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 142 additions & 39 deletions alf/algorithms/data_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@

import alf
from alf.data_structures import AlgStep, Experience, namedtuple, StepType, TimeStep
from alf.environments import suite_socialbot
from alf.experience_replayers.replay_buffer import ReplayBuffer, BatchInfo
from alf.nest.utils import convert_device
from alf.utils.normalizers import WindowNormalizer, EMNormalizer, AdaptiveNormalizer
from alf.utils import common
from alf.utils.math_ops import l2_dist_close_reward_fn
from alf.utils.normalizers import ScalarAdaptiveNormalizer

FrameStackState = namedtuple('FrameStackState', ['steps', 'prev_frames'])
Expand Down Expand Up @@ -180,6 +182,7 @@ def __init__(self,
observation_spec,
stack_size=4,
stack_axis=0,
convert_only_minibatch_to_device=False,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this argument mean? docstring should explain its actual meaning.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

fields=None):
"""Create a FrameStacker object.

Expand All @@ -198,6 +201,7 @@ def __init__(self,
self._frames = dict()
self._fields = fields if (fields is not None) else [None]
self._exp_fields = []
self._convert_only_minibatch_to_device = convert_only_minibatch_to_device
prev_frames_spec = []
stacked_observation_spec = observation_spec
for field in self._fields:
Expand Down Expand Up @@ -350,9 +354,14 @@ def transform_experience(self, experience: Experience):
B = torch.arange(batch_size)
obs_index = (B.unsqueeze(-1).unsqueeze(-1), obs_index.unsqueeze(0))

if self._convert_only_minibatch_to_device:
obs_index = convert_device(obs_index, device=replay_buffer.device)

def _stack_frame(obs, i):
prev_obs = replay_buffer.get_field(self._exp_fields[i], env_ids,
prev_positions)
if not self._convert_only_minibatch_to_device:
prev_obs = convert_device(prev_obs)
stacked_shape = alf.nest.get_field(
self._transformed_observation_spec, self._fields[i]).shape
# [batch_size, mini_batch_length + stack_size - 1, ...]
Expand Down Expand Up @@ -702,27 +711,6 @@ def forward(self, reward):
return reward * self._scale


@alf.configurable
def l2_dist_close_reward_fn(achieved_goal, goal, threshold=.05):
"""Giving -1/0 reward based on how close the achieved state is to the goal state.

Args:
achieved_goal (Tensor): achieved state, of shape ``[batch_size, batch_length, ...]``
goal (Tensor): goal state, of shape ``[batch_size, batch_length, ...]``
threshold (float): L2 distance threshold for the reward.

Returns:
Tensor for -1/0 reward of shape ``[batch_size, batch_length]``.
"""

if goal.dim() == 2: # when goals are 1-dimensional
assert achieved_goal.dim() == goal.dim()
achieved_goal = achieved_goal.unsqueeze(2)
goal = goal.unsqueeze(2)
return -(torch.norm(achieved_goal - goal, dim=2) >= threshold).to(
torch.float32)


@alf.configurable
class HindsightExperienceTransformer(DataTransformer):
"""Randomly transform her_proportion of `batch_size` trajectories with hindsight relabel.
Expand Down Expand Up @@ -751,6 +739,9 @@ def __init__(self,
her_proportion=0.8,
achieved_goal_field="time_step.observation.achieved_goal",
desired_goal_field="time_step.observation.desired_goal",
sparse_reward=False,
add_noise_to_goals=False,
threshold=.05,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can merge add_noise_to_goals and threshold into just one relabeled_goal_noise, and when it's 0 there is no noise added.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done.

reward_fn=l2_dist_close_reward_fn):
"""
Args:
Expand All @@ -759,6 +750,9 @@ def __init__(self,
exp nest.
desired_goal_field (str): path to the desired_goal field in the
exp nest.
sparse_reward (bool): Whether to transform reward from -1/0 to 0/1.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This transform from -1/0 to 0/1 seems specific to certain environments and is not a general thing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Environment rewards can be 0/1 or -1/0 or any form. But because the Hindsight relabeler needs to know whether goal is achieved to be able to relabel discount and step_type correctly, here, we assume the reward_fn always returns -1/0 reward, and it needs to be transformed into 0/1 form when needed.

Since it relabels discount to 0, this also relabels infinite horizon tasks to episodic. Updated comment.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Environment rewards can be 0/1 or -1/0 or any form. But because the Hindsight relabeler needs to know whether goal is achieved to be able to relabel discount and step_type correctly, here, we assume the reward_fn always returns -1/0 reward, and it needs to be transformed into 0/1 form when needed.

Since it relabels discount to 0, this also relabels infinite horizon tasks to episodic. Updated comment.

From the docstring, the observation is assumed to have a field "achieved_goal" indicating what goal has been achieved. If this is true, then there is no need to decide goal achieving from the reward, which is not a general setup. (on the other hand, -1/0 is as sparse as 0/1 so the name 'sparse_reward' could be improved).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about goal achieved. It's still hard to find a general way though. The reward function is general, and may not return anything about goal achievement, e.g. when it is a dense reward based on distance to goal.

Calling the variable relabel_with_episodic_rewards, since -1/0 is for continuous and 0/1 is for episodic.

add_noise_to_goals (bool): Whether to add noise around relabeled goal.
threshold (float): noise added to relabeled goals.
reward_fn (Callable): function to recompute reward based on
achieve_goal and desired_goal. Default gives reward 0 when
L2 distance less than 0.05 and -1 otherwise, same as is done in
Expand All @@ -770,6 +764,9 @@ def __init__(self,
self._her_proportion = her_proportion
self._achieved_goal_field = achieved_goal_field
self._desired_goal_field = desired_goal_field
self._sparse_reward = sparse_reward
self._add_noise_to_goals = add_noise_to_goals
self._threshold = threshold
self._reward_fn = reward_fn

def transform_timestep(self, timestep: TimeStep, state):
Expand Down Expand Up @@ -819,32 +816,96 @@ def transform_experience(self, experience: Experience):
her_cond = torch.rand(batch_size) < her_proportion
(her_indices, ) = torch.where(her_cond)

last_step_pos = start_pos[her_indices] + batch_length - 1
last_env_ids = env_ids[her_indices]
# Get x, y indices of LAST steps
has_her = torch.any(her_cond)
last_step_pos = start_pos + batch_length - 1
last_env_ids = env_ids
# Get x, y indices of LAST steps for the whole batch, not just the HER part.
dist = buffer.steps_to_episode_end(last_step_pos, last_env_ids)
if alf.summary.should_record_summaries():
alf.summary.scalar(
"replayer/" + buffer._name + ".mean_steps_to_episode_end",
torch.mean(dist.type(torch.float32)))

def _add_noise(t):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function transform_experience() has already over 200 lines, and it's really difficult to read. I suggest splitting it into several member functions to make it more modular.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Cleaned up a bit.

if not self._add_noise_to_goals:
return t
bs, bl, dim = t.shape
# rejection sample from unit ball
assert dim < 20, "Cannot rejection sample from high dim ball yet."
n_samples, i = 0, 0
while n_samples == 0:
_sample = torch.rand((bs * 2, dim))
in_ball = torch.norm(_sample, dim=1) < 1.
if torch.any(in_ball):
sample = _sample[in_ball]
nsample = sample.shape[0]
if nsample < bs:
sample = sample.expand(bs // nsample + 1, nsample,
dim).reshape(-1, dim)
if sample.shape[0] > bs:
sample = sample[:bs, :]
break
assert i < 10, "shouldn't take 10 iterations"
i += 1
return t + self._threshold * sample.reshape(bs, 1, dim)

# get random future state
future_idx = last_step_pos + (torch.rand(*dist.shape) *
(dist + 1)).to(torch.int64)
future_ag = buffer.get_field(self._achieved_goal_field,
last_env_ids, future_idx).unsqueeze(1)
future_dist = (torch.rand(*dist.shape) * (dist + 1)).to(
torch.int64)
future_idx = last_step_pos + future_dist
future_ag = _add_noise(
buffer.get_field(self._achieved_goal_field, last_env_ids,
future_idx).unsqueeze(1))

# relabel desired goal
result_desired_goal = alf.nest.get_field(result,
self._desired_goal_field)
relabed_goal = result_desired_goal.clone()
relabeled_goal = result_desired_goal.clone()
her_batch_index_tuple = (her_indices.unsqueeze(1),
torch.arange(batch_length).unsqueeze(0))
relabed_goal[her_batch_index_tuple] = future_ag
if has_her:
relabeled_goal[her_batch_index_tuple] = future_ag[her_indices]

# recompute rewards
result_ag = alf.nest.get_field(result, self._achieved_goal_field)
relabeled_rewards = self._reward_fn(result_ag, relabed_goal)
relabeled_rewards = self._reward_fn(
result_ag, relabeled_goal, threshold=self._threshold)
if alf.summary.should_record_summaries():
alf.summary.scalar(
"replayer/" + buffer._name +
".discount_mean_before_relabel",
torch.mean(result.discount[:, 1:]))
if self._sparse_reward:
reward_achieved = relabeled_rewards >= 0
# Cut off episode for any goal reached.
end = reward_achieved
discount = torch.where(end, torch.tensor(0.), result.discount)
step_type = torch.where(end, torch.tensor(StepType.LAST),
result.step_type)
# Also relabel ``LAST``` steps to ``MID``` where aux goals were not
# achieved but env ended episode due to position goal achieved.
# -1/0 reward doesn't end episode on achieving position goal, and
# doesn't need to do this relabeling.
goal_reward = result.reward
if len(result.reward.shape) > 2:
goal_reward = result.reward[..., 0]
mid = (
result.step_type == StepType.LAST) & ~reward_achieved & (
goal_reward > 0) # assumes no multi dim goal reward.
discount = torch.where(mid, torch.tensor(1.), discount)
step_type = torch.where(mid, torch.tensor(StepType.MID),
step_type)

if alf.summary.should_record_summaries():
alf.summary.scalar(
"replayer/" + buffer._name +
".discount_mean_after_relabel",
torch.mean(discount[:, 1:]))

result = result._replace(discount=discount)
result = result._replace(step_type=step_type)
relabeled_rewards = suite_socialbot.transform_reward_tensor(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that HindsightExperienceTransformer is a general data transformer independent of the environment. So it feels strange to include suite_socialbot here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This transform_reward_tensor function is used to translate -1/0 reward into 0/1 reward. We also depend on the -1/0 reward assumption to determine the relabeled LAST (episode end) steps (also relabel discount 0).

Removed the dependency here, added it in the task gin or alf configs. This runs the risk of people forgetting to set it in a new task config.

For episodic tasks (sparse_reward==True), the Hindsight transformer does depend on the original reward being -1/0. In this case, maybe I should move ``transform_reward_tensor'' out of suite_socialbot, into a more general place like reward_utils.py? Would you prefer this more than the current solution of relying on the task's alf config to setup the transform_reward_tensor function?

relabeled_rewards)

non_her_or_fst = ~her_cond.unsqueeze(1) & (result.step_type !=
StepType.FIRST)
Expand Down Expand Up @@ -874,21 +935,60 @@ def transform_experience(self, experience: Experience):
alf.summary.scalar(
"replayer/" + buffer._name + ".reward_mean_before_relabel",
torch.mean(result.reward[her_indices][:-1]))
alf.summary.scalar(
"replayer/" + buffer._name + ".reward_mean_after_relabel",
torch.mean(relabeled_rewards[her_indices][:-1]))
if has_her:
alf.summary.scalar(
"replayer/" + buffer._name + ".reward_mean_after_relabel",
torch.mean(relabeled_rewards[her_indices][:-1]))
alf.summary.scalar("replayer/" + buffer._name + ".future_distance",
torch.mean(future_dist.float()))

goal_rewards = result.reward
if result.reward.ndim > 2:
goal_rewards = result.reward[:, :, 0]

# assert reward function is the same as used by the environment.
if not torch.allclose(relabeled_rewards[non_her_or_fst],
goal_rewards[non_her_or_fst]):
not_close = torch.abs(relabeled_rewards[non_her_or_fst] -
goal_rewards[non_her_or_fst]) > 0.01
msg = ("hindsight_relabel:\nrelabeled_reward\n{}\n!=\n" +
"env_reward\n{}\nag:\n{}\ndg:\n{}\nenv_ids:\n{}\nstart_pos:"
+ "\n{}").format(
relabeled_rewards[non_her_or_fst][not_close],
goal_rewards[non_her_or_fst][not_close],
result_ag[non_her_or_fst][not_close],
result_desired_goal[non_her_or_fst][not_close],
env_ids.unsqueeze(1).expand(
shape[:2])[non_her_or_fst][not_close],
start_pos.unsqueeze(1).expand(
shape[:2])[non_her_or_fst][not_close])
logging.warning(msg)
# assert False, msg
# relabeled_rewards[non_her_or_fst] = goal_rewards[non_her_or_fst]

final_relabeled_rewards = relabeled_rewards
if result.reward.ndim > 2:
# multi dimensional env reward, first dim is goal related reward.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this reward assumption too strict and easily violated by a user?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Made a note about the assumption upfront in the class description.
The proper way is maybe to divide the current multi-dim reward into a nested reward field. Added TODO.

final_relabeled_rewards = result.reward.clone()
final_relabeled_rewards[:, :, 0] = relabeled_rewards
result = result.update_time_step_field('reward',
final_relabeled_rewards)

result = alf.nest.transform_nest(
result, self._desired_goal_field, lambda _: relabed_goal)

result = result.update_time_step_field('reward', relabeled_rewards)
result, self._desired_goal_field, lambda _: relabeled_goal)

info = info._replace(her=her_cond, future_distance=future_dist)
if alf.get_default_device() != buffer.device:
for f in accessed_fields:
result = alf.nest.transform_nest(
result, f, lambda t: convert_device(t))
result = alf.nest.transform_nest(
result, "batch_info.replay_buffer", lambda _: buffer)
info = convert_device(info)
info = info._replace(
her=info.her.unsqueeze(1).expand(result.reward.shape[:2]),
future_distance=info.future_distance.unsqueeze(1).expand(
result.reward.shape[:2]),
replay_buffer=buffer)
result = alf.data_structures.add_batch_info(result, info)
return result


Expand Down Expand Up @@ -922,4 +1022,7 @@ def create_data_transformer(data_transformer_ctor, observation_spec):
if len(data_transformer_ctor) == 1:
return data_transformer_ctor[0](observation_spec)

if HindsightExperienceTransformer in data_transformer_ctor:
assert HindsightExperienceTransformer == data_transformer_ctor[0], \
"Hindsight relabeling should happen before all other transforms."

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer true after Break's recent PR on SequentialDataTransformer. Basically UntransformedTimeStep could appear before any other data transformer. I suggest either justifying why HindsightExperienceTransformer has to appear before UntransformedTimeStep or removing this assertion here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Removed.

return SequentialDataTransformer(data_transformer_ctor, observation_spec)
12 changes: 10 additions & 2 deletions alf/algorithms/ddpg_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,12 @@ def _sample(a, ou):
noisy_action, self._action_spec)
state = empty_state._replace(
actor=DdpgActorState(actor=state, critics=()))
# action_distribution is not supported for continuous actions for now.
# Returns empty action_distribution to fail early.
return AlgStep(
output=noisy_action,
state=state,
info=DdpgInfo(action=noisy_action, action_distribution=action))
info=DdpgInfo(action=noisy_action, action_distribution=()))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this change? By default we could think of a deterministic action distribution as an action tensor.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to fail early and clearly. action is a tensor, not distribution. Putting action directly there could cause confusion when debugging. See comment 3 lines above.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is intended to return action_distribution here so that some other algorithm can use it (e.g. TracAlgorithm). Do you find it causing problem?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when used with e.g. Retrace, a distribution is needed, but action is not a distribution.


def rollout_step(self, time_step: TimeStep, state=None):
if self.need_full_rollout_state():
Expand Down Expand Up @@ -330,7 +332,8 @@ def train_step(self, inputs: TimeStep, state: DdpgState,
reward=inputs.reward,
step_type=inputs.step_type,
discount=inputs.discount,
action_distribution=policy_step.output,
action=policy_step.output,
action_distribution=(),
critic=critic_info,
actor_loss=policy_step.info,
discounted_return=rollout_info.discounted_return))
Expand All @@ -355,6 +358,11 @@ def calc_loss(self, info: DdpgInfo):

actor_loss = info.actor_loss

if self._critic_losses[0]._improve_w_nstep_bootstrap:

@hnyu hnyu May 5, 2022

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This masking seems quite hacky. CriticLoss should not be responsible for actor losses or other losses in general. So this flag really belongs to either an algorithm or some other places.

@le-horizon le-horizon May 9, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

# Ignore 2nd - nth step actor losses.
actor_loss.loss[1:] = 0

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to igore the actor loss for those steps? It seems to me it doesn't hurt to keep them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Since we compare with n-step return methods anyways, it may be more fair to keep the other steps' action and alpha losses.

actor_loss.extra[1:] = 0

return LossInfo(
loss=critic_loss + actor_loss.loss,
priority=priority,
Expand Down
Loading