Source code for lfd.demonstration.demonstration

from __future__ import division

import numpy as np
from lfd.rapprentice import ropesim, resampling
from lfd.rapprentice import math_utils as mu
from lfd.environment import sim_util # TODO fold in sim_util function into LfdEnvironment

[docs]class Demonstration(object): def __init__(self, name, scene_state, aug_traj): """Inits Demonstration Args: name: demonstration name, which is the same as the ones used for indexing the demonstrations scene_state: demonstration SceneState aug_traj: demonstration AugmentedTrajectory. Only lr2ee_traj and lr2finger_traj are required for the demonstration AugmentedTrajectory. If lr2arm_traj is specified, it might be used for initializing the trajectory optimization. """ self.name = name self.scene_state = scene_state if aug_traj is not None: assert aug_traj.lr2ee_traj is not None assert aug_traj.lr2finger_traj is not None self.aug_traj = aug_traj def __repr__(self): return "%s(%s, %s, %s)" % (self.__class__.__name__, self.name, self.scene_state.__repr__(), self.aug_traj.__repr__())
[docs]class SceneState(object): ids = set() def __init__(self, full_cloud, id=None, full_color=None, downsample_size=0): """Inits SceneState Args: full_cloud: full (i.e. not downsampled) cloud id: unique id for this SceneState full_color: colors for the respective points of full_cloud. Should have the same dimensions as full_cloud downsample_size: if downsample_size is positive, the full cloud and color are downsampled to a voxel size of downsample_size, else they are not downsampled """ self.full_cloud = full_cloud self.full_color = full_color if downsample_size > 0: global clouds from lfd.rapprentice import clouds if full_color is not None: cloud_color = clouds.downsample(np.c_[full_cloud, full_color], downsample_size) self.cloud = cloud_color[:,:3] self.color = cloud_color[:,3:] else: self.cloud = clouds.downsample(full_cloud, downsample_size) self.color = None else: self.cloud = full_cloud self.color = full_color if id is None: self.id = SceneState.get_unique_id() else: self.id = id @staticmethod
[docs] def get_unique_id(): id = len(SceneState.ids) assert id not in SceneState.ids SceneState.ids.add(id) return id
def __repr__(self): return "%s(..., id=%i)" % (self.__class__.__name__, self.id)
[docs]class GroundTruthRopeSceneState(SceneState): def __init__(self, rope_nodes, radius, upsample=0, upsample_rad=1, downsample_size=0): full_cloud = ropesim.observe_cloud(rope_nodes, radius, upsample=upsample, upsample_rad=upsample_rad) super(GroundTruthRopeSceneState, self).__init__(full_cloud, downsample_size=downsample_size) self.rope_nodes = rope_nodes self.crossing_info = None #TODO: optionally compute/load cached crossing_info
[docs]class AugmentedTrajectory(object): def __init__(self, lr2arm_traj=None, lr2finger_traj=None, lr2ee_traj=None, lr2open_finger_traj=None, lr2close_finger_traj=None): """Inits AugmentedTrajectory Args: lr2arm_traj: dict that maps from 'l' and/or 'r' to the left arm's and/or right arm's joint angle trajectory lr2finger_traj: dict that maps to the left gripper's and/or right gripper's finger joint angle trajectory lr2ee_traj: dict that maps to the left gripper's and/or right gripper's end-effector trajectory (i.e. a numpy.array of homogeneous matrices) lr2open_finger_traj: dict that maps to a boolean vector indicating whether there is a opening action for the left gripper and/or right gripper for every time step. By default, there is no action for all time steps. lr2close_finger_traj: same as lr2open_finger_traj but for closing action """ # make sure all trajs have the same number of steps self.n_steps = None for lr2traj in [lr2arm_traj, lr2finger_traj, lr2ee_traj, lr2open_finger_traj, lr2close_finger_traj]: if lr2traj is None: continue for lr in lr2traj.keys(): if self.n_steps is None: self.n_steps = lr2traj[lr].shape[0] else: assert lr2traj[lr].shape[0] == self.n_steps if lr2arm_traj is None: self.lr2arm_traj = {} else: self.lr2arm_traj = lr2arm_traj if lr2finger_traj is None: self.lr2finger_traj = {} else: self.lr2finger_traj = lr2finger_traj if lr2ee_traj is None: self.lr2ee_traj = {} else: self.lr2ee_traj = lr2ee_traj if lr2open_finger_traj is None: self.lr2open_finger_traj = {} for lr in 'lr': self.lr2open_finger_traj[lr] = np.zeros(self.n_steps, dtype=bool) else: self.lr2open_finger_traj = lr2open_finger_traj if lr2close_finger_traj is None: self.lr2close_finger_traj = {} for lr in 'lr': self.lr2close_finger_traj[lr] = np.zeros(self.n_steps, dtype=bool) else: self.lr2close_finger_traj = lr2close_finger_traj def __eq__(self, other): if isinstance(other, self.__class__): for (lr2traj, other_lr2traj) in [(self.lr2arm_traj, other.lr2arm_traj), (self.lr2finger_traj, other.lr2finger_traj), (self.lr2ee_traj, other.lr2ee_traj), (self.lr2open_finger_traj, other.lr2open_finger_traj), (self.lr2close_finger_traj, other.lr2close_finger_traj)]: if lr2traj is None: if other_lr2traj is None: continue else: return False if set(lr2traj.keys()) != set(other_lr2traj.keys()): return False for lr in lr2traj.keys(): if np.any(lr2traj[lr] != other_lr2traj[lr]): return False return True else: return False def __ne__(self, other): return not self.__eq__(other) @staticmethod
[docs] def create_from_full_traj(robot, full_traj, lr2open_finger_traj=None, lr2close_finger_traj=None): traj, dof_inds = full_traj lr2arm_traj = {} for lr in 'lr': manip_name = {"l":"leftarm", "r":"rightarm"}[lr] arm_inds = robot.GetManipulator(manip_name).GetArmIndices() if set(arm_inds).intersection(set(dof_inds)): if not set(arm_inds).issubset(set(dof_inds)): raise RuntimeError("Cannot create AugmentedTrajectory from incomplete full_traj") arm_traj = np.zeros((traj.shape[0], len(arm_inds))) for (j,arm_ind) in enumerate(arm_inds): arm_traj[:,j] = traj[:,dof_inds.index(arm_ind)] lr2arm_traj[lr] = arm_traj lr2finger_traj = {} for lr in 'lr': finger_ind = robot.GetJointIndex("%s_gripper_l_finger_joint"%lr) if finger_ind in dof_inds: lr2finger_traj[lr] = traj[:,dof_inds.index(finger_ind)][:,None] lr2ee_traj = {} for lr in lr2arm_traj.keys(): lr2ee_traj[lr] = sim_util.get_ee_traj(robot, lr, lr2arm_traj[lr]) return AugmentedTrajectory(lr2arm_traj=lr2arm_traj, lr2finger_traj=lr2finger_traj, lr2ee_traj=lr2ee_traj, lr2open_finger_traj=lr2open_finger_traj, lr2close_finger_traj=lr2close_finger_traj)
[docs] def get_full_traj(self, robot): """ TODO: remove sim_util.get_full_traj """ trajs = [] dof_inds = [] for lr in 'lr': if lr in self.lr2arm_traj: arm_traj = self.lr2arm_traj[lr] manip_name = {"l":"leftarm", "r":"rightarm"}[lr] trajs.append(arm_traj) dof_inds.extend(robot.GetManipulator(manip_name).GetArmIndices()) for lr in 'lr': if lr in self.lr2finger_traj: finger_traj = self.lr2finger_traj[lr] trajs.append(finger_traj) dof_inds.append(robot.GetJointIndex("%s_gripper_l_finger_joint"%lr)) if len(trajs) > 0: full_traj = (np.concatenate(trajs, axis=1), dof_inds) else: full_traj = (np.zeros((0,0)), []) return full_traj
[docs] def get_resampled_traj(self, timesteps_rs): lr2arm_traj_rs = None if self.lr2arm_traj is None else {} lr2finger_traj_rs = None if self.lr2finger_traj is None else {} for (lr2traj_rs, self_lr2traj) in [(lr2arm_traj_rs, self.lr2arm_traj), (lr2finger_traj_rs, self.lr2finger_traj)]: if self_lr2traj is None: continue for lr in self_lr2traj.keys(): lr2traj_rs[lr] = mu.interp2d(timesteps_rs, np.arange(len(self_lr2traj[lr])), self_lr2traj[lr]) if self.lr2ee_traj is None: lr2ee_traj_rs = None else: lr2ee_traj_rs = {} for lr in self.lr2ee_traj.keys(): lr2ee_traj_rs[lr] = np.asarray(resampling.interp_hmats(timesteps_rs, np.arange(len(self.lr2ee_traj[lr])), self.lr2ee_traj[lr])) lr2open_finger_traj_rs = None if self.lr2open_finger_traj is None else {} lr2close_finger_traj_rs = None if self.lr2close_finger_traj is None else {} for (lr2oc_finger_traj_rs, self_lr2oc_finger_traj) in [(lr2open_finger_traj_rs, self.lr2open_finger_traj), (lr2close_finger_traj_rs, self.lr2close_finger_traj)]: if self_lr2oc_finger_traj is None: continue for lr in self_lr2oc_finger_traj.keys(): self_oc_finger_traj = self_lr2oc_finger_traj[lr] self_oc_inds = np.where(self_oc_finger_traj)[0] oc_inds_rs = np.abs(timesteps_rs[:,None] - self_oc_inds[None,:]).argmin(axis=0) oc_finger_traj_rs = np.zeros(len(timesteps_rs), dtype=bool) oc_finger_traj_rs[oc_inds_rs] = True lr2oc_finger_traj_rs[lr] = oc_finger_traj_rs return AugmentedTrajectory(lr2arm_traj=lr2arm_traj_rs, lr2finger_traj=lr2finger_traj_rs, lr2ee_traj=lr2ee_traj_rs, lr2open_finger_traj=lr2open_finger_traj_rs, lr2close_finger_traj=lr2close_finger_traj_rs)
def __repr__(self): return "%s(...)" % (self.__class__.__name__)