# Copyright 2021 National Technology & Engineering Solutions
# of Sandia, LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS,
# the U.S. Government retains certain rights in this software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functionality to make logging from multiple processes easier."""
import contextlib
import logging
import numbers
from cicada.communicator.interface import Communicator, Tag
[docs]
class Logger(object):
"""Wrap a normal Python logger with a Cicada communicator to synchronize player output.
Note
----
Because :class:`Logger` communicates among players for synchronization, it
can have a significant effect on performance, even when log messages are
discarded by log levels or other filtering.
Furthermore, :class:`Logger` should not be used in error recovery code, since it will
fail attempting to communicate with players that are (presumably) dead.
You can pass `sync=False` when creating :class:`Logger` to disable synchronization,
e.g. if you're running your code on separate hosts or in separate terminal sessions.
Parameters
----------
logger: :class:`logging.Logger`, required.
The Python logger to be used for output.
communicator: :class:`cicada.communicator.interface.Communicator`, required
The communicator that will be used to synchronize output among players.
sync: :class:`bool`, optional
Used to control synchronization, which is enabled by default.
"""
def __init__(self, logger, communicator, sync=True):
if not isinstance(communicator, Communicator):
raise ValueError("A Cicada communicator is required.") # pragma: no cover
self._logger = logger
self._communicator = communicator
self._sync = sync
@property
def communicator(self):
"""Returns the underlying communicator."""
return self._communicator
[docs]
def critical(self, msg, *args, src=None, **kwargs):
"""Log a critical message, synchronized among players.
.. note::
This is a collective operation that *must* be called by all players that are members of the communicator.
The arguments match those of :meth:`logging.Logger.critical`, with the addition of the following:
Parameters
----------
src: :class:`int` or sequence of :class:`int`, optional
If specified, only the given player(s) will produce log output.
"""
self.log(logging.CRITICAL, msg, *args, src=src, **kwargs)
[docs]
def debug(self, msg, *args, src=None, **kwargs):
"""Log a debug message, synchronized among players.
.. note::
This is a collective operation that *must* be called by all players that are members of the communicator.
The arguments match those of :meth:`logging.Logger.debug`, with the addition of the following:
Parameters
----------
src: :class:`int` or sequence of :class:`int`, optional
If specified, only the given player(s) will produce log output.
"""
self.log(logging.DEBUG, msg, *args, src=src, **kwargs)
[docs]
def error(self, msg, *args, src=None, **kwargs):
"""Log an error message, synchronized among players.
.. note::
This is a collective operation that *must* be called by all players that are members of the communicator.
The arguments match those of :meth:`logging.Logger.error`, with the addition of the following:
Parameters
----------
src: :class:`int` or sequence of :class:`int`, optional
If specified, only the given player(s) will produce log output.
"""
self.log(logging.ERROR, msg, *args, src=src, **kwargs)
[docs]
def info(self, msg, *args, src=None, **kwargs):
"""Log an info message, synchronized among players.
.. note::
This is a collective operation that *must* be called by all players that are members of the communicator.
The arguments match those of :meth:`logging.Logger.info`, with the addition of the following:
Parameters
----------
src: :class:`int` or sequence of :class:`int`, optional
If specified, only the given player(s) will produce log output.
"""
self.log(logging.INFO, msg, *args, src=src, **kwargs)
[docs]
def log(self, level, msg, *args, src=None, **kwargs):
"""Log a message, synchronized among players.
.. note::
This is a collective operation that *must* be called by all players that are members of the communicator.
The arguments match those of :meth:`logging.Logger.log`, with the addition of the following:
Parameters
----------
src: :class:`int` or sequence of :class:`int`, optional
If specified, only the given player(s) will produce log output.
"""
communicator = self._communicator
if src is None:
src = communicator.ranks
elif isinstance(src, numbers.Integral):
src = [src]
# Wait for our turn to generate output.
if self._sync and communicator.rank:
communicator.recv(src=communicator.rank-1, tag=Tag.LOGSYNC)
# Generate output.
if communicator.rank in src:
self._logger.log(level, msg, *args, **kwargs)
# Notify the next player that it's their turn.
if self._sync and communicator.rank < communicator.world_size-1:
communicator.send(dst=communicator.rank+1, value=None, tag=Tag.LOGSYNC)
# The last player notifies the group that the output is complete.
if self._sync and communicator.rank == communicator.world_size-1:
for rank in communicator.ranks:
communicator.send(dst=rank, value=None, tag=Tag.LOGSYNC)
# Wait until output is complete before we return.
if self._sync:
communicator.recv(src=communicator.world_size-1, tag=Tag.LOGSYNC)
@property
def logger(self):
"""Returns the underlying Python :class:`logging.Logger`."""
return self._logger
[docs]
@contextlib.contextmanager
def override(self, *, sync=None):
"""Temporarily change logging behavior.
Use :meth:`override` to temporarily modify logger behavior in a with statement::
with log.override(sync=False):
# Do uncoordinated logging here.
# Go back to coordinated logging here.
.. note::
Changes to logging behavior *must* be consistent for *all* players that are members of the communicator.
Parameters
----------
sync: :class:`bool`, optional
If specified, override the logger sync property.
Returns
-------
context: :class:`object`
A context manager object that will restore the loger state when exited.
"""
original_context = {
"sync": self._sync,
}
try:
if sync is not None:
self._sync = sync
yield original_context
finally:
if sync is not None:
self._sync = original_context["sync"]
@property
def sync(self):
"""Controls whether coordinated logging is enabled or not.
.. note::
Changes to `sync` *must* be consistent for *all* players that are members of the communicator.
"""
return self._sync
@sync.setter
def sync(self, value):
self._sync = bool(value)
[docs]
def warning(self, msg, *args, src=None, **kwargs):
"""Log a warning message, synchronized among players.
.. note::
This is a collective operation that *must* be called by all players that are members of the communicator.
The arguments match those of :meth:`logging.Logger.warning`, with the addition of the following:
Parameters
----------
src: :class:`int` or sequence of :class:`int`, optional
If specified, only the given player(s) will produce log output.
"""
self.log(logging.WARNING, msg, *args, src=src, **kwargs)