cicada.communicator.socket module

Functionality for communicating using the builtin socket module.

exception cicada.communicator.socket.BrokenPipe[source]

Bases: Exception

Raised trying to send to another player that no longer exists.

exception cicada.communicator.socket.Failed(exception, traceback)[source]

Bases: Exception

Used to indicate that a player process raised an exception.

exception cicada.communicator.socket.NotRunning[source]

Bases: Exception

Raised calling an operation after the communicator has been freed.

exception cicada.communicator.socket.Revoked[source]

Bases: Exception

Raised calling an operation after the communicator has been revoked.

class cicada.communicator.socket.SocketCommunicator(*, sockets, name='world', timeout=5)[source]

Bases: Communicator

Cicada communicator that uses Python’s builtin socket module as the transport layer.

Note

Creating a communicator is a collective operation that must be called by all players that will be members.

Parameters:
  • sockets (dict of NetstringSocket, required) – Dictionary containing sockets that are connected to the other players and ready to use. The dictionary keys must be the ranks of the other players, and there must be one socket in the dictionary for every player except the caller (since players don’t need a socket to communicate with themselves). Note that the communicator world size is inferred from the size of the dictionary, and the communicator rank from whichever key doesn’t appear in the dictionary.

  • name (str, optional) – Human-readable name for this communicator, used for logging and debugging. Defaults to “world”

  • timeout (numbers.Number) – Maximum time to wait for communication to complete, in seconds.

allgather(value)[source]

All-to-all communication.

Note

This method is a collective operation that must be called by all players that are members of the communicator.

Parameters:

value (Any picklable object, required) – Local value that will be sent to all players.

Returns:

values – Collection of objects gathered from every player, in rank order.

Return type:

sequence of object

barrier()[source]

If the implementation returns without raising an exception, then every player entered the barrier. If an exception is raised then there are no guarantees about whether every player entered.

broadcast(*, src, value)[source]

One-to-all communication.

The src player broadcasts a single object to all players.

Note

This method is a collective operation that must be called by all players that are members of the communicator.

Parameters:
  • src (int, required) – Rank of the player who is broadcasting.

  • value (Any picklable object or None, required) – Value to be broadcast by src. Ignored for all other players.

Returns:

value – The broadcast value.

Return type:

object

static connect(*, world_size=None, rank=None, address=None, root_address=None, identity=None, trusted=None, name='world', timeout=5, startup_timeout=5)[source]

High level function to create a SocketCommunicator.

This is a high level convenience function that can be used to create a communicator, given just the calling player’s address and the address of the root player. By default, the parameters will be read from environment variables that can be set permanently by the user, or temporarily using the cicada command.

Parameters:
  • world_size (int, optional) – Number of players. Defaults to the value of the CICADA_WORLD_SIZE environment variable, which is automatically set by the cicada command.

  • rank (int, optional) – Rank of the caller. Defaults to the value of the CICADA_RANK environment variable, which is automatically set by the cicada command.

  • address (str, optional) – Listening address of the caller. This must be a URL of the form “tcp://{host}:{port}” for TCP sockets, or “file:///path/to/foo” for Unix domain sockets. Defaults to the value of the CICADA_ADDRESS environment variable, which is automatically set by the cicada command.

  • root_address (str, optional) – Listening address of the root (rank 0) player. This must be a URL of the form “tcp://{host}:{port}” for TCP sockets, or “file:///path/to/foo” for Unix domain sockets. Defaults to the value of the CICADA_ROOT_ADDRESS environment variable, which is automatically set by the cicada command.

  • identity (str, optional) – Path to a private key and certificate in PEM format. Defaults to the value of the CICADA_IDENTITY environment variable, which is automatically set by the cicada command.

  • trusted (sequence of str, optional) – Path to certificates in PEM format. Defaults to the value of the CICADA_TRUSTED environment variable, which is automatically set by the cicada command.

  • name (str, optional) – Human-readable name for the new communicator. Defaults to “world”.

  • timeout (numbers.Number) – Communication timeout for the new communicator, in seconds. Defaults to five.

  • startup_timeout (numbers.Number) – Maximum time to wait for communicator setup, in seconds. Defaults to five.

