You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Dribble/behaviors/Behavior.py

183 lines
7.0 KiB
Python

import numpy as np
class Behavior():
def __init__(self, base_agent) -> None:
from agent.Base_Agent import Base_Agent # for type hinting
self.base_agent : Base_Agent = base_agent
self.world = self.base_agent.world
self.state_behavior_name = None
self.state_behavior_init_ms = 0
self.previous_behavior = None
self.previous_behavior_duration = None
#Initialize standard behaviors
from behaviors.Poses import Poses
from behaviors.Slot_Engine import Slot_Engine
from behaviors.Head import Head
self.poses = Poses(self.world)
self.slot_engine = Slot_Engine(self.world)
self.head = Head(self.world)
def create_behaviors(self):
'''
Behaviors dictionary:
creation: key: ( description, auto_head, lambda reset[,a,b,c,..]: self.execute(...), lambda: self.is_ready(...) )
usage: key: ( description, auto_head, execute_func(reset, *args), is_ready_func )
'''
self.behaviors = self.poses.get_behaviors_callbacks()
self.behaviors.update(self.slot_engine.get_behaviors_callbacks())
self.behaviors.update(self.get_custom_callbacks())
def get_custom_callbacks(self):
'''
Searching custom behaviors could be implemented automatically
However, for code distribution, loading code dynamically is not ideal (unless we load byte code or some other import solution)
Currently, adding custom behaviors is a manual process:
1. Add import statement below
2. Add class to 'classes' list
'''
# Declaration of behaviors
from behaviors.custom.Basic_Kick.Basic_Kick import Basic_Kick
from behaviors.custom.Dribble.Dribble import Dribble
from behaviors.custom.Fall.Fall import Fall
from behaviors.custom.Get_Up.Get_Up import Get_Up
from behaviors.custom.Step.Step import Step
from behaviors.custom.Walk.Walk import Walk
classes = [Basic_Kick,Dribble,Fall,Get_Up,Step,Walk]
'''---- End of manual declarations ----'''
# Prepare callbacks
self.objects = {cls.__name__ : cls(self.base_agent) for cls in classes}
return {name: (o.description,o.auto_head,
lambda reset,*args,o=o: o.execute(reset,*args), lambda *args,o=o: o.is_ready(*args)) for name, o in self.objects.items()}
def get_custom_behavior_object(self, name):
''' Get unique object from class "name" ("name" must represent a custom behavior) '''
assert name in self.objects, f"There is no custom behavior called {name}"
return self.objects[name]
def get_all_behaviors(self):
''' Get name and description of all behaviors '''
return [ key for key in self.behaviors ], [ val[0] for val in self.behaviors.values() ]
def get_current(self):
''' Get name and duration (in seconds) of current behavior '''
duration = (self.world.time_local_ms - self.state_behavior_init_ms) / 1000.0
return self.state_behavior_name, duration
def get_previous(self):
''' Get name and duration (in seconds) of previous behavior '''
return self.previous_behavior, self.previous_behavior_duration
def force_reset(self):
''' Force reset next executed behavior '''
self.state_behavior_name = None
def execute(self, name, *args) -> bool:
'''
Execute one step of behavior `name` with arguments `*args`
- Automatically resets behavior on first call
- Call get_current() to get the current behavior (and its duration)
Returns
-------
finished : bool
True if behavior has finished
'''
assert name in self.behaviors, f"Behavior {name} does not exist!"
# Check if transitioning from other behavior
reset = bool(self.state_behavior_name != name)
if reset:
if self.state_behavior_name is not None:
self.previous_behavior = self.state_behavior_name # Previous behavior was interrupted (did not finish)
self.previous_behavior_duration = (self.world.time_local_ms - self.state_behavior_init_ms) / 1000.0
self.state_behavior_name = name
self.state_behavior_init_ms = self.world.time_local_ms
# Control head orientation if behavior allows it
if self.behaviors[name][1]:
self.head.execute()
# Execute behavior
if not self.behaviors[name][2](reset,*args):
return False
# The behavior has finished
self.previous_behavior = self.state_behavior_name # Store current behavior name
self.state_behavior_name = None
return True
def execute_sub_behavior(self, name, reset, *args):
'''
Execute one step of behavior `name` with arguments `*args`
Useful for custom behaviors that call other behaviors
- Behavior reset is performed manually
- Calling get_current() will return the main behavior (not the sub behavior)
- Poses ignore the reset argument
Returns
-------
finished : bool
True if behavior has finished
'''
assert name in self.behaviors, f"Behavior {name} does not exist!"
# Control head orientation if behavior allows it
if self.behaviors[name][1]:
self.head.execute()
# Execute behavior
return self.behaviors[name][2](reset,*args)
def execute_to_completion(self, name, *args):
'''
Execute steps and communicate with server until completion
- Slot behaviors indicate that the behavior has finished when sending the last command (which is promptly sent)
- Poses are finished when the server returns the desired robot state (so the last command is irrelevant)
- For custom behaviors, we assume the same logic, and so, the last command is ignored
Notes
-----
- Before exiting, the `Robot.joints_target_speed` array is reset to avoid polluting the next command
- For poses and custom behaviors that indicate a finished behavior on the 1st call, nothing is committed or sent
- Warning: this function may get stuck in an infinite loop if the behavior never ends
'''
r = self.world.robot
skip_last = name not in self.slot_engine.behaviors
while True:
done = self.execute(name, *args)
if done and skip_last: break # Exit here if last command is irrelevant
self.base_agent.scom.commit_and_send( r.get_command() )
self.base_agent.scom.receive()
if done: break # Exit here if last command is part of the behavior
# reset to avoid polluting the next command
r.joints_target_speed = np.zeros_like(r.joints_target_speed)
def is_ready(self, name, *args) -> bool:
''' Checks if behavior is ready to start under current game/robot conditions '''
assert name in self.behaviors, f"Behavior {name} does not exist!"
return self.behaviors[name][3](*args)