Skip to content
Snippets Groups Projects
shared.py 2.13 KiB
Newer Older
Julian Behringer's avatar
Julian Behringer committed
import json
from pathlib import Path

import requests

OKOBAU_URL = "https://oekobaudat.de/OEKOBAU.DAT/resource/datastocks/cd2bda71-760b-4fcc-8a0b-3877c10000a8"


def get_epds_limit(limit=10) -> dict:
    """Get EPDs from Ökobau"""

    response = requests.get(f"{OKOBAU_URL}/processes?format=json&pageSize={limit}")
    response.raise_for_status()
    data = response.json()

    print(f"{data.get('pageSize')} EPDs von {data.get('totalCount')} aus der ÖKOBAUDAT abgerufen")

    return data
    
def get_all_epds() -> dict:
    """Get EPDs from Ökobau"""

    response2 = requests.get(f"{OKOBAU_URL}/processes?format=json&pageSize=1")
    response2.raise_for_status()
    data2 = response2.json()
    
    response = requests.get(f"{OKOBAU_URL}/processes?format=json&pageSize={data2.get('totalCount')}")
    response.raise_for_status()
    data = response.json()

    print(f"{data.get('pageSize')} EPDs von {data.get('totalCount')} aus der ÖKOBAUDAT abgerufen")

    return data
    
    
def get_epd_pdfs(uuid, version) -> dict:
    """Get the PDFs from EPD"""

    base_url = f"{OKOBAU_URL}/processes/{uuid}/epd"
    params = {'version': version}
    
    
    response = requests.get(base_url)
    
    # Überprüfen der Antwort
    if response.status_code == 200:
        # Erfolgreiche Antwort
        data = response.json()  # Wenn die Antwort JSON ist
        print(data)  # Hier kannst du die Daten weiter verarbeiten
    else:
        # Fehler bei der Anfrage
        print(f"Fehler: {response.status_code} - {response.reason}")

    return data


def get_full_epd(uid: str) -> dict:
    """Get the full dataset for a single EPD"""

    base_url = f"{OKOBAU_URL}/processes/{uid}"
    response = requests.get(f"{base_url}?format=json&view=extended")

    response.raise_for_status()
    data = response.json()
    data["source"] = base_url

    return data


def get_full_epd_str(uid: str) -> str:
    """Get the full dataset for a single EPD and return it as a string"""
    return json.dumps(get_full_epd(uid))


def get_folder(source, name: str) -> Path:
    folder = Path(source).parent / name
    if not folder.exists():
        folder.mkdir()

    return folder