Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added tasks/HRI/HRI/__init__.py
Empty file.
137 changes: 137 additions & 0 deletions tasks/HRI/HRI/state_machine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from typing import List, Tuple, Dict

import rclpy
import smach
import smach_ros
from geometry_msgs.msg import Point, PointStamped, Pose

# skills to be imported
# from lasr_skills import (
# )

from HRI.states import *

from shapely.geometry import Polygon
from std_msgs.msg import Empty

from Base.tasks.HRI.HRI.states.check_sofa import CheckSofa


class HRI(smach.StateMachine):
def __init__(self, node):
smach.StateMachine.__init__(self, outcomes=["succeeded", "failed"])

# commented incase Detect Doorbell was not implemented
# smach.StateMachine.add(
# "DETECT DOORBELL",
# DetectDoorbell(node),
# transitions={
# "valid": "APPROACH_GUEST",
# "invalid": "DETECT_DOORBELL",
# "preempted": "DETECT_DOORBELL",
# },
# )

# start door state machine goes here (by Fadi and Aldrich)

smach.StateMachine.add(
"APPROACH_GUEST",
ApproachGuest(node),
transitions={
"valid": "FACE_GUEST",
"invalid": "GREET_GUEST",
"preempted": "GREET_GUEST",
},
)

# face guest and greet them concurrently state machine (by Fadi)

smach.StateMachine.add(
"GET_NAME_AND_DRINK",
GetNameAndDrink(node),
transitions={
"valid": "GUIDE_GUEST_TO_LIVING_ROOM",
"invalid": "GET_NAME_AND_DRINK",
"preempted": "GET_NAME_AND_DRINK",
},
)

smach.StateMachine.add(
"GUIDE_GUEST_TO_LIVING_ROOM",
GuideGuestsToLivingroom(node),
transitions={
"valid": "OFFER_A_FREE_SEAT",
"invalid": "GUIDE_GUEST_TO_LIVING_ROOM",
"preempted": "GUIDE_GUEST_TO_LIVING_ROOM",
},
)

smach.StateMachine.add(
"CHECK_SOFA",
CheckSofa(node),
transitions={
"valid": "OFFER_A_FREE_SEAT",
"invalid": "CHECK_SOFA",
"preempted": "CHECK_SOFA",
},
)

smach.StateMachine.add(
"OFFER_A_FREE_SEAT",
OfferFreeSeat(node),
transitions={
"valid": "INTRODUCE_GUESTS_TO_EACHOTHER",
"invalid": "OFFER_A_FREE_SEAT",
"preempted": "OFFER_A_FREE_SEAT",
},
)

smach.StateMachine.add(
"INTRODUCE_GUESTS_TO_EACHOTHER",
IntroduceGuestsToEachother(node),
transitions={
"valid": "ASK_SECOND_GUEST_FOR_BAG_TO_HOST",
"invalid": "INTRODUCE_GUESTS_TO_EACHOTHER",
"preempted": "INTRODUCE_GUESTS_TO_EACHOTHER",
},
)

smach.StateMachine.add(
"ASK_SECOND_GUEST_FOR_BAG_TO_HOST",
AskSecondGuestForBagToHost(node),
transitions={
"valid": "PICK_UP_BAG",
"invalid": "ASK_SECOND_GUEST_FOR_BAG_TO_HOST",
"preempted": "ASK_SECOND_GUEST_FOR_BAG_TO_HOST",
},
)

smach.StateMachine.add(
"PICK_UP_BAG",
PickUpBag(node),
transitions={
"valid": "FOLLOW_HOST",
"invalid": "LISTEN_TO_HOST_DROP_INSTRUCTION",
"preempted": "LISTEN_TO_HOST_DROP_INSTRUCTION",
},
)

smach.StateMachine.add(
"LISTEN_TO_HOST_DROP_BAG_INSTRUCTION",
ListenToHostDropBagInstruction(node),
transitions={
"valid": "DROP_BAG",
"invalid": "LISTEN_TO_HOST_DROP_BAG_INSTRUCTION",
"preempted": "LISTEN_TO_HOST_DROP_BAG_INSTRUCTION",
},
)

smach.StateMachine.add(
"DROP_BAG",
DropBag(node),
transitions={
"valid": "succeeded",
"invalid": "DROP_BAG",
"preempted": "DROP_BAG",
},
)
64 changes: 64 additions & 0 deletions tasks/HRI/HRI/states/check_sofa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import smach
import smach_ros

from shapely.geometry import Polygon as ShapelyPolygon
from geometry_msgs.msg import Point, Polygon

from lasr_vision_interfaces.srv import CroppedDetection
from lasr_vision_interfaces.msg import CDRequest

# TODO: test this SM; check services are called correctly


class CheckSofa(smach.StateMachine):
def __init__(self, sofa_area: ShapelyPolygon, max_people_on_sofa: int):

smach.StateMachine.__init__(self, outcomes=["has_free_space", "no_free_space"])

self.sofa_area = sofa_area
self.max_people_on_sofa = max_people_on_sofa

with self:

smach.StateMachine.add(
"RUN_DETECTIONS",
smach_ros.ServiceState(
"vision/cropped_detection",
CroppedDetection,
request=CroppedDetection.Request(
requests=[
CDRequest(
method="closest",
use_mask=True,
yolo_model="yolov8x-seg.pt",
yolo_model_confidence=0.5,
yolo_nms_threshold=0.3,
return_sensor_reading=False,
polygons=[
Polygon(
points=[
Point(x=point[0], y=point[1], z=0.0)
for point in sofa_area.exterior.coords
]
)
],
)
]
),
response_cb=self.detections_cb,
),
transitions={
"succeeded": "has_free_space",
"aborted": "no_free_space",
"preempted": "no_free_space",
},
)

