You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
98 lines
4.7 KiB
Python
98 lines
4.7 KiB
Python
9 months ago
|
from agent.Agent import Agent
|
||
|
from itertools import count
|
||
|
from scripts.commons.Script import Script
|
||
|
from typing import List
|
||
|
from world.commons.Draw import Draw
|
||
|
|
||
|
class Radio_Localization():
|
||
|
def __init__(self,script:Script) -> None:
|
||
|
self.script = script
|
||
|
|
||
|
def draw_objects(self, p:Agent, pos, is_down, was_seen, last_update, is_self=False):
|
||
|
w = p.world
|
||
|
me = w.robot.loc_head_position
|
||
|
|
||
|
# get draw object from same player to always overwrite previous drawing
|
||
|
# could also use team channel but with this approach we could draw for both teams
|
||
|
d:Draw = self.script.players[0].world.draw
|
||
|
|
||
|
# VISUALSTEP_MS is the time it takes to get a visual update
|
||
|
is_current = last_update > w.time_local_ms - w.VISUALSTEP_MS
|
||
|
|
||
|
# 0.12s is the time it takes to do a full broadcast with all positions if every group is completely visible
|
||
|
# here we use >= instead of > because the radio message comes with a delay of 20ms
|
||
|
is_recent = last_update >= w.time_local_ms - 120
|
||
|
|
||
|
if is_current and was_seen:
|
||
|
c = d.Color.green_light # I've seen this object in the current or previous time step
|
||
|
elif is_recent and was_seen:
|
||
|
c = d.Color.green # I've seen this object in the last 0.12s
|
||
|
elif is_current:
|
||
|
c = d.Color.yellow # I've heard about this object in the current or previous time step (and it was not seen in the same period)
|
||
|
elif is_recent:
|
||
|
c = d.Color.yellow_light # I've heard about this object in the last 0.12s (the last available info was not obtained from vision)
|
||
|
else:
|
||
|
c = d.Color.red # I haven't seen or heard about this object in the last 0.12s
|
||
|
|
||
|
if is_self:
|
||
|
if w.robot.radio_fallen_state:
|
||
|
d.annotation(me, "Fallen (radio)", d.Color.yellow, "objects", False) # I heard I've fallen (but I missed the last 2 visual steps)
|
||
|
elif w.robot.loc_head_z < 0.3:
|
||
|
d.annotation(me, "Fallen (internal)", d.Color.white, "objects", False) # I have detected I've fallen
|
||
|
d.sphere(me, 0.06, c, "objects", False)
|
||
|
else:
|
||
|
if is_down:
|
||
|
d.annotation((me[:2]+pos[:2])/2,"Fallen",d.Color.white,"objects",False)
|
||
|
d.arrow(me, pos, 0.1, 3, c, "objects", False)
|
||
|
|
||
|
|
||
|
def draw(self,p:Agent):
|
||
|
w = p.world
|
||
|
others = w.teammates + w.opponents
|
||
|
|
||
|
#----------------------------------------------------------- draw other players
|
||
|
|
||
|
for o in others:
|
||
|
if o.is_self or o.state_last_update==0: # do not draw self or never before seen players
|
||
|
continue
|
||
|
|
||
|
pos = o.state_abs_pos
|
||
|
is_down = o.state_fallen
|
||
|
# 3D head position means head is visible, 2D means some body parts are visible but not the head, or the head position comes from radio
|
||
|
is_3D = pos is not None and len(pos)==3
|
||
|
|
||
|
self.draw_objects(p, pos, is_down, is_3D, o.state_last_update)
|
||
|
|
||
|
#----------------------------------------------------------- draw self
|
||
|
|
||
|
is_pos_from_vision = w.robot.loc_head_position_last_update == w.robot.loc_last_update
|
||
|
self.draw_objects(p, None, None, is_pos_from_vision, w.robot.loc_head_position_last_update, True)
|
||
|
|
||
|
#----------------------------------------------------------- draw ball and flush drawings
|
||
|
|
||
|
self.draw_objects(p, w.ball_abs_pos, False, w.is_ball_abs_pos_from_vision, w.ball_abs_pos_last_update)
|
||
|
self.script.players[0].world.draw.flush("objects")
|
||
|
|
||
|
|
||
|
def execute(self):
|
||
|
a = self.script.args
|
||
|
|
||
|
# Args: Server IP, Agent Port, Monitor Port, Uniform No., Robot Type, Team Name, Enable Log, Enable Draw
|
||
|
self.script.batch_create(Agent, ((a.i,a.p,a.m,u,a.t, False,u==1) for u in range(1,12)))
|
||
|
self.script.batch_create(Agent, ((a.i,a.p,a.m,u,"Opponent",False,False) for u in range(1,12)))
|
||
|
players : List[Agent] = self.script.players
|
||
|
|
||
|
# Beam opponents
|
||
|
beam_pos = [(-(i//2)-3,(i%2*2-1)*(i//2+1),0) for i in range(11)]
|
||
|
self.script.batch_commit_beam( beam_pos, slice(11,None) )
|
||
|
print("\nPress ctrl+c to return.")
|
||
|
|
||
|
# Execute
|
||
|
for j in count():
|
||
|
self.script.batch_execute_agent( slice(11) ) # run our agents (think and send)
|
||
|
self.script.batch_commit_and_send( slice(11,None) ) # run their agents (don't think, just send)
|
||
|
|
||
|
self.draw(players[j//15%11]) # draw knowledge, iterate through our team, 15 time steps per player
|
||
|
self.script.batch_receive(slice(11)) # receive & update our team's world state
|
||
|
self.script.batch_receive(slice(11,None), False) # receive & don't update opponent's world state (to save cpu resources)
|