You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
183 lines
7.0 KiB
Python
183 lines
7.0 KiB
Python
9 months ago
|
import numpy as np
|
||
|
|
||
|
|
||
|
class Behavior():
|
||
|
|
||
|
def __init__(self, base_agent) -> None:
|
||
|
from agent.Base_Agent import Base_Agent # for type hinting
|
||
|
self.base_agent : Base_Agent = base_agent
|
||
|
self.world = self.base_agent.world
|
||
|
self.state_behavior_name = None
|
||
|
self.state_behavior_init_ms = 0
|
||
|
self.previous_behavior = None
|
||
|
self.previous_behavior_duration = None
|
||
|
|
||
|
#Initialize standard behaviors
|
||
|
from behaviors.Poses import Poses
|
||
|
from behaviors.Slot_Engine import Slot_Engine
|
||
|
from behaviors.Head import Head
|
||
|
|
||
|
self.poses = Poses(self.world)
|
||
|
self.slot_engine = Slot_Engine(self.world)
|
||
|
self.head = Head(self.world)
|
||
|
|
||
|
|
||
|
def create_behaviors(self):
|
||
|
'''
|
||
|
Behaviors dictionary:
|
||
|
creation: key: ( description, auto_head, lambda reset[,a,b,c,..]: self.execute(...), lambda: self.is_ready(...) )
|
||
|
usage: key: ( description, auto_head, execute_func(reset, *args), is_ready_func )
|
||
|
'''
|
||
|
self.behaviors = self.poses.get_behaviors_callbacks()
|
||
|
self.behaviors.update(self.slot_engine.get_behaviors_callbacks())
|
||
|
self.behaviors.update(self.get_custom_callbacks())
|
||
|
|
||
|
|
||
|
def get_custom_callbacks(self):
|
||
|
'''
|
||
|
Searching custom behaviors could be implemented automatically
|
||
|
However, for code distribution, loading code dynamically is not ideal (unless we load byte code or some other import solution)
|
||
|
Currently, adding custom behaviors is a manual process:
|
||
|
1. Add import statement below
|
||
|
2. Add class to 'classes' list
|
||
|
'''
|
||
|
|
||
|
# Declaration of behaviors
|
||
|
from behaviors.custom.Basic_Kick.Basic_Kick import Basic_Kick
|
||
|
from behaviors.custom.Dribble.Dribble import Dribble
|
||
|
from behaviors.custom.Fall.Fall import Fall
|
||
|
from behaviors.custom.Get_Up.Get_Up import Get_Up
|
||
|
from behaviors.custom.Step.Step import Step
|
||
|
from behaviors.custom.Walk.Walk import Walk
|
||
|
classes = [Basic_Kick,Dribble,Fall,Get_Up,Step,Walk]
|
||
|
|
||
|
'''---- End of manual declarations ----'''
|
||
|
|
||
|
# Prepare callbacks
|
||
|
self.objects = {cls.__name__ : cls(self.base_agent) for cls in classes}
|
||
|
|
||
|
return {name: (o.description,o.auto_head,
|
||
|
lambda reset,*args,o=o: o.execute(reset,*args), lambda *args,o=o: o.is_ready(*args)) for name, o in self.objects.items()}
|
||
|
|
||
|
|
||
|
def get_custom_behavior_object(self, name):
|
||
|
''' Get unique object from class "name" ("name" must represent a custom behavior) '''
|
||
|
assert name in self.objects, f"There is no custom behavior called {name}"
|
||
|
return self.objects[name]
|
||
|
|
||
|
|
||
|
def get_all_behaviors(self):
|
||
|
''' Get name and description of all behaviors '''
|
||
|
return [ key for key in self.behaviors ], [ val[0] for val in self.behaviors.values() ]
|
||
|
|
||
|
|
||
|
def get_current(self):
|
||
|
''' Get name and duration (in seconds) of current behavior '''
|
||
|
duration = (self.world.time_local_ms - self.state_behavior_init_ms) / 1000.0
|
||
|
return self.state_behavior_name, duration
|
||
|
|
||
|
|
||
|
def get_previous(self):
|
||
|
''' Get name and duration (in seconds) of previous behavior '''
|
||
|
return self.previous_behavior, self.previous_behavior_duration
|
||
|
|
||
|
|
||
|
def force_reset(self):
|
||
|
''' Force reset next executed behavior '''
|
||
|
self.state_behavior_name = None
|
||
|
|
||
|
|
||
|
def execute(self, name, *args) -> bool:
|
||
|
'''
|
||
|
Execute one step of behavior `name` with arguments `*args`
|
||
|
- Automatically resets behavior on first call
|
||
|
- Call get_current() to get the current behavior (and its duration)
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
finished : bool
|
||
|
True if behavior has finished
|
||
|
'''
|
||
|
|
||
|
assert name in self.behaviors, f"Behavior {name} does not exist!"
|
||
|
|
||
|
# Check if transitioning from other behavior
|
||
|
reset = bool(self.state_behavior_name != name)
|
||
|
if reset:
|
||
|
if self.state_behavior_name is not None:
|
||
|
self.previous_behavior = self.state_behavior_name # Previous behavior was interrupted (did not finish)
|
||
|
self.previous_behavior_duration = (self.world.time_local_ms - self.state_behavior_init_ms) / 1000.0
|
||
|
self.state_behavior_name = name
|
||
|
self.state_behavior_init_ms = self.world.time_local_ms
|
||
|
|
||
|
# Control head orientation if behavior allows it
|
||
|
if self.behaviors[name][1]:
|
||
|
self.head.execute()
|
||
|
|
||
|
# Execute behavior
|
||
|
if not self.behaviors[name][2](reset,*args):
|
||
|
return False
|
||
|
|
||
|
# The behavior has finished
|
||
|
self.previous_behavior = self.state_behavior_name # Store current behavior name
|
||
|
self.state_behavior_name = None
|
||
|
return True
|
||
|
|
||
|
|
||
|
def execute_sub_behavior(self, name, reset, *args):
|
||
|
'''
|
||
|
Execute one step of behavior `name` with arguments `*args`
|
||
|
Useful for custom behaviors that call other behaviors
|
||
|
- Behavior reset is performed manually
|
||
|
- Calling get_current() will return the main behavior (not the sub behavior)
|
||
|
- Poses ignore the reset argument
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
finished : bool
|
||
|
True if behavior has finished
|
||
|
'''
|
||
|
|
||
|
assert name in self.behaviors, f"Behavior {name} does not exist!"
|
||
|
|
||
|
# Control head orientation if behavior allows it
|
||
|
if self.behaviors[name][1]:
|
||
|
self.head.execute()
|
||
|
|
||
|
# Execute behavior
|
||
|
return self.behaviors[name][2](reset,*args)
|
||
|
|
||
|
|
||
|
def execute_to_completion(self, name, *args):
|
||
|
'''
|
||
|
Execute steps and communicate with server until completion
|
||
|
- Slot behaviors indicate that the behavior has finished when sending the last command (which is promptly sent)
|
||
|
- Poses are finished when the server returns the desired robot state (so the last command is irrelevant)
|
||
|
- For custom behaviors, we assume the same logic, and so, the last command is ignored
|
||
|
|
||
|
Notes
|
||
|
-----
|
||
|
- Before exiting, the `Robot.joints_target_speed` array is reset to avoid polluting the next command
|
||
|
- For poses and custom behaviors that indicate a finished behavior on the 1st call, nothing is committed or sent
|
||
|
- Warning: this function may get stuck in an infinite loop if the behavior never ends
|
||
|
'''
|
||
|
|
||
|
r = self.world.robot
|
||
|
skip_last = name not in self.slot_engine.behaviors
|
||
|
|
||
|
while True:
|
||
|
done = self.execute(name, *args)
|
||
|
if done and skip_last: break # Exit here if last command is irrelevant
|
||
|
self.base_agent.scom.commit_and_send( r.get_command() )
|
||
|
self.base_agent.scom.receive()
|
||
|
if done: break # Exit here if last command is part of the behavior
|
||
|
|
||
|
# reset to avoid polluting the next command
|
||
|
r.joints_target_speed = np.zeros_like(r.joints_target_speed)
|
||
|
|
||
|
|
||
|
def is_ready(self, name, *args) -> bool:
|
||
|
''' Checks if behavior is ready to start under current game/robot conditions '''
|
||
|
|
||
|
assert name in self.behaviors, f"Behavior {name} does not exist!"
|
||
|
return self.behaviors[name][3](*args)
|