upstage_des.communications package#

Submodules#

upstage_des.communications.comms module#

Comms message and commander classes.

class CommsManagerBase(*, name: str, mode: str | None = None, init_entities: list[tuple[Actor, str]] | None = None, send_time: float = 0.0, retry_max_time: float = 1.0, retry_rate: float = 0.166667, debug_logging: bool = False)#

Bases: UpstageBase

A class to manage point to point transfer of communications.

Works through simpy.Store or similar interfaces. Allows for degraded comms and comms retry.

If an Actor contains a CommunicationStore, this object will detect that and use it as a destination. In that case, you also do not need to connect the actor to this object.

Example

>>> class Talker(UP.Actor):
>>>     comms = UP.ResourceState[SIM.Store](default=SIM.Store)
>>>
>>> talker1 = Talker(name='MacReady')
>>> talker2 = Talker(name='Childs')
>>>
>>> comm_station = UP.CommsManager(name="Outpost 31", mode="voice")
>>> comm_station.connect(talker1, talker1.comms)
>>> comm_station.connect(talker2, talker2.comms)
>>>
>>> comm_station.run()
>>>
>>> # Typically, do this inside a task or somewhere else
>>> putter = comm_station.make_put(
>>>     message="Grab your flamethrower!",
>>>     source=talker1,
>>>     destination=talker2,
>>>     rehearsal_time_to_complete=0.0,
>>> )
>>> yield putter
...
>>> env.run()
>>> talker2.comms.items
    [Message(sender=Talker: MacReady, message='Grab your flamethrower!',
    destination=Talker: Childs)]
static clean_message(message: str | Message) MessageContent#

Test to see if an object is a message.

If it is, return the message contents only. Otherwise return the message.

Parameters:

message (str | Message) – The message to clean

Returns:

The message as a message content object.

Return type:

MessageContent

connect(entity: Actor, comms_store_name: str) None#

Connect an actor and its comms store to this comms manager.

Parameters:
  • entity (Actor) – The actor that will send/receive.

  • comms_store_name (str) – The store state name for receiving

make_put(message: str | Message | MessageContent | dict, source: Actor, destination: Actor, rehearsal_time_to_complete: float = 0.0) Put#

Create a Put request for a message into the CommsManager.

Parameters:
  • source – The message sender

  • destination – The message receiver, who must be connected to the CommsManager

  • message – Arbitrary data to send

  • rehearsal_time_to_complete (float, optional) – Planning time to complete the event (see Put), by default 0.0

  • Returns

  • -------

  • Put – UPSTAGE Put event object to yield from a task

run() Generator[Event, Any, None]#

Run the communications message passing.

Yields:

Generator[SimpyEvent, Any, None] – Simpy Process

store_from_actor(actor: Actor) Store#

Retrieve a communications store from an actor.

Parameters:

actor (Actor) – The actor

Returns:

A Comms store.

Return type:

Store

class Message(sender: Actor, content: MessageContent, destination: Actor, header: str | None = None, time_sent: float | None = None, time_received: float | None = None, mode: str | None = None)#

Bases: object

A message data object.

content: MessageContent#
destination: Actor#
header: str | None = None#
mode: str | None = None#
sender: Actor#
time_received: float | None = None#
time_sent: float | None = None#
class MessageContent(data: dict, message: str | None = None)#

Bases: object

Message content data object.

data: dict#
message: str | None = None#
class PointToPointCommsManager(*, name: str, mode: str | None = None, init_entities: list[tuple[Actor, str]] | None = None, send_time: float = 0.0, retry_max_time: float = 1.0, retry_rate: float = 0.166667, debug_logging: bool = False)#

Bases: CommsManagerBase

A class to manage point to point transfer of communications.

Works through simpy.Store or similar interfaces. Allows for degraded comms and comms retry.

If an Actor contains a CommunicationStore, this object will detect that and use it as a destination. In that case, you also do not need to connect the actor to this object.

Example

>>> class Talker(UP.Actor):
>>>     comms = UP.ResourceState[SIM.Store](default=SIM.Store)
>>>
>>> talker1 = Talker(name='MacReady')
>>> talker2 = Talker(name='Childs')
>>>
>>> comm_station = UP.CommsManager(name="Outpost 31", mode="voice")
>>> comm_station.connect(talker1, talker1.comms)
>>> comm_station.connect(talker2, talker2.comms)
>>>
>>> comm_station.run()
>>>
>>> # Typically, do this inside a task or somewhere else
>>> putter = comm_station.make_put(
>>>     message="Grab your flamethrower!",
>>>     source=talker1,
>>>     destination=talker2,
>>>     rehearsal_time_to_complete=0.0,
>>> )
>>> yield putter
...
>>> env.run()
>>> talker2.comms.items
    [Message(sender=Talker: MacReady, message='Grab your flamethrower!',
    destination=Talker: Childs)]

