This commit is contained in:
2025-11-19 08:08:22 -05:00
commit eaaa5519bd
256 changed files with 46657 additions and 0 deletions

60
scripts/utils/Beam.py Normal file
View File

@@ -0,0 +1,60 @@
from agent.Base_Agent import Base_Agent as Agent
from scripts.commons.Script import Script
from time import sleep
class Beam():
def __init__(self, script:Script) -> None:
self.script = script
def ask_for_input(self,prompt, default):
try:
inp=input(prompt)
return float(inp)
except ValueError:
if inp != '':
print("Illegal input:", inp, "\n")
return default
def beam_and_update(self,x,y,rot):
r = self.player.world.robot
d = self.player.world.draw
d.annotation((x,y,0.7), f"x:{x} y:{y} r:{rot}", d.Color.yellow, "pos_label")
self.player.scom.unofficial_beam((x,y,r.beam_height),rot)
for _ in range(10): # run multiple times to beam and then simulate eventual collisions (e.g. goal posts)
sleep(0.03)
self.player.behavior.execute("Zero")
self.player.scom.commit_and_send( r.get_command() )
self.player.scom.receive()
def execute(self):
a = self.script.args
self.player = Agent(a.i, a.p, a.m, a.u, a.r, a.t) # Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name
d = self.player.world.draw
self.player.scom.unofficial_set_play_mode("PlayOn")
# Draw grid
for x in range(-15,16):
for y in range(-10,11):
d.point((x,y), 6, d.Color.red, "grid", False)
d.flush("grid")
for _ in range(10): # Initialize
self.player.scom.send()
self.player.scom.receive()
print("\nBeam player to coordinates + orientation:")
x=y=a=0
while True: # Beam self.player to given position
x = self.ask_for_input(f"\nInput x coordinate ('' to send {x:5} again, ctrl+c to return): ",x)
self.beam_and_update(x,y,a)
y = self.ask_for_input( f"Input y coordinate ('' to send {y:5} again, ctrl+c to return): ",y)
self.beam_and_update(x,y,a)
a = self.ask_for_input( f"Orientation -180 to 180 ('' to send {a:5} again, ctrl+c to return): ",a)
self.beam_and_update(x,y,a)

View File

@@ -0,0 +1,55 @@
from agent.Base_Agent import Base_Agent as Agent
from scripts.commons.Script import Script
from scripts.commons.UI import UI
class Behaviors():
def __init__(self,script:Script) -> None:
self.script = script
self.player : Agent = None
def ask_for_behavior(self):
names, descriptions = self.player.behavior.get_all_behaviors()
UI.print_table( [names,descriptions], ["Behavior Name","Description"], numbering=[True,False])
choice, is_str_opt = UI.read_particle('Choose behavior ("" to skip 2 time steps, "b" to beam, ctrl+c to return): ',["","b"],int,[0,len(names)])
if is_str_opt: return choice #skip 2 time steps or quit
return names[choice]
def sync(self):
self.player.scom.commit_and_send( self.player.world.robot.get_command() )
self.player.scom.receive()
def beam(self):
self.player.scom.unofficial_beam((-2.5,0,self.player.world.robot.beam_height),0)
for _ in range(5):
self.sync()
def execute(self):
a = self.script.args
self.player = Agent(a.i, a.p, a.m, a.u, a.r, a.t) # Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name
behavior = self.player.behavior
self.beam()
self.player.scom.unofficial_set_play_mode("PlayOn")
# Special behaviors
special_behaviors = {"Step":(), "Basic_Kick":(0,), "Walk":((0.5,0),False,0,False,None), "Dribble":(None,None)}
while True:
behavior_name = self.ask_for_behavior()
if behavior_name == 0: # skip 2 time steps (user request)
self.sync()
self.sync()
elif behavior_name == 1: # beam
self.beam()
else:
if behavior_name in special_behaviors: # not using execute_to_completion to abort behavior after a timeout
duration = UI.read_int("For how many time steps [1,1000]? ", 1, 1001)
for _ in range(duration):
if behavior.execute(behavior_name, *special_behaviors[behavior_name]):
break # break if behavior ends
self.sync()
else:
behavior.execute_to_completion(behavior_name)

35
scripts/utils/Drawings.py Normal file
View File

@@ -0,0 +1,35 @@
from time import sleep
from world.commons.Draw import Draw
class Drawings():
def __init__(self,script) -> None:
self.script = script
def execute(self):
# Creating a draw object is done automatically for each agent
# This is a shortcut to draw shapes without creating an agent
# Usually, we can access the object through player.world.draw
a = self.script.args
draw = Draw(True, 0, a.i, 32769)
print("\nPress ctrl+c to return.")
while True:
for i in range(100):
sleep(0.02)
draw.circle( (0,0),i/10,2, Draw.Color.green_light,"green")
draw.circle( (0,0),i/9,2, Draw.Color.red,"red")
draw.sphere( (0,0,5-i/20),0.2, Draw.Color.red,"ball" )
draw.annotation((0,0,1), "Hello!", Draw.Color.cyan, "text")
draw.arrow( (0,0,5), (0,0,5-i/25), 0.5, 4, Draw.Color.get(127,50,255), "my_arrow")
#draw pyramid
draw.polygon(((2,0,0),(3,0,0),(3,1,0),(2,1,0)), Draw.Color.blue, 255, "solid", False)
draw.line( (2,0,0), (2.5,0.5,1), 2, Draw.Color.cyan, "solid", False)
draw.line( (3,0,0), (2.5,0.5,1), 2, Draw.Color.cyan, "solid", False)
draw.line( (2,1,0), (2.5,0.5,1), 2, Draw.Color.cyan, "solid", False)
draw.line( (3,1,0), (2.5,0.5,1), 2, Draw.Color.cyan, "solid", True)

54
scripts/utils/Dribble.py Normal file
View File

@@ -0,0 +1,54 @@
from agent.Agent import Agent
from agent.Base_Agent import Base_Agent
from scripts.commons.Script import Script
import numpy as np
'''
Objective:
----------
Dribble and score
'''
class Dribble():
def __init__(self, script:Script) -> None:
self.script = script
def execute(self):
a = self.script.args
# Args: Server IP, Agent Port, Monitor Port, Uniform No., [Robot Type] (for Base_Agent), Team name, Enable Log, Enable Draw
self.script.batch_create(Base_Agent, ((a.i,a.p,a.m,a.u,a.r,a.t,True,True),)) # one dribbler
self.script.batch_create(Agent, ((a.i,a.p,a.m,u,"Opponent",False,False) for u in range(1,2))) # 1 opponent (normal agent)
p : Base_Agent = self.script.players[0]
p.path_manager.draw_options(enable_obstacles=True, enable_path=True)
behavior = p.behavior
w = p.world
r = w.robot
d = w.draw
p.scom.unofficial_beam((-3,0,r.beam_height),0)
p.scom.unofficial_set_play_mode("PlayOn")
print("\nPress ctrl+c to return.")
while True:
if w.play_mode == w.M_THEIR_KICKOFF:
p.scom.unofficial_set_play_mode("PlayOn")
# execute dribbler
if behavior.is_ready("Get_Up") or w.play_mode_group in [w.MG_ACTIVE_BEAM, w.MG_PASSIVE_BEAM]:
p.scom.unofficial_beam((*(w.ball_abs_pos[:2]-(1,0)),r.beam_height),0)
behavior.execute("Zero_Bent_Knees")
else:
behavior.execute("Dribble",None,None)
d.annotation(r.loc_head_position+(0,0,0.2),f"{np.linalg.norm(r.get_head_abs_vel(40)[:2]):.2f}",d.Color.white,"vel_annotation")
p.scom.commit_and_send( r.get_command() )
# execute opponents as normal agents
self.script.batch_execute_agent(slice(1,None))
# all players wait for server to send feedback
self.script.batch_receive()

View File

