You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
431 lines
22 KiB
Python
431 lines
22 KiB
Python
7 months ago
|
from math_ops.Math_Ops import Math_Ops as M
|
||
|
from world.Robot import Robot
|
||
|
from world.World import World
|
||
|
import math
|
||
|
import numpy as np
|
||
|
|
||
|
|
||
|
class World_Parser():
|
||
|
def __init__(self, world:World, hear_callback) -> None:
|
||
|
self.LOG_PREFIX = "World_Parser.py: "
|
||
|
self.world = world
|
||
|
self.hear_callback = hear_callback
|
||
|
self.exp = None
|
||
|
self.depth = None
|
||
|
self.LEFT_SIDE_FLAGS = {b'F2L':(-15,-10,0),
|
||
|
b'F1L':(-15,+10,0),
|
||
|
b'F2R':(+15,-10,0),
|
||
|
b'F1R':(+15,+10,0),
|
||
|
b'G2L':(-15,-1.05,0.8),
|
||
|
b'G1L':(-15,+1.05,0.8),
|
||
|
b'G2R':(+15,-1.05,0.8),
|
||
|
b'G1R':(+15,+1.05,0.8)} #mapping between flag names and their corrected location, when playing on the left side
|
||
|
self.RIGHT_SIDE_FLAGS = {b'F2L':(+15,+10,0),
|
||
|
b'F1L':(+15,-10,0),
|
||
|
b'F2R':(-15,+10,0),
|
||
|
b'F1R':(-15,-10,0),
|
||
|
b'G2L':(+15,+1.05,0.8),
|
||
|
b'G1L':(+15,-1.05,0.8),
|
||
|
b'G2R':(-15,+1.05,0.8),
|
||
|
b'G1R':(-15,-1.05,0.8)}
|
||
|
self.play_mode_to_id = None
|
||
|
self.LEFT_PLAY_MODE_TO_ID = {"KickOff_Left":World.M_OUR_KICKOFF, "KickIn_Left":World.M_OUR_KICK_IN, "corner_kick_left":World.M_OUR_CORNER_KICK,
|
||
|
"goal_kick_left":World.M_OUR_GOAL_KICK, "free_kick_left":World.M_OUR_FREE_KICK, "pass_left":World.M_OUR_PASS,
|
||
|
"direct_free_kick_left": World.M_OUR_DIR_FREE_KICK, "Goal_Left": World.M_OUR_GOAL, "offside_left": World.M_OUR_OFFSIDE,
|
||
|
"KickOff_Right":World.M_THEIR_KICKOFF, "KickIn_Right":World.M_THEIR_KICK_IN, "corner_kick_right":World.M_THEIR_CORNER_KICK,
|
||
|
"goal_kick_right":World.M_THEIR_GOAL_KICK, "free_kick_right":World.M_THEIR_FREE_KICK, "pass_right":World.M_THEIR_PASS,
|
||
|
"direct_free_kick_right": World.M_THEIR_DIR_FREE_KICK, "Goal_Right": World.M_THEIR_GOAL, "offside_right": World.M_THEIR_OFFSIDE,
|
||
|
"BeforeKickOff": World.M_BEFORE_KICKOFF, "GameOver": World.M_GAME_OVER, "PlayOn": World.M_PLAY_ON }
|
||
|
self.RIGHT_PLAY_MODE_TO_ID = {"KickOff_Left":World.M_THEIR_KICKOFF, "KickIn_Left":World.M_THEIR_KICK_IN, "corner_kick_left":World.M_THEIR_CORNER_KICK,
|
||
|
"goal_kick_left":World.M_THEIR_GOAL_KICK, "free_kick_left":World.M_THEIR_FREE_KICK, "pass_left":World.M_THEIR_PASS,
|
||
|
"direct_free_kick_left": World.M_THEIR_DIR_FREE_KICK, "Goal_Left": World.M_THEIR_GOAL, "offside_left": World.M_THEIR_OFFSIDE,
|
||
|
"KickOff_Right":World.M_OUR_KICKOFF, "KickIn_Right":World.M_OUR_KICK_IN, "corner_kick_right":World.M_OUR_CORNER_KICK,
|
||
|
"goal_kick_right":World.M_OUR_GOAL_KICK, "free_kick_right":World.M_OUR_FREE_KICK, "pass_right":World.M_OUR_PASS,
|
||
|
"direct_free_kick_right": World.M_OUR_DIR_FREE_KICK, "Goal_Right": World.M_OUR_GOAL, "offside_right": World.M_OUR_OFFSIDE,
|
||
|
"BeforeKickOff": World.M_BEFORE_KICKOFF, "GameOver": World.M_GAME_OVER, "PlayOn": World.M_PLAY_ON }
|
||
|
|
||
|
|
||
|
def find_non_digit(self,start):
|
||
|
while True:
|
||
|
if (self.exp[start] < ord('0') or self.exp[start] > ord('9')) and self.exp[start] != ord('.'): return start
|
||
|
start+=1
|
||
|
|
||
|
def find_char(self,start,char):
|
||
|
while True:
|
||
|
if self.exp[start] == char : return start
|
||
|
start+=1
|
||
|
|
||
|
def read_float(self, start):
|
||
|
if self.exp[start:start+3] == b'nan': return float('nan'), start+3 #handle nan values (they exist)
|
||
|
end = self.find_non_digit(start+1) #we assume the first one is a digit or minus sign
|
||
|
try:
|
||
|
retval = float(self.exp[start:end])
|
||
|
except:
|
||
|
self.world.log(f"{self.LOG_PREFIX}String to float conversion failed: {self.exp[start:end]} at msg[{start},{end}], \nMsg: {self.exp.decode()}")
|
||
|
retval = 0
|
||
|
return retval, end
|
||
|
|
||
|
def read_int(self, start):
|
||
|
end = self.find_non_digit(start+1) #we assume the first one is a digit or minus sign
|
||
|
return int(self.exp[start:end]), end
|
||
|
|
||
|
def read_bytes(self, start):
|
||
|
end = start
|
||
|
while True:
|
||
|
if self.exp[end] == ord(' ') or self.exp[end] == ord(')'): break
|
||
|
end+=1
|
||
|
|
||
|
return self.exp[start:end], end
|
||
|
|
||
|
def read_str(self, start):
|
||
|
b, end = self.read_bytes(start)
|
||
|
return b.decode(), end
|
||
|
|
||
|
def get_next_tag(self, start):
|
||
|
min_depth = self.depth
|
||
|
while True:
|
||
|
if self.exp[start] == ord(")") : #monitor xml element depth
|
||
|
self.depth -= 1
|
||
|
if min_depth > self.depth: min_depth = self.depth
|
||
|
elif self.exp[start] == ord("(") : break
|
||
|
start+=1
|
||
|
if start >= len(self.exp): return None, start, 0
|
||
|
|
||
|
self.depth += 1
|
||
|
start += 1
|
||
|
end = self.find_char(start, ord(" "))
|
||
|
return self.exp[start:end], end, min_depth
|
||
|
|
||
|
|
||
|
def parse(self, exp):
|
||
|
|
||
|
self.exp = exp #used by other member functions
|
||
|
self.depth = 0 #xml element depth
|
||
|
self.world.step += 1
|
||
|
self.world.line_count = 0
|
||
|
self.world.robot.frp = dict()
|
||
|
self.world.flags_posts = dict()
|
||
|
self.world.flags_corners = dict()
|
||
|
self.world.vision_is_up_to_date = False
|
||
|
self.world.ball_is_visible = False
|
||
|
self.world.robot.feet_toes_are_touching = dict.fromkeys(self.world.robot.feet_toes_are_touching, False)
|
||
|
self.world.time_local_ms += World.STEPTIME_MS
|
||
|
|
||
|
for p in self.world.teammates: p.is_visible = False
|
||
|
for p in self.world.opponents: p.is_visible = False
|
||
|
|
||
|
tag, end, _ = self.get_next_tag(0)
|
||
|
|
||
|
while end < len(exp):
|
||
|
|
||
|
if tag==b'time':
|
||
|
while True:
|
||
|
tag, end, min_depth = self.get_next_tag(end)
|
||
|
if min_depth == 0: break
|
||
|
|
||
|
if tag==b'now':
|
||
|
#last_time = self.world.time_server
|
||
|
self.world.time_server, end = self.read_float(end+1)
|
||
|
|
||
|
#Test server time reliability
|
||
|
#increment = self.world.time_server - last_time
|
||
|
#if increment < 0.019: print ("down",last_time,self.world.time_server)
|
||
|
#if increment > 0.021: print ("up",last_time,self.world.time_server)
|
||
|
else:
|
||
|
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'time': {tag} at {end}, \nMsg: {exp.decode()}")
|
||
|
|
||
|
|
||
|
elif tag==b'GS':
|
||
|
while True:
|
||
|
tag, end, min_depth = self.get_next_tag(end)
|
||
|
if min_depth == 0: break
|
||
|
|
||
|
if tag==b'unum':
|
||
|
_, end = self.read_int(end+1) #We already know our unum
|
||
|
elif tag==b'team':
|
||
|
aux, end = self.read_str(end+1)
|
||
|
is_left = bool(aux == "left")
|
||
|
if self.world.team_side_is_left != is_left:
|
||
|
self.world.team_side_is_left = is_left
|
||
|
self.play_mode_to_id = self.LEFT_PLAY_MODE_TO_ID if is_left else self.RIGHT_PLAY_MODE_TO_ID
|
||
|
self.world.draw.set_team_side(not is_left)
|
||
|
self.world.team_draw.set_team_side(not is_left)
|
||
|
elif tag==b'sl':
|
||
|
if self.world.team_side_is_left:
|
||
|
self.world.goals_scored, end = self.read_int(end+1)
|
||
|
else:
|
||
|
self.world.goals_conceded, end = self.read_int(end+1)
|
||
|
elif tag==b'sr':
|
||
|
if self.world.team_side_is_left:
|
||
|
self.world.goals_conceded, end = self.read_int(end+1)
|
||
|
else:
|
||
|
self.world.goals_scored, end = self.read_int(end+1)
|
||
|
elif tag==b't':
|
||
|
self.world.time_game, end = self.read_float(end+1)
|
||
|
elif tag==b'pm':
|
||
|
aux, end = self.read_str(end+1)
|
||
|
if self.play_mode_to_id is not None:
|
||
|
self.world.play_mode = self.play_mode_to_id[aux]
|
||
|
else:
|
||
|
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'GS': {tag} at {end}, \nMsg: {exp.decode()}")
|
||
|
|
||
|
|
||
|
elif tag==b'GYR':
|
||
|
while True:
|
||
|
tag, end, min_depth = self.get_next_tag(end)
|
||
|
if min_depth == 0: break
|
||
|
|
||
|
'''
|
||
|
The gyroscope measures the robot's torso angular velocity (rotation rate vector)
|
||
|
The angular velocity's orientation is given by the right-hand rule.
|
||
|
|
||
|
Original reference frame:
|
||
|
X:left(-)/right(+) Y:back(-)/front(+) Z:down(-)/up(+)
|
||
|
|
||
|
New reference frame:
|
||
|
X:back(-)/front(+) Y:right(-)/left(+) Z:down(-)/up(+)
|
||
|
|
||
|
'''
|
||
|
|
||
|
if tag==b'n':
|
||
|
pass
|
||
|
elif tag==b'rt':
|
||
|
self.world.robot.gyro[1], end = self.read_float(end+1)
|
||
|
self.world.robot.gyro[0], end = self.read_float(end+1)
|
||
|
self.world.robot.gyro[2], end = self.read_float(end+1)
|
||
|
self.world.robot.gyro[1] *= -1
|
||
|
else:
|
||
|
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'GYR': {tag} at {end}, \nMsg: {exp.decode()}")
|
||
|
|
||
|
|
||
|
elif tag==b'ACC':
|
||
|
while True:
|
||
|
tag, end, min_depth = self.get_next_tag(end)
|
||
|
if min_depth == 0: break
|
||
|
|
||
|
'''
|
||
|
The accelerometer measures the acceleration relative to freefall. It will read zero during any type of free fall.
|
||
|
When at rest relative to the Earth's surface, it will indicate an upwards acceleration of 9.81m/s^2 (in SimSpark).
|
||
|
|
||
|
Original reference frame:
|
||
|
X:left(-)/right(+) Y:back(-)/front(+) Z:down(-)/up(+)
|
||
|
|
||
|
New reference frame:
|
||
|
X:back(-)/front(+) Y:right(-)/left(+) Z:down(-)/up(+)
|
||
|
'''
|
||
|
|
||
|
if tag==b'n':
|
||
|
pass
|
||
|
elif tag==b'a':
|
||
|
self.world.robot.acc[1], end = self.read_float(end+1)
|
||
|
self.world.robot.acc[0], end = self.read_float(end+1)
|
||
|
self.world.robot.acc[2], end = self.read_float(end+1)
|
||
|
self.world.robot.acc[1] *= -1
|
||
|
else:
|
||
|
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'ACC': {tag} at {end}, \nMsg: {exp.decode()}")
|
||
|
|
||
|
|
||
|
elif tag==b'HJ':
|
||
|
while True:
|
||
|
tag, end, min_depth = self.get_next_tag(end)
|
||
|
if min_depth == 0: break
|
||
|
|
||
|
if tag==b'n':
|
||
|
joint_name, end = self.read_str(end+1)
|
||
|
joint_index = Robot.MAP_PERCEPTOR_TO_INDEX[joint_name]
|
||
|
elif tag==b'ax':
|
||
|
joint_angle, end = self.read_float(end+1)
|
||
|
|
||
|
#Fix symmetry issues 2/4 (perceptors)
|
||
|
if joint_name in Robot.FIX_PERCEPTOR_SET: joint_angle = -joint_angle
|
||
|
|
||
|
old_angle = self.world.robot.joints_position[joint_index]
|
||
|
self.world.robot.joints_speed[joint_index] = (joint_angle - old_angle) / World.STEPTIME * math.pi / 180
|
||
|
self.world.robot.joints_position[joint_index] = joint_angle
|
||
|
else:
|
||
|
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'HJ': {tag} at {end}, \nMsg: {exp.decode()}")
|
||
|
|
||
|
elif tag==b'FRP':
|
||
|
while True:
|
||
|
tag, end, min_depth = self.get_next_tag(end)
|
||
|
if min_depth == 0: break
|
||
|
|
||
|
'''
|
||
|
The reference frame is used for the contact point and force vector applied to that point
|
||
|
Note: The force vector is applied to the foot, so it usually points up
|
||
|
|
||
|
Original reference frame:
|
||
|
X:left(-)/right(+) Y:back(-)/front(+) Z:down(-)/up(+)
|
||
|
|
||
|
New reference frame:
|
||
|
X:back(-)/front(+) Y:right(-)/left(+) Z:down(-)/up(+)
|
||
|
|
||
|
'''
|
||
|
|
||
|
if tag==b'n':
|
||
|
foot_toe_id, end = self.read_str(end+1)
|
||
|
self.world.robot.frp[foot_toe_id] = foot_toe_ref = np.empty(6)
|
||
|
self.world.robot.feet_toes_last_touch[foot_toe_id] = self.world.time_local_ms
|
||
|
self.world.robot.feet_toes_are_touching[foot_toe_id] = True
|
||
|
elif tag==b'c':
|
||
|
foot_toe_ref[1], end = self.read_float(end+1)
|
||
|
foot_toe_ref[0], end = self.read_float(end+1)
|
||
|
foot_toe_ref[2], end = self.read_float(end+1)
|
||
|
foot_toe_ref[1] *= -1
|
||
|
elif tag==b'f':
|
||
|
foot_toe_ref[4], end = self.read_float(end+1)
|
||
|
foot_toe_ref[3], end = self.read_float(end+1)
|
||
|
foot_toe_ref[5], end = self.read_float(end+1)
|
||
|
foot_toe_ref[4] *= -1
|
||
|
else:
|
||
|
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'FRP': {tag} at {end}, \nMsg: {exp.decode()}")
|
||
|
|
||
|
|
||
|
elif tag==b'See':
|
||
|
self.world.vision_is_up_to_date = True
|
||
|
self.world.vision_last_update = self.world.time_local_ms
|
||
|
|
||
|
while True:
|
||
|
tag, end, min_depth = self.get_next_tag(end)
|
||
|
if min_depth == 0: break
|
||
|
|
||
|
tag_bytes = bytes(tag) #since bytearray is not hashable, it cannot be used as key for dictionaries
|
||
|
|
||
|
if tag==b'G1R' or tag==b'G2R' or tag==b'G1L' or tag==b'G2L':
|
||
|
_, end, _ = self.get_next_tag(end)
|
||
|
|
||
|
c1, end = self.read_float(end+1)
|
||
|
c2, end = self.read_float(end+1)
|
||
|
c3, end = self.read_float(end+1)
|
||
|
|
||
|
aux = self.LEFT_SIDE_FLAGS[tag_bytes] if self.world.team_side_is_left else self.RIGHT_SIDE_FLAGS[tag_bytes]
|
||
|
self.world.flags_posts[aux] = (c1,c2,c3)
|
||
|
|
||
|
elif tag==b'F1R' or tag==b'F2R' or tag==b'F1L' or tag==b'F2L':
|
||
|
_, end, _ = self.get_next_tag(end)
|
||
|
|
||
|
c1, end = self.read_float(end+1)
|
||
|
c2, end = self.read_float(end+1)
|
||
|
c3, end = self.read_float(end+1)
|
||
|
|
||
|
aux = self.LEFT_SIDE_FLAGS[tag_bytes] if self.world.team_side_is_left else self.RIGHT_SIDE_FLAGS[tag_bytes]
|
||
|
self.world.flags_corners[aux] = (c1,c2,c3)
|
||
|
|
||
|
elif tag==b'B':
|
||
|
_, end, _ = self.get_next_tag(end)
|
||
|
|
||
|
self.world.ball_rel_head_sph_pos[0], end = self.read_float(end+1)
|
||
|
self.world.ball_rel_head_sph_pos[1], end = self.read_float(end+1)
|
||
|
self.world.ball_rel_head_sph_pos[2], end = self.read_float(end+1)
|
||
|
self.world.ball_rel_head_cart_pos = M.deg_sph2cart(self.world.ball_rel_head_sph_pos)
|
||
|
self.world.ball_is_visible = True
|
||
|
self.world.ball_last_seen = self.world.time_local_ms
|
||
|
|
||
|
elif tag==b'mypos':
|
||
|
|
||
|
self.world.robot.cheat_abs_pos[0], end = self.read_float(end+1)
|
||
|
self.world.robot.cheat_abs_pos[1], end = self.read_float(end+1)
|
||
|
self.world.robot.cheat_abs_pos[2], end = self.read_float(end+1)
|
||
|
|
||
|
elif tag==b'myorien':
|
||
|
|
||
|
self.world.robot.cheat_ori, end = self.read_float(end+1)
|
||
|
|
||
|
elif tag==b'ballpos':
|
||
|
|
||
|
c1, end = self.read_float(end+1)
|
||
|
c2, end = self.read_float(end+1)
|
||
|
c3, end = self.read_float(end+1)
|
||
|
|
||
|
self.world.ball_cheat_abs_vel[0] = (c1 - self.world.ball_cheat_abs_pos[0]) / World.VISUALSTEP
|
||
|
self.world.ball_cheat_abs_vel[1] = (c2 - self.world.ball_cheat_abs_pos[1]) / World.VISUALSTEP
|
||
|
self.world.ball_cheat_abs_vel[2] = (c3 - self.world.ball_cheat_abs_pos[2]) / World.VISUALSTEP
|
||
|
|
||
|
self.world.ball_cheat_abs_pos[0] = c1
|
||
|
self.world.ball_cheat_abs_pos[1] = c2
|
||
|
self.world.ball_cheat_abs_pos[2] = c3
|
||
|
|
||
|
elif tag==b'P':
|
||
|
|
||
|
while True:
|
||
|
previous_depth = self.depth
|
||
|
previous_end = end
|
||
|
tag, end, min_depth = self.get_next_tag(end)
|
||
|
if min_depth < 2: #if =1 we are still inside 'See', if =0 we are already outside 'See'
|
||
|
end = previous_end #The "P" tag is special because it's the only variable particle inside 'See'
|
||
|
self.depth = previous_depth
|
||
|
break # we restore the previous tag, and let 'See' handle it
|
||
|
|
||
|
if tag==b'team':
|
||
|
player_team, end = self.read_str(end+1)
|
||
|
is_teammate = bool(player_team == self.world.team_name)
|
||
|
if self.world.team_name_opponent is None and not is_teammate: #register opponent team name
|
||
|
self.world.team_name_opponent = player_team
|
||
|
elif tag==b'id':
|
||
|
player_id, end = self.read_int(end+1)
|
||
|
player = self.world.teammates[player_id-1] if is_teammate else self.world.opponents[player_id-1]
|
||
|
player.body_parts_cart_rel_pos = dict() #reset seen body parts
|
||
|
player.is_visible = True
|
||
|
elif tag==b'llowerarm' or tag==b'rlowerarm' or tag==b'lfoot' or tag==b'rfoot' or tag==b'head':
|
||
|
tag_str = tag.decode()
|
||
|
_, end, _ = self.get_next_tag(end)
|
||
|
|
||
|
c1, end = self.read_float(end+1)
|
||
|
c2, end = self.read_float(end+1)
|
||
|
c3, end = self.read_float(end+1)
|
||
|
|
||
|
if is_teammate:
|
||
|
self.world.teammates[player_id-1].body_parts_sph_rel_pos[tag_str] = (c1,c2,c3)
|
||
|
self.world.teammates[player_id-1].body_parts_cart_rel_pos[tag_str] = M.deg_sph2cart((c1,c2,c3))
|
||
|
else:
|
||
|
self.world.opponents[player_id-1].body_parts_sph_rel_pos[tag_str] = (c1,c2,c3)
|
||
|
self.world.opponents[player_id-1].body_parts_cart_rel_pos[tag_str] = M.deg_sph2cart((c1,c2,c3))
|
||
|
else:
|
||
|
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'P': {tag} at {end}, \nMsg: {exp.decode()}")
|
||
|
|
||
|
elif tag==b'L':
|
||
|
l = self.world.lines[self.world.line_count]
|
||
|
|
||
|
_, end, _ = self.get_next_tag(end)
|
||
|
l[0], end = self.read_float(end+1)
|
||
|
l[1], end = self.read_float(end+1)
|
||
|
l[2], end = self.read_float(end+1)
|
||
|
_, end, _ = self.get_next_tag(end)
|
||
|
l[3], end = self.read_float(end+1)
|
||
|
l[4], end = self.read_float(end+1)
|
||
|
l[5], end = self.read_float(end+1)
|
||
|
|
||
|
if np.isnan(l).any():
|
||
|
self.world.log(f"{self.LOG_PREFIX}Received field line with NaNs {l}")
|
||
|
else:
|
||
|
self.world.line_count += 1 #accept field line if there are no NaNs
|
||
|
|
||
|
else:
|
||
|
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'see': {tag} at {end}, \nMsg: {exp.decode()}")
|
||
|
|
||
|
|
||
|
elif tag==b'hear':
|
||
|
|
||
|
team_name, end = self.read_str(end+1)
|
||
|
|
||
|
if team_name == self.world.team_name: # discard message if it's not from our team
|
||
|
|
||
|
timestamp, end = self.read_float(end+1)
|
||
|
|
||
|
if self.exp[end+1] == ord('s'): # this message was sent by oneself
|
||
|
direction, end = "self", end+5
|
||
|
else: # this message was sent by teammate
|
||
|
direction, end = self.read_float(end+1)
|
||
|
|
||
|
msg, end = self.read_bytes(end+1)
|
||
|
self.hear_callback(msg, direction, timestamp)
|
||
|
|
||
|
|
||
|
tag, end, _ = self.get_next_tag(end)
|
||
|
|
||
|
|
||
|
else:
|
||
|
self.world.log(f"{self.LOG_PREFIX}Unknown root tag: {tag} at {end}, \nMsg: {exp.decode()}")
|
||
|
tag, end, min_depth = self.get_next_tag(end)
|
||
|
|