def detections_cb(self, userdata, response):
if len(response.responses[0].detections) == 0:
return "aborted"

if len(response.responses[0].detections) >= self.max_people_on_sofa:
return "aborted"

return "succeeded"
159 changes: 159 additions & 0 deletions tasks/HRI/HRI/states/get_name_and_drink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""
State for parsing the transcription of the guests' name and favourite drink, and adding this
to the guest data userdata
"""

from rclpy.node import Node
import smach
from smach import UserData
from typing import List, Dict, Any
from receptionist.states import SpeechRecovery

# from tasks.receptionist.src.receptionist.states import SpeechRecovery


class GetNameAndDrink(smach.StateMachine):
class ParseNameAndDrink(smach.State, Node):
def __init__(self, guest_id: str, param_key: str = "/receptionist/priors"):
"""Parses the transcription of the guests' name and favourite drink.

Args:
param_key (str, optional): Name of the parameter that contains the list of
possible . Defaults to "/receptionist/priors".
"""
Node.__init__(self, "parse_name_and_drink")
smach.State.__init__(
self,
outcomes=["succeeded", "failed", "failed_name", "failed_drink"],
input_keys=["guest_transcription", "guest_data"],
output_keys=["guest_data", "guest_transcription"],
)
self._guest_id = guest_id
prior_data: Dict[str, List[str]] = self.get_parameter(param_key).value
self._possible_names = [name.lower() for name in prior_data["names"]]
self._possible_drinks = [drink.lower() for drink in prior_data["drinks"]]

def execute(self, userdata: UserData) -> str:
"""Parses the transcription of the guests' name and favourite drink.

Args:
userdata (UserData): State machine userdata assumed to contain a key
called "guest transcription" with the transcription of the guest's name and
favourite drink.

Returns:
str: state outcome. Updates the userdata with the parsed name and drink, under
the parameter "guest_data".
"""
outcome = "succeeded"
name_found = False
drink_found = False
transcription = userdata.guest_transcription.lower()

transcription = userdata["guest_transcription"].lower()

for name in self._possible_names:
if name in transcription:
userdata.guest_data[self._guest_id]["name"] = name
self.get_logger().info(f"Guest Name identified as: {name}")
name_found = True
break

for drink in self._possible_drinks:
if drink in transcription:
userdata.guest_data[self._guest_id]["drink"] = drink
self.get_logger().info(f"Guest Drink identified as: {drink}")
drink_found = True
break

if not name_found:
self.get_logger().info("Name not found in transcription")
userdata.guest_data[self._guest_id]["name"] = "unknown"
outcome = "failed"
if not drink_found:
self.get_logger().info("Drink not found in transcription")
userdata.guest_data[self._guest_id]["drink"] = "unknown"
outcome = "failed"

return outcome

class PostRecoveryDecision(smach.State, Node):
def __init__(self, guest_id: str, param_key: str = "/receptionist/priors"):
Node.__init__(self, "post_recovery_decision")
smach.State.__init__(
self,
outcomes=["succeeded", "failed", "failed_name", "failed_drink"],
input_keys=["guest_transcription", "guest_data"],
output_keys=["guest_data", "guest_transcription"],
)
self._guest_id = guest_id
prior_data: Dict[str, List[str]] = self.get_parameter(
param_key
).get_parameter_value() # TODO: check this
self._possible_names = [name.lower() for name in prior_data["names"]]
self._possible_drinks = [drink.lower() for drink in prior_data["drinks"]]

def execute(self, userdata: UserData) -> str:
if not self._recovery_name_and_drink_required(userdata):
if userdata.guest_data[self._guest_id]["name"] == "unknown":
outcome = "failed_name"
else:
outcome = "failed_drink"
else:
outcome = "failed"
return outcome

def _recovery_name_and_drink_required(self, userdata: UserData) -> bool:
"""Determine whether both the name and drink requires recovery.

Returns:
bool: True if both attributes require recovery.
"""
if userdata.guest_data[self._guest_id]["name"] == "unknown":
if userdata.guest_data[self._guest_id]["drink"] == "unknown":
return True
else:
return False

def __init__(
self, guest_id: str, last_resort: bool, param_key: str = "/receptionist/priors"
):

self._guest_id = guest_id
self._param_key = param_key
self._last_resort = last_resort

smach.StateMachine.__init__(
self,
outcomes=["succeeded", "failed", "failed_name", "failed_drink"],
input_keys=["guest_transcription", "guest_data"],
output_keys=["guest_data", "guest_transcription"],
)
with self:

smach.StateMachine.add(
"PARSE_NAME_AND_DRINK",
self.ParseNameAndDrink(
guest_id=self._guest_id, param_key=self._param_key
),
transitions={"succeeded": "succeeded", "failed": "SPEECH_RECOVERY"},
)
smach.StateMachine.add(
"SPEECH_RECOVERY",
SpeechRecovery(self._guest_id, self._last_resort),
transitions={
"succeeded": "succeeded",
"failed": "POST_RECOVERY_DECISION",
},
)
smach.StateMachine.add(
"POST_RECOVERY_DECISION",
self.PostRecoveryDecision(
guest_id=self._guest_id, param_key=self._param_key
),
transitions={
"failed": "failed",
"failed_name": "failed_name",
"failed_drink": "failed_drink",
},
)
Loading
Loading