@@ -0,0 +1,115 @@
from agent.Base_Agent import Base_Agent as Agent
from scripts.commons.Script import Script
from world.commons.Draw import Draw
import numpy as np
class Fwd_Kinematics():
def __init__(self,script:Script) -> None:
self.script = script
self.cycle_duration = 200 #steps
def draw_cycle(self):
#Draw position of body parts
for _ in range(self.cycle_duration):
self.script.batch_execute_behavior("Squat")
self.script.batch_commit_and_send()
self.script.batch_receive()
p : Agent
for p in self.script.players:
if p.world.vision_is_up_to_date and not p.world.robot.loc_is_up_to_date:
p.world.draw.annotation(p.world.robot.cheat_abs_pos, "Not enough visual data! Using IMU", Draw.Color.red,"localization")
for key, val in p.world.robot.body_parts.items():
rp = val.transform.get_translation()
pos = p.world.robot.loc_head_to_field_transform(rp,False)
label_rp = np.array([rp[0]-0.0001,rp[1]*0.5,0])
label_rp /= np.linalg.norm(label_rp) / 0.4 #labels at 0.4m from body part
label = p.world.robot.loc_head_to_field_transform(rp+label_rp,False)
p.world.draw.line(pos,label,2,Draw.Color.green_light,key,False)
p.world.draw.annotation(label,key,Draw.Color.red,key)
rp = p.world.robot.body_parts['lfoot'].transform((0.08,0,0))
ap = p.world.robot.loc_head_to_field_transform(rp,False)
p.world.draw.line(ap,ap+(0,0,0.1),1,Draw.Color.red,"soup",False)
rp = p.world.robot.body_parts['lfoot'].transform((-0.08,0,0))
ap = p.world.robot.loc_head_to_field_transform(rp,False)
p.world.draw.line(ap,ap+(0,0,0.1),1,Draw.Color.red,"soup",False)
rp = p.world.robot.body_parts['lfoot'].transform((0,0.04,0))
ap = p.world.robot.loc_head_to_field_transform(rp,False)
p.world.draw.line(ap,ap+(0,0,0.1),1,Draw.Color.red,"soup",False)
rp = p.world.robot.body_parts['lfoot'].transform((0,-0.04,0))
ap = p.world.robot.loc_head_to_field_transform(rp,False)
p.world.draw.line(ap,ap+(0,0,0.1),1,Draw.Color.red,"soup",True)
Draw.clear_all()
#Draw position of joints
for _ in range(self.cycle_duration):
self.script.batch_execute_behavior("Squat")
self.script.batch_commit_and_send()
self.script.batch_receive()
for p in self.script.players:
if p.world.vision_is_up_to_date and not p.world.robot.loc_is_up_to_date:
p.world.draw.annotation(p.world.robot.cheat_abs_pos, "Not enough visual data! Using IMU", Draw.Color.red,"localization")
zstep = 0.05
label_z = [0,0,0,0,zstep,zstep,2*zstep,2*zstep,0,0,0,0,zstep,zstep,0,0,zstep,zstep,2*zstep,2*zstep,3*zstep,3*zstep,0,0]
for j, transf in enumerate(p.world.robot.joints_transform):
rp = transf.get_translation()
pos = p.world.robot.loc_head_to_field_transform(rp,False)
j_name = str(j)
label_rp = np.array([rp[0]-0.0001,rp[1]*0.5,0])
label_rp /= np.linalg.norm(label_rp) / 0.4 #labels at 0.4m from body part
label_rp += (0,0,label_z[j])
label = p.world.robot.loc_head_to_field_transform(rp+label_rp,False)
p.world.draw.line( pos,label,2,Draw.Color.green_light,j_name,False)
p.world.draw.annotation( label,j_name,Draw.Color.cyan,j_name)
Draw.clear_all()
#Draw orientation of body parts
for _ in range(self.cycle_duration):
self.script.batch_execute_behavior("Squat")
self.script.batch_commit_and_send()
self.script.batch_receive()
p : Agent
for p in self.script.players:
if p.world.vision_is_up_to_date and not p.world.robot.loc_is_up_to_date:
p.world.draw.annotation(p.world.robot.cheat_abs_pos, "Not enough visual data! Using IMU", Draw.Color.red,"localization")
for key in p.world.robot.body_parts:
#Select only some body parts
if key not in ['head', 'torso', 'llowerarm', 'rlowerarm', 'lthigh', 'rthigh', 'lshank', 'rshank', 'lfoot', 'rfoot']: continue
bpart_abs_pos = p.world.robot.get_body_part_to_field_transform(key).translate((0.1,0,0)) #10cm in front of body part
x_axis = bpart_abs_pos((0.05,0,0),False)
y_axis = bpart_abs_pos((0,0.05,0),False)
z_axis = bpart_abs_pos((0,0,0.05),False)
axes_0 = bpart_abs_pos.get_translation()
p.world.draw.line( axes_0,x_axis,2,Draw.Color.green_light,key,False)
p.world.draw.line( axes_0,y_axis,2,Draw.Color.blue,key,False)
p.world.draw.line( axes_0,z_axis,2,Draw.Color.red,key)
Draw.clear_all()
def execute(self):
a = self.script.args
# Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw
self.script.batch_create(Agent, ((a.i,a.p,a.m,u,u-1,a.t,True,True) for u in range(1,6)) )
#Beam players
self.script.batch_unofficial_beam( [(-2,i*4-10,0.5,i*45) for i in range(5)] )
print("\nPress ctrl+c to return.")
while True:
self.draw_cycle()

45
scripts/utils/Get_Up.py Normal file
View File

@@ -0,0 +1,45 @@
from agent.Base_Agent import Base_Agent as Agent
from itertools import count
from scripts.commons.Script import Script
import numpy as np
'''
Objective:
----------
Fall and get up
'''
class Get_Up():
def __init__(self, script:Script) -> None:
self.script = script
self.player : Agent = None
def sync(self):
r = self.player.world.robot
self.player.scom.commit_and_send( r.get_command() )
self.player.scom.receive()
def execute(self):
a = self.script.args
player = self.player = Agent(a.i, a.p, a.m, a.u, a.r, a.t) # Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name
behavior = player.behavior
r = player.world.robot
player.scom.commit_beam((-3,0),0)
print("\nPress ctrl+c to return.")
for i in count():
rnd = np.random.uniform(-6,6,r.no_of_joints)
# Fall
while r.loc_head_z > 0.3 and r.imu_torso_inclination < 50:
if i < 4:
behavior.execute(["Fall_Front","Fall_Back","Fall_Left","Fall_Right"][i % 4]) # First, fall deterministically
else:
r.joints_target_speed[:] = rnd # Second, fall randomly
self.sync()
# Get up
behavior.execute_to_completion("Get_Up")
behavior.execute_to_completion("Zero_Bent_Knees")

179
scripts/utils/IMU.py Normal file
View File