Raises:
  • ValueError – If there are problems with input parameters.

  • Timeout – If timeout seconds elapses before all connections are established.

  • TokenMismatch – If every player doesn’t provide the same token during startup.

Returns:

communicator – A fully-initialized communicator, ready for use.

Return type:

SocketCommunicator

free()[source]

Free the communicator.

This should be called if the communicator is no longer needed so that resources can be freed. Note that communicators cannot be reused after they have been freed, a new communicator must be created instead.

gather(*, value, dst)[source]

All-to-one communication.

Every player sends a value to dst.

Note

This method is a collective operation that must be called by all players that are members of the communicator.

Parameters:
  • value (Any picklable object, required) – Value to be sent to dst.

  • dst (int, required) – Rank of the player who will receive all of the values.

Returns:

values – For the destination player, a sequence of world_size objects received from every player in rank order. For all other players, None.

Return type:

sequence of object or None

gatherv(*, src, value, dst)[source]

Many-to-one communication.

A subset of players each sends a value to dst.

Note

This method is a collective operation that must be called by all players that are members of the communicator.

Parameters:
  • src (sequence of int, required) – Rank of each player sending a value.

  • value (Any picklable object, or None, required) – Value to be sent to dst.

  • dst (int, required) – Rank of the player who will receive all of the values.

Returns:

values – For the destination player, the sequence of values received from the src players in the same order as src. For all other players, None.

Return type:

sequence of object or None

irecv(*, src, tag)[source]

Non-blocking one-to-one communication.

One player (the sender) sends an object to one player (the destination).

Note

Unlike collective operations, this method is only called by the receiver. It must be matched by a call to send() by the sender.

See also

recv

Blocking one-to-one communication.

Parameters:
  • src (int, required) – Rank of the sending player.

  • tag (int or Tag, required) – User- or library-defined tag identifying the message type to match.

Returns:

result – A special result object that can be used to wait for and access the value sent by the sender. The result object will have a property is_completed which returns a boolean value indicating whether the result has been received; method wait, which will block indefinitely until the result is received; and property value which returns the received value or raises an exception if the value has not been received yet.

Return type:

object

isend(*, value, dst, tag)[source]

Non-blocking one-to-one communication.

One player (the sender) sends an object to one player (the destination).

Note

Unlike collective operations, this method is only called by the sender. It must be matched by a call to recv() by the destination.

See also

send

Blocking one-to-one communication.

Parameters:
  • value (Picklable object, required) – Value to be sent.

  • dst (int, required) – Rank of the destination player.

  • tag (int or Tag, required) – User- or library-defined tag identifying the message type.

Returns:

result – A special result object that can be used to wait until the message has been sent. The result object will have a property is_completed which returns a boolean value indicating whether the result has been sent; and a method wait which will block until the message is sent.

Return type:

object

property name

The name of this communicator, which can be used for logging / debugging.

Returns:

name

Return type:

str

override(*, timeout=None)[source]

Temporarily change communicator properties.

Use override() to temporarily modify communicator behavior in a with statement:

with communicator.override(timeout=10):
    # Do stuff with the new timeout here.
# The timeout will return to its previous value here.
Parameters:

timeout (numbers.Number, optional) – If specified, override the maximum time for communications to complete, in seconds.

Returns:

context – A context manager object that will restore the communicator state when exited.

Return type:

object

property rank

Rank of the local player.

Returns:

rank – Player rank, in the range \([0, \text{world_size})\).

Return type:

int

recv(*, src, tag)[source]

Blocking one-to-one communication.

One player (the sender) sends an object to one player (the destination).

Note

Unlike collective operations, this method is only called by the receiver. It must be matched by a call to send() by the sender.

See also

irecv

Non-blocking one-to-one communication.

Parameters:
  • src (int, required) – Rank of the sending player.

  • tag (int or Tag, required) – User- or library-defined tag identifying the message type to match.

