import re from datetime import datetime from typing import Optional import requests from pydantic import BaseModel, Field, model_validator from model.settings import settings # we are assuming, that at the time of loading the app is initializes and the config is loaded OUI_DB = dict() # TODO: redis-cache if not settings.oui_lookup_disabled: for oui in ['http://standards-oui.ieee.org/oui.txt', 'https://standards-oui.ieee.org/oui28/mam.txt', 'https://standards-oui.ieee.org/oui36/oui36.txt']: ouis = requests.get(oui).text.split('\n') # normal request, not with fastapi Request Object last_hex = '' if len(ouis) == 0: raise ValueError(f'Empty OUI-DB from {oui}!') for line in ouis: if '(hex)' not in line: if '(base 16)' in line: p16 = line.split('(base 16)') p16[0] = p16[0].strip() if '-' in p16[0]: r = [int(p, base=16) for p in p16[0].split('-')] OUI_DB[last_hex].append((r[0], r[1], p16[1].strip())) continue parts = line.split('(hex)') parts[0] = parts[0].strip().lower().replace('-', ':') last_hex = parts[0] if parts[0] not in OUI_DB: OUI_DB[parts[0]] = [(None, None, parts[1].strip())] else: OUI_DB[parts[0]].append((None, None, parts[1].strip())) class MacFinderJobCreationRequest(BaseModel): mac: str = Field() bcd: str = Field() class MacFinderJobCreationResponse(BaseModel): job_ids: list[str] = Field(default=[]) class MacFinderJob(BaseModel): # kein DB Object mehr -> ok? job_id: str = Field() finished: bool = Field(default=False) failed: bool = Field(default=False) status: Optional[dict] = Field(default=None) mac: str = Field() vendor: Optional[str] = Field(default=None) vlan: int = Field(description='VLAN-Id') bcd: str = Field(description='Name of the bcd') started: datetime = Field(default_factory=datetime.now) @model_validator(mode='after') def validate_data(self): self.mac = format_mac(self.mac) if self.vendor is None: tmp_vendor = OUI_DB.get(self.mac[:8], None) if tmp_vendor is not None: self.vendor = tmp_vendor[0][2] return self def poll(self, session): if self.finished: return self.status self.status = session.get(settings.macfinder_server_url + '/status/{}'.format(self.job_id), cert=(settings.macfinder_cert, settings.macfinder_key), verify=settings.macfinder_ca).json() self.finished = self.status['status'] == 'finished' self.failed = self.status['status'] == 'failed' return self.status def format_mac(mac): mac = mac.lower().strip() mac = re.sub(r'(.*?)([0-9a-f]+|$|^)', lambda m: ('' * len(m.groups()[0]) + m.groups()[1]), mac) return ''.join(':' + c if i % 2 == 0 and not i == 0 else c for i, c in enumerate(mac, 0))