@@ -0,0 +1,179 @@
from agent.Base_Agent import Base_Agent as Agent
from math_ops.Matrix_3x3 import Matrix_3x3
from math_ops.Matrix_4x4 import Matrix_4x4
from scripts.commons.Script import Script
from world.commons.Draw import Draw
from world.Robot import Robot
import numpy as np
'''
Objective:
----------
Demonstrate the accuracy of the IMU
Robot.imu_(...) variables are based on the visual localizer algorithm and the IMU when no visual data is available.
If visual data is not available for longer than 0.2 seconds, the robot's position is frozen and the velocity decays to zero.
The rotation computed by the IMU is so accurate that it is never frozen, no matter how long the robot goes without visual data.
It is almost always safe to use IMU data for rotation.
Known issues: the accelerometer is not reliable in the presence of "instant" acceleration peaks, due to its low sample rate (50Hz)
this limitation impacts the translation estimation during crashes (e.g. falling, crashing against other players)
'''
class IMU():
def __init__(self,script:Script) -> None:
self.script = script
self.player : Agent = None
self.cycle = 0
self.imu_torso_to_field_rotation = [Matrix_3x3() for _ in range(3)]
self.imu_torso_to_field_transform = [Matrix_4x4() for _ in range(3)]
self.imu_head_to_field_transform = [Matrix_4x4() for _ in range(3)]
self.imu_torso_position = np.zeros((3,3))
self.imu_torso_velocity = np.zeros((3,3))
self.imu_torso_acceleration = np.zeros((3,3))
self.imu_torso_next_position = np.zeros((3,3))
self.imu_torso_next_velocity = np.zeros((3,3))
self.imu_CoM_position = np.zeros((3,3))
self.colors = [Draw.Color.green_light, Draw.Color.yellow, Draw.Color.red]
def act(self):
r = self.player.world.robot
joint_indices = [r.J_LLEG_PITCH,
r.J_LKNEE,
r.J_LFOOT_PITCH,
r.J_LARM_ROLL,
r.J_RLEG_PITCH,
r.J_RKNEE,
r.J_RFOOT_PITCH,
r.J_RARM_ROLL]
amplitude = [1,0.93,1,1,1][r.type]
self.cycle += 1
if self.cycle < 50:
r.set_joints_target_position_direct(joint_indices, np.array([32+10,-64,32, 45, 40+10,-80,40, 0])*amplitude)
elif self.cycle < 100:
r.set_joints_target_position_direct(joint_indices, np.array([ -10, 0, 0, 0, -10, 0, 0, 0])*amplitude)
elif self.cycle < 150:
r.set_joints_target_position_direct(joint_indices, np.array([40+10,-80,40, 0, 32+10,-64,32, 45])*amplitude)
elif self.cycle < 200:
r.set_joints_target_position_direct(joint_indices, np.array([ -10, 0, 0, 0, -10, 0, 0, 0])*amplitude)
else:
self.cycle = 0
self.player.scom.commit_and_send( r.get_command() )
self.player.scom.receive()
def act2(self):
r = self.player.world.robot
self.player.behavior.execute("Walk", (0.2,0), False, 5, False, None ) # Args: target, is_target_abs, ori, is_ori_abs, distance
self.player.scom.commit_and_send( r.get_command() )
self.player.scom.receive()
def draw_player_reference_frame(self,i):
pos = self.imu_torso_position[i]
xvec = self.imu_torso_to_field_rotation[i].multiply((1,0,0)) + pos
yvec = self.imu_torso_to_field_rotation[i].multiply((0,1,0)) + pos
zvec = self.imu_torso_to_field_rotation[i].multiply((0,0,1)) + pos
self.player.world.draw.arrow(pos, xvec, 0.2, 2, self.colors[i], "IMU"+str(i), False)
self.player.world.draw.arrow(pos, yvec, 0.2, 2, self.colors[i], "IMU"+str(i), False)
self.player.world.draw.arrow(pos, zvec, 0.2, 2, self.colors[i], "IMU"+str(i), False)
self.player.world.draw.annotation(xvec, "x", Draw.Color.white, "IMU"+str(i), False)
self.player.world.draw.annotation(yvec, "y", Draw.Color.white, "IMU"+str(i), False)
self.player.world.draw.annotation(zvec, "z", Draw.Color.white, "IMU"+str(i), False)
self.player.world.draw.sphere(self.imu_CoM_position[i],0.04,self.colors[i],"IMU"+str(i), True)
def compute_local_IMU(self):
r = self.player.world.robot
g = r.gyro / 50 # convert degrees per second to degrees per step
self.imu_torso_to_field_rotation[2].multiply( Matrix_3x3.from_rotation_deg(g), in_place=True, reverse_order=True)
self.imu_torso_position[2][:] = self.imu_torso_next_position[2]
if self.imu_torso_position[2][2] < 0: self.imu_torso_position[2][2] = 0 #limit z coordinate to positive values
self.imu_torso_velocity[2][:] = self.imu_torso_next_velocity[2]
# convert proper acceleration to coordinate acceleration and fix rounding bias
self.imu_torso_acceleration[2] = self.imu_torso_to_field_rotation[2].multiply(r.acc) + Robot.GRAVITY
self.imu_torso_to_field_transform[2] = Matrix_4x4.from_3x3_and_translation(self.imu_torso_to_field_rotation[2],self.imu_torso_position[2])
self.imu_head_to_field_transform[2] = self.imu_torso_to_field_transform[2].multiply(r.body_parts["torso"].transform.invert())
self.imu_CoM_position[2][:] = self.imu_head_to_field_transform[2](r.rel_cart_CoM_position)
# Next Position = x0 + v0*t + 0.5*a*t^2, Next velocity = v0 + a*t
self.imu_torso_next_position[2] = self.imu_torso_position[2] + self.imu_torso_velocity[2] * 0.02 + self.imu_torso_acceleration[2] * 0.0002
self.imu_torso_next_velocity[2] = self.imu_torso_velocity[2] + self.imu_torso_acceleration[2] * 0.02
self.imu_torso_next_velocity[2] *= Robot.IMU_DECAY #stability tradeoff
def compute_local_IMU_rotation_only(self):
r = self.player.world.robot
g = r.gyro / 50 # convert degrees per second to degrees per step
self.imu_torso_to_field_rotation[1].multiply( Matrix_3x3.from_rotation_deg(g), in_place=True, reverse_order=True)
self.imu_torso_position[1][:] = r.loc_torso_position
self.imu_torso_to_field_transform[1] = Matrix_4x4.from_3x3_and_translation(self.imu_torso_to_field_rotation[1],self.imu_torso_position[1])
self.imu_head_to_field_transform[1] = self.imu_torso_to_field_transform[1].multiply(r.body_parts["torso"].transform.invert())
self.imu_CoM_position[1][:] = self.imu_head_to_field_transform[1](r.rel_cart_CoM_position)
def update_local_IMU(self, i):
r = self.player.world.robot
self.imu_torso_to_field_rotation[i].m[:] = r.imu_torso_to_field_rotation.m
self.imu_torso_to_field_transform[i].m[:] = r.imu_weak_torso_to_field_transform.m
self.imu_head_to_field_transform[i].m[:] = r.imu_weak_head_to_field_transform.m
self.imu_torso_position[i][:] = r.imu_weak_torso_position
self.imu_torso_velocity[i][:] = r.imu_weak_torso_velocity
self.imu_torso_acceleration[i][:] = r.imu_weak_torso_acceleration
self.imu_torso_next_position[i] = self.imu_torso_position[i] + self.imu_torso_velocity[i] * 0.02 + self.imu_torso_acceleration[i] * 0.0002
self.imu_torso_next_velocity[i] = self.imu_torso_velocity[i] + self.imu_torso_acceleration[i] * 0.02
self.imu_CoM_position[i][:] = r.imu_weak_CoM_position
def execute(self):
a = self.script.args
self.player = Agent(a.i, a.p, a.m, a.u, a.r, a.t) # Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name
self.player.scom.unofficial_beam((-3,0,self.player.world.robot.beam_height),15)
for _ in range(10): #beam to place
self.player.scom.commit_and_send()
self.player.scom.receive()
self.player.world.draw.annotation((-3,1,1.1), "IMU + Localizer", self.colors[0], "note_IMU_1", True)
for _ in range(150):
self.act()
self.update_local_IMU(0)
self.draw_player_reference_frame(0)
self.player.world.draw.annotation((-3,1,0.9), "IMU for rotation", self.colors[1], "note_IMU_2", True)
self.update_local_IMU(1)
for _ in range(200):
self.act()
self.update_local_IMU(0)
self.draw_player_reference_frame(0)
self.compute_local_IMU_rotation_only()
self.draw_player_reference_frame(1)
self.player.world.draw.annotation((-3,1,0.7), "IMU for rotation & position", self.colors[2], "note_IMU_3", True)
self.update_local_IMU(2)
for _ in range(200):
self.act()
self.update_local_IMU(0)
self.draw_player_reference_frame(0)
self.compute_local_IMU_rotation_only()
self.draw_player_reference_frame(1)
self.compute_local_IMU()
self.draw_player_reference_frame(2)
print("\nPress ctrl+c to return.")
# Still "IMU for rotation & position" but now start walking
self.update_local_IMU(2)
while True:
self.act2()
self.update_local_IMU(0)
self.draw_player_reference_frame(0)
self.compute_local_IMU_rotation_only()
self.draw_player_reference_frame(1)
self.compute_local_IMU()
self.draw_player_reference_frame(2)

View File