Returns:

value – The value sent by the sender.

Return type:

object

revoke()[source]

Revoke the current communicator.

Revokes the communicator for this player, and any players able to receive messages. A revoked communicator cannot be used to perform any operation other than shrink(). Typically, revoke should be called by any player that detects a communication failure, to initiate a recovery phase.

static run(*, world_size, fn, identities=None, trusted=None, args=(), kwargs={}, family='tcp', name='world', timeout=5, startup_timeout=5, show_traceback=False)[source]

Run a function in parallel using sub-processes on the local host.

This method returns when the callback functions finish, returning a list of results from each, in rank order. Special sentinel classes are used to indicate whether a process raised an exception or terminated unexpectedly. This is extremely useful for running examples and regression tests on a single machine.

The given function fn must accept a communicator as its first argument. Additional caller-provided positional and keyword arguments are passed to the function following the communicator.

To perform computation using multiple hosts, you should use connect() and the cicada command line executable instead.

Parameters:
  • world_size (int, required) – The number of players that will run the function.

  • fn (callable(), required) – The function to execute in parallel.

  • identities (sequence of str, optional) – Path to files in PEM format each containing a private key and a certificate, one per player.

  • trusted (sequence of str, optional) – Path to files in PEM format containing certificates.

  • args (tuple, optional) – Positional arguments to pass to fn when it is executed.

  • kwargs (dict, optional) – Keyword arguments to pass to fn when it is executed.

  • family (str, optional) – Address family that matches the scheme used in address URLs elsewhere in the API. Allowed values are “tcp” and “file”.

  • name (str, optional) – Human-readable name for the communicator created by this function. Defaults to “world”.

  • timeout (numbers.Number, optional) – Maximum time to wait for normal communication to complete in seconds. Defaults to five seconds.

  • startup_timeout (numbers.Number, optional) – Maximum time allowed to setup the communicator in seconds. Defaults to five seconds.

  • show_traceback (bool, optional) – If True, a traceback will be printed for every player that fails.

Returns:

results – A value returned from fn for each player, in rank order. If a player process terminates unexpectedly, its value will be an instance of Terminated, which can be used to access the process exit code. If the player process raises a Python exception, its value will be an instance of Failed, which can be used to access the Python exception and a traceback for the failing code.

Return type:

list

static run_forever(*, world_size, fn, identities=None, trusted=None, args=(), kwargs={}, family='tcp', name='world', timeout=5, startup_timeout=5)[source]

Execute a long-running function in parallel using sub-processes on the local host.

This method returns immediately after networking has been setup and the callback function begins executing, returning a list of network addresses and a list of corresponding processes. This is particularly useful for running “MPC-as-a-service” applications on the local machine - the caller can use the addresses to communicate with the individual processes.

The given function must accept a listening socket and a communicator as its first two arguments. Additional caller-provided positional and keyword arguments are passed to the function after the socket and communicator.

To run a service with multiple hosts, you should use connect() and the cicada command line executable instead.

Parameters:
  • world_size (int, required) – The number of players that will run the function.

  • fn (callable(), required) – The function to execute in parallel.

  • identities (sequence of str, optional) – Path to files in PEM format each containing a private key and a certificate, one per player.

  • trusted (sequence of str, optional) – Path to files in PEM format containing certificates.

  • args (tuple, optional) – Positional arguments to pass to fn when it is executed.

  • kwargs (dict, optional) – Keyword arguments to pass to fn when it is executed.

  • family (str, optional) – Address family that matches the scheme used in address URLs elsewhere in the API. Allowed values are “tcp” and “file”.

  • name (str, optional) – Human-readable name for the communicator created by this function. Defaults to “world”.

  • timeout (numbers.Number, optional) – Maximum time to wait for normal communication to complete in seconds. Defaults to five seconds.

  • startup_timeout (numbers.Number, optional) – Maximum time allowed to setup the communicator in seconds. Defaults to five seconds.

Returns:

scatter(*, src, values)[source]

One-to-all communication.

