from __future__ import annotations
import asyncio
import collections
import dataclasses
import ipaddress
import itertools
import logging
import os
import platform
import random
import socket
import struct
import threading
import typing
import someip.header
import someip.config
from someip.config import _T_SOCKNAME as _T_SOCKADDR
from someip.utils import log_exceptions, wait_cancelled
LOG = logging.getLogger("someip.sd")
_T_IPADDR = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
_T_OPT_SOCKADDR = typing.Optional[_T_SOCKADDR]
TTL_FOREVER = 0xFFFFFF
[docs]def ip_address(s: str) -> _T_IPADDR:
return ipaddress.ip_address(s.split("%", 1)[0])
[docs]def pack_addr_v4(a):
return socket.inet_pton(socket.AF_INET, a.split("%", 1)[0])
[docs]def pack_addr_v6(a):
return socket.inet_pton(socket.AF_INET6, a.split("%", 1)[0])
[docs]class SOMEIPDatagramProtocol:
"""
is actually not a subclass of asyncio.BaseProtocol or asyncio.DatagramProtocol,
because datagram_received() has an additional parameter `multicast: bool`
TODO: fix misleading name
"""
[docs] @classmethod
async def create_unicast_endpoint(
cls,
*args,
local_addr: _T_OPT_SOCKADDR = None,
remote_addr: _T_OPT_SOCKADDR = None,
loop=None,
**kwargs,
):
if loop is None: # pragma: nobranch
loop = asyncio.get_event_loop()
protocol = cls(*args, **kwargs)
transport, _ = await loop.create_datagram_endpoint(
lambda: DatagramProtocolAdapter(protocol, is_multicast=False),
local_addr=local_addr,
remote_addr=remote_addr,
)
protocol.transport = transport
return transport, protocol
def __init__(self, logger: str = "someip"):
self.log = logging.getLogger(logger)
self.transport: asyncio.DatagramTransport
self.session_storage = _SessionStorage()
# default_addr=None means use connected address from socket
self.default_addr: _T_OPT_SOCKADDR = None
[docs] def datagram_received(self, data, addr: _T_SOCKADDR, multicast: bool) -> None:
try:
while data:
# 4.2.1, TR_SOMEIP_00140 more than one SOMEIP message per UDP frame
# allowed
parsed, data = someip.header.SOMEIPHeader.parse(data)
self.message_received(parsed, addr, multicast)
except someip.header.ParseError as exc:
self.log.error(
"failed to parse SOME/IP datagram from %s: %r",
format_address(addr),
data,
exc_info=exc,
)
[docs] def error_received(self, exc: typing.Optional[Exception]): # pragma: nocover
self.log.exception("someip event listener protocol failed", exc_info=exc)
[docs] def connection_lost(
self, exc: typing.Optional[Exception]
) -> None: # pragma: nocover
log = self.log.exception if exc else self.log.info
log("someip closed", exc_info=exc)
[docs] def message_received(
self,
someip_message: someip.header.SOMEIPHeader,
addr: _T_SOCKADDR,
multicast: bool,
) -> None: # pragma: nocover
"""
called when a well-formed SOME/IP datagram was received
"""
self.log.info("received from %s\n%s", format_address(addr), someip_message)
pass
[docs] def send(self, buf: bytes, remote: _T_OPT_SOCKADDR = None):
# ideally, we'd use transport.write() and have the DGRAM socket connected to the
# default_addr. However, after connect() the socket will not be bound to
# INADDR_ANY anymore. so we store the multicast address as a default destination
# address on the instance and wrap the send calls with self.send
if not self.transport: # pragma: nocover
self.log.error(
"no transport set on %r but tried to send to %r: %r", self, remote, buf
)
return
if not remote:
remote = self.default_addr
self.transport.sendto(buf, remote)
[docs]class DatagramProtocolAdapter(asyncio.DatagramProtocol):
def __init__(self, protocol: SOMEIPDatagramProtocol, is_multicast: bool):
self.is_multicast = is_multicast
self.protocol = protocol
[docs] def datagram_received(self, data, addr: _T_SOCKADDR) -> None:
self.protocol.datagram_received(data, addr, multicast=self.is_multicast)
[docs] def error_received(
self, exc: typing.Optional[Exception]
) -> None: # pragma: nocover
self.protocol.error_received(exc)
[docs] def connection_lost(
self, exc: typing.Optional[Exception]
) -> None: # pragma: nocover
self.protocol.connection_lost(exc)
class _SessionStorage:
def __init__(self):
self.incoming = {}
self.outgoing: typing.DefaultDict[
_T_OPT_SOCKADDR, typing.Tuple[bool, int]
] = collections.defaultdict(lambda: (True, 1))
self.outgoing_lock = threading.Lock()
def check_received(
self, sender: _T_SOCKADDR, multicast: bool, flag: bool, session_id: int
) -> bool:
"""
return true if a reboot was detected
"""
k = (sender, multicast)
try:
old_flag, old_session_id = self.incoming[k]
if flag and (
not old_flag or (old_session_id > 0 and old_session_id >= session_id)
):
return True
return False
except KeyError:
# sender not yet known -> insert
self.incoming[k] = (flag, session_id)
return False
finally:
self.incoming[k] = (flag, session_id)
def assign_outgoing(self, remote: _T_OPT_SOCKADDR):
# need a lock for outgoing messages if they may be sent from separate threads
# eg. when an application logic runs in a seperate thread from the SOMEIP stack
# event loop
with self.outgoing_lock:
flag, _id = self.outgoing[remote]
if _id >= 0xFFFF:
# 4.2.1, TR_SOMEIP_00521
# 4.2.1, TR_SOMEIP_00255
self.outgoing[remote] = (False, 1)
else:
self.outgoing[remote] = (flag, _id + 1)
return flag, _id
[docs]@dataclasses.dataclass()
class Timings:
INITIAL_DELAY_MIN: float = 0.0 # in seconds
INITIAL_DELAY_MAX: float = 3 # in seconds
REQUEST_RESPONSE_DELAY_MIN: float = 0.01 # in seconds
REQUEST_RESPONSE_DELAY_MAX: float = 0.05 # in seconds
REPETITIONS_MAX: int = 3
REPETITIONS_BASE_DELAY: float = 0.01 # in seconds
CYCLIC_OFFER_DELAY: float = 1 # in seconds
FIND_TTL: int = 3 # in seconds
ANNOUNCE_TTL: int = 3 # in seconds
SUBSCRIBE_TTL: int = 5 # in seconds
SUBSCRIBE_REFRESH_INTERVAL: typing.Optional[float] = 3 # in seconds
SEND_COLLECTION_TIMEOUT: float = 0.005 # in seconds
[docs]class ServiceDiscoveryProtocol(SOMEIPDatagramProtocol):
@classmethod
async def _create_endpoint(
cls,
loop: asyncio.BaseEventLoop,
prot: SOMEIPDatagramProtocol,
family: socket.AddressFamily,
local_addr: str,
port: int,
multicast_addr: typing.Optional[str] = None,
multicast_interface: typing.Optional[str] = None,
ttl: int = 1,
):
if family not in (socket.AF_INET, socket.AF_INET6):
raise ValueError("only IPv4 and IPv6 supported, got {family!r}")
if os.name == "posix": # pragma: nocover
# multicast binding:
# - BSD: will only receive packets destined for multicast addr,
# but will send with address from bind()
# - Linux: will receive all multicast traffic destined for this port,
# can be filtered using bind()
bind_addr: typing.Optional[str] = local_addr
if multicast_addr:
bind_addr = None
if platform.system() == "Linux": # pragma: nocover
if family == socket.AF_INET or "%" in multicast_addr:
bind_addr = multicast_addr
else:
bind_addr = f"{multicast_addr}%{multicast_interface}"
# wrong type in asyncio typeshed, should be optional
bind_addr = typing.cast(str, bind_addr)
trsp, _ = await loop.create_datagram_endpoint(
lambda: DatagramProtocolAdapter(
prot, is_multicast=bool(multicast_addr)
),
local_addr=(bind_addr, port),
reuse_port=True,
family=family,
proto=socket.IPPROTO_UDP,
flags=socket.AI_PASSIVE,
)
elif platform.system() == "Windows": # pragma: nocover
sock = socket.socket(
family=family, type=socket.SOCK_DGRAM, proto=socket.IPPROTO_UDP
)
if (
family == socket.AF_INET6
and platform.python_version_tuple() < ("3", "8", "4")
and isinstance(loop, getattr(asyncio, "ProactorEventLoop", ()))
):
prot.log.warning(
"ProactorEventLoop has issues with ipv6 datagram sockets!"
" https://bugs.python.org/issue39148. Update to Python>=3.8.4, or"
" workaround with asyncio.set_event_loop_policy("
"asyncio.WindowsSelectorEventLoopPolicy())",
)
# python disallowed SO_REUSEADDR on create_datagram_endpoint.
# https://bugs.python.org/issue37228
# Windows doesnt have SO_REUSEPORT and the problem apparently does not exist
# for multicast, so we need to set SO_REUSEADDR on the socket manually
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
addrinfos = await loop.getaddrinfo(
local_addr,
port,
family=sock.family,
type=sock.type,
proto=sock.proto,
flags=socket.AI_PASSIVE,
)
if not addrinfos:
raise RuntimeError(
f"could not resolve local_addr={local_addr!r} port={port!r}"
)
ai = addrinfos[0]
sock.bind(ai[4])
trsp, _ = await loop.create_datagram_endpoint(
lambda: DatagramProtocolAdapter(
prot, is_multicast=bool(multicast_addr)
),
sock=sock,
)
else: # pragma: nocover
raise NotImplementedError(
f"unsupported platform {os.name} {platform.system()}"
)
sock = trsp.get_extra_info("socket")
try:
if family == socket.AF_INET:
packed_local_addr = pack_addr_v4(local_addr)
if multicast_addr:
packed_mcast_addr = pack_addr_v4(multicast_addr)
mreq = struct.pack("=4s4s", packed_mcast_addr, packed_local_addr)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setsockopt(
socket.IPPROTO_IP, socket.IP_MULTICAST_IF, packed_local_addr
)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
else: # AF_INET6
if multicast_interface is None:
raise ValueError("ipv6 requires interface name")
ifindex = socket.if_nametoindex(multicast_interface)
if multicast_addr:
packed_mcast_addr = pack_addr_v6(multicast_addr)
mreq = struct.pack("=16sl", packed_mcast_addr, ifindex)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
sock.setsockopt(
socket.IPPROTO_IPV6,
socket.IPV6_MULTICAST_IF,
struct.pack("=i", ifindex),
)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl)
except BaseException:
trsp.close()
raise
return trsp
[docs] @classmethod
async def create_endpoints(
cls,
family: socket.AddressFamily,
local_addr: str,
multicast_addr: str,
multicast_interface: typing.Optional[str] = None,
port: int = 30490,
ttl=1,
loop=None,
):
if loop is None: # pragma: nobranch
loop = asyncio.get_event_loop()
if not ip_address(multicast_addr).is_multicast:
raise ValueError("multicast_addr is not multicast")
# since posix does not provide a portable interface to figure out what address a
# datagram was received on, we need one unicast and one multicast socket
prot = cls((str(multicast_addr), port))
# order matters, at least for Windows. If the multicast socket was created
# first, both unicast and multicast packets would go to the multicast socket
trsp_u = await cls._create_endpoint(
loop,
prot,
family,
local_addr,
port,
multicast_interface=multicast_interface,
ttl=ttl,
)
trsp_m = await cls._create_endpoint(
loop,
prot,
family,
local_addr,
port,
multicast_addr=multicast_addr,
multicast_interface=multicast_interface,
ttl=ttl,
)
prot.transport = trsp_u
return trsp_u, trsp_m, prot
def __init__(
self,
multicast_addr: _T_SOCKADDR,
timings: typing.Optional[Timings] = None,
logger: str = "someip.sd",
):
super().__init__(logger=logger)
self.timings = timings or Timings()
self.default_addr = multicast_addr
self.discovery = ServiceDiscover(self)
self.subscriber = ServiceSubscriber(self)
self.announcer = ServiceAnnouncer(self)
[docs] def message_received(
self,
someip_message: someip.header.SOMEIPHeader,
addr: _T_SOCKADDR,
multicast: bool,
) -> None:
if (
someip_message.service_id != someip.header.SD_SERVICE
or someip_message.method_id != someip.header.SD_METHOD
or someip_message.interface_version != someip.header.SD_INTERFACE_VERSION
or someip_message.return_code != someip.header.SOMEIPReturnCode.E_OK
or someip_message.message_type
!= someip.header.SOMEIPMessageType.NOTIFICATION
):
self.log.error("SD protocol received non-SD message: %s", someip_message)
return
try:
sdhdr, rest = someip.header.SOMEIPSDHeader.parse(someip_message.payload)
except someip.header.ParseError as exc:
self.log.error("SD-message did not parse: %r", exc)
return
if self.session_storage.check_received(
addr, multicast, sdhdr.flag_reboot, someip_message.session_id
):
self.reboot_detected(addr)
# FIXME this will drop the SD Endpoint options, since they are not referenced by
# entries. see 4.2.1 TR_SOMEIP_00548
sdhdr_resolved = sdhdr.resolve_options()
self.sd_message_received(sdhdr_resolved, addr, multicast)
if rest: # pragma: nocover
self.log.warning(
"unparsed data after SD from %s: %r", format_address(addr), rest
)
[docs] def send_sd(
self,
entries: typing.Collection[someip.header.SOMEIPSDEntry],
remote: _T_OPT_SOCKADDR = None,
) -> None:
if not entries:
return
flag_reboot, session_id = self.session_storage.assign_outgoing(remote)
msg = someip.header.SOMEIPSDHeader(
flag_reboot=flag_reboot,
flag_unicast=True, # 4.2.1, TR_SOMEIP_00540 receiving unicast is supported
entries=tuple(entries),
)
msg_assigned = msg.assign_option_indexes()
hdr = someip.header.SOMEIPHeader(
service_id=someip.header.SD_SERVICE,
method_id=someip.header.SD_METHOD,
client_id=0,
session_id=session_id,
interface_version=1,
message_type=someip.header.SOMEIPMessageType.NOTIFICATION,
payload=msg_assigned.build(),
)
self.send(hdr.build(), remote)
[docs] def start(self) -> None:
self.subscriber.start()
self.announcer.start()
self.discovery.start()
[docs] def stop(self) -> None:
self.discovery.stop()
self.announcer.stop()
self.subscriber.stop()
[docs] def connection_lost(self, exc: typing.Optional[Exception]) -> None:
log = self.log.exception if exc else self.log.info
log("connection lost. stopping all child tasks", exc_info=exc)
asyncio.get_event_loop().call_soon(self.subscriber.connection_lost, exc)
asyncio.get_event_loop().call_soon(self.discovery.connection_lost, exc)
asyncio.get_event_loop().call_soon(self.announcer.connection_lost, exc)
[docs] def reboot_detected(self, addr: _T_SOCKADDR) -> None:
asyncio.get_event_loop().call_soon(self.subscriber.reboot_detected, addr)
asyncio.get_event_loop().call_soon(self.discovery.reboot_detected, addr)
asyncio.get_event_loop().call_soon(self.announcer.reboot_detected, addr)
[docs] def sd_message_received(
self, sdhdr: someip.header.SOMEIPSDHeader, addr: _T_SOCKADDR, multicast: bool
) -> None:
"""
called when a well-formed SOME/IP SD message was received
"""
LOG.debug(
"sd_message_received received from %s (multicast=%r): %s",
format_address(addr),
multicast,
sdhdr,
)
if not sdhdr.flag_unicast:
# R21-11 PRS_SOMEIPSD_00843 ignoring multicast-only SD messages
LOG.warning(
"discarding multicast-only SD message from %s",
format_address(addr),
)
return
for entry in sdhdr.entries:
if entry.sd_type == someip.header.SOMEIPSDEntryType.OfferService:
asyncio.get_event_loop().call_soon(
self.discovery.handle_offer, entry, addr
)
continue
if entry.sd_type == someip.header.SOMEIPSDEntryType.SubscribeAck:
# TODO raise to application
# TODO figure out what to do when not receiving an ACK after X?
if entry.ttl == 0:
self.log.info("received Subscribe NACK from %s: %s", addr, entry)
else:
self.log.info("received Subscribe ACK from %s: %s", addr, entry)
continue
if entry.sd_type == someip.header.SOMEIPSDEntryType.FindService:
self.announcer.handle_findservice(
entry, addr, multicast
)
continue
if ( # pragma: nobranch
entry.sd_type == someip.header.SOMEIPSDEntryType.Subscribe
):
if multicast:
self.log.warning(
"discarding subscribe received over multicast from %s: %s",
format_address(addr),
entry,
)
continue
self.announcer.handle_subscribe(entry, addr)
continue
[docs]class ServiceSubscriber:
"""
datagram protocol for subscribing to eventgroups via SOME/IP SD
example:
TODO
"""
def __init__(self, sd: ServiceDiscoveryProtocol):
self.sd = sd
self.timings = sd.timings
self.log = sd.log.getChild("subscribe")
ttl = self.timings.SUBSCRIBE_TTL
refresh_interval = self.timings.SUBSCRIBE_REFRESH_INTERVAL
if not refresh_interval and ttl < TTL_FOREVER: # pragma: nocover
self.log.warning(
"no refresh, but ttl=%r set. expect lost connection after ttl", ttl
)
elif refresh_interval and refresh_interval >= ttl: # pragma: nocover
self.log.warning(
"refresh_interval=%r too high for ttl=%r. expect dropped updates.",
refresh_interval,
ttl,
)
self.task: typing.Optional[asyncio.Task[None]] = None
# separate alive tracking (instead of using task.done()) as task will only run
# for one iteration when ttl=None
self.alive = False
self.subscribeentries: typing.List[
typing.Tuple[someip.config.Eventgroup, _T_SOCKADDR],
] = []
[docs] def subscribe_eventgroup(
self, eventgroup: someip.config.Eventgroup, endpoint: _T_SOCKADDR
) -> None:
"""
eventgroup:
someip.config.Eventgroup that describes the eventgroup to subscribe to and the
local endpoint that accepts the notifications
endpoint:
remote SD endpoint that will receive the subscription messages
"""
# relies on _subscribe() to send out the Subscribe messages in the next cycle.
self.subscribeentries.append((eventgroup, endpoint))
if self.alive:
asyncio.get_event_loop().call_soon(
self._send_start_subscribe, endpoint, [eventgroup]
)
[docs] def stop_subscribe_eventgroup(
self,
eventgroup: someip.config.Eventgroup,
endpoint: _T_SOCKADDR,
send: bool = True,
) -> None:
"""
eventgroup:
someip.config.Eventgroup that describes the eventgroup to unsubscribe from
endpoint:
remote SD endpoint that will receive the subscription messages
"""
try:
self.subscribeentries.remove((eventgroup, endpoint))
except ValueError:
return
if send:
asyncio.get_event_loop().call_soon(
self._send_stop_subscribe, endpoint, [eventgroup]
)
def _send_stop_subscribe(
self, remote: _T_SOCKADDR, entries: typing.Collection[someip.config.Eventgroup]
) -> None:
self._send_subscribe(0, remote, entries)
def _send_start_subscribe(
self, remote: _T_SOCKADDR, entries: typing.Collection[someip.config.Eventgroup]
) -> None:
self._send_subscribe(self.timings.SUBSCRIBE_TTL, remote, entries)
def _send_subscribe(
self,
ttl: int,
remote: _T_SOCKADDR,
entries: typing.Collection[someip.config.Eventgroup],
) -> None:
self.sd.send_sd(
[e.create_subscribe_entry(ttl=ttl) for e in entries], remote=remote
)
[docs] def start(self, loop=None) -> None:
if self.alive: # pragma: nocover
return
if loop is None: # pragma: nobranch
loop = asyncio.get_event_loop()
self.alive = True
self.task = loop.create_task(self._subscribe())
[docs] def stop(self, send_stop_subscribe=True) -> None:
if not self.alive:
return
self.alive = False
if self.task: # pragma: nobranch
self.task.cancel()
asyncio.create_task(wait_cancelled(self.task))
self.task = None
if send_stop_subscribe:
for endpoint, entries in self._group_entries().items():
asyncio.get_event_loop().call_soon(
self._send_stop_subscribe, endpoint, entries
)
def _group_entries(
self,
) -> typing.Mapping[_T_SOCKADDR, typing.Collection[someip.config.Eventgroup]]:
endpoint_entries: typing.DefaultDict[
_T_SOCKADDR, typing.List[someip.config.Eventgroup]
] = collections.defaultdict(list)
for eventgroup, endpoint in self.subscribeentries:
endpoint_entries[endpoint].append(eventgroup)
return endpoint_entries
@log_exceptions()
async def _subscribe(self) -> None:
while True:
for endpoint, entries in self._group_entries().items():
self._send_start_subscribe(endpoint, entries)
if self.timings.SUBSCRIBE_REFRESH_INTERVAL is None:
break
try:
await asyncio.sleep(self.timings.SUBSCRIBE_REFRESH_INTERVAL)
except asyncio.CancelledError:
break
[docs] def reboot_detected(self, addr: _T_SOCKADDR) -> None:
# TODO
pass
[docs] def connection_lost(self, exc: typing.Optional[Exception]) -> None:
self.stop(send_stop_subscribe=False)
[docs]class ClientServiceListener:
[docs] def service_offered(
self, service: someip.config.Service, source: _T_SOCKADDR
) -> None:
...
[docs] def service_stopped(
self, service: someip.config.Service, source: _T_SOCKADDR
) -> None:
...
[docs]@dataclasses.dataclass(frozen=True)
class AutoSubscribeServiceListener(ClientServiceListener):
subscriber: ServiceSubscriber
eventgroup: someip.config.Eventgroup
[docs] def service_offered(
self, service: someip.config.Service, source: _T_SOCKADDR
) -> None:
eventgroup = self.eventgroup.for_service(service)
if not eventgroup: # pragma: nocover
return
# TODO support TCP event groups: application (or lib?) needs to open connection
# before subscribe
self.subscriber.subscribe_eventgroup(eventgroup, source)
[docs] def service_stopped(
self, service: someip.config.Service, source: _T_SOCKADDR
) -> None:
eventgroup = self.eventgroup.for_service(service)
if not eventgroup: # pragma: nocover
return
# TODO support TCP event groups: application (or lib?) needs to close connection
self.subscriber.stop_subscribe_eventgroup(eventgroup, source)
KT = typing.TypeVar("KT")
_T_CALLBACK = typing.Callable[[KT, _T_SOCKADDR], None]
[docs]class TimedStore(typing.Generic[KT]):
def __init__(self, log):
self.log = log
self.store: typing.Dict[
_T_SOCKADDR,
typing.Dict[
KT,
typing.Tuple[
typing.Callable[[KT, _T_SOCKADDR], None],
typing.Optional[asyncio.Handle],
],
],
] = collections.defaultdict(dict)
[docs] def refresh(
self,
ttl,
address: _T_SOCKADDR,
entry: KT,
callback_new: _T_CALLBACK[KT],
callback_expired: _T_CALLBACK[KT],
) -> None:
try:
_, old_timeout_handle = self.store[address].pop(entry)
if old_timeout_handle:
old_timeout_handle.cancel()
except KeyError:
# pop failed => new entry
callback_new(entry, address)
timeout_handle = None
if ttl != TTL_FOREVER:
timeout_handle = asyncio.get_event_loop().call_later(
ttl, self._expired, address, entry
)
self.store[address][entry] = (callback_expired, timeout_handle)
[docs] def stop(self, address: _T_SOCKADDR, entry: KT) -> None:
try:
callback, _timeout_handle = self.store[address].pop(entry)
except KeyError:
# race-condition: service was already stopped. don't notify again
return
if _timeout_handle:
_timeout_handle.cancel()
# this must be called immediately - otherwise pairs of StopSubscribe/Subscribe
# would not be handled correctly. If this were deferred to the event loop,
# callback_new would also need to run deferred, but then NakSubscription errors
# would not propagate to the sender
callback(entry, address)
[docs] def stop_all_for_address(self, address: _T_SOCKADDR) -> None:
for entry, (callback, handle) in self.store[address].items():
if handle:
handle.cancel()
asyncio.get_event_loop().call_soon(callback, entry, address)
self.store[address].clear()
[docs] def stop_all(self) -> None:
for addr in self.store.keys():
self.stop_all_for_address(addr)
self.store.clear()
[docs] def stop_all_matching(self, match: typing.Callable[[KT], bool]) -> None:
stopping_entries = [
(ep, entry)
for ep, entries in self.store.items()
for entry in entries
if match(entry)
]
for endpoint, entry in stopping_entries:
self.stop(endpoint, entry)
def _expired(self, address: _T_SOCKADDR, entry: KT) -> None:
try:
callback, _ = self.store[address].pop(entry)
except KeyError: # pragma: nocover
self.log.warning(
"race-condition: entry %r timeout was not in store but triggered"
" anyway. forgot to cancel?",
entry,
)
return
asyncio.get_event_loop().call_soon(callback, entry, address)
[docs] def entries(self) -> typing.Iterator[KT]:
return itertools.chain.from_iterable(x.keys() for x in self.store.values())
[docs]class ServiceDiscover:
def __init__(self, sd: ServiceDiscoveryProtocol):
self.sd = sd
self.timings = sd.timings
self.log = sd.log.getChild("discover")
self.watched_services: typing.Dict[
someip.config.Service,
typing.Set[ClientServiceListener],
] = collections.defaultdict(set)
self.watcher_all_services: typing.Set[ClientServiceListener] = set()
self.found_services: TimedStore[someip.config.Service] = TimedStore(self.log)
self.task: typing.Optional[asyncio.Task[None]] = None
[docs] def start(self):
if self.task is not None and not self.task.done(): # pragma: nocover
return
loop = asyncio.get_running_loop()
self.task = loop.create_task(self.send_find_services())
[docs] def stop(self):
if self.task: # pragma: nobranch
self.task.cancel()
asyncio.create_task(wait_cancelled(self.task))
self.task = None
[docs] def handle_offer(
self, entry: someip.header.SOMEIPSDEntry, addr: _T_SOCKADDR
) -> None:
if not self.is_watching_service(entry):
return
if entry.ttl == 0:
self.service_offer_stopped(addr, entry)
else:
self.service_offered(addr, entry)
[docs] def is_watching_service(self, entry: someip.header.SOMEIPSDEntry):
if self.watcher_all_services:
return True
return any(s.matches_offer(entry) for s in self.watched_services.keys())
[docs] def watch_service(
self, service: someip.config.Service, listener: ClientServiceListener
) -> None:
self.watched_services[service].add(listener)
for addr, services in self.found_services.store.items():
for s in services:
if service.matches_service(s):
asyncio.get_event_loop().call_soon(
listener.service_offered, s, addr
)
[docs] def stop_watch_service(
self, service: someip.config.Service, listener: ClientServiceListener
) -> None:
self.watched_services[service].remove(listener)
# TODO verify if this makes sense
for addr, services in self.found_services.store.items():
for s in services:
if service.matches_service(s):
asyncio.get_event_loop().call_soon(
listener.service_stopped, s, addr
)
[docs] def watch_all_services(self, listener: ClientServiceListener) -> None:
self.watcher_all_services.add(listener)
for addr, services in self.found_services.store.items():
for s in services:
asyncio.get_event_loop().call_soon(listener.service_offered, s, addr)
[docs] def stop_watch_all_services(self, listener: ClientServiceListener) -> None:
self.watcher_all_services.remove(listener)
# TODO verify if this makes sense
for addr, services in self.found_services.store.items():
for s in services:
asyncio.get_event_loop().call_soon(listener.service_stopped, s, addr)
[docs] def find_subscribe_eventgroup(self, eventgroup: someip.config.Eventgroup):
self.watch_service(
eventgroup.as_service(),
AutoSubscribeServiceListener(self.sd.subscriber, eventgroup),
)
[docs] def stop_find_subscribe_eventgroup(self, eventgroup: someip.config.Eventgroup):
self.stop_watch_service(
eventgroup.as_service(),
AutoSubscribeServiceListener(self.sd.subscriber, eventgroup),
)
def _service_found(self, service: someip.config.Service) -> bool:
return any(service.matches_service(s) for s in self.found_services.entries())
[docs] async def send_find_services(self):
if not self.watched_services:
return
def _build_entries():
return [
service.create_find_entry(self.timings.FIND_TTL)
for service in self.watched_services.keys()
if not self._service_found(service) # 4.2.1: SWS_SD_00365
]
await asyncio.sleep(
random.uniform(
self.timings.INITIAL_DELAY_MIN, self.timings.INITIAL_DELAY_MAX
)
)
find_entries = _build_entries()
if not find_entries:
return
self.sd.send_sd(find_entries) # 4.2.1: SWS_SD_00353
for i in range(self.timings.REPETITIONS_MAX):
await asyncio.sleep(
(2 ** i) * self.timings.REPETITIONS_BASE_DELAY
) # 4.2.1: SWS_SD_00363
find_entries = _build_entries()
if not find_entries:
return
self.sd.send_sd(find_entries) # 4.2.1: SWS_SD_00457
[docs] def service_offered(self, addr: _T_SOCKADDR, entry: someip.header.SOMEIPSDEntry):
service = someip.config.Service.from_offer_entry(entry)
self.found_services.refresh(
entry.ttl,
addr,
service,
self._notify_service_offered,
self._notify_service_stopped,
)
[docs] def connection_lost(self, exc: typing.Optional[Exception]) -> None:
self.found_services.stop_all()
[docs] def service_offer_stopped(
self, addr: _T_SOCKADDR, entry: someip.header.SOMEIPSDEntry
) -> None:
service = someip.config.Service.from_offer_entry(entry)
self.found_services.stop(addr, service)
[docs] def reboot_detected(self, addr: _T_SOCKADDR) -> None:
# notify stop for each service of rebooted instance.
# reboot_detected() is called before sd_message_received(), so any offered
# service in this message will cause a new notify
self.found_services.stop_all_for_address(addr)
def _notify_service_offered(
self, service: someip.config.Service, source: _T_SOCKADDR
) -> None:
for service_filter, listeners in self.watched_services.items():
if service_filter.matches_service(service):
for listener in listeners:
listener.service_offered(service, source)
for listener in self.watcher_all_services:
listener.service_offered(service, source)
def _notify_service_stopped(
self, service: someip.config.Service, source: _T_SOCKADDR
) -> None:
for service_filter, listeners in self.watched_services.items():
if service_filter.matches_service(service):
for listener in listeners:
listener.service_stopped(service, source)
for listener in self.watcher_all_services:
listener.service_stopped(service, source)
[docs]@dataclasses.dataclass(frozen=True)
class EventgroupSubscription:
service_id: int
instance_id: int
major_version: int
id: int
counter: int
ttl: int = dataclasses.field(compare=False)
endpoints: typing.FrozenSet[
someip.header.EndpointOption[typing.Any]
] = dataclasses.field(default_factory=frozenset)
options: typing.Tuple[someip.header.SOMEIPSDOption, ...] = dataclasses.field(
default_factory=tuple, compare=False
)
[docs] @classmethod
def from_subscribe_entry(cls, entry: someip.header.SOMEIPSDEntry):
endpoints = []
options = []
for option in entry.options:
if isinstance(option, someip.header.EndpointOption):
endpoints.append(option)
else:
options.append(option)
return cls(
service_id=entry.service_id,
instance_id=entry.instance_id,
major_version=entry.major_version,
id=entry.eventgroup_id,
counter=entry.eventgroup_counter,
ttl=entry.ttl,
endpoints=frozenset(endpoints),
options=tuple(options),
)
[docs] def to_ack_entry(self):
return someip.header.SOMEIPSDEntry(
sd_type=someip.header.SOMEIPSDEntryType.SubscribeAck,
service_id=self.service_id,
instance_id=self.instance_id,
major_version=self.major_version,
ttl=self.ttl,
minver_or_counter=(self.counter << 16) | self.id,
)
[docs] def to_nack_entry(self):
return dataclasses.replace(self, ttl=0).to_ack_entry()
[docs]class NakSubscription(Exception): # noqa: N818
pass
[docs]class ServerServiceListener:
[docs] def client_subscribed(
self, subscription: EventgroupSubscription, source: _T_SOCKADDR
) -> None:
"""
should raise someip.sd.NakSubscription if subscription should be rejected
"""
...
[docs] def client_unsubscribed(
self, subscription: EventgroupSubscription, source: _T_SOCKADDR
) -> None:
...
_T_SL = typing.Tuple[someip.config.Service, ServerServiceListener]
[docs]class ServiceInstance:
def __init__(
self,
service: someip.config.Service,
listener: ServerServiceListener,
announcer: ServiceAnnouncer,
timings: Timings,
):
self.service = service
self.listener = listener
self.announcer = announcer
self.timings = timings
self.log = announcer.log.getChild(
f"service_{service.service_id:04x}.instance_{service.instance_id:04x}"
)
self._can_answer_offers = False
self._task: typing.Optional[asyncio.Task[None]] = None
self.subscriptions: TimedStore[EventgroupSubscription] = TimedStore(self.log)
def __repr__(self):
return f"<ServiceInstance {self.service}>"
[docs] def start(self, loop=None):
if self._task is not None: # pragma: nocover
raise RuntimeError("task already started")
self._can_answer_offers = False
self._task = asyncio.create_task(self._offer_task())
[docs] def stop(self):
if self._task is None: # pragma: nocover
raise RuntimeError("task already stopped")
self._task.cancel()
asyncio.create_task(wait_cancelled(self._task))
self._task = None
# cyclic tasks send stop when they are cancelled
if not self.timings.CYCLIC_OFFER_DELAY:
self._send_offer(stop=True)
self.subscriptions.stop_all()
@log_exceptions()
async def _offer_task(self) -> None:
ttl = self.timings.ANNOUNCE_TTL
if ttl is not TTL_FOREVER and (
not self.timings.CYCLIC_OFFER_DELAY
or self.timings.CYCLIC_OFFER_DELAY >= ttl
):
self.log.warning(
"CYCLIC_OFFER_DELAY=%r too long for TTL=%r."
" expect connectivity issues",
self.timings.CYCLIC_OFFER_DELAY,
ttl,
)
await asyncio.sleep(
random.uniform(
self.timings.INITIAL_DELAY_MIN, self.timings.INITIAL_DELAY_MAX
)
)
self._send_offer()
try:
self._can_answer_offers = True
for i in range(self.timings.REPETITIONS_MAX):
await asyncio.sleep((2 ** i) * self.timings.REPETITIONS_BASE_DELAY)
self._send_offer()
if not self.timings.CYCLIC_OFFER_DELAY: # 4.2.1 SWS_SD_00451
return
while True:
# 4.2.1 SWS_SD_00450
await asyncio.sleep(self.timings.CYCLIC_OFFER_DELAY)
self._send_offer()
except asyncio.CancelledError:
self._can_answer_offers = False
raise
finally:
if self.timings.CYCLIC_OFFER_DELAY:
self._send_offer(stop=True)
def _send_offer(self, remote: _T_OPT_SOCKADDR = None, stop: bool = False) -> None:
entry = self.service.create_offer_entry(
self.timings.ANNOUNCE_TTL if not stop else 0
)
self.announcer.queue_send(entry, remote=remote)
[docs] def matches_find(
self, entry: someip.header.SOMEIPSDEntry, addr: _T_SOCKADDR
) -> bool:
if not self._can_answer_offers:
# 4.2.1 SWS_SD_00319
self.log.info(
"ignoring FindService from %s during Initial Wait Phase: %s",
format_address(addr),
entry,
)
return False
return self.service.matches_find(entry)
[docs] def handle_subscribe(
self,
entry: someip.header.SOMEIPSDEntry,
addr: _T_SOCKADDR,
) -> bool:
if self._task is None:
return False
if not self.service.matches_subscribe(entry):
return False
subscription = EventgroupSubscription.from_subscribe_entry(entry)
if entry.ttl == 0:
self.eventgroup_subscribe_stopped(addr, subscription)
return True
try:
self.subscriptions.refresh(
subscription.ttl,
addr,
subscription,
self.listener.client_subscribed,
self.listener.client_unsubscribed,
)
except NakSubscription:
self.announcer._send_subscribe_nack(subscription, addr)
else:
self.announcer.queue_send(subscription.to_ack_entry(), remote=addr)
return True
[docs] def eventgroup_subscribe_stopped(
self, addr: _T_SOCKADDR, subscription: EventgroupSubscription
) -> None:
self.subscriptions.stop(addr, subscription)
[docs] def reboot_detected(self, addr: _T_SOCKADDR) -> None:
self.subscriptions.stop_all_for_address(addr)
VT = typing.TypeVar("VT")
[docs]class SendCollector(typing.Generic[KT]):
def __init__(
self,
timeout: float,
callback: typing.Callable[[typing.List[VT]], None],
*args,
**kwargs,
):
self.data: typing.List[VT] = []
self.args = args
self.kwargs = kwargs
self.callback = callback
self.done = False
self._handle = asyncio.get_event_loop().call_later(
timeout, self._handle_timeout
)
def _handle_timeout(self) -> None:
self.done = True
self.callback(self.data, *self.args, **self.kwargs)
[docs] def append(self, datum) -> None:
if self.done:
raise RuntimeError("tried to append data on an expired SendCollector")
self.data.append(datum)
[docs] def cancel(self) -> None:
self._handle.cancel()
[docs]class ServiceAnnouncer:
# TODO doc
def __init__(self, sd: ServiceDiscoveryProtocol):
self.sd = sd
self.timings = sd.timings
self.log = sd.log.getChild("announce")
self.started = False
self.announcing_services: typing.List[ServiceInstance] = []
self.send_queues: typing.Dict[
_T_OPT_SOCKADDR, SendCollector[someip.header.SOMEIPSDEntry]
] = {}
[docs] def queue_send(
self, entry: someip.header.SOMEIPSDEntry, remote: _T_OPT_SOCKADDR = None
) -> None:
if self.timings.SEND_COLLECTION_TIMEOUT == 0:
self.sd.send_sd([entry], remote=remote)
return
queue = self.send_queues.get(remote)
if queue is None or queue.done:
self.send_queues[remote] = queue = SendCollector(
self.timings.SEND_COLLECTION_TIMEOUT, self.sd.send_sd, remote=remote
)
# FIXME stops and starts for the same instance in the same queue make no sense
# and should probably be cleaned out
queue.append(entry)
[docs] def announce_service(self, instance: ServiceInstance) -> None:
if self.started:
instance.start()
self.announcing_services.append(instance)
[docs] def stop_announce_service(self, instance: ServiceInstance, send_stop=True) -> None:
"""
stops announcing previously started service
:param instance: service instance to be stopped
:raises ValueError: if the service was not announcing
"""
self.announcing_services.remove(instance)
if send_stop and self.started:
instance.stop()
[docs] def handle_subscribe(
self,
entry: someip.header.SOMEIPSDEntry,
addr: _T_SOCKADDR,
) -> None:
matching_services = []
for instance in self.announcing_services:
if instance.handle_subscribe(entry, addr):
matching_services.append(instance)
if not matching_services:
self.log.warning(
"discarding subscribe for unknown service from %s: %s",
format_address(addr),
entry,
)
subscription = EventgroupSubscription.from_subscribe_entry(entry)
self._send_subscribe_nack(subscription, addr)
return
if len(matching_services) > 1:
self.log.warning(
"multiple configured services matched subscribe %s from %s: %s",
entry,
format_address(addr),
matching_services,
)
def _send_subscribe_nack(
self, subscription: EventgroupSubscription, addr: _T_SOCKADDR
) -> None:
self.queue_send(subscription.to_nack_entry(), remote=addr)
[docs] def handle_findservice(
self,
entry: someip.header.SOMEIPSDEntry,
addr: _T_SOCKADDR,
received_over_multicast: bool,
) -> None:
self.log.info("received from %s: %s", format_address(addr), entry)
matching_instances = []
for instance in self.announcing_services:
if instance.matches_find(entry, addr):
matching_instances.append(instance)
if not matching_instances:
return
# R21-11 PRS_SOMEIPSD_00423 not implemented because it's unclear how it should
# behave for multiple services with different offer periods
# R21-11 PRS_SOMEIPSD_00417 and PRS_SOMEIPSD_00419
if received_over_multicast:
# R21-11 PRS_SOMEIPSD_00420 and PRS_SOMEIPSD_00421
delay = random.uniform(
self.timings.REQUEST_RESPONSE_DELAY_MIN,
self.timings.REQUEST_RESPONSE_DELAY_MAX,
)
def call(func) -> None:
asyncio.get_event_loop().call_later(delay, func, addr)
else:
def call(func) -> None:
asyncio.get_event_loop().call_soon(func, addr)
for instance in matching_instances:
call(instance._send_offer)
[docs] def start(self, loop=None):
for instance in self.announcing_services:
instance.start()
self.started = True
[docs] def stop(self):
for instance in self.announcing_services:
instance.stop()
self.started = False
[docs] def connection_lost(self, exc: typing.Optional[Exception]) -> None:
self.stop()
[docs] def reboot_detected(self, addr: _T_SOCKADDR) -> None:
for instance in self.announcing_services:
instance.reboot_detected(addr)