Timeouts
Timeouts play an important role in Cicada networking, because they’re the primary mechanism for detecting failures among players. Consider the following example of a player waiting for a message that never arrives:
[1]:
import logging
from cicada.communicator import SocketCommunicator
from cicada.logging import Logger
logging.basicConfig(level=logging.INFO)
def main(communicator):
log = Logger(logging.getLogger(), communicator)
log.info(f"Player {communicator.rank} timeout: {communicator.timeout}")
if communicator.rank == 0:
try:
communicator.recv(src=1, tag=42)
except Exception as e:
logging.error(f"Player {communicator.rank} exception: {e}")
SocketCommunicator.run(world_size=2, fn=main);
INFO:root:Player 0 timeout: 5
INFO:root:Player 1 timeout: 5
ERROR:root:Player 0 exception: Tag 42 from player 1 timed-out after 5s
Each player prints their default timeout, which is five seconds, then player 0 waits for a message from player 1, who has already exited. After five seconds, recv()
raises an exception that reports the timeout.
If you need a different default timeout, you can specify it at the point where your communicator is created (run()
in this case):
[2]:
def main(communicator):
log = Logger(logging.getLogger(), communicator)
log.info(f"Player {communicator.rank} timeout: {communicator.timeout}")
if communicator.rank == 0:
try:
communicator.recv(src=1, tag=42)
except Exception as e:
logging.error(f"Player {communicator.rank} exception: {e}")
SocketCommunicator.run(world_size=2, fn=main, timeout=10);
INFO:root:Player 0 timeout: 10
INFO:root:Player 1 timeout: 10
ERROR:root:Player 0 exception: Tag 42 from player 1 timed-out after 10s
… here, we set the timeout to ten seconds.
You can also change the timeout at any time after the communicator has been created:
[3]:
def main(communicator):
log = Logger(logging.getLogger(), communicator)
log.info(f"Player {communicator.rank} original timeout: {communicator.timeout}")
communicator.timeout = 8
log.info(f"Player {communicator.rank} new timeout: {communicator.timeout}")
if communicator.rank == 0:
try:
communicator.recv(src=1, tag=42)
except Exception as e:
logging.error(f"Player {communicator.rank} exception: {e}")
SocketCommunicator.run(world_size=2, fn=main);
INFO:root:Player 0 original timeout: 5
INFO:root:Player 1 original timeout: 5
INFO:root:Player 0 new timeout: 8
INFO:root:Player 1 new timeout: 8
ERROR:root:Player 0 exception: Tag 42 from player 1 timed-out after 8s
You can also change timeouts temporarily using a context manager:
[4]:
def main(communicator):
log = Logger(logging.getLogger(), communicator)
log.info(f"Player {communicator.rank} original timeout: {communicator.timeout}")
with communicator.override(timeout=8):
log.info(f"Player {communicator.rank} new timeout: {communicator.timeout}")
if communicator.rank == 0:
try:
communicator.recv(src=1, tag=42)
except Exception as e:
logging.error(f"Player {communicator.rank} exception: {e}")
logging.info(f"Player {communicator.rank} restored timeout: {communicator.timeout}")
SocketCommunicator.run(world_size=2, fn=main);
INFO:root:Player 0 original timeout: 5
INFO:root:Player 1 original timeout: 5
INFO:root:Player 0 new timeout: 8
INFO:root:Player 1 new timeout: 8
INFO:root:Player 1 restored timeout: 5
ERROR:root:Player 0 exception: Tag 42 from player 1 timed-out after 8s
INFO:root:Player 0 restored timeout: 5