Timeouts

Timeouts play an important role in Cicada networking, because they’re the primary mechanism for detecting failures among players. Consider the following example of a player waiting for a message that never arrives:

[1]:
import logging

from cicada.communicator import SocketCommunicator
from cicada.logging import Logger

logging.basicConfig(level=logging.INFO)

def main(communicator):
    log = Logger(logging.getLogger(), communicator)

    log.info(f"Player {communicator.rank} timeout: {communicator.timeout}")
    if communicator.rank == 0:
        try:
            communicator.recv(src=1, tag=42)
        except Exception as e:
            logging.error(f"Player {communicator.rank} exception: {e}")

SocketCommunicator.run(world_size=2, fn=main);
INFO:root:Player 0 timeout: 5
INFO:root:Player 1 timeout: 5
ERROR:root:Player 0 exception: Tag 42 from player 1 timed-out after 5s

Each player prints their default timeout, which is five seconds, then player 0 waits for a message from player 1, who has already exited. After five seconds, recv() raises an exception that reports the timeout.

If you need a different default timeout, you can specify it at the point where your communicator is created (run() in this case):

[2]:
def main(communicator):
    log = Logger(logging.getLogger(), communicator)

    log.info(f"Player {communicator.rank} timeout: {communicator.timeout}")
    if communicator.rank == 0:
        try:
            communicator.recv(src=1, tag=42)
        except Exception as e:
            logging.error(f"Player {communicator.rank} exception: {e}")

SocketCommunicator.run(world_size=2, fn=main, timeout=10);
INFO:root:Player 0 timeout: 10
INFO:root:Player 1 timeout: 10
ERROR:root:Player 0 exception: Tag 42 from player 1 timed-out after 10s

… here, we set the timeout to ten seconds.

You can also change the timeout at any time after the communicator has been created:

[3]:
def main(communicator):
    log = Logger(logging.getLogger(), communicator)

    log.info(f"Player {communicator.rank} original timeout: {communicator.timeout}")
    communicator.timeout = 8
    log.info(f"Player {communicator.rank} new timeout: {communicator.timeout}")

    if communicator.rank == 0:
        try:
            communicator.recv(src=1, tag=42)
        except Exception as e:
            logging.error(f"Player {communicator.rank} exception: {e}")

SocketCommunicator.run(world_size=2, fn=main);
INFO:root:Player 0 original timeout: 5
INFO:root:Player 1 original timeout: 5
INFO:root:Player 0 new timeout: 8
INFO:root:Player 1 new timeout: 8
ERROR:root:Player 0 exception: Tag 42 from player 1 timed-out after 8s

You can also change timeouts temporarily using a context manager:

[4]:
def main(communicator):
    log = Logger(logging.getLogger(), communicator)

    log.info(f"Player {communicator.rank} original timeout: {communicator.timeout}")
    with communicator.override(timeout=8):
        log.info(f"Player {communicator.rank} new timeout: {communicator.timeout}")

        if communicator.rank == 0:
            try:
                communicator.recv(src=1, tag=42)
            except Exception as e:
                logging.error(f"Player {communicator.rank} exception: {e}")
    logging.info(f"Player {communicator.rank} restored timeout: {communicator.timeout}")

SocketCommunicator.run(world_size=2, fn=main);
INFO:root:Player 0 original timeout: 5
INFO:root:Player 1 original timeout: 5
INFO:root:Player 0 new timeout: 8
INFO:root:Player 1 new timeout: 8
INFO:root:Player 1 restored timeout: 5
ERROR:root:Player 0 exception: Tag 42 from player 1 timed-out after 8s
INFO:root:Player 0 restored timeout: 5