From 7748c64b44a5bf16f4ecf910b44ac3819ed127e7 Mon Sep 17 00:00:00 2001 From: MagDish <2717360869@qq.com> Date: Fri, 15 Nov 2024 14:07:51 +0800 Subject: [PATCH] new --- scripts/gyms/Sprint.py | 424 +++++++++++++++++++++ scripts/gyms/{sprint.py => sprint_demo.py} | 13 +- 2 files changed, 432 insertions(+), 5 deletions(-) create mode 100644 scripts/gyms/Sprint.py rename scripts/gyms/{sprint.py => sprint_demo.py} (97%) diff --git a/scripts/gyms/Sprint.py b/scripts/gyms/Sprint.py new file mode 100644 index 0000000..db6a722 --- /dev/null +++ b/scripts/gyms/Sprint.py @@ -0,0 +1,424 @@ +import math +import random + +from agent.Base_Agent import Base_Agent as Agent +from behaviors.custom.Step.Step import Step +from world.commons.Draw import Draw +from stable_baselines3 import PPO +from stable_baselines3.common.vec_env import SubprocVecEnv +from scripts.commons.Server import Server +from scripts.commons.Train_Base import Train_Base +from time import sleep +import os, gym +import numpy as np +from math_ops.Math_Ops import Math_Ops as U +from math_ops.Math_Ops import Math_Ops as M +from behaviors.custom.Step.Step_Generator import Step_Generator + +''' +Objective: +Learn how to run forward using step primitive +---------- +- class Basic_Run: implements an OpenAI custom gym +- class Train: implements algorithms to train a new model or test an existing model +''' + + +class sprint(gym.Env): + def __init__(self, ip, server_p, monitor_p, r_type, enable_draw) -> None: + + self.Gen_player_pos = None + self.internal_target = None + self.values_l = None + self.values_r = None + self.reset_time = None + self.behavior = None + self.bias_dir = None + self.robot_type = r_type + self.kick_ori = 0 + self.terminal = False + # Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw + self.player = Agent(ip, server_p, monitor_p, 1, self.robot_type, "Gym", True, enable_draw) + self.step_counter = 0 # to limit episode size + self.ik = self.player.inv_kinematics + self.target = None + + # Step behavior defaults + self.STEP_DUR = 10 + self.STEP_Z_SPAN = 0.2 + self.STEP_Z_MAX = 0.7 + nao_specs = self.ik.NAO_SPECS + self.leg_length = nao_specs[1] + nao_specs[3] # upper leg height + lower leg height + feet_y_dev = nao_specs[0] * 2 # wider step + sample_time = self.player.world.robot.STEPTIME + max_ankle_z = nao_specs[5] * 1.8 + self.step_generator = Step_Generator(feet_y_dev, sample_time, max_ankle_z) + self.DEFAULT_ARMS = np.array([-90, -90, 8, 8, 90, 90, 70, 70], np.float32) + + self.walk_rel_orientation = None + self.walk_rel_target = None + self.walk_distance = None + self.act = np.zeros(16, np.float32) # memory variable + + # State space + obs_size = 63 + self.obs = np.zeros(obs_size, np.float32) + self.observation_space = gym.spaces.Box(low=np.full(obs_size, -np.inf, np.float32), + high=np.full(obs_size, np.inf, np.float32), dtype=np.float32) + + # Action space + MAX = np.finfo(np.float32).max + self.no_of_actions = act_size = 16 + self.action_space = gym.spaces.Box(low=np.full(act_size, -MAX, np.float32), + high=np.full(act_size, MAX, np.float32), dtype=np.float32) + + # Place ball far away to keep landmarks in FoV (head follows ball while using Step behavior) + self.player.scom.unofficial_move_ball((14, 0, 0.042)) + self.ball_pos = np.array([0, 0, 0]) + + self.player.scom.unofficial_set_play_mode("PlayOn") + self.player.scom.unofficial_move_ball((0, 0, 0)) + + self.gait: Step_Generator = self.player.behavior.get_custom_behavior_object("Walk").env.step_generator + + def observe(self, init=False): + r = self.player.world.robot + + if init: # reset variables + self.act = np.zeros(16, np.float32) # memory variable + self.step_counter = 0 + + # index observation naive normalization + self.obs[0] = min(self.step_counter, 15 * 8) / 100 # simple counter: 0,1,2,3... + self.obs[1] = r.loc_head_z * 3 # z coordinate (torso) + self.obs[2] = r.loc_head_z_vel / 2 # z velocity (torso) + self.obs[3] = r.imu_torso_roll / 15 # absolute torso roll in deg + self.obs[4] = r.imu_torso_pitch / 15 # absolute torso pitch in deg + self.obs[5:8] = r.gyro / 100 # gyroscope + self.obs[8:11] = r.acc / 10 # accelerometer + + self.obs[11:17] = r.frp.get('lf', np.zeros(6)) * (10, 10, 10, 0.01, 0.01, + 0.01) # left foot: relative point of origin (p) and force vector (f) -> (px,py,pz,fx,fy,fz)* + self.obs[17:23] = r.frp.get('rf', np.zeros(6)) * (10, 10, 10, 0.01, 0.01, + 0.01) # right foot: relative point of origin (p) and force vector (f) -> (px,py,pz,fx,fy,fz)* + # *if foot is not touching the ground, then (px=0,py=0,pz=0,fx=0,fy=0,fz=0) + + # Joints: Forward kinematics for ankles + feet rotation + arms (pitch + roll) + rel_lankle = self.player.inv_kinematics.get_body_part_pos_relative_to_hip( + "lankle") # ankle position relative to center of both hip joints + rel_rankle = self.player.inv_kinematics.get_body_part_pos_relative_to_hip( + "rankle") # ankle position relative to center of both hip joints + lf = r.head_to_body_part_transform("torso", r.body_parts['lfoot'].transform) # foot transform relative to torso + rf = r.head_to_body_part_transform("torso", r.body_parts['rfoot'].transform) # foot transform relative to torso + lf_rot_rel_torso = np.array( + [lf.get_roll_deg(), lf.get_pitch_deg(), lf.get_yaw_deg()]) # foot rotation relative to torso + rf_rot_rel_torso = np.array( + [rf.get_roll_deg(), rf.get_pitch_deg(), rf.get_yaw_deg()]) # foot rotation relative to torso + + # pose + self.obs[23:26] = rel_lankle * (8, 8, 5) + self.obs[26:29] = rel_rankle * (8, 8, 5) + self.obs[29:32] = lf_rot_rel_torso / 20 + self.obs[32:35] = rf_rot_rel_torso / 20 + self.obs[35:39] = r.joints_position[14:18] / 100 # arms (pitch + roll) + + # velocity + self.obs[39:55] = r.joints_target_last_speed[2:18] # predictions == last action + + ''' + Expected observations for walking state: + Time step R 0 1 2 3 4 5 6 7 0 + Progress 1 0 .14 .28 .43 .57 .71 .86 1 0 + Left leg active T F F F F F F F F T + ''' + + if init: # the walking parameters refer to the last parameters in effect (after a reset, they are pointless) + self.obs[55] = 1 # step progress + self.obs[56] = 1 # 1 if left leg is active + self.obs[57] = 0 # 1 if right leg is active + else: + self.obs[55] = self.step_generator.external_progress # step progress + self.obs[56] = float(self.step_generator.state_is_left_active) # 1 if left leg is active + self.obs[57] = float(not self.step_generator.state_is_left_active) # 1 if right leg is active + + ''' + Create internal target with a smoother variation + ''' + + MAX_LINEAR_DIST = 0.5 + MAX_LINEAR_DIFF = 0.014 # max difference (meters) per step + MAX_ROTATION_DIFF = 1.6 # max difference (degrees) per step + MAX_ROTATION_DIST = 45 + + if init: + self.internal_rel_orientation = 0 + self.internal_target = np.zeros(2) + + previous_internal_target = np.copy(self.internal_target) + + # ---------------------------------------------------------------- compute internal linear target + + rel_raw_target_size = np.linalg.norm(self.walk_rel_target) + + if rel_raw_target_size == 0: + rel_target = self.walk_rel_target + else: + rel_target = self.walk_rel_target / rel_raw_target_size * min(self.walk_distance, MAX_LINEAR_DIST) + + internal_diff = rel_target - self.internal_target + internal_diff_size = np.linalg.norm(internal_diff) + + if internal_diff_size > MAX_LINEAR_DIFF: + self.internal_target += internal_diff * (MAX_LINEAR_DIFF / internal_diff_size) + else: + self.internal_target[:] = rel_target + + # ---------------------------------------------------------------- compute internal rotation target + + internal_ori_diff = np.clip(M.normalize_deg(self.walk_rel_orientation - self.internal_rel_orientation), + -MAX_ROTATION_DIFF, MAX_ROTATION_DIFF) + self.internal_rel_orientation = np.clip(M.normalize_deg(self.internal_rel_orientation + internal_ori_diff), + -MAX_ROTATION_DIST, MAX_ROTATION_DIST) + + # ----------------------------------------------------------------- observations + + internal_target_vel = self.internal_target - previous_internal_target + + self.obs[58] = self.internal_target[0] / MAX_LINEAR_DIST + self.obs[59] = self.internal_target[1] / MAX_LINEAR_DIST + self.obs[60] = self.internal_rel_orientation / MAX_ROTATION_DIST + self.obs[61] = internal_target_vel[0] / MAX_LINEAR_DIFF + self.obs[62] = internal_target_vel[0] / MAX_LINEAR_DIFF + + return self.obs + + def execute_ik(self, l_pos, l_rot, r_pos, r_rot): + r = self.player.world.robot + # Apply IK to each leg + Set joint targets + + # Left leg + indices, self.values_l, error_codes = (self.ik.leg(l_pos, l_rot, True, dynamic_pose=False)) + + r.set_joints_target_position_direct(indices, self.values_l, harmonize=False) + + # Right leg + indices, self.values_r, error_codes = self.ik.leg(r_pos, r_rot, False, dynamic_pose=False) + + r.set_joints_target_position_direct(indices, self.values_r, harmonize=False) + + def sync(self): + ''' Run a single simulation step ''' + r = self.player.world.robot + self.player.scom.commit_and_send(r.get_command()) + self.player.scom.receive() + + def reset(self): + # print("reset") + ''' + Reset and stabilize the robot + Note: for some behaviors it would be better to reduce stabilization or add noise + ''' + + self.player.scom.unofficial_set_play_mode("PlayOn") + + Gen_ball_pos = [15, 0, 0] + self.Gen_player_pos = (random.random() * 3 - 15, 10 - random.random() * 20, 0.5) + + self.ball_pos = np.array(Gen_ball_pos) + self.player.scom.unofficial_move_ball((Gen_ball_pos[0], Gen_ball_pos[1], Gen_ball_pos[2])) + self.step_counter = 0 + self.behavior = self.player.behavior + r = self.player.world.robot + w = self.player.world + t = w.time_local_ms + self.reset_time = t + + self.generate_random_target(self.Gen_player_pos) + distance = np.linalg.norm(self.target - self.Gen_player_pos[:2]) + self.walk_rel_target = self.target - self.Gen_player_pos[:2] + self.walk_distance = distance + self.walk_rel_orientation = M.vector_angle(self.walk_rel_target) + + for _ in range(25): + self.player.scom.unofficial_beam(self.Gen_player_pos, 0) # beam player continuously (floating above ground) + self.player.behavior.execute("Zero_Bent_Knees") + self.sync() + + # beam player to ground + self.player.scom.unofficial_beam(self.Gen_player_pos, 0) + r.joints_target_speed[ + 0] = 0.01 # move head to trigger physics update (rcssserver3d bug when no joint is moving) + self.sync() + + # stabilize on ground + for _ in range(7): + self.player.behavior.execute("Zero_Bent_Knees") + self.sync() + + # walk to ball + while True: + + if w.time_local_ms - self.reset_time > 2500 and not self.gait.state_is_left_active and self.gait.state_current_ts == 2: + break + else: + if self.player.behavior.is_ready("Get_Up"): + self.player.behavior.execute_to_completion("Get_Up") + + reset_walk = self.behavior.previous_behavior != "Walk" # reset walk if it wasn't the previous behavior + self.behavior.execute_sub_behavior("Walk", reset_walk, self.walk_rel_target, True, None, True, + None) # target, is_target_abs, ori, is_ori_abs, distance + + self.sync() + + self.act = np.zeros(self.no_of_actions, np.float32) + return self.observe(True) + + def render(self, mode='human', close=False): + return + + def close(self): + Draw.clear_all() + self.player.terminate() + + def generate_random_target(self, position, x_range=(-15, 15), y_range=(-10, 10)): + while True: + x = np.random.uniform(x_range[0], x_range[1]) + y = np.random.uniform(y_range[0], y_range[1]) + + if np.linalg.norm(np.array([x, y]) - position[:2]) >= 10: + break + self.target = np.array([x, y]) + + def step(self, action): + + r = (self. + player.world.robot) + + w = self.player.world + internal_dist = np.linalg.norm(self.internal_target) + action_mult = 1 if internal_dist > 0.2 else (0.7 / 0.2) * internal_dist + 0.3 + self.walk_rel_target = M.rotate_2d_vec( + (self.target[0] - r.loc_head_position[0], self.target[1] - r.loc_head_position[1]), -r.imu_torso_orientation) + self.walk_distance = np.linalg.norm(self.walk_rel_target) + self.walk_rel_orientation = M.vector_angle(self.walk_rel_target) * 0.3 + if self.walk_distance <= 0.5: + self.generate_random_target(r.loc_head_position) + self.walk_rel_target = M.rotate_2d_vec( + (self.target[0] - r.loc_head_position[0], self.target[1] - r.loc_head_position[1]), + -r.imu_torso_orientation) + self.walk_distance = np.linalg.norm(self.walk_rel_target) + self.walk_rel_orientation = M.vector_angle(self.walk_rel_target) * 0.3 + + # exponential moving average + self.act = 0.8 * self.act + 0.2 * action * action_mult * 0.7 + + # execute Step behavior to extract the target positions of each leg (we will override these targets) + lfy, lfz, rfy, rfz = self.step_generator.get_target_positions(self.step_counter == 0, self.STEP_DUR, + self.STEP_Z_SPAN, + self.leg_length * self.STEP_Z_MAX) + + # Leg IK + a = self.act + l_ankle_pos = (a[0] * 0.02, max(0.01, a[1] * 0.02 + lfy), a[2] * 0.01 + lfz) # limit y to avoid self collision + r_ankle_pos = (a[3] * 0.02, min(a[4] * 0.02 + rfy, -0.01), a[5] * 0.01 + rfz) # limit y to avoid self collision + l_foot_rot = a[6:9] * (3, 3, 5) + r_foot_rot = a[9:12] * (3, 3, 5) + + # Limit leg yaw/pitch + l_foot_rot[2] = max(0, l_foot_rot[2] + 7) + r_foot_rot[2] = min(0, r_foot_rot[2] - 7) + + # Arms actions + arms = np.copy(self.DEFAULT_ARMS) # default arms pose + arm_swing = math.sin(self.step_generator.state_current_ts / self.STEP_DUR * math.pi) * 6 + inv = 1 if self.step_generator.state_is_left_active else -1 + arms[0:4] += a[12:16] * 4 + (-arm_swing * inv, arm_swing * inv, 0, 0) # arms pitch+roll + + # Set target positions + self.execute_ik(l_ankle_pos, l_foot_rot, r_ankle_pos, r_foot_rot) # legs + r.set_joints_target_position_direct(slice(14, 22), arms, harmonize=False) # arms + self.sync() + self.step_counter += 1 + obs = self.observe() + velocity_rel_orientation = M.vector_angle(M.rotate_2d_vec((r.loc_torso_velocity[0], r.loc_torso_velocity[1]), r.imu_torso_orientation)) * 0.3 + direction_error = min(abs(velocity_rel_orientation - self.walk_rel_orientation), 10) + robot_speed = np.linalg.norm(r.loc_torso_velocity[:2]) + reward = robot_speed * (1.5 - direction_error / 10) + if self.walk_distance < 0.5: + reward += 10 + + if self.player.behavior.is_ready("Get_Up"): + self.terminal = True + elif w.time_local_ms - self.reset_time > 25000 * 2: + self.terminal = True + else: + self.terminal = False + return obs, reward, self.terminal, {} + + +class Train(Train_Base): + def __init__(self, script) -> None: + super().__init__(script) + + def train(self, args): + + # --------------------------------------- Learning parameters + n_envs = min(12, os.cpu_count()) + n_steps_per_env = 1024 # RolloutBuffer is of size (n_steps_per_env * n_envs) + minibatch_size = 64 # should be a factor of (n_steps_per_env * n_envs) + total_steps = 50000000 + learning_rate = 3e-4 + folder_name = f'Sprint_R{self.robot_type}' + model_path = f'./scripts/gyms/logs/{folder_name}/' + + # print("Model path:", model_path) + + # --------------------------------------- Run algorithm + def init_env(i_env): + def thunk(): + return sprint(self.ip, self.server_p + i_env, self.monitor_p_1000 + i_env, self.robot_type, False) + + return thunk + + servers = Server(self.server_p, self.monitor_p_1000, n_envs + 1) # include 1 extra server for testing + + env = SubprocVecEnv([init_env(i) for i in range(n_envs)]) + eval_env = SubprocVecEnv([init_env(n_envs)]) + + try: + if "model_file" in args: # retrain + model = PPO.load(args["model_file"], env=env, device="cuda", n_envs=n_envs, n_steps=n_steps_per_env, + batch_size=minibatch_size, learning_rate=learning_rate) + else: # train new model + model = PPO("MlpPolicy", env=env, verbose=1, n_steps=n_steps_per_env, batch_size=minibatch_size, + learning_rate=learning_rate, device="cuda") + + model_path = self.learn_model(model, total_steps, model_path, eval_env=eval_env, + eval_freq=n_steps_per_env * 20, save_freq=n_steps_per_env * 200, + backup_env_file=__file__) + except KeyboardInterrupt: + sleep(1) # wait for child processes + print("\nctrl+c pressed, aborting...\n") + servers.kill() + return + + env.close() + eval_env.close() + servers.kill() + + def test(self, args): + + # Uses different server and monitor ports + server = Server(self.server_p - 1, self.monitor_p, 1) + env = sprint(self.ip, self.server_p - 1, self.monitor_p, self.robot_type, True) + model = PPO.load(args["model_file"], env=env) + + try: + self.export_model(args["model_file"], args["model_file"] + ".pkl", + False) # Export to pkl to create custom behavior + self.test_model(model, env, log_path=args["folder_dir"], model_path=args["folder_dir"]) + except KeyboardInterrupt: + print() + + env.close() + server.kill() \ No newline at end of file diff --git a/scripts/gyms/sprint.py b/scripts/gyms/sprint_demo.py similarity index 97% rename from scripts/gyms/sprint.py rename to scripts/gyms/sprint_demo.py index 695aefb..99a7f6c 100644 --- a/scripts/gyms/sprint.py +++ b/scripts/gyms/sprint_demo.py @@ -37,6 +37,7 @@ class sprint(gym.Env): self.robot_type = r_type self.kick_ori = 0 self.terminal = False + self.distance = None # Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw self.player = Agent(ip, server_p, monitor_p, 1, self.robot_type, "Gym", True, enable_draw) self.step_counter = 0 # to limit episode size @@ -290,8 +291,8 @@ class sprint(gym.Env): if np.linalg.norm(np.array([x, y]) - position) >= 10: break - self.walk_target = np.array([x, y]) + def step(self, action): r = (self. @@ -302,13 +303,15 @@ class sprint(gym.Env): action_mult = 1 if internal_dist > 0.2 else (0.7 / 0.2) * internal_dist + 0.3 self.walk_rel_target = M.rotate_2d_vec( (self.walk_target[0] - r.loc_head_position[0], self.walk_target[1] - r.loc_head_position[1]), -r.imu_torso_orientation) - self.walk_distance = np.linalg.norm(self.walk_target - r.loc_head_position[:2]) - if self.walk_distance <= 0.5: + self.distance = np.linalg.norm(self.walk_target - r.loc_head_position[:2]) + self.walk_distance = np.linalg.norm(self.walk_rel_target) + if self.distance <= 0.5: self.generate_random_target(r.loc_head_position[:2]) self.walk_rel_target = M.rotate_2d_vec( (self.walk_target[0] - r.loc_head_position[0], self.walk_target[1] - r.loc_head_position[1]), -r.imu_torso_orientation) - self.walk_distance = np.linalg.norm(self.walk_target - r.loc_head_position[:2]) + self.distance = np.linalg.norm(self.walk_target - r.loc_head_position[:2]) + self.walk_distance = np.linalg.norm(self.walk_rel_target) self.walk_rel_orientation = M.vector_angle(self.walk_rel_target) * 0.3 # exponential moving average self.act = 0.8 * self.act + 0.2 * action * action_mult * 0.7 @@ -345,7 +348,7 @@ class sprint(gym.Env): direction_error = abs(self.walk_rel_orientation) direction_error = min(direction_error, 10) reward = robot_speed * (1 - direction_error / 10) * 0.6 - if self.walk_distance < 0.5: + if self.distance < 0.5: reward += 10 if self.player.behavior.is_ready("Get_Up"):