Source code for someip.config

"""
Classes for defining a :class:`Service` or :class:`Eventgroup`.
These definitions will be used to match against, and to convert to SD service or
eventgroup entries as seen on the wire (see :class:`someip.header.SOMEIPSDEntry`).
"""
from __future__ import annotations

import dataclasses
import ipaddress
import socket
import typing

import someip.header


_T_ADDR = typing.Tuple[typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address], int]
_T_SOCKNAME = typing.Union[typing.Tuple[str, int], typing.Tuple[str, int, int, int]]


[docs]@dataclasses.dataclass(frozen=True) class Eventgroup: """ Defines an Eventgroup that can be subscribed to. :param service_id: :param instance_id: :param major_version: :param eventgroup_id: :param sockname: the socket address as returned by :meth:`socket.getsockname` :type sockname: tuple :param protocol: selects the layer 4 protocol """ service_id: int instance_id: int major_version: int eventgroup_id: int sockname: _T_SOCKNAME protocol: someip.header.L4Protocols
[docs] def create_subscribe_entry( self, ttl: int = 3, counter: int = 0 ) -> someip.header.SOMEIPSDEntry: """ Create a SD Subscribe entry for this eventgroup. :param ttl: the TTL for this Subscribe entry :param counter: counter to identify this specific subscription in otherwise identical subscriptions :return: the Subscribe SD entry for this eventgroup """ endpoint_option = self._sockaddr_to_endpoint(self.sockname, self.protocol) return someip.header.SOMEIPSDEntry( sd_type=someip.header.SOMEIPSDEntryType.Subscribe, service_id=self.service_id, instance_id=self.instance_id, major_version=self.major_version, ttl=ttl, minver_or_counter=(counter << 16) | self.eventgroup_id, options_1=(endpoint_option,), )
[docs] def for_service(self, service: Service) -> typing.Optional[Eventgroup]: """ replace a generic definition (that may contain wildcards in :attr:`instance_id` and :attr:`major_version`) with actual values from a :class:`Service`. :param service: actual service :return: A new :class:`Eventgroup` with :attr:`instance_id` and :attr:`major_version` from service. None if this eventgroup does not match the given service. """ if not self.as_service().matches_offer(service.create_offer_entry()): return None return dataclasses.replace( self, instance_id=service.instance_id, major_version=service.major_version, )
[docs] def as_service(self): """ returns a :class:`Service` for this event group, e.g. for use with :meth:`~someip.sd.ServiceDiscover.watch_service`. """ return Service( service_id=self.service_id, instance_id=self.instance_id, major_version=self.major_version, )
@staticmethod def _sockaddr_to_endpoint( sockname: _T_SOCKNAME, protocol: someip.header.L4Protocols ) -> someip.header.SOMEIPSDOption: host, port = socket.getnameinfo( sockname, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV ) nport = int(port) naddr = ipaddress.ip_address(host) if isinstance(naddr, ipaddress.IPv4Address): return someip.header.IPv4EndpointOption( address=naddr, l4proto=protocol, port=nport ) elif isinstance(naddr, ipaddress.IPv6Address): return someip.header.IPv6EndpointOption( address=naddr, l4proto=protocol, port=nport ) else: # pragma: nocover raise TypeError("unsupported IP address family") def __str__(self) -> str: # pragma: nocover return ( f"eventgroup={self.eventgroup_id:04x} service=0x{self.service_id:04x}," f" instance=0x{self.instance_id:04x}, version={self.major_version}" f" addr={self.sockname!r} proto={self.protocol.name}" )
[docs]@dataclasses.dataclass(frozen=True) class Service: """ Defines a Service that can be found and offered. :param service_id: :param instance_id: may be 0xFFFF (default) to match any instance :param major_version: may be 0xFF (default) to match any major version :param minor_version: may be 0xFFFFFFFF (default) to match any major version :param options_1: options that apply to this service (run 1) :param options_2: options that apply to this service (run 2) :param eventgroups: offered eventgroup ids. """ service_id: int instance_id: int = 0xFFFF major_version: int = 0xFF minor_version: int = 0xFFFFFFFF options_1: typing.Tuple[someip.header.SOMEIPSDOption, ...] = dataclasses.field( default=(), compare=False ) options_2: typing.Tuple[someip.header.SOMEIPSDOption, ...] = dataclasses.field( default=(), compare=False ) eventgroups: typing.FrozenSet[int] = frozenset()
[docs] def matches_offer(self, entry: someip.header.SOMEIPSDEntry) -> bool: """ Test if a received OfferService :class:`~someip.header.SOMEIPSDEntry` matches this service. This is the case if the `service_id` is identical, and `instance_id`, `major_version` and `minor_version` are either identical or set to wildcard values on this :class:`Service` instance. :param entry: the entry to match against :return: True if the given OfferService entry matches this service :raises ValueError: if the entry is no OfferService """ if entry.sd_type != someip.header.SOMEIPSDEntryType.OfferService: raise ValueError("entry is no OfferService") if self.service_id != entry.service_id: return False if self.instance_id != 0xFFFF and self.instance_id != entry.instance_id: return False if self.major_version != 0xFF and self.major_version != entry.major_version: return False if ( self.minor_version != 0xFFFFFFFF and self.minor_version != entry.service_minor_version ): return False return True
[docs] def matches_find(self, entry: someip.header.SOMEIPSDEntry) -> bool: """ Test if a received FindService :class:`~someip.header.SOMEIPSDEntry` matches this service. This is the case if the `service_id` fields are equal, and `instance_id`, `major_version` and `minor_version` fields are either equal or set to wildcard values on the FindService SD entry. :param entry: the entry to match against :return: True if the given FindService entry matches this service :raises ValueError: if the entry is no FindService """ if entry.sd_type != someip.header.SOMEIPSDEntryType.FindService: raise ValueError("entry is no FindService") if self.service_id != entry.service_id: return False if entry.instance_id != 0xFFFF and self.instance_id != entry.instance_id: return False if entry.major_version != 0xFF and self.major_version != entry.major_version: return False if ( entry.service_minor_version != 0xFFFFFFFF and self.minor_version != entry.service_minor_version ): return False return True
[docs] def matches_subscribe(self, entry: someip.header.SOMEIPSDEntry) -> bool: """ Test if a received Subscribe :class:`~someip.header.SOMEIPSDEntry` matches this service. This is the case if the `service_id` fields are equal, the `eventgroup_id` is in :attr:`eventgroups`, and `instance_id` and `major_version` fields are either equal or set to wildcard values on this :class:`Service` instance. :param entry: the entry to match against :return: True if the given Subscribe entry matches this service :raises ValueError: if the entry is no Subscribe """ if entry.sd_type != someip.header.SOMEIPSDEntryType.Subscribe: raise ValueError("entry is no Subscribe") if self.service_id != entry.service_id: return False if self.instance_id != 0xFFFF and self.instance_id != entry.instance_id: return False if self.major_version != 0xFF and self.major_version != entry.major_version: return False return entry.eventgroup_id in self.eventgroups
[docs] def matches_service(self, other: Service) -> bool: """ Test if a given service matches this service. This is the case if the :attr:`service_id` fields are equal, and :attr:`instance_id`, :attr:`major_version` and :attr:`minor_version` are either equal or set to wildcard values on either :class:`Service` instance. :param other: the service to match against :return: True if the given service matches this service """ if self.service_id != other.service_id: return False if ( self.instance_id != 0xFFFF and other.instance_id != 0xFFFF and self.instance_id != other.instance_id ): return False if ( self.major_version != 0xFF and other.major_version != 0xFF and self.major_version != other.major_version ): return False if ( self.minor_version != 0xFFFFFFFF and other.minor_version != 0xFFFFFFFF and self.minor_version != other.minor_version ): return False return True
[docs] def create_find_entry(self, ttl=3): """ Create a SD FindService entry for this service. :param ttl: the TTL for this FindService entry :return: the FindService SD entry for this service """ return someip.header.SOMEIPSDEntry( sd_type=someip.header.SOMEIPSDEntryType.FindService, service_id=self.service_id, instance_id=self.instance_id, major_version=self.major_version, ttl=ttl, minver_or_counter=self.minor_version, )
[docs] def create_offer_entry(self, ttl=3): """ Create a SD OfferService entry for this service. :param ttl: the TTL for this FindService entry :return: the OfferService SD entry for this service """ return someip.header.SOMEIPSDEntry( sd_type=someip.header.SOMEIPSDEntryType.OfferService, service_id=self.service_id, instance_id=self.instance_id, major_version=self.major_version, ttl=ttl, minver_or_counter=self.minor_version, options_1=tuple(self.options_1), options_2=tuple(self.options_2), )
def __str__(self) -> str: # pragma: nocover version = f"{self.major_version}.{self.minor_version}" s_options_1 = ( ", ".join(str(o) for o in self.options_1) if self.options_1 else "" ) s_options_2 = ( ", ".join(str(o) for o in self.options_2) if self.options_2 else "" ) return ( f"service=0x{self.service_id:04x}, instance=0x{self.instance_id:04x}," f" version={version}, options_1=[{s_options_1}], options_2=[{s_options_2}]" )
[docs] @classmethod def from_offer_entry(cls, entry: someip.header.SOMEIPSDEntry) -> Service: """ Create a :class:`Service` from a given OfferService SD entry. :param entry: the entry as seen on the wire :return: a new :class:`Service` instance with values set from the given OfferService entry :raises ValueError: if the entry is no OfferService, or the entry does not have resolved options (see :attr:`someip.header.SOMEIPSDEntry.options_resolved`) """ if entry.sd_type != someip.header.SOMEIPSDEntryType.OfferService: raise ValueError("entry is no OfferService") if not entry.options_resolved: raise ValueError("entry must have resolved options") return cls( entry.service_id, entry.instance_id, entry.major_version, entry.service_minor_version, options_1=tuple(entry.options_1), options_2=tuple(entry.options_2), )