upstage_des.communications.processes module#

Communications process helper.

generate_comms_wait(incoming_store: Store, callback: Callable[[MessageContent], Any]) Callable[[], Process]#

Create a process function to transfer communications to a callback.

This hides cleanup and other stability functions from the user.

Parameters:
  • incoming_store (A simpy or upstage store) – The store that is linked to a CommsManager instance.

  • callback (function) – The function to call with a received message

  • Returns

  • -------

  • function – An UPSTAGE process function that passes messages

upstage_des.communications.routing module#

Comms routing from a routing table lookup.

class RoutingCommsManagerBase(*, name: str, mode: str | None = None, send_time: float = 0.0, retry_max_time: float = 1.0, retry_rate: float = 0.166667, global_ignore: bool = False, debug_logging: bool = False)#

Bases: CommsManagerBase

A comms manager that routes messages according to a network.

connect(entity: Actor, comms_store_name: str) None#

RoutingManagerBase doesn’t allow this method.

select_hop(source: Actor, dest: Actor, ignore_nodes: list[Actor] | None = None) Actor | None#

Subclassable method for selecting the next hop in a route.

Parameters:
  • source (Actor) – Current point

  • dest (Actor) – Message Destination

  • ignore_nodes (list[Actor], optional) – Nodes to exclude.

Returns:

Next hop to make. None if blocked path

Return type:

Actor | None

class RoutingTableCommsManager(*, name: str, mode: str | None = None, send_time: float = 0.0, retry_max_time: float = 1.0, retry_rate: float = 0.166667, global_ignore: bool = False, debug_logging: bool = False)#

Bases: RoutingCommsManagerBase

Route comms according to a pre-defined network.

Nodes (Actors) must be explicitly connected, and this manager will route through shortest number of hops.

Allows for degraded comms and comms retry. If a link is not degraded, after the retry fails the network will re-plan a route assuming the intermediate destination node is no longer available.

The behavior is:

  1. Ask for transmit from SOURCE to DEST

  2. Set CURRENT to SOURCE

  3. Find the NEXT in the shortest path from CURRENT to DEST

  4. If there is no path, stop trying to send and end.

  5. Attempt to send to NEXT (this is the degraded comms/retry step)

  6. If it can send, do so. Set CURRENT = NEXT. If NEXT is DEST, Goto 8. Otherwise, Goto 3.

  7. If it can’t send, drop NEXT from the route options. Goto 3

  8. Place message in DEST and end.

Since this is time-based, a link can re-open during transmission. If the network has paths:

A -> B -> C A -> D -> E -> F -> G -> H -> C E -> B -> C

and we want to send from A to C, but B is blocked, a retry will have the network attempt to take the long way through DEFGHC. If B comes back online after the message gets to E, the routing will choose EBC instead.

If B does not come back online, the router will still try to go to B from E since that is shorter. If B is still down, it will take longer due to the retry. Set the input global_ignore to True to ignore a bad node for the entire routing and avoid this behavior.

Example:

class CommNode(Actor):
    messages = CommunicationStore(modes=None)

with EnvironmentContext() as env:
    nodes = {
        name: CommNode(name=name, messages={"modes":["cup-and-string"]})
        for name in "ABCDEFGH"
    }
    mgr = RoutingTableCommsManager(
        name="StaticManager",
        mode="cup-and-string",
        send_time=1/3600.,
        retry_max_time=20/3600.,
        retry_rate=4/3600.,
    )
    for u, v in ["AB", "BC", "AD", "DE", "EF", "FG", "GH", "HC", "EB"]:
        mgr.connect_nodes(nodes[u], nodes[v])
connect_nodes(u: Actor, v: Actor, two_way: bool = False) None#

Connect node u to v (one-way).

Make the connection two-way with the last argument.

Parameters:
  • u (Actor) – The source actor

  • v (Actor) – Destination actor

  • two_way (bool, optional) – If the connection is two way. Defaults to False.

select_hop(source: Actor, dest: Actor, ignore_nodes: list[Actor] | None = None) Actor | None#

Method for selecting the next hop to make.

This selects the shortest number of hops.

Parameters:
  • source (Actor) – Starting or current point

  • dest (Actor) – Destination

  • ignore_nodes (list[Actor], optional) – Nodes to exclude.

Returns:

The next place to go or None if no route.

Return type:

Actor | None

Module contents#

Module for communications processes and data objects.