Complex Cashier Full Source#

# Copyright (C) 2024 by the Georgia Tech Research Institute (GTRI)

# Licensed under the BSD 3-Clause License.
# See the LICENSE file in the project root for license terms.

from collections.abc import Generator
from typing import Any

import simpy as SIM

import upstage_des.api as UP
from upstage_des.task import InterruptStates
from upstage_des.type_help import SIMPY_GEN, TASK_GEN


class Cashier(UP.Actor):
    scan_speed = UP.State[float](
        valid_types=(float,),
        frozen=True,
    )
    time_until_break = UP.State[float](
        default=120.0,
        valid_types=(float,),
        frozen=True,
    )
    breaks_until_done = UP.State[int](default=2, valid_types=int)
    breaks_taken = UP.State[int](default=0, valid_types=int, recording=True)
    items_scanned = UP.State[int](
        default=0,
        valid_types=(int,),
        recording=True,
    )
    time_scanning = UP.LinearChangingState(
        default=0.0,
        valid_types=(float,),
    )
    messages = UP.ResourceState[UP.SelfMonitoringStore](
        default=UP.SelfMonitoringStore,
    )

    def time_left_to_break(self) -> float:
        elapsed = self.env.now - float(self.get_knowledge("start_time", must_exist=True))
        return self.time_until_break - elapsed


class CheckoutLane(UP.Actor):
    customer_queue = UP.ResourceState[UP.SelfMonitoringStore](
        default=UP.SelfMonitoringStore,
    )


class StoreBoss(UP.UpstageBase):
    def __init__(self, lanes: list[CheckoutLane]) -> None:
        self.lanes = lanes
        self._lane_map: dict[CheckoutLane, Cashier] = {}

    def get_lane(self, cashier: Cashier) -> CheckoutLane:
        possible = [lane for lane in self.lanes if lane not in self._lane_map]
        lane = self.stage.random.choice(possible)
        self._lane_map[lane] = cashier
        return lane

    def clear_lane(self, cashier: Cashier) -> None:
        to_del = [name for name, cash in self._lane_map.items() if cash is cashier]
        for name in to_del:
            del self._lane_map[name]


