diff --git a/rl_coach/agents/clipped_ppo_agent.py b/rl_coach/agents/clipped_ppo_agent.py index cc29f33..4f1a7d9 100644 --- a/rl_coach/agents/clipped_ppo_agent.py +++ b/rl_coach/agents/clipped_ppo_agent.py @@ -182,7 +182,7 @@ class ClippedPPOAgent(ActorCriticAgent): screen.warning("WARNING: The requested policy gradient rescaler is not available") # standardize - advantages = (advantages - np.mean(advantages)) / np.std(advantages) + advantages = (advantages - np.mean(advantages)) / (np.std(advantages) + 1e-8) for transition, advantage, value_target in zip(batch.transitions, advantages, value_targets): transition.info['advantage'] = advantage diff --git a/rl_coach/architectures/architecture.py b/rl_coach/architectures/architecture.py index 90dbd6e..8d457a9 100644 --- a/rl_coach/architectures/architecture.py +++ b/rl_coach/architectures/architecture.py @@ -46,8 +46,9 @@ class Architecture(object): """ self.spaces = spaces self.name = name - self.network_wrapper_name = self.name.split('/')[0] # e.g. 'main/online' --> 'main' - self.full_name = "{}/{}".format(agent_parameters.full_name_id, name) + self.network_wrapper_name = self.name.split('/')[1] # e.g. 'main/online' --> 'main' + self.full_name = "{}/{}".format(agent_parameters.full_name_id, '/'.join(name.split('/')[1:])) + # self.full_name = "{}/{}".format(agent_parameters.full_name_id, name) self.network_parameters = agent_parameters.network_wrappers[self.network_wrapper_name] self.batch_size = self.network_parameters.batch_size self.learning_rate = self.network_parameters.learning_rate diff --git a/rl_coach/architectures/network_wrapper.py b/rl_coach/architectures/network_wrapper.py index dfefc41..a31dbf4 100644 --- a/rl_coach/architectures/network_wrapper.py +++ b/rl_coach/architectures/network_wrapper.py @@ -68,7 +68,7 @@ class NetworkWrapper(object): self.global_network = general_network(variable_scope=variable_scope, devices=force_list(replicated_device), agent_parameters=agent_parameters, - name='{}/global'.format(name), + name='{}/{}/global'.format(agent_parameters.name, name), global_network=None, network_is_local=False, spaces=spaces, @@ -79,7 +79,7 @@ class NetworkWrapper(object): self.online_network = general_network(variable_scope=variable_scope, devices=force_list(worker_device), agent_parameters=agent_parameters, - name='{}/online'.format(name), + name='{}/{}/online'.format(agent_parameters.name,name), global_network=self.global_network, network_is_local=True, spaces=spaces, @@ -91,7 +91,7 @@ class NetworkWrapper(object): self.target_network = general_network(variable_scope=variable_scope, devices=force_list(worker_device), agent_parameters=agent_parameters, - name='{}/target'.format(name), + name='{}/{}/target'.format(agent_parameters.name, name), global_network=self.global_network, network_is_local=True, spaces=spaces, diff --git a/rl_coach/architectures/tensorflow_components/architecture.py b/rl_coach/architectures/tensorflow_components/architecture.py index 68420fe..1fcb912 100644 --- a/rl_coach/architectures/tensorflow_components/architecture.py +++ b/rl_coach/architectures/tensorflow_components/architecture.py @@ -97,7 +97,7 @@ class TensorFlowArchitecture(Architecture): self.optimizer_type = self.network_parameters.optimizer_type if self.ap.task_parameters.seed is not None: tf.set_random_seed(self.ap.task_parameters.seed) - with tf.variable_scope("/".join(self.name.split("/")[1:]), initializer=tf.contrib.layers.xavier_initializer(), + with tf.variable_scope("/".join(self.name.split("/")[2:]), initializer=tf.contrib.layers.xavier_initializer(), custom_getter=local_getter if network_is_local and global_network else None): self.global_step = tf.train.get_or_create_global_step() diff --git a/rl_coach/architectures/tensorflow_components/general_network.py b/rl_coach/architectures/tensorflow_components/general_network.py index 8821ac6..fc0b3ac 100644 --- a/rl_coach/architectures/tensorflow_components/general_network.py +++ b/rl_coach/architectures/tensorflow_components/general_network.py @@ -105,7 +105,7 @@ class GeneralTensorFlowNetwork(TensorFlowArchitecture): """ self.global_network = global_network self.network_is_local = network_is_local - self.network_wrapper_name = name.split('/')[0] + self.network_wrapper_name = name.split('/')[1] self.network_parameters = agent_parameters.network_wrappers[self.network_wrapper_name] self.num_heads_per_network = 1 if self.network_parameters.use_separate_networks_per_head else \ len(self.network_parameters.heads_parameters) diff --git a/rl_coach/architectures/tensorflow_components/heads/ppo_head.py b/rl_coach/architectures/tensorflow_components/heads/ppo_head.py index 63f95a3..d476c10 100644 --- a/rl_coach/architectures/tensorflow_components/heads/ppo_head.py +++ b/rl_coach/architectures/tensorflow_components/heads/ppo_head.py @@ -25,6 +25,11 @@ from rl_coach.spaces import BoxActionSpace, DiscreteActionSpace from rl_coach.spaces import SpacesDefinition from rl_coach.utils import eps +# Since we are using log prob it is possible to encounter a 0 log 0 condition +# which will tank the training by producing NaN's therefore it is necessary +# to add a zero offset to all networks with discreete distributions to prevent +# this isssue +ZERO_OFFSET = 1e-8 class PPOHead(Head): def __init__(self, agent_parameters: AgentParameters, spaces: SpacesDefinition, network_name: str, @@ -107,7 +112,8 @@ class PPOHead(Head): # Policy Head self.input = [self.actions, self.old_policy_mean] policy_values = self.dense_layer(num_actions)(input_layer, name='policy_fc') - self.policy_mean = tf.nn.softmax(policy_values, name="policy") + # Prevent distributions with 0 values + self.policy_mean = tf.maximum(tf.nn.softmax(policy_values, name="policy"), ZERO_OFFSET) # define the distributions for the policy and the old policy self.policy_distribution = tf.contrib.distributions.Categorical(probs=self.policy_mean) @@ -123,9 +129,8 @@ class PPOHead(Head): self.old_policy_std = tf.placeholder(tf.float32, [None, num_actions], "old_policy_std") self.input = [self.actions, self.old_policy_mean, self.old_policy_std] - self.policy_mean = self.dense_layer(num_actions)(input_layer, name='policy_mean', - kernel_initializer=normalized_columns_initializer(0.01)) - + self.policy_mean = tf.identity(self.dense_layer(num_actions)(input_layer, name='policy_mean', + kernel_initializer=normalized_columns_initializer(0.01)), name="policy") # for local networks in distributed settings, we need to move variables we create manually to the # tf.GraphKeys.LOCAL_VARIABLES collection, since the variable scope custom getter which is set in # Architecture does not apply to them @@ -135,7 +140,7 @@ class PPOHead(Head): else: self.policy_logstd = tf.Variable(np.zeros((1, num_actions)), dtype='float32', name="policy_log_std") - self.policy_std = tf.tile(tf.exp(self.policy_logstd), [tf.shape(input_layer)[0], 1], name='policy_std') + self.policy_std = tf.tile(tf.exp(tf.clip_by_value(self.policy_logstd, -20.0, 3.0)), [tf.shape(input_layer)[0], 1], name='policy_std') # define the distributions for the policy and the old policy self.policy_distribution = tf.contrib.distributions.MultivariateNormalDiag(self.policy_mean, self.policy_std + eps) diff --git a/rl_coach/architectures/tensorflow_components/savers.py b/rl_coach/architectures/tensorflow_components/savers.py index 531c523..4bde2ad 100644 --- a/rl_coach/architectures/tensorflow_components/savers.py +++ b/rl_coach/architectures/tensorflow_components/savers.py @@ -28,11 +28,11 @@ class GlobalVariableSaver(Saver): self._names = [name] # if graph is finalized, savers must have already already been added. This happens # in the case of a MonitoredSession - self._variables = tf.global_variables() + self._variables = tf.trainable_variables() # target network is never saved or restored directly from checkpoint, so we are removing all its variables from the list # the target network would be synched back from the online network in graph_manager.improve(...), at the beginning of the run flow. - self._variables = [v for v in self._variables if "/target" not in v.name] + self._variables = [v for v in self._variables if ('/target' not in v.name and name.split('/')[0]+'/' in v.name)] # Using a placeholder to update the variable during restore to avoid memory leak. # Ref: https://github.com/tensorflow/tensorflow/issues/4151 diff --git a/rl_coach/agents/actor_critic_agent.py b/rl_coach/agents/actor_critic_agent.py index 35c8bf9..4f3ce60 100644 --- a/rl_coach/agents/actor_critic_agent.py +++ b/rl_coach/agents/actor_critic_agent.py @@ -94,11 +94,14 @@ class ActorCriticAgentParameters(AgentParameters): class ActorCriticAgent(PolicyOptimizationAgent): def __init__(self, agent_parameters, parent: Union['LevelManager', 'CompositeAgent']=None): super().__init__(agent_parameters, parent) + print("[RL] ActorCriticAgent init") self.last_gradient_update_step_idx = 0 self.action_advantages = self.register_signal('Advantages') self.state_values = self.register_signal('Values') self.value_loss = self.register_signal('Value Loss') self.policy_loss = self.register_signal('Policy Loss') + print("[RL] ActorCriticAgent init successful") + # Discounting function used to calculate discounted returns. def discount(self, x, gamma): diff --git a/rl_coach/agents/agent.py b/rl_coach/agents/agent.py index 866fe8a..cf0873a 100644 --- a/rl_coach/agents/agent.py +++ b/rl_coach/agents/agent.py @@ -28,6 +28,8 @@ from rl_coach.base_parameters import AgentParameters, Device, DeviceType, Distri from rl_coach.core_types import RunPhase, PredictionType, EnvironmentEpisodes, ActionType, Batch, Episode, StateType from rl_coach.core_types import Transition, ActionInfo, TrainingSteps, EnvironmentSteps, EnvResponse from rl_coach.logger import screen, Logger, EpisodeLogger +from rl_coach.memories.memory import Memory +from rl_coach.memories.non_episodic.experience_replay import ExperienceReplay from rl_coach.memories.episodic.episodic_experience_replay import EpisodicExperienceReplay from rl_coach.saver import SaverCollection from rl_coach.spaces import SpacesDefinition, VectorObservationSpace, GoalsSpace, AttentionActionSpace @@ -74,7 +76,7 @@ class Agent(AgentInterface): self.imitation = False self.agent_logger = Logger() self.agent_episode_logger = EpisodeLogger() - + print("[RL] Created agent loggers") # get the memory # - distributed training + shared memory: # * is chief? -> create the memory and add it to the scratchpad @@ -84,22 +86,30 @@ class Agent(AgentInterface): memory_name = self.ap.memory.path.split(':')[1] self.memory_lookup_name = self.full_name_id + '.' + memory_name if self.shared_memory and not self.is_chief: + print("[RL] Creating shared memory") self.memory = self.shared_memory_scratchpad.get(self.memory_lookup_name) else: + print("[RL] Dynamic import of memory: ", self.ap.memory) # modules self.memory = dynamic_import_and_instantiate_module_from_params(self.ap.memory) + print("[RL] Dynamically imported of memory", self.memory) if hasattr(self.ap.memory, 'memory_backend_params'): + print("[RL] Getting memory backend", self.ap.memory.memory_backend_params) self.memory_backend = get_memory_backend(self.ap.memory.memory_backend_params) + print("[RL] Memory backend", self.memory_backend) if self.ap.memory.memory_backend_params.run_type != 'trainer': + print("[RL] Setting memory backend", self.memory_backend) self.memory.set_memory_backend(self.memory_backend) if self.shared_memory and self.is_chief: + print("[RL] Shared memory scratchpad") self.shared_memory_scratchpad.add(self.memory_lookup_name, self.memory) # set devices if type(agent_parameters.task_parameters) == DistributedTaskParameters: + print("[RL] Setting distributed devices") self.has_global = True self.replicated_device = agent_parameters.task_parameters.device self.worker_device = "/job:worker/task:{}".format(self.task_id) @@ -108,6 +118,7 @@ class Agent(AgentInterface): else: self.worker_device += "/device:GPU:0" else: + print("[RL] Setting devices") self.has_global = False self.replicated_device = None if agent_parameters.task_parameters.use_cpu: @@ -115,7 +126,7 @@ class Agent(AgentInterface): else: self.worker_device = [Device(DeviceType.GPU, i) for i in range(agent_parameters.task_parameters.num_gpu)] - + print("[RL] Setting filters") # filters self.input_filter = self.ap.input_filter self.input_filter.set_name('input_filter') @@ -134,21 +145,26 @@ class Agent(AgentInterface): # 3. Single worker (=both TF and Mxnet) - no data sharing needed + numpy arithmetic backend if hasattr(self.ap.memory, 'memory_backend_params') and self.ap.algorithm.distributed_coach_synchronization_type: + print("[RL] Setting filter devices: distributed") self.input_filter.set_device(device, memory_backend_params=self.ap.memory.memory_backend_params, mode='numpy') self.output_filter.set_device(device, memory_backend_params=self.ap.memory.memory_backend_params, mode='numpy') self.pre_network_filter.set_device(device, memory_backend_params=self.ap.memory.memory_backend_params, mode='numpy') elif (type(agent_parameters.task_parameters) == DistributedTaskParameters and agent_parameters.task_parameters.framework_type == Frameworks.tensorflow): + print("[RL] Setting filter devices: tf") self.input_filter.set_device(device, mode='tf') self.output_filter.set_device(device, mode='tf') self.pre_network_filter.set_device(device, mode='tf') else: + print("[RL] Setting filter devices: numpy") self.input_filter.set_device(device, mode='numpy') self.output_filter.set_device(device, mode='numpy') self.pre_network_filter.set_device(device, mode='numpy') # initialize all internal variables + print("[RL] Setting Phase") self._phase = RunPhase.HEATUP + print("[RL] After setting Phase") self.total_shaped_reward_in_current_episode = 0 self.total_reward_in_current_episode = 0 self.total_steps_counter = 0 @@ -180,7 +196,7 @@ class Agent(AgentInterface): # environment parameters self.spaces = None self.in_action_space = self.ap.algorithm.in_action_space - + print("[RL] Setting signals") # signals self.episode_signals = [] self.step_signals = [] @@ -195,6 +211,8 @@ class Agent(AgentInterface): # batch rl self.ope_manager = OpeManager() if self.ap.is_batch_rl_training else None + print("[RL] Agent init successful") + @property def parent(self) -> 'LevelManager': @@ -572,7 +590,8 @@ class Agent(AgentInterface): self.current_episode += 1 if self.phase != RunPhase.TEST: - if isinstance(self.memory, EpisodicExperienceReplay): + if isinstance(self.memory, EpisodicExperienceReplay) or \ + (isinstance(self.memory, Memory) and not isinstance(self.memory, ExperienceReplay)): self.call_memory('store_episode', self.current_episode_buffer) elif self.ap.algorithm.store_transitions_only_when_episodes_are_terminated: for transition in self.current_episode_buffer.transitions: @@ -618,7 +637,8 @@ class Agent(AgentInterface): self.input_filter.reset() self.output_filter.reset() self.pre_network_filter.reset() - if isinstance(self.memory, EpisodicExperienceReplay): + if isinstance(self.memory, EpisodicExperienceReplay) or \ + (isinstance(self.memory, Memory) and not isinstance(self.memory, ExperienceReplay)): self.call_memory('verify_last_episode_is_closed') for network in self.networks.values(): @@ -953,7 +973,7 @@ class Agent(AgentInterface): # for episodic memories we keep the transitions in a local buffer until the episode is ended. # for regular memories we insert the transitions directly to the memory self.current_episode_buffer.insert(transition) - if not isinstance(self.memory, EpisodicExperienceReplay) \ + if isinstance(self.memory, ExperienceReplay) \ and not self.ap.algorithm.store_transitions_only_when_episodes_are_terminated: self.call_memory('store', transition) diff --git a/rl_coach/core_types.py b/rl_coach/core_types.py index c173318..58fd0bc 100644 --- a/rl_coach/core_types.py +++ b/rl_coach/core_types.py @@ -182,6 +182,7 @@ class RunPhase(Enum): TRAIN = "Training" TEST = "Testing" UNDEFINED = "Undefined" + WAITING = "Waiting" # transitions