cicada.communicator.socket module
Functionality for communicating using the builtin socket
module.
- exception cicada.communicator.socket.BrokenPipe[source]
Bases:
Exception
Raised trying to send to another player that no longer exists.
- exception cicada.communicator.socket.Failed(exception, traceback)[source]
Bases:
Exception
Used to indicate that a player process raised an exception.
- exception cicada.communicator.socket.NotRunning[source]
Bases:
Exception
Raised calling an operation after the communicator has been freed.
- exception cicada.communicator.socket.Revoked[source]
Bases:
Exception
Raised calling an operation after the communicator has been revoked.
- class cicada.communicator.socket.SocketCommunicator(*, sockets, name='world', timeout=5)[source]
Bases:
Communicator
Cicada communicator that uses Python’s builtin
socket
module as the transport layer.Note
Creating a communicator is a collective operation that must be called by all players that will be members.
- Parameters:
sockets (
dict
ofNetstringSocket
, required) – Dictionary containing sockets that are connected to the other players and ready to use. The dictionary keys must be the ranks of the other players, and there must be one socket in the dictionary for every player except the caller (since players don’t need a socket to communicate with themselves). Note that the communicator world size is inferred from the size of the dictionary, and the communicator rank from whichever key doesn’t appear in the dictionary.name (
str
, optional) – Human-readable name for this communicator, used for logging and debugging. Defaults to “world”timeout (
numbers.Number
) – Maximum time to wait for communication to complete, in seconds.
- allgather(value)[source]
All-to-all communication.
Note
This method is a collective operation that must be called by all players that are members of the communicator.
- barrier()[source]
If the implementation returns without raising an exception, then every player entered the barrier. If an exception is raised then there are no guarantees about whether every player entered.
- broadcast(*, src, value)[source]
One-to-all communication.
The src player broadcasts a single object to all players.
Note
This method is a collective operation that must be called by all players that are members of the communicator.
- static connect(*, world_size=None, rank=None, address=None, root_address=None, identity=None, trusted=None, name='world', timeout=5, startup_timeout=5)[source]
High level function to create a SocketCommunicator.
This is a high level convenience function that can be used to create a communicator, given just the calling player’s address and the address of the root player. By default, the parameters will be read from environment variables that can be set permanently by the user, or temporarily using the cicada command.
- Parameters:
world_size (
int
, optional) – Number of players. Defaults to the value of the CICADA_WORLD_SIZE environment variable, which is automatically set by the cicada command.rank (
int
, optional) – Rank of the caller. Defaults to the value of the CICADA_RANK environment variable, which is automatically set by the cicada command.address (
str
, optional) – Listening address of the caller. This must be a URL of the form “tcp://{host}:{port}” for TCP sockets, or “file:///path/to/foo” for Unix domain sockets. Defaults to the value of the CICADA_ADDRESS environment variable, which is automatically set by the cicada command.root_address (
str
, optional) – Listening address of the root (rank 0) player. This must be a URL of the form “tcp://{host}:{port}” for TCP sockets, or “file:///path/to/foo” for Unix domain sockets. Defaults to the value of the CICADA_ROOT_ADDRESS environment variable, which is automatically set by the cicada command.identity (
str
, optional) – Path to a private key and certificate in PEM format. Defaults to the value of the CICADA_IDENTITY environment variable, which is automatically set by the cicada command.trusted (sequence of
str
, optional) – Path to certificates in PEM format. Defaults to the value of the CICADA_TRUSTED environment variable, which is automatically set by the cicada command.name (
str
, optional) – Human-readable name for the new communicator. Defaults to “world”.timeout (
numbers.Number
) – Communication timeout for the new communicator, in seconds. Defaults to five.startup_timeout (
numbers.Number
) – Maximum time to wait for communicator setup, in seconds. Defaults to five.
- Raises:
ValueError – If there are problems with input parameters.
Timeout – If timeout seconds elapses before all connections are established.
TokenMismatch – If every player doesn’t provide the same token during startup.
- Returns:
communicator – A fully-initialized communicator, ready for use.
- Return type:
- free()[source]
Free the communicator.
This should be called if the communicator is no longer needed so that resources can be freed. Note that communicators cannot be reused after they have been freed, a new communicator must be created instead.
- gather(*, value, dst)[source]
All-to-one communication.
Every player sends a value to dst.
Note
This method is a collective operation that must be called by all players that are members of the communicator.
- Parameters:
- Returns:
values – For the destination player, a sequence of world_size objects received from every player in rank order. For all other players, None.
- Return type:
sequence of
object
or None
- gatherv(*, src, value, dst)[source]
Many-to-one communication.
A subset of players each sends a value to dst.
Note
This method is a collective operation that must be called by all players that are members of the communicator.
- irecv(*, src, tag)[source]
Non-blocking one-to-one communication.
One player (the sender) sends an object to one player (the destination).
Note
Unlike collective operations, this method is only called by the receiver. It must be matched by a call to
send()
by the sender.See also
recv
Blocking one-to-one communication.
- Parameters:
- Returns:
result – A special result object that can be used to wait for and access the value sent by the sender. The result object will have a property is_completed which returns a boolean value indicating whether the result has been received; method wait, which will block indefinitely until the result is received; and property value which returns the received value or raises an exception if the value has not been received yet.
- Return type:
- isend(*, value, dst, tag)[source]
Non-blocking one-to-one communication.
One player (the sender) sends an object to one player (the destination).
Note
Unlike collective operations, this method is only called by the sender. It must be matched by a call to
recv()
by the destination.See also
send
Blocking one-to-one communication.
- Parameters:
- Returns:
result – A special result object that can be used to wait until the message has been sent. The result object will have a property is_completed which returns a boolean value indicating whether the result has been sent; and a method wait which will block until the message is sent.
- Return type:
- property name
The name of this communicator, which can be used for logging / debugging.
- Returns:
name
- Return type:
- override(*, timeout=None)[source]
Temporarily change communicator properties.
Use
override()
to temporarily modify communicator behavior in a with statement:with communicator.override(timeout=10): # Do stuff with the new timeout here. # The timeout will return to its previous value here.
- Parameters:
timeout (
numbers.Number
, optional) – If specified, override the maximum time for communications to complete, in seconds.- Returns:
context – A context manager object that will restore the communicator state when exited.
- Return type:
- property rank
Rank of the local player.
- Returns:
rank – Player rank, in the range \([0, \text{world_size})\).
- Return type:
- recv(*, src, tag)[source]
Blocking one-to-one communication.
One player (the sender) sends an object to one player (the destination).
Note
Unlike collective operations, this method is only called by the receiver. It must be matched by a call to
send()
by the sender.See also
irecv
Non-blocking one-to-one communication.
- revoke()[source]
Revoke the current communicator.
Revokes the communicator for this player, and any players able to receive messages. A revoked communicator cannot be used to perform any operation other than
shrink()
. Typically, revoke should be called by any player that detects a communication failure, to initiate a recovery phase.
- static run(*, world_size, fn, identities=None, trusted=None, args=(), kwargs={}, family='tcp', name='world', timeout=5, startup_timeout=5, show_traceback=False)[source]
Run a function in parallel using sub-processes on the local host.
This method returns when the callback functions finish, returning a
list
of results from each, in rank order. Special sentinel classes are used to indicate whether a process raised an exception or terminated unexpectedly. This is extremely useful for running examples and regression tests on a single machine.The given function fn must accept a communicator as its first argument. Additional caller-provided positional and keyword arguments are passed to the function following the communicator.
To perform computation using multiple hosts, you should use
connect()
and the cicada command line executable instead.- Parameters:
world_size (
int
, required) – The number of players that will run the function.fn (
callable()
, required) – The function to execute in parallel.identities (sequence of
str
, optional) – Path to files in PEM format each containing a private key and a certificate, one per player.trusted (sequence of
str
, optional) – Path to files in PEM format containing certificates.args (
tuple
, optional) – Positional arguments to pass to fn when it is executed.kwargs (
dict
, optional) – Keyword arguments to pass to fn when it is executed.family (
str
, optional) – Address family that matches the scheme used in address URLs elsewhere in the API. Allowed values are “tcp” and “file”.name (
str
, optional) – Human-readable name for the communicator created by this function. Defaults to “world”.timeout (
numbers.Number
, optional) – Maximum time to wait for normal communication to complete in seconds. Defaults to five seconds.startup_timeout (
numbers.Number
, optional) – Maximum time allowed to setup the communicator in seconds. Defaults to five seconds.show_traceback (
bool
, optional) – IfTrue
, a traceback will be printed for every player that fails.
- Returns:
results – A value returned from fn for each player, in rank order. If a player process terminates unexpectedly, its value will be an instance of
Terminated
, which can be used to access the process exit code. If the player process raises a Python exception, its value will be an instance ofFailed
, which can be used to access the Python exception and a traceback for the failing code.- Return type:
- static run_forever(*, world_size, fn, identities=None, trusted=None, args=(), kwargs={}, family='tcp', name='world', timeout=5, startup_timeout=5)[source]
Execute a long-running function in parallel using sub-processes on the local host.
This method returns immediately after networking has been setup and the callback function begins executing, returning a
list
of network addresses and alist
of corresponding processes. This is particularly useful for running “MPC-as-a-service” applications on the local machine - the caller can use the addresses to communicate with the individual processes.The given function must accept a listening socket and a communicator as its first two arguments. Additional caller-provided positional and keyword arguments are passed to the function after the socket and communicator.
To run a service with multiple hosts, you should use
connect()
and the cicada command line executable instead.- Parameters:
world_size (
int
, required) – The number of players that will run the function.fn (
callable()
, required) – The function to execute in parallel.identities (sequence of
str
, optional) – Path to files in PEM format each containing a private key and a certificate, one per player.trusted (sequence of
str
, optional) – Path to files in PEM format containing certificates.args (
tuple
, optional) – Positional arguments to pass to fn when it is executed.kwargs (
dict
, optional) – Keyword arguments to pass to fn when it is executed.family (
str
, optional) – Address family that matches the scheme used in address URLs elsewhere in the API. Allowed values are “tcp” and “file”.name (
str
, optional) – Human-readable name for the communicator created by this function. Defaults to “world”.timeout (
numbers.Number
, optional) – Maximum time to wait for normal communication to complete in seconds. Defaults to five seconds.startup_timeout (
numbers.Number
, optional) – Maximum time allowed to setup the communicator in seconds. Defaults to five seconds.
- Returns:
addresses (
list
ofstr
) – A listening address for each player, in rank order.processes (
list
ofmultiprocessing.Process
) – An instance ofmultiprocessing.Process
for each player, in rank order.
- scatter(*, src, values)[source]
One-to-all communication.
One player (the sender) sends a different object to every player.
Note
This method is a collective operation that must be called by all players that are members of the communicator.
- scatterv(*, src, values, dst)[source]
One-to-many communication.
One player (the sender) sends a different object to each in a subset of players.
Note
This method is a collective operation that must be called by all players that are members of the communicator.
- Parameters:
- Returns:
value – The object received by this player, or None if this player wasn’t in the list of recipients.
- Return type:
object
or None
- send(*, value, dst, tag)[source]
Blocking one-to-one communication.
One player (the sender) sends an object to one player (the destination).
Note
Unlike collective operations, this method is only called by the sender. It must be matched by a call to
recv()
by the destination.See also
isend
Non-blocking one-to-one communication.
- shrink(*, name, identity=None, trusted=None, shrink_timeout=5, startup_timeout=5, timeout=5)[source]
Create a new communicator containing surviving players.
This method should be called as part of a failure-recovery phase by as many players as possible (ideally, every player still running). It will attempt to rendezvous with the other players and return a new communicator, but the process could fail and raise an exception instead. In that case it is up to the application to decide how to proceed.
- Parameters:
name (
str
, required) – New communicator name.identity (
str
, optional) – Path to a private key and certificate in PEM format that will identify the current player.trusted (sequence of
str
, optional) – Path to certificates in PEM format that will identify the other players in the new communicator.shrink_timeout (
numbers.Number
, optional) – Maximum amount of time to spend identifying remaining members.startup_timeout (
numbers.Number
, optional) – Maximum time to wait for communicator_setup, in seconds.timeout (
numbers.Number
, optional) – Maximum time to wait for communication, in seconds.
- Returns:
communicator (
SocketCommunicator
) – New communicator containing the remaining players.oldranks (sequence of
int
) – Previous ranks of the remaining players, in rank order. Use this if you need to know the original rank of any member of communicator.
- split(*, name, identity=None, trusted=None, timeout=5, startup_timeout=5)[source]
Return a new communicator with the given name.
If players specify different names - which can be any
str
- then a new communicator will be created for each unique name, with those players as members. If a player supplies a name of None, they will not be a part of any communicator, and this method will return None.Note
This is a collective operation that must be called by every member of the communicator, even if they aren’t going to be a member of any of the resulting groups!
- Parameters:
identity (
str
, optional) – Path to a private key and certificate in PEM format that will identify the current player.trusted (sequence of
str
, optional) – Path to certificates in PEM format that will identify the other players in the new communicator.timeout (
numbers.Number
, optional) – Maximum time to wait for communication, in seconds.startup_timeout (
numbers.Number
, optional) – Maximum time to wait for communicator setup, in seconds.
- Returns:
communicator
- Return type:
a new
SocketCommunicator
instance, or None
- property stats
Nested dict containing communication statistics for logging / debugging.
- property timeout
Amount of time allowed for communications to complete, in seconds.
- Returns:
timeout – The timeout in seconds.
- Return type: