You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

439 lines
19 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import math
import pickle
import random
from imitation.rewards.reward_nets import BasicRewardNet
from stable_baselines3.common.callbacks import EvalCallback
from agent.Base_Agent import Base_Agent as Agent
from behaviors.custom.Step.Step import Step
from world.commons.Draw import Draw
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import SubprocVecEnv
from scripts.commons.Server import Server
from scripts.commons.Train_Base import Train_Base
from time import sleep
import os, gym
import numpy as np
from math_ops.Math_Ops import Math_Ops as M
from behaviors.custom.Step.Step_Generator import Step_Generator
from imitation.data import rollout
from imitation.algorithms.adversarial.gail import GAIL
'''
Objective:
Learn how to run forward using step primitive
----------
- class Basic_Run: implements an OpenAI custom gym
- class Train: implements algorithms to train a new model or test an existing model
'''
class dribble(gym.Env):
def __init__(self, ip, server_p, monitor_p, r_type, enable_draw) -> None:
self.abs_ori = 75
self.ball_dist_hip_center_2d = 0
self.ball_dist_hip_center = None
self.internal_rel_orientation = None
self.dribble_speed = 1
self.gym_last_internal_abs_ori = None
self.internal_target_vel = None
self.Gen_player_pos = None
self.internal_target = None
self.values_l = None
self.values_r = None
self.reset_time = None
self.behavior = None
self.robot_type = r_type
self.kick_ori = 0
self.terminal = False
# Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw
self.player = Agent(ip, server_p, monitor_p, 1, self.robot_type, "Gym", True, True)
self.step_counter = 0 # to limit episode size
self.ik = self.player.inv_kinematics
self.dribble_rel_orientation = 0 # relative to imu_torso_orientation (in degrees)
# Step behavior defaults
self.STEP_DUR = 8
self.STEP_Z_SPAN = 0.02
self.STEP_Z_MAX = 0.70
nao_specs = self.ik.NAO_SPECS
self.leg_length = nao_specs[1] + nao_specs[3] # upper leg height + lower leg height
feet_y_dev = nao_specs[0] * 1.12 # wider step
sample_time = self.player.world.robot.STEPTIME
max_ankle_z = nao_specs[5]
self.step_generator = Step_Generator(feet_y_dev, sample_time, max_ankle_z)
self.DEFAULT_ARMS = np.array([-90, -90, 8, 8, 90, 90, 70, 70], np.float32)
self.act = np.zeros(16, np.float32) # memory variable
self.path_manager = self.player.path_manager
# State space
obs_size = 76
self.obs = np.zeros(obs_size, np.float32)
self.observation_space = gym.spaces.Box(low=np.full(obs_size, -np.inf, np.float32),
high=np.full(obs_size, np.inf, np.float32), dtype=np.float32)
self.ball_x_center = 0.21
self.ball_y_center = -0.045
# Action space
MAX = np.finfo(np.float32).max
self.no_of_actions = act_size = 16
self.action_space = gym.spaces.Box(low=np.full(act_size, -MAX, np.float32),
high=np.full(act_size, MAX, np.float32), dtype=np.float32)
# Place ball far away to keep landmarks in FoV (head follows ball while using Step behavior)
self.player.scom.unofficial_move_ball((14, 0, 0.042))
self.player.scom.unofficial_set_play_mode("PlayOn")
self.player.scom.unofficial_move_ball((0, 0, 0))
def observe(self, init=False, virtual_ball=False):
r = self.player.world.robot
w = self.player.world
if init: # reset variables
self.step_counter = 0
self.act = np.zeros(16, np.float32) # memory variable
# index observation naive normalization
self.obs[0] = min(self.step_counter, 12 * 8) / 100 # simple counter: 0,1,2,3...
self.obs[1] = r.loc_head_z * 3 # z coordinate (torso)
self.obs[2] = r.loc_head_z_vel / 2 # z velocity (torso)
self.obs[3] = r.imu_torso_roll / 15 # absolute torso roll in deg
self.obs[4] = r.imu_torso_pitch / 15 # absolute torso pitch in deg
self.obs[5:8] = r.gyro / 100 # gyroscope
self.obs[8:11] = r.acc / 10 # accelerometer
self.obs[11:17] = r.frp.get('lf', np.zeros(6)) * (10, 10, 10, 0.01, 0.01,
0.01) # left foot: relative point of origin (p) and force vector (f) -> (px,py,pz,fx,fy,fz)*
self.obs[17:23] = r.frp.get('rf', np.zeros(6)) * (10, 10, 10, 0.01, 0.01,
0.01) # right foot: relative point of origin (p) and force vector (f) -> (px,py,pz,fx,fy,fz)*
# *if foot is not touching the ground, then (px=0,py=0,pz=0,fx=0,fy=0,fz=0)
self.obs[23:43] = r.joints_position[2:22] / 100 # position of all joints except head & toes (for robot type 4)
self.obs[43:63] = r.joints_speed[2:22] / 6.1395 # speed of all joints except head & toes (for robot type 4)
if init: # the walking parameters refer to the last parameters in effect (after a reset, they are pointless)
self.obs[63] = 1 # step progress
self.obs[64] = 1 # 1 if left leg is active
self.obs[65] = 0 # 1 if right leg is active
self.obs[66] = 0
else:
self.obs[63] = self.step_generator.external_progress # step progress
self.obs[64] = float(self.step_generator.state_is_left_active) # 1 if left leg is active
self.obs[65] = float(not self.step_generator.state_is_left_active) # 1 if right leg is active
self.obs[66] = math.sin(self.step_generator.state_current_ts / self.step_generator.ts_per_step * math.pi)
# Ball
ball_rel_hip_center = self.ik.torso_to_hip_transform(w.ball_rel_torso_cart_pos)
self.ball_dist_hip_center = np.linalg.norm(ball_rel_hip_center)
ball_rel_torso_xy = w.ball_rel_torso_cart_pos[:2] # 取X和Y分量
self.ball_dist_hip_center_2d = np.linalg.norm(ball_rel_torso_xy)
if init:
self.obs[67:70] = (0, 0, 0) # Initial velocity is 0
elif w.ball_is_visible:
self.obs[67:70] = (ball_rel_hip_center - self.obs[
70:73]) * 10 # Ball velocity, relative to ankle's midpoint
self.obs[70:73] = ball_rel_hip_center # Ball position, relative to hip
self.obs[73] = self.ball_dist_hip_center * 2
if virtual_ball: # simulate the ball between the robot's feet
self.obs[67:74] = (0, 0, 0, 0.05, 0, -0.175, 0.36)
'''
Create internal target with a smoother variation
'''
MAX_ROTATION_DIFF = 20 # max difference (degrees) per visual step
MAX_ROTATION_DIST = 80
if init:
self.internal_rel_orientation = 0
self.internal_target_vel = 0
self.gym_last_internal_abs_ori = r.imu_torso_orientation # for training purposes (reward)
# ---------------------------------------------------------------- compute internal target
if w.vision_is_up_to_date:
previous_internal_rel_orientation = np.copy(self.internal_rel_orientation)
internal_ori_diff = np.clip(M.normalize_deg(self.dribble_rel_orientation - self.internal_rel_orientation),
-MAX_ROTATION_DIFF, MAX_ROTATION_DIFF)
self.internal_rel_orientation = np.clip(M.normalize_deg(self.internal_rel_orientation + internal_ori_diff),
-MAX_ROTATION_DIST, MAX_ROTATION_DIST)
# Observations
self.internal_target_vel = self.internal_rel_orientation - previous_internal_rel_orientation
self.gym_last_internal_abs_ori = self.internal_rel_orientation + r.imu_torso_orientation
# ----------------------------------------------------------------- observations
self.obs[74] = self.internal_rel_orientation / MAX_ROTATION_DIST
self.obs[75] = self.internal_target_vel / MAX_ROTATION_DIFF
return self.obs
def execute_ik(self, l_pos, l_rot, r_pos, r_rot):
r = self.player.world.robot
# Apply IK to each leg + Set joint targets
# Left leg
indices, self.values_l, error_codes = self.ik.leg(l_pos, l_rot, True, dynamic_pose=False)
r.set_joints_target_position_direct(indices, self.values_l, harmonize=False)
# Right leg
indices, self.values_r, error_codes = self.ik.leg(r_pos, r_rot, False, dynamic_pose=False)
r.set_joints_target_position_direct(indices, self.values_r, harmonize=False)
def sync(self):
''' Run a single simulation step '''
r = self.player.world.robot
self.player.scom.commit_and_send(r.get_command())
self.player.scom.receive()
def reset(self):
# print("reset")
'''
Reset and stabilize the robot
Note: for some behaviors it would be better to reduce stabilization or add noise
'''
self.abs_ori = 45
self.player.scom.unofficial_set_play_mode("PlayOn")
Gen_ball_pos = [- 9, 0, 0]
self.Gen_player_pos = (Gen_ball_pos[0] - 1.5, Gen_ball_pos[1], 0.5)
self.player.scom.unofficial_move_ball((Gen_ball_pos[0], Gen_ball_pos[1], Gen_ball_pos[2]))
self.step_counter = 0
self.behavior = self.player.behavior
r = self.player.world.robot
w = self.player.world
t = w.time_local_ms
self.reset_time = t
self.b_rel = w.ball_rel_torso_cart_pos[:2]
self.path_manager = self.player.path_manager
for _ in range(25):
self.player.scom.unofficial_beam(self.Gen_player_pos, 0) # beam player continuously (floating above ground)
self.player.behavior.execute("Zero_Bent_Knees")
self.sync()
# beam player to ground
self.player.scom.unofficial_beam(self.Gen_player_pos, 0)
r.joints_target_speed[
0] = 0.01 # move head to trigger physics update (rcssserver3d bug when no joint is moving)
self.sync()
# stabilize on ground
for _ in range(7):
self.player.behavior.execute("Zero_Bent_Knees")
self.sync()
# walk to ball
while True:
self.b_rel = w.ball_rel_torso_cart_pos[:2]
if self.player.behavior.is_ready("Get_Up"):
self.player.behavior.execute_to_completion("Get_Up")
if 0.26 > self.b_rel[0] > 0.18 and abs(self.b_rel[1]) < 0.04 and w.ball_is_visible:
break
else:
if self.player.behavior.is_ready("Get_Up"):
self.player.behavior.execute_to_completion("Get_Up")
reset_walk = self.behavior.previous_behavior != "Walk" # reset walk if it wasn't the previous behavior
rel_target = self.b_rel + (-0.23, 0) # relative target is a circle (center: ball, radius:0.23m)
rel_ori = M.vector_angle(self.b_rel) # relative orientation of ball, NOT the target!
dist = max(0.08, np.linalg.norm(rel_target) * 0.7) # slow approach
self.behavior.execute_sub_behavior("Walk", reset_walk, rel_target, False, rel_ori, False,
dist) # target, is_target_abs, ori, is_ori_abs, distance
self.sync()
self.act = np.zeros(self.no_of_actions, np.float32)
return self.observe(True)
def render(self, mode='human', close=False):
return
def close(self):
Draw.clear_all()
self.player.terminate()
def step(self, action):
r = (self.
player.world.robot)
w = self.player.world
d = w.draw
if w.ball_abs_pos[1] > 0: #
dribble_target = (15, 5)
else:
dribble_target = (15, -5)
self.dribble_rel_orientation = self.path_manager.get_dribble_path(optional_2d_target=dribble_target)[1]
# exponential moving average
self.act = 0.85 * self.act + 0.15 * action * 0.7 * 0.95 * self.dribble_speed
# execute Step behavior to extract the target positions of each leg (we will override these targets)
lfy, lfz, rfy, rfz = self.step_generator.get_target_positions(self.step_counter == 0, self.STEP_DUR,
self.STEP_Z_SPAN,
self.leg_length * self.STEP_Z_MAX)
# Leg IK
a = self.act
l_ankle_pos = (a[0] * 0.025 - 0.01, a[1] * 0.01 + lfy, a[2] * 0.01 + lfz)
r_ankle_pos = (a[3] * 0.025 - 0.01, a[4] * 0.01 + rfy, a[5] * 0.01 + rfz)
l_foot_rot = a[6:9] * (2, 2, 3)
r_foot_rot = a[9:12] * (2, 2, 3)
# Limit leg yaw/pitch (and add bias)
l_foot_rot[2] = max(0, l_foot_rot[2] + 18.3)
r_foot_rot[2] = min(0, r_foot_rot[2] - 18.3)
# Arms actions
arms = np.copy(self.DEFAULT_ARMS) # default arms pose
arms[0:4] += a[12:16] * 4 # arms pitch+roll
# Set target positions
self.execute_ik(l_ankle_pos, l_foot_rot, r_ankle_pos, r_foot_rot) # legs
r.set_joints_target_position_direct(slice(14, 22), arms, harmonize=False) # arms
self.sync()
self.step_counter += 1
# 收集观测
obs = self.observe()
angle_rad = np.radians(self.gym_last_internal_abs_ori) # 将角度转换为弧度
unit_vector = np.array([np.cos(angle_rad), np.sin(angle_rad)]) # 计算单位向量
if np.linalg.norm(w.ball_cheat_abs_vel[:2]) != 0:
cos_theta = np.dot(unit_vector, w.ball_cheat_abs_vel[:2]) / (
np.linalg.norm(unit_vector) * np.linalg.norm(w.ball_cheat_abs_vel[:2]))
else:
cos_theta = 0
# 计算奖励
reward = np.linalg.norm(w.ball_cheat_abs_vel) * cos_theta
if self.ball_dist_hip_center_2d < 0.115:
reward = 0
# 判断终止
if self.player.behavior.is_ready("Get_Up"):
self.terminal = True
elif w.time_local_ms - self.reset_time > 7500 * 3 or np.linalg.norm(
w.ball_cheat_abs_pos[:2] - r.loc_head_position[:2]) > 0.45 or not w.is_ball_abs_pos_from_vision:
self.terminal = True
else:
self.terminal = False
return obs, reward, self.terminal, {}
class Train(Train_Base):
def __init__(self, script) -> None:
super().__init__(script)
def load_expert_data(self, path: str):
# 加载专家数据
expert_data = np.load(path, allow_pickle=True)
observations, actions = [], []
for item in expert_data:
if isinstance(item, dict):
# 处理字典格式的数据
observations.append(item['observations'])
actions.append(item['actions'])
elif isinstance(item, tuple) and len(item) == 2:
# 处理元组格式的数据
observations.append(item[0])
actions.append(item[1])
else:
print("Unexpected item format:", item)
# 将列表转换为numpy数组
return {
'observations': np.array(observations, dtype=np.float32),
'actions': np.array(actions, dtype=np.float32)
}
def train(self, args):
# --------------------------------------- 学习参数
n_envs = min(1, os.cpu_count())
n_steps_per_env = 1024 # RolloutBuffer is of size (n_steps_per_env * n_envs)
minibatch_size = 64 # should be a factor of (n_steps_per_env * n_envs)
learning_rate = 3e-4
total_steps = 50000000
folder_name = f'Sprint_Gail_R{self.robot_type}'
model_path = f'./scripts/gyms/logs/{folder_name}/'
if not os.path.exists(model_path):
os.makedirs(model_path)
# 加载专家数据
expert_data = self.load_expert_data(
"/home/her-darling/APOLLO_PRO/TEST_Dribble/scripts/gyms/dribble_expert_data.pkl")
# --------------------------------------- 运行 GAIL 算法
def init_env(i_env):
def thunk():
return dribble(self.ip, self.server_p + i_env, self.monitor_p_1000 + i_env, self.robot_type, False)
return thunk
servers = Server(self.server_p, self.monitor_p_1000, n_envs + 1) # include 1 extra server for testing
env = SubprocVecEnv([init_env(i) for i in range(n_envs)])
eval_env = SubprocVecEnv([init_env(n_envs)])
reward_net = BasicRewardNet(observation_space=env.observation_space, action_space=env.action_space)
try:
if "model_file" in args: # retrain
model = PPO.load(args["model_file"], env=env, device="cuda", n_envs=n_envs, n_steps=n_steps_per_env,
batch_size=minibatch_size, learning_rate=learning_rate)
else: # train new model
model = PPO("MlpPolicy", env=env, verbose=1, n_steps=n_steps_per_env, batch_size=minibatch_size,
learning_rate=learning_rate, device="cuda")
gail_trainer = GAIL(
demonstrations=expert_data,
demo_batch_size=32,
venv=env,
gen_algo=model, # 这里传递 PPO 实例,而不是字符串
reward_net=reward_net,
)
eval_callback = EvalCallback(
eval_env,
best_model_save_path='./logs/',
log_path='./logs/',
eval_freq=10000, # 每训练 10000 个 step 进行一次评估
deterministic=True,
render=False,
)
gail_trainer.train(total_timesteps=total_steps, callback=eval_callback)
except KeyboardInterrupt:
sleep(1) # 等待子进程结束
print("\n按下 ctrl+c正在中止...\n")
servers.kill()
return
except Exception as e:
print(f"训练过程中出错: {e}")
env.close()
eval_env.close()
servers.kill()
def test(self, args):
server = Server(self.server_p - 1, self.monitor_p, 1)
env = dribble(self.ip, self.server_p - 1, self.monitor_p, self.robot_type, True)
model = GAIL.load(args["model_file"], env=env) # 加载 GAIL 模型
try:
self.export_model(args["model_file"], args["model_file"] + ".pkl", False)
self.test_model(model, env, log_path=args["folder_dir"], model_path=args["folder_dir"])
except KeyboardInterrupt:
print()
env.close()
server.kill()