You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Dribble/scripts/utils/Radio_Localization.py

98 lines
4.7 KiB
Python

9 months ago
from agent.Agent import Agent
from itertools import count
from scripts.commons.Script import Script
from typing import List
from world.commons.Draw import Draw
class Radio_Localization():
def __init__(self,script:Script) -> None:
self.script = script
def draw_objects(self, p:Agent, pos, is_down, was_seen, last_update, is_self=False):
w = p.world
me = w.robot.loc_head_position
# get draw object from same player to always overwrite previous drawing
# could also use team channel but with this approach we could draw for both teams
d:Draw = self.script.players[0].world.draw
# VISUALSTEP_MS is the time it takes to get a visual update
is_current = last_update > w.time_local_ms - w.VISUALSTEP_MS
# 0.12s is the time it takes to do a full broadcast with all positions if every group is completely visible
# here we use >= instead of > because the radio message comes with a delay of 20ms
is_recent = last_update >= w.time_local_ms - 120
if is_current and was_seen:
c = d.Color.green_light # I've seen this object in the current or previous time step
elif is_recent and was_seen:
c = d.Color.green # I've seen this object in the last 0.12s
elif is_current:
c = d.Color.yellow # I've heard about this object in the current or previous time step (and it was not seen in the same period)
elif is_recent:
c = d.Color.yellow_light # I've heard about this object in the last 0.12s (the last available info was not obtained from vision)
else:
c = d.Color.red # I haven't seen or heard about this object in the last 0.12s
if is_self:
if w.robot.radio_fallen_state:
d.annotation(me, "Fallen (radio)", d.Color.yellow, "objects", False) # I heard I've fallen (but I missed the last 2 visual steps)
elif w.robot.loc_head_z < 0.3:
d.annotation(me, "Fallen (internal)", d.Color.white, "objects", False) # I have detected I've fallen
d.sphere(me, 0.06, c, "objects", False)
else:
if is_down:
d.annotation((me[:2]+pos[:2])/2,"Fallen",d.Color.white,"objects",False)
d.arrow(me, pos, 0.1, 3, c, "objects", False)
def draw(self,p:Agent):
w = p.world
others = w.teammates + w.opponents
#----------------------------------------------------------- draw other players
for o in others:
if o.is_self or o.state_last_update==0: # do not draw self or never before seen players
continue
pos = o.state_abs_pos
is_down = o.state_fallen
# 3D head position means head is visible, 2D means some body parts are visible but not the head, or the head position comes from radio
is_3D = pos is not None and len(pos)==3
self.draw_objects(p, pos, is_down, is_3D, o.state_last_update)
#----------------------------------------------------------- draw self
is_pos_from_vision = w.robot.loc_head_position_last_update == w.robot.loc_last_update
self.draw_objects(p, None, None, is_pos_from_vision, w.robot.loc_head_position_last_update, True)
#----------------------------------------------------------- draw ball and flush drawings
self.draw_objects(p, w.ball_abs_pos, False, w.is_ball_abs_pos_from_vision, w.ball_abs_pos_last_update)
self.script.players[0].world.draw.flush("objects")
def execute(self):
a = self.script.args
# Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw
self.script.batch_create(Agent, ((a.i,a.p,a.m,u,a.t, False,u==1) for u in range(1,12)))
self.script.batch_create(Agent, ((a.i,a.p,a.m,u,"Opponent",False,False) for u in range(1,12)))
players : List[Agent] = self.script.players
# Beam opponents
beam_pos = [(-(i//2)-3,(i%2*2-1)*(i//2+1),0) for i in range(11)]
self.script.batch_commit_beam( beam_pos, slice(11,None) )
print("\nPress ctrl+c to return.")
# Execute
for j in count():
self.script.batch_execute_agent( slice(11) ) # run our agents (think and send)
self.script.batch_commit_and_send( slice(11,None) ) # run their agents (don't think, just send)
self.draw(players[j//15%11]) # draw knowledge, iterate through our team, 15 time steps per player
self.script.batch_receive(slice(11)) # receive & update our team's world state
self.script.batch_receive(slice(11,None), False) # receive & don't update opponent's world state (to save cpu resources)