You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Dribble/communication/World_Parser.py

431 lines
22 KiB
Python

9 months ago
from math_ops.Math_Ops import Math_Ops as M
from world.Robot import Robot
from world.World import World
import math
import numpy as np
class World_Parser():
def __init__(self, world:World, hear_callback) -> None:
self.LOG_PREFIX = "World_Parser.py: "
self.world = world
self.hear_callback = hear_callback
self.exp = None
self.depth = None
self.LEFT_SIDE_FLAGS = {b'F2L':(-15,-10,0),
b'F1L':(-15,+10,0),
b'F2R':(+15,-10,0),
b'F1R':(+15,+10,0),
b'G2L':(-15,-1.05,0.8),
b'G1L':(-15,+1.05,0.8),
b'G2R':(+15,-1.05,0.8),
b'G1R':(+15,+1.05,0.8)} #mapping between flag names and their corrected location, when playing on the left side
self.RIGHT_SIDE_FLAGS = {b'F2L':(+15,+10,0),
b'F1L':(+15,-10,0),
b'F2R':(-15,+10,0),
b'F1R':(-15,-10,0),
b'G2L':(+15,+1.05,0.8),
b'G1L':(+15,-1.05,0.8),
b'G2R':(-15,+1.05,0.8),
b'G1R':(-15,-1.05,0.8)}
self.play_mode_to_id = None
self.LEFT_PLAY_MODE_TO_ID = {"KickOff_Left":World.M_OUR_KICKOFF, "KickIn_Left":World.M_OUR_KICK_IN, "corner_kick_left":World.M_OUR_CORNER_KICK,
"goal_kick_left":World.M_OUR_GOAL_KICK, "free_kick_left":World.M_OUR_FREE_KICK, "pass_left":World.M_OUR_PASS,
"direct_free_kick_left": World.M_OUR_DIR_FREE_KICK, "Goal_Left": World.M_OUR_GOAL, "offside_left": World.M_OUR_OFFSIDE,
"KickOff_Right":World.M_THEIR_KICKOFF, "KickIn_Right":World.M_THEIR_KICK_IN, "corner_kick_right":World.M_THEIR_CORNER_KICK,
"goal_kick_right":World.M_THEIR_GOAL_KICK, "free_kick_right":World.M_THEIR_FREE_KICK, "pass_right":World.M_THEIR_PASS,
"direct_free_kick_right": World.M_THEIR_DIR_FREE_KICK, "Goal_Right": World.M_THEIR_GOAL, "offside_right": World.M_THEIR_OFFSIDE,
"BeforeKickOff": World.M_BEFORE_KICKOFF, "GameOver": World.M_GAME_OVER, "PlayOn": World.M_PLAY_ON }
self.RIGHT_PLAY_MODE_TO_ID = {"KickOff_Left":World.M_THEIR_KICKOFF, "KickIn_Left":World.M_THEIR_KICK_IN, "corner_kick_left":World.M_THEIR_CORNER_KICK,
"goal_kick_left":World.M_THEIR_GOAL_KICK, "free_kick_left":World.M_THEIR_FREE_KICK, "pass_left":World.M_THEIR_PASS,
"direct_free_kick_left": World.M_THEIR_DIR_FREE_KICK, "Goal_Left": World.M_THEIR_GOAL, "offside_left": World.M_THEIR_OFFSIDE,
"KickOff_Right":World.M_OUR_KICKOFF, "KickIn_Right":World.M_OUR_KICK_IN, "corner_kick_right":World.M_OUR_CORNER_KICK,
"goal_kick_right":World.M_OUR_GOAL_KICK, "free_kick_right":World.M_OUR_FREE_KICK, "pass_right":World.M_OUR_PASS,
"direct_free_kick_right": World.M_OUR_DIR_FREE_KICK, "Goal_Right": World.M_OUR_GOAL, "offside_right": World.M_OUR_OFFSIDE,
"BeforeKickOff": World.M_BEFORE_KICKOFF, "GameOver": World.M_GAME_OVER, "PlayOn": World.M_PLAY_ON }
def find_non_digit(self,start):
while True:
if (self.exp[start] < ord('0') or self.exp[start] > ord('9')) and self.exp[start] != ord('.'): return start
start+=1
def find_char(self,start,char):
while True:
if self.exp[start] == char : return start
start+=1
def read_float(self, start):
if self.exp[start:start+3] == b'nan': return float('nan'), start+3 #handle nan values (they exist)
end = self.find_non_digit(start+1) #we assume the first one is a digit or minus sign
try:
retval = float(self.exp[start:end])
except:
self.world.log(f"{self.LOG_PREFIX}String to float conversion failed: {self.exp[start:end]} at msg[{start},{end}], \nMsg: {self.exp.decode()}")
retval = 0
return retval, end
def read_int(self, start):
end = self.find_non_digit(start+1) #we assume the first one is a digit or minus sign
return int(self.exp[start:end]), end
def read_bytes(self, start):
end = start
while True:
if self.exp[end] == ord(' ') or self.exp[end] == ord(')'): break
end+=1
return self.exp[start:end], end
def read_str(self, start):
b, end = self.read_bytes(start)
return b.decode(), end
def get_next_tag(self, start):
min_depth = self.depth
while True:
if self.exp[start] == ord(")") : #monitor xml element depth
self.depth -= 1
if min_depth > self.depth: min_depth = self.depth
elif self.exp[start] == ord("(") : break
start+=1
if start >= len(self.exp): return None, start, 0
self.depth += 1
start += 1
end = self.find_char(start, ord(" "))
return self.exp[start:end], end, min_depth
def parse(self, exp):
self.exp = exp #used by other member functions
self.depth = 0 #xml element depth
self.world.step += 1
self.world.line_count = 0
self.world.robot.frp = dict()
self.world.flags_posts = dict()
self.world.flags_corners = dict()
self.world.vision_is_up_to_date = False
self.world.ball_is_visible = False
self.world.robot.feet_toes_are_touching = dict.fromkeys(self.world.robot.feet_toes_are_touching, False)
self.world.time_local_ms += World.STEPTIME_MS
for p in self.world.teammates: p.is_visible = False
for p in self.world.opponents: p.is_visible = False
tag, end, _ = self.get_next_tag(0)
while end < len(exp):
if tag==b'time':
while True:
tag, end, min_depth = self.get_next_tag(end)
if min_depth == 0: break
if tag==b'now':
#last_time = self.world.time_server
self.world.time_server, end = self.read_float(end+1)
#Test server time reliability
#increment = self.world.time_server - last_time
#if increment < 0.019: print ("down",last_time,self.world.time_server)
#if increment > 0.021: print ("up",last_time,self.world.time_server)
else:
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'time': {tag} at {end}, \nMsg: {exp.decode()}")
elif tag==b'GS':
while True:
tag, end, min_depth = self.get_next_tag(end)
if min_depth == 0: break
if tag==b'unum':
_, end = self.read_int(end+1) #We already know our unum
elif tag==b'team':
aux, end = self.read_str(end+1)
is_left = bool(aux == "left")
if self.world.team_side_is_left != is_left:
self.world.team_side_is_left = is_left
self.play_mode_to_id = self.LEFT_PLAY_MODE_TO_ID if is_left else self.RIGHT_PLAY_MODE_TO_ID
self.world.draw.set_team_side(not is_left)
self.world.team_draw.set_team_side(not is_left)
elif tag==b'sl':
if self.world.team_side_is_left:
self.world.goals_scored, end = self.read_int(end+1)
else:
self.world.goals_conceded, end = self.read_int(end+1)
elif tag==b'sr':
if self.world.team_side_is_left:
self.world.goals_conceded, end = self.read_int(end+1)
else:
self.world.goals_scored, end = self.read_int(end+1)
elif tag==b't':
self.world.time_game, end = self.read_float(end+1)
elif tag==b'pm':
aux, end = self.read_str(end+1)
if self.play_mode_to_id is not None:
self.world.play_mode = self.play_mode_to_id[aux]
else:
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'GS': {tag} at {end}, \nMsg: {exp.decode()}")
elif tag==b'GYR':
while True:
tag, end, min_depth = self.get_next_tag(end)
if min_depth == 0: break
'''
The gyroscope measures the robot's torso angular velocity (rotation rate vector)
The angular velocity's orientation is given by the right-hand rule.
Original reference frame:
X:left(-)/right(+) Y:back(-)/front(+) Z:down(-)/up(+)
New reference frame:
X:back(-)/front(+) Y:right(-)/left(+) Z:down(-)/up(+)
'''
if tag==b'n':
pass
elif tag==b'rt':
self.world.robot.gyro[1], end = self.read_float(end+1)
self.world.robot.gyro[0], end = self.read_float(end+1)
self.world.robot.gyro[2], end = self.read_float(end+1)
self.world.robot.gyro[1] *= -1
else:
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'GYR': {tag} at {end}, \nMsg: {exp.decode()}")
elif tag==b'ACC':
while True:
tag, end, min_depth = self.get_next_tag(end)
if min_depth == 0: break
'''
The accelerometer measures the acceleration relative to freefall. It will read zero during any type of free fall.
When at rest relative to the Earth's surface, it will indicate an upwards acceleration of 9.81m/s^2 (in SimSpark).
Original reference frame:
X:left(-)/right(+) Y:back(-)/front(+) Z:down(-)/up(+)
New reference frame:
X:back(-)/front(+) Y:right(-)/left(+) Z:down(-)/up(+)
'''
if tag==b'n':
pass
elif tag==b'a':
self.world.robot.acc[1], end = self.read_float(end+1)
self.world.robot.acc[0], end = self.read_float(end+1)
self.world.robot.acc[2], end = self.read_float(end+1)
self.world.robot.acc[1] *= -1
else:
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'ACC': {tag} at {end}, \nMsg: {exp.decode()}")
elif tag==b'HJ':
while True:
tag, end, min_depth = self.get_next_tag(end)
if min_depth == 0: break
if tag==b'n':
joint_name, end = self.read_str(end+1)
joint_index = Robot.MAP_PERCEPTOR_TO_INDEX[joint_name]
elif tag==b'ax':
joint_angle, end = self.read_float(end+1)
#Fix symmetry issues 2/4 (perceptors)
if joint_name in Robot.FIX_PERCEPTOR_SET: joint_angle = -joint_angle
old_angle = self.world.robot.joints_position[joint_index]
self.world.robot.joints_speed[joint_index] = (joint_angle - old_angle) / World.STEPTIME * math.pi / 180
self.world.robot.joints_position[joint_index] = joint_angle
else:
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'HJ': {tag} at {end}, \nMsg: {exp.decode()}")
elif tag==b'FRP':
while True:
tag, end, min_depth = self.get_next_tag(end)
if min_depth == 0: break
'''
The reference frame is used for the contact point and force vector applied to that point
Note: The force vector is applied to the foot, so it usually points up
Original reference frame:
X:left(-)/right(+) Y:back(-)/front(+) Z:down(-)/up(+)
New reference frame:
X:back(-)/front(+) Y:right(-)/left(+) Z:down(-)/up(+)
'''
if tag==b'n':
foot_toe_id, end = self.read_str(end+1)
self.world.robot.frp[foot_toe_id] = foot_toe_ref = np.empty(6)
self.world.robot.feet_toes_last_touch[foot_toe_id] = self.world.time_local_ms
self.world.robot.feet_toes_are_touching[foot_toe_id] = True
elif tag==b'c':
foot_toe_ref[1], end = self.read_float(end+1)
foot_toe_ref[0], end = self.read_float(end+1)
foot_toe_ref[2], end = self.read_float(end+1)
foot_toe_ref[1] *= -1
elif tag==b'f':
foot_toe_ref[4], end = self.read_float(end+1)
foot_toe_ref[3], end = self.read_float(end+1)
foot_toe_ref[5], end = self.read_float(end+1)
foot_toe_ref[4] *= -1
else:
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'FRP': {tag} at {end}, \nMsg: {exp.decode()}")
elif tag==b'See':
self.world.vision_is_up_to_date = True
self.world.vision_last_update = self.world.time_local_ms
while True:
tag, end, min_depth = self.get_next_tag(end)
if min_depth == 0: break
tag_bytes = bytes(tag) #since bytearray is not hashable, it cannot be used as key for dictionaries
if tag==b'G1R' or tag==b'G2R' or tag==b'G1L' or tag==b'G2L':
_, end, _ = self.get_next_tag(end)
c1, end = self.read_float(end+1)
c2, end = self.read_float(end+1)
c3, end = self.read_float(end+1)
aux = self.LEFT_SIDE_FLAGS[tag_bytes] if self.world.team_side_is_left else self.RIGHT_SIDE_FLAGS[tag_bytes]
self.world.flags_posts[aux] = (c1,c2,c3)
elif tag==b'F1R' or tag==b'F2R' or tag==b'F1L' or tag==b'F2L':
_, end, _ = self.get_next_tag(end)
c1, end = self.read_float(end+1)
c2, end = self.read_float(end+1)
c3, end = self.read_float(end+1)
aux = self.LEFT_SIDE_FLAGS[tag_bytes] if self.world.team_side_is_left else self.RIGHT_SIDE_FLAGS[tag_bytes]
self.world.flags_corners[aux] = (c1,c2,c3)
elif tag==b'B':
_, end, _ = self.get_next_tag(end)
self.world.ball_rel_head_sph_pos[0], end = self.read_float(end+1)
self.world.ball_rel_head_sph_pos[1], end = self.read_float(end+1)
self.world.ball_rel_head_sph_pos[2], end = self.read_float(end+1)
self.world.ball_rel_head_cart_pos = M.deg_sph2cart(self.world.ball_rel_head_sph_pos)
self.world.ball_is_visible = True
self.world.ball_last_seen = self.world.time_local_ms
elif tag==b'mypos':
self.world.robot.cheat_abs_pos[0], end = self.read_float(end+1)
self.world.robot.cheat_abs_pos[1], end = self.read_float(end+1)
self.world.robot.cheat_abs_pos[2], end = self.read_float(end+1)
elif tag==b'myorien':
self.world.robot.cheat_ori, end = self.read_float(end+1)
elif tag==b'ballpos':
c1, end = self.read_float(end+1)
c2, end = self.read_float(end+1)
c3, end = self.read_float(end+1)
self.world.ball_cheat_abs_vel[0] = (c1 - self.world.ball_cheat_abs_pos[0]) / World.VISUALSTEP
self.world.ball_cheat_abs_vel[1] = (c2 - self.world.ball_cheat_abs_pos[1]) / World.VISUALSTEP
self.world.ball_cheat_abs_vel[2] = (c3 - self.world.ball_cheat_abs_pos[2]) / World.VISUALSTEP
self.world.ball_cheat_abs_pos[0] = c1
self.world.ball_cheat_abs_pos[1] = c2
self.world.ball_cheat_abs_pos[2] = c3
elif tag==b'P':
while True:
previous_depth = self.depth
previous_end = end
tag, end, min_depth = self.get_next_tag(end)
if min_depth < 2: #if =1 we are still inside 'See', if =0 we are already outside 'See'
end = previous_end #The "P" tag is special because it's the only variable particle inside 'See'
self.depth = previous_depth
break # we restore the previous tag, and let 'See' handle it
if tag==b'team':
player_team, end = self.read_str(end+1)
is_teammate = bool(player_team == self.world.team_name)
if self.world.team_name_opponent is None and not is_teammate: #register opponent team name
self.world.team_name_opponent = player_team
elif tag==b'id':
player_id, end = self.read_int(end+1)
player = self.world.teammates[player_id-1] if is_teammate else self.world.opponents[player_id-1]
player.body_parts_cart_rel_pos = dict() #reset seen body parts
player.is_visible = True
elif tag==b'llowerarm' or tag==b'rlowerarm' or tag==b'lfoot' or tag==b'rfoot' or tag==b'head':
tag_str = tag.decode()
_, end, _ = self.get_next_tag(end)
c1, end = self.read_float(end+1)
c2, end = self.read_float(end+1)
c3, end = self.read_float(end+1)
if is_teammate:
self.world.teammates[player_id-1].body_parts_sph_rel_pos[tag_str] = (c1,c2,c3)
self.world.teammates[player_id-1].body_parts_cart_rel_pos[tag_str] = M.deg_sph2cart((c1,c2,c3))
else:
self.world.opponents[player_id-1].body_parts_sph_rel_pos[tag_str] = (c1,c2,c3)
self.world.opponents[player_id-1].body_parts_cart_rel_pos[tag_str] = M.deg_sph2cart((c1,c2,c3))
else:
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'P': {tag} at {end}, \nMsg: {exp.decode()}")
elif tag==b'L':
l = self.world.lines[self.world.line_count]
_, end, _ = self.get_next_tag(end)
l[0], end = self.read_float(end+1)
l[1], end = self.read_float(end+1)
l[2], end = self.read_float(end+1)
_, end, _ = self.get_next_tag(end)
l[3], end = self.read_float(end+1)
l[4], end = self.read_float(end+1)
l[5], end = self.read_float(end+1)
if np.isnan(l).any():
self.world.log(f"{self.LOG_PREFIX}Received field line with NaNs {l}")
else:
self.world.line_count += 1 #accept field line if there are no NaNs
else:
self.world.log(f"{self.LOG_PREFIX}Unknown tag inside 'see': {tag} at {end}, \nMsg: {exp.decode()}")
elif tag==b'hear':
team_name, end = self.read_str(end+1)
if team_name == self.world.team_name: # discard message if it's not from our team
timestamp, end = self.read_float(end+1)
if self.exp[end+1] == ord('s'): # this message was sent by oneself
direction, end = "self", end+5
else: # this message was sent by teammate
direction, end = self.read_float(end+1)
msg, end = self.read_bytes(end+1)
self.hear_callback(msg, direction, timestamp)
tag, end, _ = self.get_next_tag(end)
else:
self.world.log(f"{self.LOG_PREFIX}Unknown root tag: {tag} at {end}, \nMsg: {exp.decode()}")
tag, end, min_depth = self.get_next_tag(end)