One player (the sender) sends a different object to every player.

Note

This method is a collective operation that must be called by all players that are members of the communicator.

Parameters:
  • src (int, required) – Rank of the sending player.

  • values (sequence of picklable object, or None, required) – Collection of objects to be sent, one per player, in rank order.

Returns:

value – The object received by this player.

Return type:

object

scatterv(*, src, values, dst)[source]

One-to-many communication.

One player (the sender) sends a different object to each in a subset of players.

Note

This method is a collective operation that must be called by all players that are members of the communicator.

Parameters:
  • src (int, required) – Rank of the sending player.

  • values (sequence of picklable object, or None, required) – Collection of objects to be sent, one per recipient.

  • dst (sequence of int, required) – Rank of each player receiving an object, in the same order as values.

Returns:

value – The object received by this player, or None if this player wasn’t in the list of recipients.

Return type:

object or None

send(*, value, dst, tag)[source]

Blocking one-to-one communication.

One player (the sender) sends an object to one player (the destination).

Note

Unlike collective operations, this method is only called by the sender. It must be matched by a call to recv() by the destination.

See also

isend

Non-blocking one-to-one communication.

Parameters:
  • value (Picklable object, required) – Value to be sent.

  • dst (int, required) – Rank of the destination player.

  • tag (int or Tag, required) – User- or library-defined tag identifying the message type.

shrink(*, name, identity=None, trusted=None, shrink_timeout=5, startup_timeout=5, timeout=5)[source]

Create a new communicator containing surviving players.

This method should be called as part of a failure-recovery phase by as many players as possible (ideally, every player still running). It will attempt to rendezvous with the other players and return a new communicator, but the process could fail and raise an exception instead. In that case it is up to the application to decide how to proceed.

Parameters:
  • name (str, required) – New communicator name.

  • identity (str, optional) – Path to a private key and certificate in PEM format that will identify the current player.

  • trusted (sequence of str, optional) – Path to certificates in PEM format that will identify the other players in the new communicator.

  • shrink_timeout (numbers.Number, optional) – Maximum amount of time to spend identifying remaining members.

  • startup_timeout (numbers.Number, optional) – Maximum time to wait for communicator_setup, in seconds.

  • timeout (numbers.Number, optional) – Maximum time to wait for communication, in seconds.

Returns:

  • communicator (SocketCommunicator) – New communicator containing the remaining players.

  • oldranks (sequence of int) – Previous ranks of the remaining players, in rank order. Use this if you need to know the original rank of any member of communicator.

split(*, name, identity=None, trusted=None, timeout=5, startup_timeout=5)[source]

Return a new communicator with the given name.

If players specify different names - which can be any str - then a new communicator will be created for each unique name, with those players as members. If a player supplies a name of None, they will not be a part of any communicator, and this method will return None.

Note

This is a collective operation that must be called by every member of the communicator, even if they aren’t going to be a member of any of the resulting groups!

Parameters:
  • name (str or None, required) – Communicator name, or None.

  • identity (str, optional) – Path to a private key and certificate in PEM format that will identify the current player.

  • trusted (sequence of str, optional) – Path to certificates in PEM format that will identify the other players in the new communicator.

  • timeout (numbers.Number, optional) – Maximum time to wait for communication, in seconds.

  • startup_timeout (numbers.Number, optional) – Maximum time to wait for communicator setup, in seconds.

Returns:

communicator

Return type:

a new SocketCommunicator instance, or None

property stats

Nested dict containing communication statistics for logging / debugging.

property timeout

Amount of time allowed for communications to complete, in seconds.

Returns:

timeout – The timeout in seconds.

Return type:

numbers.Number.

property world_size

Number of players sharing this communicator.

Returns:

world_size – The number of players sharing this communicator.

Return type:

int

exception cicada.communicator.socket.Terminated(exitcode)[source]

Bases: Exception

Used to indicate that a player process terminated unexpectedly without output.

exception cicada.communicator.socket.TryAgain[source]

Bases: Exception

Raised when a non-blocking operation would block.