import math from locale import normalize from agent.Base_Agent import Base_Agent as Agent from world.commons.Draw import Draw from stable_baselines3 import PPO from stable_baselines3.common.vec_env import SubprocVecEnv from scripts.commons.Server import Server from scripts.commons.Train_Base import Train_Base from time import sleep import os, gym import numpy as np from math_ops.Math_Ops import Math_Ops as M from behaviors.custom.Step.Step_Generator import Step_Generator import random class Dribble(gym.Env): def __init__(self, ip, server_p, monitor_p, r_type, enable_draw) -> None: self.abs_ori = 75 self.ball_dist_hip_center_2d = 0 self.ball_dist_hip_center = None self.internal_rel_orientation = None self.dribble_speed = 1 self.gym_last_internal_abs_ori = None self.internal_target_vel = None self.Gen_player_pos = None self.internal_target = None self.values_l = None self.values_r = None self.reset_time = None self.behavior = None self.robot_type = r_type self.kick_ori = 0 self.terminal = False self.player = Agent(ip, server_p, monitor_p, 1, self.robot_type, "Gym", True, True) self.step_counter = 0 self.ik = self.player.inv_kinematics self.dribble_rel_orientation = 0 self.STEP_DUR = 8 self.STEP_Z_SPAN = 0.02 self.STEP_Z_MAX = 0.70 nao_specs = self.ik.NAO_SPECS self.leg_length = nao_specs[1] + nao_specs[3] feet_y_dev = nao_specs[0] * 1.12 sample_time = self.player.world.robot.STEPTIME max_ankle_z = nao_specs[5] self.step_generator = Step_Generator(feet_y_dev, sample_time, max_ankle_z) self.DEFAULT_ARMS = np.array([-90, -90, 8, 8, 90, 90, 70, 70], np.float32) self.act = np.zeros(16, np.float32) self.path_manager = self.player.path_manager obs_size = 76 self.obs = np.zeros(obs_size, np.float32) self.observation_space = gym.spaces.Box( low=np.full(obs_size, -np.inf, np.float32), high=np.full(obs_size, np.inf, np.float32), dtype=np.float32 ) self.ball_x_center = 0.21 self.ball_y_center = -0.045 MAX = np.finfo(np.float32).max self.no_of_actions = act_size = 16 self.action_space = gym.spaces.Box( low=np.full(act_size, -MAX, np.float32), high=np.full(act_size, MAX, np.float32), dtype=np.float32 ) self.player.scom.unofficial_move_ball((0, 0, 0.042)) self.player.scom.unofficial_set_play_mode("PlayOn") def observe(self, init=False, virtual_ball=False): r = self.player.world.robot w = self.player.world if init: self.step_counter = 0 self.act = np.zeros(16, np.float32) self.obs[0] = min(self.step_counter, 12 * 8) / 100 self.obs[1] = r.loc_head_z * 3 self.obs[2] = r.loc_head_z_vel / 2 self.obs[3] = r.imu_torso_roll / 15 self.obs[4] = r.imu_torso_pitch / 15 self.obs[5:8] = r.gyro / 100 self.obs[8:11] = r.acc / 10 self.obs[11:17] = r.frp.get('lf', np.zeros(6)) * (10, 10, 10, 0.01, 0.01, 0.01) self.obs[17:23] = r.frp.get('rf', np.zeros(6)) * (10, 10, 10, 0.01, 0.01, 0.01) self.obs[23:43] = r.joints_position[2:22] / 100 self.obs[43:63] = r.joints_speed[2:22] / 6.1395 if init: self.obs[63] = 1 self.obs[64] = 1 self.obs[65] = 0 self.obs[66] = 0 else: self.obs[63] = self.step_generator.external_progress self.obs[64] = float(self.step_generator.state_is_left_active) self.obs[65] = float(not self.step_generator.state_is_left_active) self.obs[66] = math.sin(self.step_generator.state_current_ts / self.step_generator.ts_per_step * math.pi) ball_rel_hip_center = self.ik.torso_to_hip_transform(w.ball_rel_torso_cart_pos) self.ball_dist_hip_center = np.linalg.norm(ball_rel_hip_center) ball_rel_torso_xy = w.ball_rel_torso_cart_pos[:2] self.ball_dist_hip_center_2d = np.linalg.norm(ball_rel_torso_xy) if init: self.obs[67:70] = (0, 0, 0) elif w.ball_is_visible: self.obs[67:70] = (ball_rel_hip_center - self.obs[70:73]) * 10 self.obs[70:73] = ball_rel_hip_center self.obs[73] = self.ball_dist_hip_center * 2 if virtual_ball: self.obs[67:74] = (0, 0, 0, 0.05, 0, -0.175, 0.36) MAX_ROTATION_DIFF = 20 MAX_ROTATION_DIST = 80 if init: self.internal_rel_orientation = 0 self.internal_target_vel = 0 self.gym_last_internal_abs_ori = r.imu_torso_orientation if w.vision_is_up_to_date: previous_internal_rel_orientation = np.copy(self.internal_rel_orientation) internal_ori_diff = np.clip( M.normalize_deg(self.dribble_rel_orientation - self.internal_rel_orientation), -MAX_ROTATION_DIFF, MAX_ROTATION_DIFF ) self.internal_rel_orientation = np.clip( M.normalize_deg(self.internal_rel_orientation + internal_ori_diff), -MAX_ROTATION_DIST, MAX_ROTATION_DIST ) self.internal_target_vel = self.internal_rel_orientation - previous_internal_rel_orientation self.gym_last_internal_abs_ori = self.internal_rel_orientation + r.imu_torso_orientation self.obs[74] = self.internal_rel_orientation / MAX_ROTATION_DIST self.obs[75] = self.internal_target_vel / MAX_ROTATION_DIFF return self.obs def execute_ik(self, l_pos, l_rot, r_pos, r_rot): r = self.player.world.robot indices, self.values_l, error_codes = self.ik.leg(l_pos, l_rot, True, dynamic_pose=False) r.set_joints_target_position_direct(indices, self.values_l, harmonize=False) indices, self.values_r, error_codes = self.ik.leg(r_pos, r_rot, False, dynamic_pose=False) r.set_joints_target_position_direct(indices, self.values_r, harmonize=False) def sync(self): r = self.player.world.robot self.player.scom.commit_and_send(r.get_command()) self.player.scom.receive() def reset(self): self.abs_ori = 45 self.player.scom.unofficial_set_play_mode("PlayOn") Gen_ball_pos = [-9, 0, 0] a = random.uniform(0, 3) b = random.uniform(-3, 3) self.Gen_player_pos = (Gen_ball_pos[0] - a, b, 0.5) self.player.scom.unofficial_move_ball((Gen_ball_pos[0], Gen_ball_pos[1], Gen_ball_pos[2])) self.step_counter = 0 self.behavior = self.player.behavior r = self.player.world.robot w = self.player.world t = w.time_local_ms self.reset_time = t self.b_rel = w.ball_rel_torso_cart_pos[:2] self.path_manager = self.player.path_manager for _ in range(25): self.player.scom.unofficial_beam(self.Gen_player_pos, 0) self.player.behavior.execute("Zero_Bent_Knees") self.sync() self.player.scom.unofficial_beam(self.Gen_player_pos, 0) r.joints_target_speed[0] = 0.01 self.sync() for _ in range(7): self.player.behavior.execute("Zero_Bent_Knees") self.sync() while True: self.b_rel = w.ball_rel_torso_cart_pos[:2] if self.player.behavior.is_ready("Get_Up"): self.player.behavior.execute_to_completion("Get_Up") if 0.25 > np.linalg.norm(self.b_rel) and w.ball_is_visible: break else: if self.player.behavior.is_ready("Get_Up"): self.player.behavior.execute_to_completion("Get_Up") reset_walk = self.behavior.previous_behavior != "Walk" rel_target = self.b_rel - 0.25 * M.normalize_vec(self.b_rel) rel_ori = M.vector_angle(self.b_rel) dist = max(0.08, float(np.linalg.norm(rel_target)) * 0.7) self.behavior.execute_sub_behavior("Walk", reset_walk, rel_target, False, rel_ori, False, dist) self.sync() self.act = np.zeros(self.no_of_actions, np.float32) return self.observe(True) def render(self, mode='human', close=False): return def close(self): Draw.clear_all() self.player.terminate() def step(self, action): r = self.player.world.robot w = self.player.world t = self.step_counter dribble_target = (15, random.choice([5, -5])) if w.ball_abs_pos[1] > 5: dribble_target = (15, -5) if w.ball_abs_pos[1] < -5: dribble_target = (15, 5) self.dribble_rel_orientation = self.path_manager.get_dribble_path(optional_2d_target=dribble_target)[1] self.act = 0.75 * self.act + 0.25 * action * 0.7 * 0.95 * self.dribble_speed lfy, lfz, rfy, rfz = self.step_generator.get_target_positions( self.step_counter == 0, self.STEP_DUR, self.STEP_Z_SPAN, self.leg_length * self.STEP_Z_MAX ) a = self.act l_ankle_pos = (a[0] * 0.025 - 0.01, a[1] * 0.01 + lfy, a[2] * 0.01 + lfz) r_ankle_pos = (a[3] * 0.025 - 0.01, a[4] * 0.01 + rfy, a[5] * 0.01 + rfz) l_foot_rot = a[6:9] * (2, 2, 3) r_foot_rot = a[9:12] * (2, 2, 3) l_foot_rot[2] = max(0, l_foot_rot[2] + 18.3) r_foot_rot[2] = min(0, r_foot_rot[2] - 18.3) arms = np.copy(self.DEFAULT_ARMS) amplitude = 15 period = 25 forward_arm_movement = amplitude * math.sin(2 * math.pi * t / period) arms[0] += forward_arm_movement arms[1] += forward_arm_movement inward_arm_movement = 10 arms[2] += inward_arm_movement arms[3] -= inward_arm_movement arms[0:4] += a[12:16] * 4 self.execute_ik(l_ankle_pos, l_foot_rot, r_ankle_pos, r_foot_rot) r.set_joints_target_position_direct(slice(14, 22), arms, harmonize=False) self.sync() self.step_counter += 1 obs = self.observe() angle_rad = np.radians(self.gym_last_internal_abs_ori) unit_vector = np.array([np.cos(angle_rad), np.sin(angle_rad)]) if np.linalg.norm(w.ball_cheat_abs_vel[:2]) != 0: cos_theta = np.dot(unit_vector, w.ball_cheat_abs_vel[:2]) / ( np.linalg.norm(unit_vector) * np.linalg.norm(w.ball_cheat_abs_vel[:2]) ) else: cos_theta = 0 reward = np.linalg.norm(w.ball_cheat_abs_vel) * cos_theta if abs(w.ball_rel_torso_cart_pos[1]) > 0.02: reward = 0 if self.ball_dist_hip_center_2d < 0.115 or self.ball_dist_hip_center_2d > 0.2: reward = 0 if abs(r.imu_torso_roll) > 10 or abs(r.imu_torso_pitch) > 10: reward = 0 if self.player.behavior.is_ready("Get_Up"): self.terminal = True elif (w.time_local_ms - self.reset_time > 7500 * 15 or np.linalg.norm(w.ball_cheat_abs_pos[:2] - r.loc_head_position[:2]) > 0.25 or not w.is_ball_abs_pos_from_vision): self.terminal = True else: self.terminal = False return obs, reward, self.terminal, {} class Train(Train_Base): def __init__(self, script) -> None: super().__init__(script) def train(self, args): n_envs = min(8, os.cpu_count()) n_steps_per_env = 128 minibatch_size = 64 total_steps = 500000000 learning_rate = 3e-4 folder_name = f'Dribble_LL_R{self.robot_type}' model_path = f'./scripts/gyms/logs/{folder_name}/' def init_env(i_env): def thunk(): return Dribble(self.ip, self.server_p + i_env, self.monitor_p_1000 + i_env, self.robot_type, False) return thunk servers = Server(self.server_p, self.monitor_p_1000, n_envs + 1) env = SubprocVecEnv([init_env(i) for i in range(n_envs)]) eval_env = SubprocVecEnv([init_env(n_envs)]) try: if "model_file" in args: model = PPO.load( args["model_file"], env=env, device="cuda", n_envs=n_envs, n_steps=n_steps_per_env, batch_size=minibatch_size, learning_rate=learning_rate ) else: model = PPO( "MlpPolicy", env=env, verbose=1, n_steps=n_steps_per_env, batch_size=minibatch_size, learning_rate=learning_rate, device="cuda" ) model_path = self.learn_model( model, total_steps, model_path, eval_env=eval_env, eval_freq=n_steps_per_env * 10, save_freq=n_steps_per_env * 10, backup_env_file=__file__ ) except KeyboardInterrupt: sleep(1) print("\nctrl+c pressed, aborting...\n") servers.kill() return env.close() eval_env.close() servers.kill() def test(self, args): server = Server(self.server_p - 1, self.monitor_p, 1) env = Dribble(self.ip, self.server_p - 1, self.monitor_p, self.robot_type, True) model = PPO.load(args["model_file"], env=env) try: self.export_model(args["model_file"], args["model_file"] + ".pkl", False) self.test_model(model, env, log_path=args["folder_dir"], model_path=args["folder_dir"]) except KeyboardInterrupt: print() env.close() server.kill()