@@ -0,0 +1,146 @@
from agent.Base_Agent import Base_Agent as Agent
from itertools import count
from math_ops.Inverse_Kinematics import Inverse_Kinematics
from scripts.commons.Script import Script
from world.commons.Draw import Draw
import numpy as np
class Inv_Kinematics():
def __init__(self, script:Script) -> None:
self.args = script.args
self.last_action = (0,0,0)
self.gravity = True
# Initial pose is a neutral pose where all angles are 0
leg_y_dev, upper_leg_height, upper_leg_depth, lower_leg_len, _, _ = Inverse_Kinematics.NAO_SPECS_PER_ROBOT[self.args.r]
leg_height = upper_leg_height + lower_leg_len
self.feet_pose = [ [[upper_leg_depth,leg_y_dev,-leg_height],[0,0,0]], [[upper_leg_depth,-leg_y_dev,-leg_height], [0,0,0]] ]
def _user_control(self):
while True:
inp = input("Command:")
if inp == "": return 2
elif inp == ".": return 1
elif inp == "h": self.print_help(); continue
elif inp == "g":
self.gravity = not self.gravity
print("Using gravity:",self.gravity)
if self.gravity:
return 6 # extra steps for beam to take effect
else:
return 1
#Check if user input is a value
try:
val = float(inp)
self.feet_pose[self.last_action[0]][self.last_action[1]][self.last_action[2]] = val
continue
except:
pass
if inp[0] not in ['l','r'] or inp[1] not in ['x','y','z','X','Y','Z']:
print("Illegal command!")
continue
side = 0 if inp[0]=='l' else 1
pos_rot = 0 if inp[1].islower() else 1
axis = {'x':0,'y':1,'z':2}[inp[1].lower()]
self.last_action = (side,pos_rot,axis)
try:
val = float(inp[2:])
self.feet_pose[side][pos_rot][axis] = val
except:
print("Illegal value conversion!")
def _draw_labels(self, player:Agent):
r = player.world.robot
robot_pos = r.loc_head_position
for i, body_part in enumerate(['lankle','rankle']):
pos = r.get_body_part_abs_position(body_part)
label_rel_pos = np.array([-0.2,(0.5-i),0])
label_rel_pos /= np.linalg.norm(label_rel_pos) / 1.0 #labels at 1.0m from body part
player.world.draw.line( pos,pos+label_rel_pos,2,Draw.Color.green_light,body_part,False)
p = self.feet_pose[i]
pose_text = f"x:{p[0][0]:.4f} y:{p[0][1]:.4f} z:{p[0][2]:.4f}", f"rol:{p[1][0]:.2f} (bias) pit:{p[1][1]:.2f} (bias) yaw:{p[1][2]:.2f} "
player.world.draw.annotation( pos+label_rel_pos+[0,0,0.2], pose_text[0], Draw.Color.cyan,body_part,False)
player.world.draw.annotation( pos+label_rel_pos+[0,0,0.1], pose_text[1], Draw.Color.cyan,body_part,False)
# Draw forward kinematics (ankles positions + feet rotation)
p = player.inv_kinematics.get_body_part_pos_relative_to_hip(body_part) # ankle relative to center of both hip joints
foot_rel_torso = r.head_to_body_part_transform("torso", r.body_parts[['lfoot','rfoot'][i]].transform )
w = foot_rel_torso.get_roll_deg(), foot_rel_torso.get_pitch_deg(), foot_rel_torso.get_yaw_deg()
pose_text = f"x:{p[0]:.4f} y:{p[1]:.4f} z:{p[2]:.4f}", f"rol:{w[0]:.4f} pit:{w[1]:.4f} yaw:{w[2]:.4f}"
player.world.draw.annotation( pos+label_rel_pos+[0,0,-0.2], pose_text[0], Draw.Color.red,body_part,False)
player.world.draw.annotation( pos+label_rel_pos+[0,0,-0.3], pose_text[1], Draw.Color.red,body_part,False)
player.world.draw.annotation( pos+label_rel_pos+[0,0,-0.4], "(forward kinematics data)", Draw.Color.red,body_part,True)
note = f"Torso roll: {r.imu_torso_roll:.2f} Torso pitch: {r.imu_torso_pitch:.2f}"
player.world.draw.annotation( robot_pos+[0,0,0.10],note,Draw.Color.red,"Torso")
def print_help(self):
print("""
---------------- Inverse kinematics demonstration ----------------
INPUT: ankle positions + feet rotations (relative coordinates)
OUTPUT: angular positions of both legs' joints
------------------------------------------------------------------
Command: {action/option}
action: [side:{l/r} axis*:{x/y/z/X/Y/Z}] value
*for position use x/y/z, for rotation use X/Y/Z
option: {"",.,g,h}
Examples:
"lz-0.12" - move left ankle to -0.1m in the z-axis
"rX30.5" - rotate right foot to 30.5 deg in the x-axis (roll)
"20" - repeat last action but change value to 20
"" - advance 2 simulation step
"." - advance 1 simulation step
"g" - toggle gravity
"h" - help, display this message
"ctrl+c" - quit demonstration
------------------------------------------------------------------""")
def execute(self):
self.state = 0
a = self.args
self.print_help()
player = Agent(a.i, a.p, a.m, a.u, a.r, a.t) # Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name
player.scom.unofficial_beam((-3,0,0.42),0)
next_control_step = 20
for i in count():
if self.gravity:
player.scom.unofficial_beam((-3,0,0.42),0)
self._draw_labels(player)
if i == next_control_step:
next_control_step += self._user_control()
for i in range(2): #left and right legs
indices, values, error_codes = player.inv_kinematics.leg(self.feet_pose[i][0], self.feet_pose[i][1], bool(i==0), False)
if -1 in error_codes:
print("Position is out of reach!")
error_codes.remove(-1)
for j in error_codes:
print(f"Joint {j} is out of range!")
player.world.robot.set_joints_target_position_direct(indices,values)
player.scom.commit_and_send( player.world.robot.get_command() )
player.scom.receive()

151
scripts/utils/Joints.py Normal file
View File

@@ -0,0 +1,151 @@
from agent.Base_Agent import Base_Agent as Agent
from scripts.commons.Script import Script
from world.commons.Draw import Draw
import numpy as np
class Joints():
def __init__(self,script:Script) -> None:
self.script = script
self.agent_pos = (-3,0,0.45)
self.enable_pos = True
self.enable_gravity = False
self.enable_labels = True
self.enable_harmonize = True
self.active_joint = 0
self.joints_value = None #position or speed
def _draw_joints(self,player:Agent):
zstep = 0.05
label_z = [3*zstep,5*zstep,0,0,zstep,zstep,2*zstep,2*zstep,0,0,0,0,zstep,zstep,0,0,zstep,zstep,4*zstep,4*zstep,5*zstep,5*zstep,0,0]
for j, transf in enumerate(player.world.robot.joints_transform):
rp = transf.get_translation()
pos = player.world.robot.loc_head_to_field_transform(rp,False)
j_id = f"{j}"
j_name = f"{j}"
color = Draw.Color.cyan
if player.world.robot.joints_position[j] != 0:
j_name += f" ({int(player.world.robot.joints_position[j])})"
color = Draw.Color.red
label_rp = np.array([rp[0]-0.0001,rp[1]*0.5,0])
label_rp /= np.linalg.norm(label_rp) / 0.5 #labels at 0.5m from body part
label_rp += (0,0,label_z[j])
label = player.world.robot.loc_head_to_field_transform(rp+label_rp,False)
player.world.draw.line( pos,label,2,Draw.Color.green_light,j_id,False)
player.world.draw.annotation( label,j_name,color,j_id)
def print_help(self):
print(f"""
---------------------- Joints demonstration ----------------------
Command: {{action/actions/option}}
action : [joint:{{int}}] value
actions: value0,value1,...,valueN
e.g. if N=10, you control all joints from j0 to j10
option: {{h,s,g,l,w,r,"",.}}
Examples:
"6 90" - move joint 6 to 90deg or move joint 6 at 90deg/step
"4" - move last joint to 4deg or apply speed of 4deg/step
"1,9,-35"- move joints 0,1,2 to 1deg, 9deg, -35deg (or speed)
"h" - help, display this message
"s" - toggle position/speed control ({"Posi" if self.enable_pos else "Spee"})
"g" - toggle gravity ({self.enable_gravity})
"l" - toggle labels ({self.enable_labels})
"w" - toggle harmonize* ({self.enable_harmonize})
"r" - reset (position mode + reset joints)
"" - advance 2 simulation step
"." - advance 1 simulation step
"ctrl+c" - quit demonstration
*all joints end moving at the same time when harmonize is True
------------------------------------------------------------------""")
def _user_control_step(self,player:Agent):
while True:
inp = input("Command: ")
if inp == "s":
self.enable_pos = not self.enable_pos
print("Using", "position" if self.enable_pos else "velocity", "control.")
if self.enable_pos:
self.joints_value[:] = player.world.robot.joints_position
else:
self.joints_value.fill(0)
continue
elif inp == "g":
self.enable_gravity = not self.enable_gravity
print("Using gravity:",self.enable_gravity)
continue
elif inp == "l":
self.enable_labels = not self.enable_labels
print("Using labels:",self.enable_labels)
continue
elif inp == "w":
self.enable_harmonize = not self.enable_harmonize
print("Using harmonize:",self.enable_harmonize)
continue
elif inp == "r":
self.enable_pos = True
self.joints_value.fill(0)
print("Using position control. All joints are set to zero.")
continue
elif inp == "h":
self.print_help(); continue
elif inp == "": return 1
elif inp == ".": return 0
try:
if " " in inp:
self.active_joint, value = map(float, inp.split())
self.joints_value[int(self.active_joint)] = value
elif "," in inp:
values = inp.split(",")
self.joints_value[0:len(values)] = values
else:
self.joints_value[self.active_joint] = float(inp)
except:
print("Illegal command!")
continue
def execute(self):
a = self.script.args
player = Agent(a.i, a.p, a.m, a.u, a.r, a.t) # Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name
self.joints_no = player.world.robot.no_of_joints
self.joints_value = np.zeros(self.joints_no) # initialize
player.scom.commit_beam(self.agent_pos[0:2],0)
self.print_help()
# initialize (+beam)
for _ in range(8):
player.scom.commit_and_send()
player.scom.receive()
self._draw_joints(player)
skip_next = 0 # variable to advance more than 1 step
while True:
if skip_next == 0:
skip_next = self._user_control_step(player)
else:
skip_next -= 1
if self.enable_labels:
self._draw_joints(player)
if self.enable_pos:
player.world.robot.set_joints_target_position_direct(slice(self.joints_no), self.joints_value, harmonize=self.enable_harmonize)
else:
player.world.robot.joints_target_speed[:]=self.joints_value * 0.87266463 #deg/step to rad/s
if not self.enable_gravity: player.scom.unofficial_beam(self.agent_pos,0)
player.scom.commit_and_send( player.world.robot.get_command() )
player.scom.receive()

