Source code for someip.utils
from __future__ import annotations
import asyncio
import functools
import platform
import socket
import typing
[docs]def log_exceptions(msg="unhandled exception in {__func__}"):
"""
decorator that will catch all exceptions in methods and coroutine methods
and log them with self.log
msg will be formatted with __func__ as the called function's __qualname__ plus any
passed arguments
"""
def decorator(f):
if asyncio.iscoroutinefunction(f):
@functools.wraps(f)
async def wrapper(self, *args, **kwargs):
try:
return await f(self, *args, **kwargs)
except Exception:
self.log.exception(
msg.format(__func__=f.__qualname__, *args, **kwargs)
)
else:
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
try:
return f(self, *args, **kwargs)
except Exception:
self.log.exception(
msg.format(__func__=f.__qualname__, *args, **kwargs)
)
return wrapper
return decorator
[docs]async def getfirstaddrinfo(
host, port, family=0, type=0, proto=0, sock=None, flags=0, loop=None
):
"""
retrieve sockaddr for host/port pair with given family, type, proto settings.
return first sockaddr. raises socket.gaierror if no result was returned.
"""
if sock is not None:
if family != 0 or type != 0 or proto != 0:
raise ValueError(
"family/type/proto and sock cannot be specified at the same time"
)
family = sock.family
type = sock.type
proto = sock.proto
if loop is None: # pragma: nobranch
loop = asyncio.get_event_loop()
# QNX fails when supplying a numeric port to getaddrinfo
# workaround: supply no port and inject it in the results instead
# see https://github.com/afflux/pysomeip/issues/13
no_port_in_gai = platform.system() == "QNX"
lookup_port = None if no_port_in_gai else port
result = await loop.getaddrinfo(
host, lookup_port, family=family, type=type, proto=proto, flags=flags
)
if not result: # pragma: nocover
raise socket.gaierror(
socket.EAI_NODATA, f"no address info found for {host}:{port}"
)
if no_port_in_gai: # pragma: nocover
result = result[:1] + (port,) + result[2:]
return result[0]
T = typing.TypeVar("T")
[docs]async def wait_cancelled(task: asyncio.Task[T]) -> typing.Optional[T]:
# I'd go with try: await task; except asyncio.CancelledError, but this can not
# discern between task raising cancelled or this current task being cancelled.
await asyncio.gather(task, return_exceptions=True)
assert task.done()
if task.cancelled():
return None
return task.result()