# Copyright 2021 National Technology & Engineering Solutions
# of Sandia, LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS,
# the U.S. Government retains certain rights in this software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functionality for communicating using the builtin :mod:`socket` module.
"""
import json
import logging
import numbers
import os
import pickle
import pprint
import select
import socket
import ssl
import time
import urllib.parse
import pynetstring
[docs]
class CommunicatorEvents(object):
def __init__(self, name, rank):
self.name = name
self.rank = rank
[docs]
def filter(self, record):
record.comm = self.name
record.rank = self.rank
return True
[docs]
class EncryptionFailed(Exception):
"""Raised if an encrypted connection can't be established with another player."""
pass
[docs]
class LoggerAdapter(logging.LoggerAdapter):
"""Wraps a Python logger for consistent formatting of communicator log entries.
logger: :class:`logging.Logger`, required
Python logger to wrap.
name: :class:`str`, required
Communicator name.
rank: :class:`int`, required
Communicator rank
"""
def __init__(self, logger, name, rank):
super().__init__(logger, extra={"name": name, "rank": rank})
[docs]
def process(self, msg, kwargs):
return message(self.extra["name"], self.extra["rank"], msg), kwargs
[docs]
class NetstringSocket(object):
"""Message-oriented socket that uses the Netstrings protocol."""
def __init__(self, sock):
self._socket = sock
self._decoder = pynetstring.Decoder()
self._messages = []
self._sent_bytes = 0
self._sent_messages = 0
self._received_bytes = 0
self._received_messages = 0
[docs]
def close(self):
"""Close the underlying socket."""
self._socket.close()
[docs]
def feed(self):
"""Read data from the underlying socket, decoding whatever is available."""
raw = self._socket.recv(4096)
messages = self._decoder.feed(raw)
self._received_bytes += len(raw)
self._received_messages += len(messages)
self._messages += messages
@property
def family(self):
"""Return the underlying socket's address family.
See :attr:`socket.socket.family`.
"""
return self._socket.family
[docs]
def fileno(self):
"""Return the file descriptor for the underlying socket.
This allows :class:`NetstringSocket` to be used with :func:`select.select`.
"""
return self._socket.fileno()
[docs]
def getsockname(self):
"""Return the underlying socket's address.
See :meth:`socket.socket.getsockname`.
"""
return self._socket.getsockname()
[docs]
def messages(self):
"""Return every message that has been received, if any."""
result = self._messages
self._messages = []
return result
[docs]
def next_message(self, *, timeout):
"""Return the next available message, if any."""
if not self._messages:
ready, _, _ = select.select([self], [], [], timeout)
if ready:
self.feed()
return self._messages.pop(0) if self._messages else None
[docs]
def send(self, msg):
"""Send a message."""
raw = pynetstring.encode(msg)
self._sent_bytes += len(raw)
self._sent_messages += 1
while raw:
_, ready, _ = select.select([], [self], [])
if ready:
sent = self._socket.send(raw)
raw = raw[sent:]
@property
def stats(self):
"""Return a :class:`dict` containing statistics.
Returns the number of bytes and messages that have been sent and received.
"""
return {
"sent": {"bytes": self._sent_bytes, "messages": self._sent_messages},
"received": {"bytes": self._received_bytes, "messages": self._received_messages},
}
[docs]
class Timeout(Exception):
"""Raised when an operation has timed-out."""
pass
[docs]
class Timer(object):
"""Tracks elapsed time.
Parameters
----------
threshold: :class:`numbers.Number`, required
The maximum of number of elapsed seconds before this timer will be expired.
"""
def __init__(self, threshold):
self._start = time.time()
self._threshold = threshold
@property
def elapsed(self):
"""Return the number of seconds since the :class:`Timer` was created."""
return time.time() - self._start
@property
def expired(self):
"""Returns :any:`True` if the elapsed time has exceeded a threshold."""
if self._threshold and (self.elapsed > self._threshold):
return True
return False
[docs]
class TokenMismatch(Exception):
"""Raised when players can't agree on a token for communicator creation."""
pass
[docs]
def connect(*, address, rank, other_rank, name="world", tls=None):
"""Given an address, create a socket and make a connection.
Parameters
----------
address: :class:`urllib.parse.ParseResult`, required
The address URL.
rank: :class:`int`, required
Rank of the calling player.
other_rank: :class:`int`, required
Rank of the other player (the one we're connecting to).
name: :class:`str`, optional
Human readable label used for logging and error messages. Typically,
this should be the same name that will be eventually used by a
communicator instance. Defaults to "world".
tls: pair of :class:`ssl.SSLContext`, optional
If provided, the returned sockets will implement transport layer
security. Callers must provide a sequence containing one context
configured for server connections, and one for configured for client
connections, in that order.
Returns
-------
socket: :class:`NetstringSocket`
The newly-connected socket, ready for use.
"""
log = getLogger(__name__, name, rank)
if address.scheme == "file":
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(address.path)
elif address.scheme == "tcp":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((address.hostname, address.port))
else:
raise ValueError(f"address.scheme must be file or tcp, got {address.scheme} instead.") # pragma: no cover
sock.setblocking(True)
# Optionally setup TLS.
if tls is not None:
sock = tls[1].wrap_socket(sock, server_side=False)
log.info(f"{geturl(sock)} connected to player {other_rank} {getpeerurl(sock)}, received certificate:\n{certformat(sock.getpeercert(), indent=' ')}")
else:
log.info(f"{geturl(sock)} connected to player {other_rank} {getpeerurl(sock)}")
return NetstringSocket(sock)
[docs]
def direct(*, listen_socket, addresses, rank, name="world", timer=None, tls=None):
"""Create socket connections given a list of player addresses.
Parameters
----------
listen_socket: :class:`socket.socket`, required
A nonblocking Python socket or compatible object, already bound and
listening for connections. Typically created with :func:`listen`.
addresses: :class:`list` of :class:`str`, required
List of address URLs for every player in rank order.
rank: :class:`int`, required
Rank of the calling player.
name: :class:`str`, optional
Human readable label used for logging and error messages. Typically,
this should be the same name assigned to the communicator that will use
the :func:`direct` outputs. Defaults to "world".
timer: :class:`Timer`, optional
Determines the maximum time to wait for player connections. Defaults
to five seconds.
tls: pair of :class:`ssl.SSLContext`, optional
If provided, the returned sockets will implement transport layer
security. Callers must provide a sequence containing one context
configured for server connections, and one for configured for client
connections, in that order.
Raises
------
:class:`EncryptionFailed`
If there are problems establishing an encrypted connection with another
player.
:class:`Timeout`
If `timer` expires before all connections are established.
:class:`ValueError`
If there are problems with input parameters.
Returns
-------
sockets: :class:`dict` of :class:`NetstringSocket`
Dictionary mapping player ranks to connected sockets.
"""
for address in addresses:
if not isinstance(address, str):
raise ValueError(message(name, rank, f"address must be a string, got {address} instead.")) # pragma: no cover
addresses = [urllib.parse.urlparse(address) for address in addresses]
for address in addresses:
if address.scheme not in ["file", "tcp"]:
raise ValueError(message(name, rank, f"address scheme must be file or tcp, got {address.scheme} instead.")) # pragma: no cover
if address.scheme == "tcp" and not address.port:
raise ValueError(message(name, rank, f"port must be specified for tcp addresses, got {address} instead.")) # pragma: no cover
if address.scheme != addresses[0].scheme:
raise ValueError(message(name, rank, f"address schemes must match.")) # pragma: no cover
world_size = len(addresses)
if not isinstance(rank, int):
raise ValueError(message(name, rank, f"rank must be an integer, got {rank} instead.")) # pragma: no cover
if rank < 0 or rank >= world_size:
raise ValueError(message(name, rank, f"rank must be in the range [0, {world_size}), got {rank} instead.")) # pragma: no cover
if not isinstance(name, str):
raise ValueError(message(name, rank, f"name must be a string, got {name} instead.")) # pragma: no cover
if timer is None:
timer = Timer(threshold=5)
if not isinstance(timer, Timer):
raise ValueError(message(name, rank, f"timer must be an instance of Timer, got {timer} instead.")) # pragma: no cover
# Setup logging
log = getLogger(__name__, name, rank)
log.info(f"direct connect with {[address.geturl() for address in addresses]}.")
# Set aside storage for connections to the other players.
players = {}
# Players setup connections with each other, in rank order.
for listener in range(0, world_size-1):
if rank == listener:
# Accept connections from higher-rank players.
other_players = []
while not timer.expired:
if len(other_players) == world_size - rank - 1: # We don't connect to ourself.
break
try:
ready = select.select([listen_socket], [], [], 0.1)
if ready:
sock, _ = listen_socket.accept()
if tls is not None:
sock = tls[0].wrap_socket(sock, server_side=True)
log.info(f"{geturl(sock)} accepted connection from {getpeerurl(sock)}, received certificate:\n{certformat(sock.getpeercert(), indent=' ')}")
else:
log.info(f"{geturl(sock)} accepted connection from {getpeerurl(sock)}")
sock = NetstringSocket(sock)
other_players.append(sock)
except ssl.SSLError as e:
# There was a problem setting up an encrypted connection with
# the other player, so there's no point in continuing.
raise EncryptionFailed(message(name, rank, f"remote player failed to connect: {e}"))
except Exception as e: # pragma: no cover
log.warning(f"exception listening for other players: {e}")
time.sleep(0.1)
# Collect ranks from the other players.
for other_player in other_players:
while not timer.expired:
try:
raw_message = other_player.next_message(timeout=0.1)
if raw_message is not None:
other_rank = pickle.loads(raw_message)
players[other_rank] = other_player
log.debug(f"received rank from player {other_rank}.")
other_player.send(pickle.dumps("OK"))
break
except Exception as e: # pragma: no cover
log.warning(f"exception receiving player rank: {e}")
time.sleep(0.1)
else: # pragma: no cover
raise Timeout(message(name, rank, "timeout waiting for player rank."))
elif rank > listener:
# Make a connection to the listener.
while not timer.expired:
try:
players[listener] = connect(address=addresses[listener], rank=rank, other_rank=listener, name=name, tls=tls)
break
except ssl.SSLCertVerificationError as e:
# If this happens it means we couldn't verify the other
# player's certificate, so there's no point in continuing.
raise EncryptionFailed(message(name, rank, f"received invalid certificate from player {listener}."))
except Exception as e: # pragma: no cover
log.warning(f"exception connecting to player {listener}: {e}")
time.sleep(0.5)
else: # pragma: no cover
raise Timeout(message(name, rank, f"timeout connecting to player {listener}."))
# Send our rank to the listener.
while not timer.expired:
try:
players[listener].send(pickle.dumps(rank))
break
except Exception as e: # pragma: no cover
log.warning(f"exception sending rank to player {listener}: {e}")
time.sleep(0.5)
else: # pragma: no cover
raise Timeout(message(name, rank, f"timeout sending rank to player {listener}."))
# Receive a response from the listener.
while not timer.expired:
try:
raw_message = players[listener].next_message(timeout=0.1)
if raw_message is not None:
response = pickle.loads(raw_message)
log.debug(f"received response from player {listener}.")
break
except Exception as e: # pragma: no cover
log.warning(f"exception waiting for response from player {listener}: {e}")
time.sleep(0.1)
else: # pragma: no cover
raise Timeout(message(name, rank, f"timeout waiting for response from player {listener}."))
return players
[docs]
def getLogger(name, comm, rank):
logger = logging.getLogger(name)
logger.addFilter(CommunicatorEvents(comm, rank))
return LoggerAdapter(logger, comm, rank)
[docs]
def getpeerurl(sock):
"""Return a socket's peer address as a URL.
Parameters
----------
sock: :class:`socket.socket`, required
Returns
-------
url: :class:`str`
The socket's local address as a URL. For example:
`"tcp://127.0.0.1:59678"` for TCP sockets, or `"file:///path/to/foo"` for
Unix domain sockets.
"""
if sock.family == socket.AF_UNIX:
path = sock.getpeername()
return f"file://{path}"
if sock.family == socket.AF_INET:
host, port = sock.getpeername()
return f"tcp://{host}:{port}"
raise ValueError(f"Unknown address family: {sock.family}") # pragma: no cover
[docs]
def gettls(*, identity=None, trusted=None):
"""Construct a pair of :class:`ssl.SSLContext` instances.
Parameters
----------
identity: :class:`str`, optional
Path to a private key and certificate in PEM format that will
identify the local player.
trusted: sequence of :class:`str`, optional
Path to certificates in PEM format that will identify the other
players.
Returns
-------
tls: (server, client) tuple or :any:`None`
"""
if identity and trusted:
server = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
server.load_cert_chain(certfile=identity)
for trust in trusted:
server.load_verify_locations(trust)
server.check_hostname=False
server.verify_mode = ssl.CERT_REQUIRED
client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
client.load_cert_chain(certfile=identity)
for trust in trusted:
client.load_verify_locations(trust)
client.check_hostname = False
client.verify_mode = ssl.CERT_REQUIRED
return (server, client)
return None
[docs]
def geturl(sock):
"""Return a socket's local address as a URL.
Parameters
----------
sock: :class:`socket.socket`, required
Returns
-------
url: :class:`str`
The socket's local address as a URL. For example:
`"tcp://127.0.0.1:59678"` for TCP sockets, or `"file:///path/to/foo"` for
Unix domain sockets.
"""
if sock.family == socket.AF_UNIX:
path = sock.getsockname()
return f"file://{path}"
if sock.family == socket.AF_INET:
host, port = sock.getsockname()
return f"tcp://{host}:{port}"
raise ValueError(f"Unknown address family: {sock.family}") # pragma: no cover
[docs]
def listen(*, address, rank, name="world", timer=None):
"""Create a listening socket from a URL.
Typically, callers should use this function to create a listening socket
for use with either :func:`direct` or :func:`rendezvous`.
Parameters
----------
address: :class:`str`, required
Address to use for listening, in the form of a URL. For example:
`"tcp://127.0.0.1:59478"` to create a TCP socket, or
`"file:///path/to/foo"` to create a Unix domain socket.
rank: :class:`int`, required
Integer rank of the caller. Used for logging and error messages.
name: :class:`str`, optional
Human readable label. Used for logging and error messages. Typically,
this should match the name used elsewhere to create a communicator.
timer: :class:`Timer`, optional
Determines the maximum time to wait for socket creation. Defaults to
five seconds.
Raises
------
:class:`ValueError`
If there are problems with input parameters.
:class:`Timeout`
If `timer` expires before all connections are established.
Returns
-------
socket: :class:`socket.socket`
A non-blocking socket, bound to `address`, listening for connections.
"""
if not isinstance(address, str):
raise ValueError(message(name, rank, f"address must be a string, got {address} instead.")) # pragma: no cover
address = urllib.parse.urlparse(address)
if address.scheme not in ["file", "tcp"]:
raise ValueError(message(name, rank, f"address scheme must be file or tcp, got {address.scheme} instead.")) # pragma: no cover
if not isinstance(rank, int):
raise ValueError(message(name, rank, f"rank must be an integer, got {rank} instead.")) # pragma: no cover
if not isinstance(name, str):
raise ValueError(message(name, rank, f"name must be a string, got {name} instead.")) # pragma: no cover
if timer is None:
timer = Timer(threshold=5)
if not isinstance(timer, Timer):
raise ValueError(message(name, rank, f"timer must be an instance of Timer, got {timer} instead.")) # pragma: no cover
# Setup logging
log = getLogger(__name__, name, rank)
# Create the socket.
while not timer.expired:
try:
if address.scheme == "tcp":
listen_socket = socket.create_server((address.hostname, address.port or 0))
address = urllib.parse.urlparse(geturl(listen_socket)) # Recreate the address in case the port was assigned at random.
elif address.scheme == "file":
if os.path.exists(address.path):
os.unlink(address.path) # pragma: no cover
listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
listen_socket.bind(address.path)
listen_socket.setblocking(False)
listen_socket.listen()
log.info(f"listening to {address.geturl()} for connections.")
break
except Exception as e: # pragma: no cover
log.warning(f"exception creating listening socket: {e}")
time.sleep(0.1)
else: # pragma: no cover
raise Timeout(message(name, rank, f"timeout creating listening socket."))
return listen_socket
[docs]
def message(name, rank, msg):
"""Format a message for logging and error handling."""
return f"Comm {name} player {rank} {msg}"
[docs]
def rendezvous(*, listen_socket, root_address, world_size, rank, name="world", token=0, timer=None, tls=None):
"""Create socket connections given just the address of the root player.
Parameters
----------
listen_socket: :class:`socket.socket`, required
A nonblocking Python socket or compatible object, already bound and
listening for connections. Typically created with :func:`listen`.
root_address: :class:`str`, required
URL address of the root (rank 0) player. The address must be reachable
by all of the other players.
world_size: :class:`int`, required
The number of players that will be members of this communicator.
rank: :class:`int`, required
The rank of the caller, in the range [0, world_size).
name: :class:`str`, optional
Human readable label used for logging and debugging. Typically, this
should be the same name assigned to the communicator that will use
the :func:`rendezvous` outputs. Defaults to "world".
token: :class:`object`, optional
All players provide an arbitrary token at startup; every player token
must match, or a :class:`TokenMismatch` exception will be raised.
timer: :class:`Timer`, optional
Determines the maximum time to wait for socket creation. Defaults to
five seconds.
tls: pair of :class:`ssl.SSLContext`, optional
If provided, the returned sockets will implement transport layer
security. Callers must provide a sequence containing one context
configured for server connections, and one for configured for client
connections, in that order.
Raises
------
:class:`EncryptionFailed`
If there are problems establishing an encrypted connection with another
player.
:class:`Timeout`
If `timer` expires before all connections are established.
:class:`TokenMismatch`
If every player doesn't call this function with the same token.
:class:`ValueError`
If there are problems with input parameters.
Returns
-------
sockets: :class:`dict` of :class:`NetstringSocket`
Dictionary mapping player ranks to connected sockets.
"""
if not isinstance(root_address, str):
raise ValueError(message(name, rank, f"root_address must be a string, got {root_address} instead.")) # pragma: no cover
root_address = urllib.parse.urlparse(root_address)
if root_address.scheme not in ["file", "tcp"]:
raise ValueError(message(name, rank, f"root_address scheme must be file or tcp, got {root_address.scheme} instead.")) # pragma: no cover
if rank == 0 and root_address.geturl() != geturl(listen_socket):
raise ValueError(message(name, rank, f"Player 0 root_address {root_address} must match listen_socket.")) # pragma: no cover
if not isinstance(world_size, int):
raise ValueError(message(name, rank, f"world_size must be an integer, got {world_size} instead.")) # pragma: no cover
if not world_size > 0:
raise ValueError(message(name, rank, f"world_size must be greater than zero, got {world_size} instead.")) # pragma: no cover
if not isinstance(rank, int):
raise ValueError(message(name, rank, f"rank must be an integer, got {rank} instead.")) # pragma: no cover
if rank < 0 or rank >= world_size:
raise ValueError(message(name, rank, f"rank must be in the range [0, {world_size}), got {rank} instead.")) # pragma: no cover
if not isinstance(name, str):
raise ValueError(message(name, rank, f"name must be a string, got {name} instead.")) # pragma: no cover
if timer is None:
timer = Timer(threshold=5)
if not isinstance(timer, Timer):
raise ValueError(message(name, rank, f"timer must be an instance of Timer, got {timer} instead.")) # pragma: no cover
# Setup logging
log = getLogger(__name__, name, rank)
log.info(f"rendezvous with {root_address.geturl()} from {geturl(listen_socket)}")
# Set aside storage for connections to the other players.
players = {}
###########################################################################
# Phase 1: Every player (except root) makes a connection to root.
if rank != 0:
while not timer.expired:
try:
players[0] = connect(address=root_address, rank=rank, other_rank=0, name=name, tls=tls)
break
except ConnectionRefusedError as e: # pragma: no cover
# This happens regularly, particularly when starting on
# separate hosts, so no need to log a warning.
time.sleep(0.1)
except ssl.SSLCertVerificationError as e:
# If this happens it means we couldn't verify the other
# player's certificate, so there's no point in continuing.
raise EncryptionFailed(message(name, rank, "received invalid certificate from player 0."))
except Exception as e: # pragma: no cover
log.warning(f"exception connecting to player 0: {e}")
time.sleep(0.1)
else: # pragma: no cover
raise Timeout(message(name, rank, "timeout connecting to player 0."))
###########################################################################
# Phase 2: Every player sends their listening address to root.
if rank != 0:
while not timer.expired:
try:
players[0].send(pickle.dumps((rank, geturl(listen_socket), token)))
break
except Exception as e: # pragma: no cover
log.warning(f"exception sending address to player 0: {e}")
time.sleep(0.1)
else: # pragma: no cover
raise Timeout(message(name, rank, f"timeout sending address to player 0."))
###########################################################################
# Phase 3: Root gathers addresses from every player.
if rank == 0:
# Accept a connection from every player.
other_players = []
while not timer.expired:
if len(other_players) == world_size - 1: # We don't connect to ourself.
break
try:
ready = select.select([listen_socket], [], [], 0.1)
if ready:
sock, _ = listen_socket.accept()
if tls is not None:
sock = tls[0].wrap_socket(sock, server_side=True)
log.info(f"{geturl(sock)} accepted connection from {getpeerurl(sock)}, received certificate:\n{certformat(sock.getpeercert(), indent=' ')}")
else:
log.debug(f"{geturl(sock)} accepted connection from {getpeerurl(sock)}.")
sock = NetstringSocket(sock)
other_players.append(sock)
except ssl.SSLError as e:
# There was a problem setting up an encrypted connection with
# the other player, so there's no point in continuing.
raise EncryptionFailed(message(name, rank, f"remote player failed to connect: {e}"))
except BlockingIOError as e: # pragma: no cover
# This happens regularly, particularly when starting on
# separate hosts, so no need to log a warning.
time.sleep(0.1)
except Exception as e: # pragma: no cover
log.warning(f"exception waiting for connections from other players: {type(e)} {e}")
time.sleep(0.1)
else: # pragma: no cover
raise Timeout(message(name, rank, "timeout waiting for player connections."))
# Collect an address from every player.
addresses = {rank: (geturl(listen_socket), token)}
for other_player in other_players:
while not timer.expired:
try:
raw_message = other_player.next_message(timeout=0.1)
if raw_message is not None:
other_rank, other_addr, other_token = pickle.loads(raw_message)
players[other_rank] = other_player
addresses[other_rank] = (other_addr, other_token)
log.debug(f"received address from player {other_rank}.")
break
except Exception as e: # pragma: no cover
log.warning(f"exception receiving player address: {e}")
time.sleep(0.1)
else: # pragma: no cover
raise Timeout(message(name, rank, "timeout waiting for player address."))
###########################################################################
# Phase 4: Root broadcasts the set of all addresses to every player.
if rank == 0:
for player in players.values():
player.send(pickle.dumps(addresses))
###########################################################################
# Phase 5: Every player receives the set of all addresses from root.
if rank != 0:
while not timer.expired:
try:
raw_message = players[0].next_message(timeout=0.1)
if raw_message is not None:
addresses = pickle.loads(raw_message)
log.debug(f"received addresses from player 0.")
break
except ssl.SSLError as e:
# There was a problem setting up an encrypted connection with
# the other player, so there's no point in continuing.
raise EncryptionFailed(message(name, rank, f"failed getting addresses from player 0: {e}"))
except Exception as e: # pragma: no cover
log.warning(f"exception getting addresses from player 0: {e}")
time.sleep(0.1)
else: # pragma: no cover
raise Timeout(message(name, rank, "timeout waiting for addresses from player 0."))
addresses = {rank: (urllib.parse.urlparse(address), token) for rank, (address, token) in addresses.items()}
###########################################################################
# Phase 6: Every player verifies that all tokens match.
for other_rank, (other_address, other_token) in addresses.items():
if other_token != token:
raise TokenMismatch(message(name, rank, f"expected token {token!r}, received {other_token!r} from player {other_rank}."))
###########################################################################
# Phase 7: Players setup connections with one another.
for listener in range(1, world_size-1):
if rank == listener:
# Accept connections from higher-rank players.
other_players = []
while not timer.expired:
if len(other_players) == world_size - rank - 1: # We don't connect to ourself.
break
try:
ready = select.select([listen_socket], [], [], 0.1)
if ready:
sock, _ = listen_socket.accept()
if tls is not None:
sock = tls[0].wrap_socket(sock, server_side=True)
log.info(f"{geturl(sock)} accepted connection from {getpeerurl(sock)}:\n{certformat(sock.getpeercert(), indent=' ')}")
else:
log.debug(f"{geturl(sock)} accepted connection from {getpeerurl(sock)}.")
sock = NetstringSocket(sock)
other_players.append(sock)
except ssl.SSLError as e:
# There was a problem setting up an encrypted connection with
# the other player, so there's no point in continuing.
raise EncryptionFailed(message(name, rank, f"remote player failed to connect: {e}"))
except Exception as e: # pragma: no cover
log.warning(f"exception listening for other players: {type(e)} {e}")
time.sleep(0.1)
# Collect ranks from the other players.
for other_player in other_players:
while not timer.expired:
try:
raw_message = other_player.next_message(timeout=0.1)
if raw_message is not None:
other_rank = pickle.loads(raw_message)
players[other_rank] = other_player
log.debug(f"received rank from player {other_rank}.")
break
except Exception as e: # pragma: no cover
log.warning(f"exception receiving player rank: {e}")
time.sleep(0.1)
else: # pragma: no cover
raise Timeout(message(name, rank, "timeout waiting for player rank."))
elif rank > listener:
# Make a connection to the listener.
while not timer.expired:
try:
players[listener] = connect(address=addresses[listener][0], rank=rank, other_rank=listener, name=name, tls=tls)
break
except ssl.SSLCertVerificationError as e:
# If this happens it means we couldn't verify the other
# player's certificate, so there's no point in continuing.
raise EncryptionFailed(message(name, rank, f"received invalid certificate from player {listener}."))
except Exception as e: # pragma: no cover
log.warning(f"exception connecting to player {listener}: {e}")
time.sleep(0.5)
else: # pragma: no cover
raise Timeout(message(name, rank, f"timeout connecting to player {listener}."))
# Send our rank to the listener.
while not timer.expired:
try:
players[listener].send(pickle.dumps(rank))
break
except Exception as e: # pragma: no cover
log.warning(f"exception sending rank to player {listener}: {e}")
time.sleep(0.5)
else: # pragma: no cover
raise Timeout(message(name, rank, f"timeout sending rank to player {listener}."))
return players