53
scripts/utils/Kick.py Normal file
View File

@@ -0,0 +1,53 @@
from agent.Base_Agent import Base_Agent as Agent
from math_ops.Math_Ops import Math_Ops as M
from scripts.commons.Script import Script
import numpy as np
'''
Objective:
----------
Demonstrate kick
'''
class Kick():
def __init__(self, script:Script) -> None:
self.script = script
def execute(self):
a = self.script.args
player = Agent(a.i, a.p, a.m, a.u, a.r, a.t) # Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name
player.path_manager.draw_options(enable_obstacles=True, enable_path=True) # enable drawings of obstacles and path to ball
behavior = player.behavior
w = player.world
r = w.robot
print("\nThe robot will kick towards the center of the field")
print("Try to manually relocate the ball")
print("Press ctrl+c to return\n")
player.scom.unofficial_set_play_mode("PlayOn")
player.scom.unofficial_beam((-3,0,r.beam_height),0)
vec = (1,0)
while True:
player.scom.unofficial_set_game_time(0)
b = w.ball_abs_pos[:2]
if 0 < np.linalg.norm(w.get_ball_abs_vel(6)) < 0.02: # speed of zero is likely to indicate prolongued inability to see the ball
if np.linalg.norm(w.ball_rel_head_cart_pos[:2]) > 0.5: # update kick if ball is further than 0.5 m
if max(abs(b)) < 0.5:
vec = np.array([6,0])
else:
vec = M.normalize_vec((0,0)-b) * 6
w.draw.point(b+vec, 8, w.draw.Color.pink, "target")
behavior.execute("Basic_Kick", M.vector_angle(vec))
player.scom.commit_and_send( r.get_command() )
player.scom.receive()
if behavior.is_ready("Get_Up"):
player.scom.unofficial_beam((*r.loc_head_position[0:2],r.beam_height),0)
behavior.execute_to_completion("Zero_Bent_Knees")

View File

@@ -0,0 +1,103 @@
from agent.Agent import Agent as Agent
from cpp.localization import localization
from math_ops.Math_Ops import Math_Ops as M
from scripts.commons.Script import Script
from world.commons.Draw import Draw
from world.commons.Other_Robot import Other_Robot
class Localization():
def __init__(self,script:Script) -> None:
self.script = script
def execute(self):
a = self.script.args
d = self.draw = Draw(True, 0, a.i, 32769) # using independent draw object so that the internal agent drawings can be disabled
# Args: Server IP, Agent Port, Monitor Port, Uniform No., Team name, Enable Log, Enable Draw
self.script.batch_create(Agent, ((a.i,a.p,a.m,1,a.t,False,False),)) # one teammate (dummy goalkeeper without communication)
self.script.batch_create(Agent, ((a.i,a.p,a.m,5,"Opponent",False,False),)) # one opponent
self.script.batch_create(Agent, ((a.i,a.p,a.m,9,a.t,False,False),)) # one main agent (the one who draws its world)
# Beam dummy goalkeeper
self.script.batch_unofficial_beam( ((-14,0,0.5,0),), slice(0,1))
p : Agent = self.script.players[-1] # p identifies the main agent
p.scom.unofficial_set_play_mode("PlayOn")
# Execute
while True:
self.script.batch_commit_and_send(slice(0,1)) # dummy agent does not think
self.script.batch_execute_agent(slice(1,None)) # execute normal agents
self.script.batch_receive(slice(0,1), False) # receive & don't update dummy's world state (to save cpu resources)
self.script.batch_receive(slice(1,None)) # receive & update world state
if p.world.vision_is_up_to_date:
if p.world.robot.loc_is_up_to_date: # localization will draw the world of the last agent to be executed
localization.print_python_data() # print data received by the localization module
localization.draw_visible_elements(not p.world.team_side_is_left) # draw visible elements
localization.print_report() # print report with stats
print("\nPress ctrl+c to return.")
d.circle( p.world.ball_abs_pos, 0.1,6,Draw.Color.purple_magenta,"world", False)
else:
d.annotation( p.world.robot.cheat_abs_pos, "Not enough visual data!", Draw.Color.red,"world", False)
for o in p.world.teammates:
if o.state_last_update != 0 and not o.is_self: # skip if other robot was not yet seen
self._draw_other_robot(p, o, Draw.Color.white)
for o in p.world.opponents:
if o.state_last_update != 0: # skip if other robot was not yet seen
self._draw_other_robot(p, o, Draw.Color.red)
d.flush("world")
def _draw_other_robot(self, p:Agent, o:Other_Robot, team_color):
#p - player that sees
#o - other robot (player that is seen)
d = self.draw
white = Draw.Color.white
green = Draw.Color.green_light
gray = Draw.Color.gray_20
time_diff = p.world.time_local_ms - o.state_last_update
if time_diff > 0:
white = Draw.Color.gray_40
green = Draw.Color.get(107, 139, 107)
gray = Draw.Color.gray_50
#orientation
if len(o.state_abs_pos)==3:
line_tip = o.state_abs_pos + (0.5*M.deg_cos(o.state_orientation),0.5*M.deg_sin(o.state_orientation),0)
d.line( o.state_abs_pos, line_tip, 3, white, "world", False)
else:
temp_pos = M.to_3d(o.state_abs_pos, 0.3)
line_tip = temp_pos + (0.5*M.deg_cos(o.state_orientation),0.5*M.deg_sin(o.state_orientation),0)
d.line( temp_pos, line_tip, 3, Draw.Color.yellow, "world", False)
#body parts
for pos in o.state_body_parts_abs_pos.values():
d.sphere( pos, 0.07, green,"world", False)
#player ground area
d.circle( o.state_ground_area[0], o.state_ground_area[1], 6, team_color,"world", False)
#distance
midpoint = (o.state_abs_pos[0:2] + p.world.robot.loc_head_position[0:2])/2
d.line( o.state_abs_pos[0:2], p.world.robot.loc_head_position[0:2], 1, gray, "world", False)
d.annotation( midpoint, f'{o.state_horizontal_dist:.2f}m', white, "world", False)
#velocity
arrow_tip = o.state_abs_pos[0:2] + o.state_filtered_velocity[0:2]
d.arrow( o.state_abs_pos[0:2], arrow_tip, 0.2, 4, green, "world", False)
#state
state_color = white if not o.state_fallen else Draw.Color.yellow
d.annotation( (o.state_abs_pos[0],o.state_abs_pos[1],1),
f"({o.unum}) {'Fallen' if o.state_fallen else 'Normal'}", state_color, "world", False)

View File

