upstage_des.communications package#
Submodules#
upstage_des.communications.comms module#
Comms message and commander classes.
- class CommsManagerBase(*, name: str, mode: str | None = None, init_entities: list[tuple[Actor, str]] | None = None, send_time: float = 0.0, retry_max_time: float = 1.0, retry_rate: float = 0.166667, debug_logging: bool = False)#
Bases:
UpstageBase
A class to manage point to point transfer of communications.
Works through simpy.Store or similar interfaces. Allows for degraded comms and comms retry.
If an Actor contains a CommunicationStore, this object will detect that and use it as a destination. In that case, you also do not need to connect the actor to this object.
Example
>>> class Talker(UP.Actor): >>> comms = UP.ResourceState[SIM.Store](default=SIM.Store) >>> >>> talker1 = Talker(name='MacReady') >>> talker2 = Talker(name='Childs') >>> >>> comm_station = UP.CommsManager(name="Outpost 31", mode="voice") >>> comm_station.connect(talker1, talker1.comms) >>> comm_station.connect(talker2, talker2.comms) >>> >>> comm_station.run() >>> >>> # Typically, do this inside a task or somewhere else >>> putter = comm_station.make_put( >>> message="Grab your flamethrower!", >>> source=talker1, >>> destination=talker2, >>> rehearsal_time_to_complete=0.0, >>> ) >>> yield putter ... >>> env.run() >>> talker2.comms.items [Message(sender=Talker: MacReady, message='Grab your flamethrower!', destination=Talker: Childs)]
- static clean_message(message: str | Message) MessageContent #
Test to see if an object is a message.
If it is, return the message contents only. Otherwise return the message.
- Parameters:
message (str | Message) – The message to clean
- Returns:
The message as a message content object.
- Return type:
- connect(entity: Actor, comms_store_name: str) None #
Connect an actor and its comms store to this comms manager.
- Parameters:
entity (Actor) – The actor that will send/receive.
comms_store_name (str) – The store state name for receiving
- make_put(message: str | Message | MessageContent | dict, source: Actor, destination: Actor, rehearsal_time_to_complete: float = 0.0) Put #
Create a Put request for a message into the CommsManager.
- Parameters:
source – The message sender
destination – The message receiver, who must be connected to the CommsManager
message – Arbitrary data to send
rehearsal_time_to_complete (float, optional) – Planning time to complete the event (see Put), by default 0.0
Returns
-------
Put – UPSTAGE Put event object to yield from a task
- run() Generator[Event, Any, None] #
Run the communications message passing.
- Yields:
Generator[SimpyEvent, Any, None] – Simpy Process
- class Message(sender: Actor, content: MessageContent, destination: Actor, header: str | None = None, time_sent: float | None = None, time_received: float | None = None, mode: str | None = None)#
Bases:
object
A message data object.
- content: MessageContent#
- header: str | None = None#
- mode: str | None = None#
- time_received: float | None = None#
- time_sent: float | None = None#
- class MessageContent(data: dict, message: str | None = None)#
Bases:
object
Message content data object.
- data: dict#
- message: str | None = None#
- class PointToPointCommsManager(*, name: str, mode: str | None = None, init_entities: list[tuple[Actor, str]] | None = None, send_time: float = 0.0, retry_max_time: float = 1.0, retry_rate: float = 0.166667, debug_logging: bool = False)#
Bases:
CommsManagerBase
A class to manage point to point transfer of communications.
Works through simpy.Store or similar interfaces. Allows for degraded comms and comms retry.
If an Actor contains a CommunicationStore, this object will detect that and use it as a destination. In that case, you also do not need to connect the actor to this object.
Example
>>> class Talker(UP.Actor): >>> comms = UP.ResourceState[SIM.Store](default=SIM.Store) >>> >>> talker1 = Talker(name='MacReady') >>> talker2 = Talker(name='Childs') >>> >>> comm_station = UP.CommsManager(name="Outpost 31", mode="voice") >>> comm_station.connect(talker1, talker1.comms) >>> comm_station.connect(talker2, talker2.comms) >>> >>> comm_station.run() >>> >>> # Typically, do this inside a task or somewhere else >>> putter = comm_station.make_put( >>> message="Grab your flamethrower!", >>> source=talker1, >>> destination=talker2, >>> rehearsal_time_to_complete=0.0, >>> ) >>> yield putter ... >>> env.run() >>> talker2.comms.items [Message(sender=Talker: MacReady, message='Grab your flamethrower!', destination=Talker: Childs)]
upstage_des.communications.processes module#
Communications process helper.
- generate_comms_wait(incoming_store: Store, callback: Callable[[MessageContent], Any]) Callable[[], Process] #
Create a process function to transfer communications to a callback.
This hides cleanup and other stability functions from the user.
- Parameters:
incoming_store (A simpy or upstage store) – The store that is linked to a CommsManager instance.
callback (function) – The function to call with a received message
Returns
-------
function – An UPSTAGE process function that passes messages
upstage_des.communications.routing module#
Comms routing from a routing table lookup.
- class RoutingCommsManagerBase(*, name: str, mode: str | None = None, send_time: float = 0.0, retry_max_time: float = 1.0, retry_rate: float = 0.166667, global_ignore: bool = False, debug_logging: bool = False)#
Bases:
CommsManagerBase
A comms manager that routes messages according to a network.
- class RoutingTableCommsManager(*, name: str, mode: str | None = None, send_time: float = 0.0, retry_max_time: float = 1.0, retry_rate: float = 0.166667, global_ignore: bool = False, debug_logging: bool = False)#
Bases:
RoutingCommsManagerBase
Route comms according to a pre-defined network.
Nodes (Actors) must be explicitly connected, and this manager will route through shortest number of hops.
Allows for degraded comms and comms retry. If a link is not degraded, after the retry fails the network will re-plan a route assuming the intermediate destination node is no longer available.
The behavior is:
Ask for transmit from SOURCE to DEST
Set CURRENT to SOURCE
Find the NEXT in the shortest path from CURRENT to DEST
If there is no path, stop trying to send and end.
Attempt to send to NEXT (this is the degraded comms/retry step)
If it can send, do so. Set CURRENT = NEXT. If NEXT is DEST, Goto 8. Otherwise, Goto 3.
If it can’t send, drop NEXT from the route options. Goto 3
Place message in DEST and end.
Since this is time-based, a link can re-open during transmission. If the network has paths:
A -> B -> C A -> D -> E -> F -> G -> H -> C E -> B -> C
and we want to send from A to C, but B is blocked, a retry will have the network attempt to take the long way through DEFGHC. If B comes back online after the message gets to E, the routing will choose EBC instead.
If B does not come back online, the router will still try to go to B from E since that is shorter. If B is still down, it will take longer due to the retry. Set the input
global_ignore
toTrue
to ignore a bad node for the entire routing and avoid this behavior.Example:
class CommNode(Actor): messages = CommunicationStore(modes=None) with EnvironmentContext() as env: nodes = { name: CommNode(name=name, messages={"modes":["cup-and-string"]}) for name in "ABCDEFGH" } mgr = RoutingTableCommsManager( name="StaticManager", mode="cup-and-string", send_time=1/3600., retry_max_time=20/3600., retry_rate=4/3600., ) for u, v in ["AB", "BC", "AD", "DE", "EF", "FG", "GH", "HC", "EB"]: mgr.connect_nodes(nodes[u], nodes[v])
- connect_nodes(u: Actor, v: Actor, two_way: bool = False) None #
Connect node u to v (one-way).
Make the connection two-way with the last argument.
Module contents#
Module for communications processes and data objects.