Source code for lfd.environment.environment

from __future__ import division

import numpy as np
from lfd.rapprentice import eval_util
from lfd.demonstration import demonstration
from lfd.environment import simulation_object

[docs]class LfdEnvironment(object): def __init__(self, world, sim, downsample_size=0): """Inits LfdEnvironment Args: world: RobotWorld sim: StaticSimulation that contains a robot downsample_size: if downsample_size is positive, the clouds are downsampled to a voxel size of downsample_size, else they are not downsampled """ self.world = world self.sim = sim self.downsample_size = downsample_size
[docs] def execute_augmented_trajectory(self, aug_traj, step_viewer=1, interactive=False, sim_callback=None, check_feasible=False): open_or_close_finger_traj = np.zeros(aug_traj.n_steps, dtype=bool) if aug_traj.lr2open_finger_traj is not None: for lr in aug_traj.lr2open_finger_traj.keys(): open_or_close_finger_traj = np.logical_or(open_or_close_finger_traj, aug_traj.lr2open_finger_traj[lr]) if aug_traj.lr2close_finger_traj is not None: for lr in aug_traj.lr2close_finger_traj.keys(): open_or_close_finger_traj = np.logical_or(open_or_close_finger_traj, aug_traj.lr2close_finger_traj[lr]) open_or_close_inds = np.where(open_or_close_finger_traj)[0] traj, dof_inds = aug_traj.get_full_traj(self.sim.robot) feasible = True misgraspl = False misgraspr = False lr2gripper_open = {'l':True, 'r':True} for (start_ind, end_ind) in zip(np.r_[0, open_or_close_inds], np.r_[open_or_close_inds+1, aug_traj.n_steps]): if aug_traj.lr2open_finger_traj is not None: for lr in aug_traj.lr2open_finger_traj.keys(): if aug_traj.lr2open_finger_traj[lr][start_ind]: target_val = None joint_ind = self.sim.robot.GetJoint("%s_gripper_l_finger_joint"%lr).GetDOFIndex() if joint_ind in dof_inds: target_val = traj[start_ind, dof_inds.index(joint_ind)] self.world.open_gripper(lr, target_val=target_val, step_viewer=step_viewer) lr2gripper_open[lr] = True if aug_traj.lr2close_finger_traj is not None: for lr in aug_traj.lr2close_finger_traj.keys(): if aug_traj.lr2close_finger_traj[lr][start_ind]: n_cnts = len(self.sim.constraints[lr]) self.world.close_gripper(lr, step_viewer=step_viewer) if len(self.sim.constraints[lr]) == n_cnts and lr == 'l': misgraspl = True elif lr == 'l': misgraspl = False elif len(self.sim.constraints[lr]) == n_cnts and lr=='r': misgraspr = True else: misgraspr = False #misgrasp |= len(self.sim.constraints[lr]) == n_cnts lr2gripper_open[lr] = False misgrasp = misgraspl or misgraspr # don't execute trajectory for finger joint if the corresponding gripper is closed active_inds = np.ones(len(dof_inds), dtype=bool) for lr in 'lr': if not lr2gripper_open[lr]: joint_ind = self.sim.robot.GetJoint("%s_gripper_l_finger_joint"%lr).GetDOFIndex() if joint_ind in dof_inds: active_inds[dof_inds.index(joint_ind)] = False miniseg_traj = traj[start_ind:end_ind, active_inds] miniseg_dof_inds = list(np.asarray(dof_inds)[active_inds]) full_traj = (miniseg_traj, miniseg_dof_inds) feasible &= eval_util.traj_is_safe(self.sim, full_traj, 0) if check_feasible and not feasible: break self.world.execute_trajectory(full_traj, step_viewer=step_viewer, interactive=interactive, sim_callback=sim_callback) return feasible, misgrasp
[docs] def observe_scene(self): full_cloud = self.world.observe_cloud() return demonstration.SceneState(full_cloud, downsample_size=self.downsample_size)
[docs]class GroundTruthRopeLfdEnvironment(LfdEnvironment): def __init__(self, world, sim, upsample=0, upsample_rad=1, downsample_size=0): """Inits GroundTruthRopeLfdEnvironment Args: world: RobotWorld sim: DynamicSimulation that should containing exactly one rope when observe_scene is called downsample_size: if downsample_size is positive, the clouds are downsampled to a voxel size of downsample_size, else they are not downsampled """ super(GroundTruthRopeLfdEnvironment, self).__init__(world, sim, downsample_size=downsample_size) self.upsample = upsample self.upsample_rad = upsample_rad
[docs] def observe_scene(self): rope_sim_objs = [sim_obj for sim_obj in self.sim.sim_objs if isinstance(sim_obj, simulation_object.RopeSimulationObject)] assert len(rope_sim_objs) == 1 rope_sim_obj = rope_sim_objs[0] return demonstration.GroundTruthRopeSceneState(rope_sim_obj.rope.GetControlPoints(), rope_sim_obj.rope_params.radius, upsample=self.upsample, upsample_rad=self.upsample_rad, downsample_size=self.downsample_size)