@@ -0,0 +1,203 @@
from agent.Base_Agent import Base_Agent as Agent
from cpp.a_star import a_star
from scripts.commons.Script import Script
import numpy as np
import time
'''
::::::::::::::::::::::::::::::::::::::::::
::::::::a_star.compute(param_vec):::::::::
::::::::::::::::::::::::::::::::::::::::::
param_vec (numpy array, float32)
param_vec[0] - start x
param_vec[1] - start y
param_vec[2] - allow path to go out of bounds? (useful when player does not have the ball)
param_vec[3] - go to opposite goal? (path goes to the most efficient part of the goal)
param_vec[4] - target x (only used if param_vec[3]==0)
param_vec[5] - target y (only used if param_vec[3]==0)
param_vec[6] - timeout in us (maximum execution time)
-------------- [optional] ----------------
param_vec[ 7-11] - obstacle 1: x, y, hard radius (max:5m), soft radius (max:5m), repulsive force for soft radius (min:0)
param_vec[12-16] - obstacle 2: x, y, hard radius (max:5m), soft radius (max:5m), repulsive force for soft radius (min:0)
... - obstacle n: x, y, hard radius (max:5m), soft radius (max:5m), repulsive force for soft radius (min:0)
---------------- return ------------------
path_ret : numpy array (float32)
path_ret[:-2]
contains path from start to target (up to a maximum of 1024 positions)
each position is composed of x,y coordinates (so, up to 2048 coordinates)
the return vector is flat (1 dimension) (e.g. [x1,y1,x2,y2,x3,y3,...])
reasons why path may not end in target:
- path is longer than 1024 positions (which is at least 102 meters!)
- reaching target is impossible or timeout (in which case, the path ends in the closest position to target found)
path_ret[-2]
number indicating the path status
0 - success
1 - timeout before the target was reached (may be impossible)
2 - impossible to reach target (all options were tested)
3 - no obstacles between start and target (path_ret[:-2] contains only 2 points: the start and target)
path_ret[-1]
A* path cost
::::::::::::::::::::::::::::::::::::::::::
::::::::::::::::::Notes:::::::::::::::::::
::::::::::::::::::::::::::::::::::::::::::
Map of field:
- The algorithm has a 32m by 22m map with a precision of 10cm (same dimension as field +1 meter border)
- The map contains information about field lines, goal posts and goal net
- The path may go outside the field (out of bounds) if the user allows it,
but it may never go through goal posts or the goal net (these are considered static inaccessible obstacles)
- The user must only specify dynamic obstacles through the arguments
Repulsive force:
- The repulsive force is implemented as an extra cost for the A* algorithm
- The cost for walking 10cm is 1, and the cost for walking diagonally is sqrt(2)
- The extra cost of stepping on a position with a repulsive force f=1 is 1
- For any given position on the field, the repulsive force of >=2 objects is combined with the max function, max(f1,f2), NOT f1+f2!
- If path starts on inaccessible position, it can go to a neighbor inaccessible position but there is a cost of 100 (to avoid inaccessible paths)
Example:
Map 1 Map 2 Map 3
..x.. ..o.. ..o..
..1.. ..o.. .o1..
..o.. ..o.. ..o..
Consider 'Map 1' where 'x' is the target, 'o' is the player, and '1' is a repulsive force of 1
In 'Map 2', the player chooses to go forward, the total cost of this path is: 1+(extra=1)+1 = 3
In 'Map 3', the player avoids the repulsive force, the total cost of this path is: sqrt(2)+sqrt(2) = 2.83 (optimal solution)
Map 1 Map 2 Map 3 Map 4
...x... ..oo... ...o... ...o...
..123.. .o123.. ..o23.. ..1o3..
...o... ..oo... ...o... ...o...
Consider 'Map 1' with 3 positions with 3 distinct repulsive forces, going from 1 to 3.
In 'Map 2', the player avoids all repulsive forces, total cost: 1+sqrt(2)+sqrt(2)+1 = 4.83
In 'Map 3', the player goes through the smallest repulsive force, total cost: sqrt(2)+(extra=1)+sqrt(2) = 3.83 (optimal solution)
In 'Map 4', the player chooses to go forward, total cost: 1+(extra=2)+1 = 4.00
Obstacles:
hard radius: inaccessible obstacle radius (infinite repulsive force)
soft radius: accessible obstacle radius with user-defined repulsive force (fades with distance) (disabled if <= hard radius)
Example:
obstacle(0,0,1,3,5) -> obstacle at pos(0,0) with hard radius of 1m, soft radius of 3m with repulsive force 5
- the path cannot be at <=1m from this obstacle, unless the path were to start inside that radius
- the soft radius force is maximum at the center (5) and fades with distance until (0) at 3m from the obstacle
- so to sum up, at a distance of [0,1]m the force is infinite, [1,3]m the force goes from 3.333 to 0
obstacle(-2.1,3,0,0,0) -> obstacle at pos(-2.1,3) with hard radius of 0m, soft radius of 0m with repulsive force 0
- the path cannot go through (-2.1,3)
obstacle(-2.16,3,0,0,8) -> obstacle at pos(-2.2,3) with hard radius of 0m, soft radius of 0m with repulsive force 8
- the path cannot go through (-2.2,3), the map has a precision of 10cm, so the obstacle is placed at the nearest valid position
- the repulsive force is ignored because (soft radius <= hard radius)
'''
class Pathfinding():
def __init__(self, script:Script) -> None:
self.script = script
a_star.compute(np.zeros(6, np.float32)) # Initialize (not needed, but the first run takes a bit more time)
def draw_grid(self):
d = self.player.world.draw
MAX_RAW_COST = 0.6 # dribble cushion
for x in np.arange(-16,16.01,0.1):
for y in np.arange(-11,11.01,0.1):
s_in, cost_in = a_star.compute(np.array([x, y, 0, 0, x, y, 5000], np.float32))[-2:] # do not allow out of bounds
s_out, cost_out = a_star.compute(np.array([x, y, 1, 0, x, y, 5000], np.float32))[-2:] # allow out of bounds
#print(path_cost_in, path_cost_out)
if s_out != 3:
d.point((x,y), 5, d.Color.red, "grid", False)
elif s_in != 3:
d.point((x,y), 4, d.Color.blue_pale, "grid", False)
elif 0 < cost_in < MAX_RAW_COST + 1e-6:
d.point((x,y), 4, d.Color.get(255,(1-cost_in/MAX_RAW_COST)*255,0), "grid", False)
elif cost_in > MAX_RAW_COST:
d.point((x,y), 4, d.Color.black, "grid", False)
#else:
# d.point((x,y), 4, d.Color.white, "grid", False)
d.flush("grid")
def sync(self):
r = self.player.world.robot
self.player.behavior.head.execute()
self.player.scom.commit_and_send( r.get_command() )
self.player.scom.receive()
def draw_path_and_obstacles(self, obst, path_ret_pb, path_ret_bp):
w = self.player.world
# draw obstacles
for i in range(0,len(obst[0]),5):
w.draw.circle(obst[0][i:i+2], obst[0][i+2], 2, w.draw.Color.red, "obstacles", False)
w.draw.circle(obst[0][i:i+2], obst[0][i+3], 2, w.draw.Color.orange, "obstacles", False)
# draw path
path_pb = path_ret_pb[:-2] # create view without status
path_status_pb = path_ret_pb[-2] # extract status
path_cost_pb = path_ret_pb[-1] # extract A* cost
path_bp = path_ret_bp[:-2] # create view without status
path_status_bp = path_ret_bp[-2] # extract status
path_cost_bp = path_ret_bp[-1] # extract A* cost
c_pb = {0: w.draw.Color.green_lime, 1: w.draw.Color.yellow, 2: w.draw.Color.red, 3: w.draw.Color.blue_light}[path_status_pb]
c_bp = {0: w.draw.Color.green_pale, 1: w.draw.Color.yellow_light, 2: w.draw.Color.red_salmon, 3: w.draw.Color.blue_pale}[path_status_bp]
for i in range(2,len(path_pb)-2,2):
w.draw.line(path_pb[i-2:i],path_pb[i:i+2], 5, c_pb, "path_player_ball", False)
if len(path_pb)>=4:
w.draw.arrow(path_pb[-4:-2],path_pb[-2:],0.4, 5, c_pb, "path_player_ball", False)
for i in range(2,len(path_bp)-2,2):
w.draw.line(path_bp[i-2:i],path_bp[i:i+2], 5, c_bp, "path_ball_player", False)
if len(path_bp)>=4:
w.draw.arrow(path_bp[-4:-2],path_bp[-2:],0.4, 5, c_bp, "path_ball_player", False)
w.draw.flush("obstacles")
w.draw.flush("path_player_ball")
w.draw.flush("path_ball_player")
def move_obstacles(self, obst):
for i in range(len(obst[0])//5):
obst[0][i*5] +=obst[1][i,0]
obst[0][i*5+1]+=obst[1][i,1]
if not -16<obst[0][i*5] <16: obst[1][i,0] *=-1
if not -11<obst[0][i*5+1]<11: obst[1][i,1] *=-1
def execute(self):
a = self.script.args
self.player = Agent(a.i, a.p, a.m, a.u, a.r, a.t) # Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name
w = self.player.world
r = self.player.world.robot
timeout = 5000
go_to_goal = 0
obst_no = 50
obst = [[0,0,0.5,1,1]*obst_no, np.random.uniform(-0.01,0.01,(obst_no,2))] # obst[x,y,h,s,f] + random velocity
print("\nMove player/ball around using RoboViz!")
print("Press ctrl+c to return.")
print("\nPathfinding timeout set to", timeout, "us.")
print("Pathfinding execution time:")
self.draw_grid()
while True:
ball = w.ball_abs_pos[:2]
rpos = r.loc_head_position[:2]
self.move_obstacles(obst)
param_vec_pb = np.array([*rpos, 1, go_to_goal, *ball, timeout, *obst[0]], np.float32) # allow out of bounds (player->ball)
param_vec_bp = np.array([*ball, 0, go_to_goal, *rpos, timeout, *obst[0]], np.float32) # don't allow (ball->player)
t1 = time.time()
path_ret_pb = a_star.compute(param_vec_pb)
t2 = time.time()
path_ret_bp = a_star.compute(param_vec_bp)
t3 = time.time()
print(end=f"\rplayer->ball {int((t2-t1)*1000000):5}us (len:{len(path_ret_pb[:-2])//2:4}) ball->player {int((t3-t2)*1000000):5}us (len:{len(path_ret_bp[:-2])//2:4}) ")
self.draw_path_and_obstacles( obst, path_ret_pb, path_ret_bp )
self.sync()

View File

@@ -0,0 +1,97 @@
from agent.Agent import Agent
from itertools import count
from scripts.commons.Script import Script
from typing import List
from world.commons.Draw import Draw
class Radio_Localization():
def __init__(self,script:Script) -> None:
self.script = script
def draw_objects(self, p:Agent, pos, is_down, was_seen, last_update, is_self=False):
w = p.world
me = w.robot.loc_head_position
# get draw object from same player to always overwrite previous drawing
# could also use team channel but with this approach we could draw for both teams
d:Draw = self.script.players[0].world.draw
# VISUALSTEP_MS is the time it takes to get a visual update
is_current = last_update > w.time_local_ms - w.VISUALSTEP_MS
# 0.12s is the time it takes to do a full broadcast with all positions if every group is completely visible
# here we use >= instead of > because the radio message comes with a delay of 20ms
is_recent = last_update >= w.time_local_ms - 120
if is_current and was_seen:
c = d.Color.green_light # I've seen this object in the current or previous time step
elif is_recent and was_seen:
c = d.Color.green # I've seen this object in the last 0.12s
elif is_current:
c = d.Color.yellow # I've heard about this object in the current or previous time step (and it was not seen in the same period)
elif is_recent:
c = d.Color.yellow_light # I've heard about this object in the last 0.12s (the last available info was not obtained from vision)
else:
c = d.Color.red # I haven't seen or heard about this object in the last 0.12s
if is_self:
if w.robot.radio_fallen_state:
d.annotation(me, "Fallen (radio)", d.Color.yellow, "objects", False) # I heard I've fallen (but I missed the last 2 visual steps)
elif w.robot.loc_head_z < 0.3:
d.annotation(me, "Fallen (internal)", d.Color.white, "objects", False) # I have detected I've fallen
d.sphere(me, 0.06, c, "objects", False)
else:
if is_down:
d.annotation((me[:2]+pos[:2])/2,"Fallen",d.Color.white,"objects",False)
d.arrow(me, pos, 0.1, 3, c, "objects", False)
def draw(self,p:Agent):
w = p.world
others = w.teammates + w.opponents
#----------------------------------------------------------- draw other players
for o in others:
if o.is_self or o.state_last_update==0: # do not draw self or never before seen players
continue
pos = o.state_abs_pos
is_down = o.state_fallen
# 3D head position means head is visible, 2D means some body parts are visible but not the head, or the head position comes from radio
is_3D = pos is not None and len(pos)==3
self.draw_objects(p, pos, is_down, is_3D, o.state_last_update)
#----------------------------------------------------------- draw self
is_pos_from_vision = w.robot.loc_head_position_last_update == w.robot.loc_last_update
self.draw_objects(p, None, None, is_pos_from_vision, w.robot.loc_head_position_last_update, True)
#----------------------------------------------------------- draw ball and flush drawings
self.draw_objects(p, w.ball_abs_pos, False, w.is_ball_abs_pos_from_vision, w.ball_abs_pos_last_update)
self.script.players[0].world.draw.flush("objects")
def execute(self):
a = self.script.args
# Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw
self.script.batch_create(Agent, ((a.i,a.p,a.m,u,a.t, False,u==1) for u in range(1,12)))
self.script.batch_create(Agent, ((a.i,a.p,a.m,u,"Opponent",False,False) for u in range(1,12)))
players : List[Agent] = self.script.players
# Beam opponents
beam_pos = [(-(i//2)-3,(i%2*2-1)*(i//2+1),0) for i in range(11)]
self.script.batch_commit_beam( beam_pos, slice(11,None) )
print("\nPress ctrl+c to return.")
# Execute
for j in count():
self.script.batch_execute_agent( slice(11) ) # run our agents (think and send)
self.script.batch_commit_and_send( slice(11,None) ) # run their agents (don't think, just send)
self.draw(players[j//15%11]) # draw knowledge, iterate through our team, 15 time steps per player
self.script.batch_receive(slice(11)) # receive & update our team's world state
self.script.batch_receive(slice(11,None), False) # receive & don't update opponent's world state (to save cpu resources)

148
scripts/utils/Server.py Normal file
View File

@@ -0,0 +1,148 @@
import os
class Server():
def __init__(self,script) -> None:
if os.path.isdir("/usr/local/share/rcssserver3d/"):
self.source = "/usr/local/share/rcssserver3d/"
elif os.path.isdir("/usr/share/rcssserver3d/"):
self.source = "/usr/share/rcssserver3d/"
else:
raise FileNotFoundError("The server configuration files were not found!")
# To add options: insert into options & explations with same index, read respective value from file or from other values, add edit entry
self.options = ["Official Config", "Penalty Shootout", "Soccer Rules", "Sync Mode", "Real Time", "Cheats", "Full Vision", "Add Noise", "25Hz Monitor"]
self.descriptions = ["Configuration used in official matches", "Server's Penalty Shootout mode", "Play modes, automatic referee, etc.",
"Synchronous communication between agents and server", "Real Time (or maximum server speed)",
"Agent position & orientation, ball position", "See 360 deg instead of 120 deg (vertically & horizontally)",
"Noise added to the position of visible objects", "25Hz Monitor (or 50Hz but RoboViz will show 2x the actual speed)"]
spark_f = os.path.expanduser("~/.simspark/spark.rb")
naoneckhead_f = self.source+"rsg/agent/nao/naoneckhead.rsg"
self.files = {"Penalty Shootout" : self.source + "naosoccersim.rb",
"Soccer Rules" : self.source + "naosoccersim.rb",
"Sync Mode" : spark_f,
"Real Time" : self.source+"rcssserver3d.rb",
"Cheats" : naoneckhead_f,
"Full Vision" : naoneckhead_f,
"Add Noise" : naoneckhead_f,
"25Hz Monitor" : spark_f}
def label(self, setting_name, t_on, t_off):
with open(self.files[setting_name], "r") as sources:
content = sources.read()
if t_on in content:
self.values[setting_name] = "On"
elif t_off in content:
self.values[setting_name] = "Off"
else:
self.values[setting_name] = "Error"
def read_config(self):
v = self.values = dict()
print("Reading server configuration files...")
self.label("Penalty Shootout", "addSoccerVar('PenaltyShootout', true)", "addSoccerVar('PenaltyShootout', false)")
self.label("Soccer Rules", " gameControlServer.initControlAspect('SoccerRuleAspect')", "#gameControlServer.initControlAspect('SoccerRuleAspect')")
self.label("Real Time", "enableRealTimeMode = true", "enableRealTimeMode = false")
self.label("Cheats", "setSenseMyPos true", "setSenseMyPos false")
self.label("Full Vision", "setViewCones 360 360", "setViewCones 120 120")
self.label("Add Noise", "addNoise true", "addNoise false")
self.label("Sync Mode", "agentSyncMode = true", "agentSyncMode = false")
self.label("25Hz Monitor", "monitorStep = 0.04", "monitorStep = 0.02")
is_official_config = (v["Penalty Shootout"] == "Off" and v["Soccer Rules"] == "On" and v["Real Time"] == "On" and v["Cheats"] == "Off" and v["Full Vision"] == "Off" and
v["Add Noise"] == "On" and v["Sync Mode"] == "Off" and v["25Hz Monitor"] == "On")
print(v["Penalty Shootout"], is_official_config)
v["Official Config"] = "On" if is_official_config else "Off"
def change_config(self, setting_name, t_on, t_off, current_value=None, file=None):
if current_value is None:
current_value = self.values[setting_name]
if file is None:
file = self.files[setting_name]
with open(file, "r") as sources:
t = sources.read()
if current_value == "On":
t = t.replace(t_on, t_off, 1)
print(f"Replacing '{t_on}' with '{t_off}' in '{file}'")
elif current_value == "Off":
t = t.replace(t_off, t_on, 1)
print(f"Replacing '{t_off}' with '{t_on}' in '{file}'")
else:
print(setting_name, "was not changed because the value is unknown!")
with open(file, "w") as sources:
sources.write(t)
def execute(self):
while True:
self.read_config()
# Convert convenient values dict to list
values_list = [self.values[o] for o in self.options]
print()
UI.print_table( [self.options, values_list, self.descriptions], ["Setting", "Value", "Description"], numbering=[True, False, False])
choice = UI.read_int('Choose setting (ctrl+c to return): ',0,len(self.options))
opt = self.options[choice]
prefix = ['sudo', 'python3', 'scripts/utils/Server.py', opt]
if opt in self.files:
suffix = [self.values[opt], self.files[opt]]
if opt == "Penalty Shootout":
subprocess.call([*prefix, "addSoccerVar('PenaltyShootout', true)", "addSoccerVar('PenaltyShootout', false)", *suffix])
elif opt == "Soccer Rules":
subprocess.call([*prefix, "gameControlServer.initControlAspect('SoccerRuleAspect')", "#gameControlServer.initControlAspect('SoccerRuleAspect')", *suffix])
elif opt == "Sync Mode":
self.change_config(opt, "agentSyncMode = true", "agentSyncMode = false") # doesn't need sudo privileges
elif opt == "Real Time":
subprocess.call([*prefix, "enableRealTimeMode = true", "enableRealTimeMode = false", *suffix])
elif opt == "Cheats":
subprocess.call([*prefix, "setSenseMyPos true", "setSenseMyPos false", *suffix,
opt, "setSenseMyOrien true", "setSenseMyOrien false", *suffix,
opt, "setSenseBallPos true", "setSenseBallPos false", *suffix])
elif opt == "Full Vision":
subprocess.call([*prefix, "setViewCones 360 360", "setViewCones 120 120", *suffix])
elif opt == "Add Noise":
subprocess.call([*prefix, "addNoise true", "addNoise false", *suffix])
elif opt == "25Hz Monitor":
self.change_config(opt, "monitorStep = 0.04", "monitorStep = 0.02") # doesn't need sudo privileges
elif opt == "Official Config":
if self.values[opt] == "On":
print("The official configuration is already On!")
else: # here, the first option is the official value
subprocess.call([*prefix[:3],
"Penalty Shootout", "addSoccerVar('PenaltyShootout', false)", "addSoccerVar('PenaltyShootout', true)", "Off", self.files["Penalty Shootout"],
"Soccer Rules", "gameControlServer.initControlAspect('SoccerRuleAspect')", "#gameControlServer.initControlAspect('SoccerRuleAspect')", "Off", self.files["Soccer Rules"],
"Sync Mode", "agentSyncMode = false", "agentSyncMode = true", "Off", self.files["Sync Mode"],
"Real Time", "enableRealTimeMode = true", "enableRealTimeMode = false", "Off", self.files["Real Time"],
"Cheats", "setSenseMyPos false", "setSenseMyPos true", "Off", self.files["Cheats"],
"Cheats", "setSenseMyOrien false", "setSenseMyOrien true", "Off", self.files["Cheats"],
"Cheats", "setSenseBallPos false", "setSenseBallPos true", "Off", self.files["Cheats"],
"Full Vision", "setViewCones 120 120", "setViewCones 360 360", "Off", self.files["Full Vision"],
"Add Noise", "addNoise true", "addNoise false", "Off", self.files["Add Noise"],
"25Hz Monitor", "monitorStep = 0.04", "monitorStep = 0.02", "Off", self.files["25Hz Monitor"]])
# process with sudo privileges to change the configuration files
if __name__ == "__main__":
import sys
s = Server(None)
for i in range(1,len(sys.argv),5):
s.change_config( *sys.argv[i:i+5] )
else:
import subprocess
from scripts.commons.UI import UI

View File

@@ -0,0 +1,73 @@
from agent.Base_Agent import Base_Agent as Agent
from itertools import count
from scripts.commons.Script import Script
'''
How does communication work?
The say command allows a player to broadcast a message to everyone on the field
Message range: 50m (the field is 36m diagonally, so ignore this limitation)
The hear perceptor indicates 3 things:
- the message
- the origin team
- the origin absolute angle (set to "self" if the message was sent by oneself)
Messages are heard in the next step.
Messages are only sent every 2 steps (0.04s).
Messages sent in muted steps are only heard by oneself.
In one time step, a player can only hear one other player besides itself.
If two other players say something, only the first message is heard.
This ability exists independetly for messages from both teams.
In theory, a player can hear its own message + the 1st teammate to speak + the 1st opponent to speak
In practice, the opponent doesn't matter because our team's parser ignores messages from other teams
Message characteristics:
Maximum 20 characters, ascii between 0x20, 0x7E except ' ', '(', ')'
Accepted: letters+numbers+symbols: !"#$%&'*+,-./:;<=>?@[\]^_`{|}~
However, due to a server bug, sending ' or " ends the message sooner
'''
class Team_Communication():
def __init__(self,script:Script) -> None:
self.script = script
def player1_hear(self, msg:bytes, direction, timestamp:float) -> None:
print(f"Player 1 heard: {msg.decode():20} from:{direction:7} timestamp:{timestamp}")
def player2_hear(self, msg:bytes, direction, timestamp:float) -> None:
print(f"Player 2 heard: {msg.decode():20} from:{direction:7} timestamp:{timestamp}")
def player3_hear(self, msg:bytes, direction, timestamp:float) -> None:
print(f"Player 3 heard: {msg.decode():20} from:{direction:7} timestamp:{timestamp}")
def execute(self):
a = self.script.args
hear_callbacks = (self.player1_hear, self.player2_hear, self.player3_hear)
# Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw, Play Mode Correction, Wait for Server, Hear Callback
self.script.batch_create(Agent, ((a.i,a.p,a.m,i+1,0,a.t,True,True,False,True,clbk) for i,clbk in enumerate(hear_callbacks)))
p1:Agent = self.script.players[0]
p2:Agent = self.script.players[1]
p3:Agent = self.script.players[2]
# Beam players
self.script.batch_commit_beam( [(-2,i,45) for i in range(3)] )
for i in count():
msg1 = b"I_am_p1!_no:"+str(i).encode()
msg2 = b"I_am_p2!_no:"+str(i).encode()
msg3 = b"I_am_p3!_no:"+str(i).encode()
p1.scom.commit_announcement(msg1) # commit message
p2.scom.commit_announcement(msg2) # commit message
p3.scom.commit_announcement(msg3) # commit message
self.script.batch_commit_and_send() # send message
print(f"Player 1 sent: {msg1.decode()} HEX: {' '.join([f'{m:02X}' for m in msg1])}")
print(f"Player 2 sent: {msg2.decode()} HEX: {' '.join([f'{m:02X}' for m in msg2])}")
print(f"Player 3 sent: {msg3.decode()} HEX: {' '.join([f'{m:02X}' for m in msg3])}")
self.script.batch_receive()
input("Press enter to continue or ctrl+c to return.")

Binary file not shown.