class CashierBreakTimer(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        yield UP.Wait(actor.time_until_break)
        actor.interrupt_network("CashierJob", cause=dict(reason="BREAK TIME"))


class InterruptibleTask(UP.Task):
    def on_interrupt(self, *, actor: Cashier, cause: dict[str, Any]) -> InterruptStates:
        # We will only interrupt with a dictionary of data
        assert isinstance(cause, dict)
        job_list: list[str]

        if cause["reason"] == "BREAK TIME":
            job_list = ["Break"]
        elif cause["reason"] == "NEW JOB":
            job_list = cause["job_list"]
        else:
            raise UP.SimulationError("Unexpected interrupt cause")

        # determine time until break
        time_left = actor.time_left_to_break()
        # if there are only five minutes left, take the break and queue the task.
        if time_left <= 5.0 and "Break" not in job_list:
            job_list = ["Break"] + job_list

        # Ignore the interrupt, unless we've marked it to know otherwise
        marker = self.get_marker() or "none"
        if marker == "on break":
            if "Break" in job_list:
                job_list.remove("Break")

        self.clear_actor_task_queue(actor)
        self.set_actor_task_queue(actor, job_list)
        if marker == "cancellable":
            return self.INTERRUPT.END
        return self.INTERRUPT.IGNORE


class GoToWork(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Go to work"""
        yield UP.Wait(15.0)


class TalkToBoss(UP.DecisionTask):
    def make_decision(self, *, actor: Cashier) -> None:
        """Zero-time task to get information."""
        boss: StoreBoss = self.stage.boss
        lane = boss.get_lane(actor)
        self.set_actor_knowledge(actor, "checkout_lane", lane, overwrite=False)
        actor.breaks_taken = 0
        self.set_actor_knowledge(actor, "start_time", self.env.now, overwrite=True)
        # Convenient spot to run the timer.
        CashierBreakTimer().run(actor=actor)


class WaitInLane(InterruptibleTask):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Wait until break time, or a customer."""
        lane: CheckoutLane = self.get_actor_knowledge(
            actor,
            "checkout_lane",
            must_exist=True,
        )
        customer_arrival = UP.Get(lane.customer_queue)

        self.set_marker(marker="cancellable")
        yield customer_arrival

        customer: int = customer_arrival.get_value()
        self.set_actor_knowledge(actor, "customer", customer, overwrite=True)


class DoCheckout(InterruptibleTask):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Do the checkout"""
        items: int = self.get_actor_knowledge(
            actor,
            "customer",
            must_exist=True,
        )
        per_item_time = actor.scan_speed / items
        actor.activate_linear_state(
            state="time_scanning",
            rate=1.0,
            task=self,
        )
        for _ in range(items):
            yield UP.Wait(per_item_time)
            actor.items_scanned += 1
        actor.deactivate_all_states(task=self)
        # assume 2 minutes to take payment
        yield UP.Wait(2.0)


class Break(UP.DecisionTask):
    def make_decision(self, *, actor: Cashier) -> None:
        """Decide what kind of break we are taking."""
        actor.breaks_taken += 1

        # we might have jobs queued
        queue = self.get_actor_task_queue(actor) or []
        if "Break" in queue:
            raise UP.SimulationError("Odd task network state")
        self.clear_actor_task_queue(actor)

        if actor.breaks_taken == actor.breaks_until_done:
            self.set_actor_task_queue(actor, ["NightBreak"])
        elif actor.breaks_taken > actor.breaks_until_done:
            raise UP.SimulationError("Too many breaks taken")
        else:
            self.set_actor_task_queue(actor, ["ShortBreak"] + queue)


class ShortBreak(InterruptibleTask):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Take a short break."""
        self.set_marker("on break")
        yield UP.Wait(15.0)
        self.set_actor_knowledge(actor, "start_time", self.env.now, overwrite=True)
        CashierBreakTimer().run(actor=actor)


class NightBreak(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Go home and rest."""
        self.clear_actor_knowledge(actor, "checkout_lane")
        self.stage.boss.clear_lane(actor)
        yield UP.Wait(60 * 12.0)


class Restock(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Restock."""
        yield UP.Wait(10.0)


task_classes = {
    "GoToWork": GoToWork,
    "TalkToBoss": TalkToBoss,
    "WaitInLane": WaitInLane,
    "DoCheckout": DoCheckout,
    "Break": Break,
    "ShortBreak": ShortBreak,
    "NightBreak": NightBreak,
    "Restock": Restock,
}

task_links = {
    "GoToWork": UP.TaskLinks(default="TalkToBoss", allowed=["TalkToBoss"]),
    "TalkToBoss": UP.TaskLinks(default="WaitInLane", allowed=["WaitInLane"]),
    "WaitInLane": UP.TaskLinks(default="DoCheckout", allowed=["DoCheckout", "Break"]),
    "DoCheckout": UP.TaskLinks(default="WaitInLane", allowed=["WaitInLane", "Break"]),
    "Break": UP.TaskLinks(default="ShortBreak", allowed=["ShortBreak", "NightBreak"]),
    "ShortBreak": UP.TaskLinks(default="WaitInLane", allowed=["WaitInLane"]),
    "NightBreak": UP.TaskLinks(default="GoToWork", allowed=["GoToWork"]),
    "Restock": UP.TaskLinks(default="WaitInLane", allowed=["WaitInLane", "Break"]),
}

cashier_task_network = UP.TaskNetworkFactory(
    name="CashierJob",
    task_classes=task_classes,
    task_links=task_links,
)


class CashierMessages(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        getter = UP.Get(actor.messages)
        yield getter
        tasks_needed: list[str] | str = getter.get_value()
        tasks_needed = [tasks_needed] if isinstance(tasks_needed, str) else tasks_needed
        actor.interrupt_network("CashierJob", cause=dict(reason="NEW JOB", job_list=tasks_needed))


cashier_message_net = UP.TaskNetworkFactory.from_single_looping("Messages", CashierMessages)


def customer_spawner(
    env: SIM.Environment,
    lanes: list[CheckoutLane],
) -> Generator[SIM.Event, None, None]:
    # sneaky way to get access to stage
    stage = lanes[0].stage
    while True:
        hrs = env.now / 60
        time_of_day = hrs // 24
        if time_of_day <= 8 or time_of_day >= 15.5:
            time_until_open = (24 - time_of_day) + 8
            yield env.timeout(time_until_open)

        lane_pick = stage.random.choice(lanes)
        number_pick = stage.random.randint(3, 17)
        yield lane_pick.customer_queue.put(number_pick)
        yield UP.Wait.from_random_uniform(5.0, 30.0).as_event()


def manager_process(boss: StoreBoss, cashiers: list[Cashier]) -> SIMPY_GEN:
    while True:
        # Use the random uniform feature, but convert the UPSTAGE event to simpy
        # because this is a simpy only process
        yield UP.Wait.from_random_uniform(30.0, 90.0).as_event()
        possible = [
            cash
            for cash in cashiers
            if getattr(cash.get_running_task("CashierJob"), "name", "") != "NightBreak"
        ]
        if not possible:
            return
        cash = boss.stage.random.choice(possible)
        yield cash.messages.put(["Restock"])


def test_cashier_example() -> None:
    with UP.EnvironmentContext(initial_time=8 * 60) as env:
        UP.add_stage_variable("time_unit", "min")
        cashier = Cashier(
            name="Bob",
            scan_speed=1.0,
            time_until_break=120.0,
            breaks_until_done=4,
            debug_log=True,
        )
        lane_1 = CheckoutLane(name="Lane 1")
        lane_2 = CheckoutLane(name="Lane 2")
        boss = StoreBoss(lanes=[lane_1, lane_2])

        UP.add_stage_variable("boss", boss)

        net = cashier_task_network.make_network()
        cashier.add_task_network(net)
        cashier.start_network_loop(net.name, "GoToWork")

        net = cashier_message_net.make_network()
        cashier.add_task_network(net)
        cashier.start_network_loop(net.name, "CashierMessages")

        customer_proc = customer_spawner(env, [lane_1, lane_2])
        _ = env.process(customer_proc)

        _ = env.process(manager_process(boss, [cashier]))

        env.run(until=20 * 60)

    for line in cashier.get_log():
        if "Interrupt" in line:
            print(line)

    print(cashier.items_scanned)


if __name__ == "__main__":
    test_cashier_example()

This file is auto-generated.