Finanzgesundheit aus Bundesanzeiger-Daten: Ein simples ML‑Modell lokal trainieren & deployen (mit Python)
Von Datenabruf bis Deployment: Lerne Schritt für Schritt, wie du aus Bundesanzeiger-Jahresabschlüssen ein lokales Machine-Learning-Modell für Financial-Health-Scoring baust – mit Python, scikit-learn und der handelsregister.ai API.
Jahresabschlüsse im Bundesanzeiger liefern harte Fakten zur Vermögens‑, Finanz‑ und Ertragslage deutscher Unternehmen. Für Entwickler, Data Scientists und FinTech-Teams, die auf Basis dieser Daten automatisiert Scorings berechnen möchten, sind sie eine hervorragende Grundlage. In einem früheren Artikel haben wir bereits beschrieben, welche Bilanzkennzahlen typischerweise für ein Finanz- und Risikoscoring herangezogen werden – allerdings blieb das damals konzeptionell. Die Lücke zwischen „Theorie der Kennzahlenanalyse" und „tatsächlich laufendes Modell" war noch offen.
Genau diese Lücke schließen wir jetzt. In diesem Tutorial baust du ein einfaches, reproduzierbares Machine-Learning-Modell, das eine „Finanzgesundheits-Wahrscheinlichkeit" aus Bundesanzeiger-Daten schätzt. Du trainierst es lokal auf deinem Rechner und betreibst es lokal – ohne Cloud-Training, ohne externes Hosting, ohne Blackbox.
Die Daten beziehen wir über die handelsregister.ai API. Der Vorteil: Statt unstrukturierter PDFs bekommst du saubere, maschinenlesbare JSON-Antworten, die du direkt in deine Pipeline einspeisen kannst. Die Originaldaten stammen aus den offiziellen Quellen handelsregister.de und bundesanzeiger.de.
Hinweis: Dieses Tutorial dient ausschließlich als technisches Lernbeispiel. Das resultierende Modell ist weder ein offizielles Rating noch eine Anlage- oder Kreditberatung. Es arbeitet mit sogenannten „Weak Labels" – vereinfachten Regeln, die ein echtes Rating nicht ersetzen.
Was du am Ende dieses Tutorials hast
Am Ende dieses Artikels steht ein vollständiges, lauffähiges Python-Projekt auf deinem Rechner. Konkret besteht es aus vier Bausteinen, die aufeinander aufbauen:
Zunächst ein Datenabruf-Script, das Bundesanzeiger-Finanzdaten (Bilanz, GuV und KPIs) über die handelsregister.ai API als JSON herunterlädt und lokal speichert. Dann ein Feature-Engineering-Script, das diese rohen JSON-Dateien in einen tabellarischen Trainingsdatensatz umwandelt – mit einer Zeile pro Unternehmen und Geschäftsjahr, angereichert um abgeleitete Kennzahlen wie Eigenkapitalquote, ROA oder Personalintensität. Darauf aufbauend ein Trainings-Script, das eine logistische Regression auf diesem Datensatz trainiert und als einzelne Datei abspeichert. Und schließlich ein Scoring-CLI, mit dem du jederzeit eine Firma abfragen und in Echtzeit einen Gesundheitsscore berechnen kannst – optional auch als lokaler REST-Service mit FastAPI.
Voraussetzungen
Um dem Tutorial zu folgen, brauchst du Python 3.10 oder neuer und einen API-Key von handelsregister.ai (den du als Umgebungsvariable HANDELSREGISTER_API_KEY setzen kannst oder direkt im Code hinterlegen). Außerdem ist ein grundlegendes Verständnis von Bilanz- und GuV-Kennzahlen hilfreich – der Scoring-Artikel im Blog liefert dir dafür einen guten Einstieg.
Wichtige API-Basics für die Praxis:
Die handelsregister.ai API arbeitet als REST-Schnittstelle mit der Basis-URL https://handelsregister.ai/api. Die Authentifizierung erfolgt bevorzugt über den HTTP-Header x-api-key: YOUR_API_KEY (alternativ als Query-Parameter api_key, was aber als Legacy-Methode gilt). Für die Finanzdaten aktivierst du einzelne Features als Query-Parameter: feature=financial_kpi, feature=balance_sheet_accounts und feature=profit_and_loss_account. Jedes aktivierte Feature verbraucht Credits. Das Rate-Limit liegt bei 60 Anfragen pro Minute, sodass du bei sequenziellen Abrufen ein kurzes Pacing einbauen solltest – genau das machen wir im Code weiter unten.
Datenüberblick: Was kommt aus der API zurück?
Bevor wir in den Code einsteigen, lohnt es sich, kurz zu verstehen, wie die Finanzdaten strukturiert sind. Für unser Modell nutzen wir drei Features der API, die jeweils unterschiedliche Perspektiven auf die Finanzlage liefern:
Das Feature financial_kpi liefert aggregierte Jahreskennzahlen wie Umsatz, Jahresüberschuss/-fehlbetrag, Bilanzsumme und Mitarbeiterzahl. Es ist der kompakteste Einstieg in die Finanzdaten und enthält die Werte, die handelsregister.ai bereits aus den Jahresabschlüssen extrahiert hat. Das Feature balance_sheet_accounts enthält die vollständige Bilanz als hierarchische Baumstruktur – Aktiva und Passiva mit allen Unterpositionen (Anlagevermögen, Umlaufvermögen, Eigenkapital, Verbindlichkeiten usw.). Und schließlich gibt uns profit_and_loss_account die Gewinn- und Verlustrechnung in derselben hierarchischen Struktur: Umsatzerlöse, Materialaufwand, Personalaufwand bis hin zum Jahresergebnis.
Die Daten sind pro Geschäftsjahr gruppiert. Ein typischer Eintrag sieht vereinfacht so aus:
{
"entity_id": "…",
"balance_sheet_accounts": [
{
"year": 2024,
"balance_sheet_accounts": [
{"name": {"de": "Aktivseite"}, "value": 24966659.45, "children": [...]},
{"name": {"de": "Passivseite"}, "value": 24966659.45, "children": [...]}
]
}
],
"financial_kpi": [
{
"year": 2024,
"active_total": 24966659.45,
"revenue": 9502936.17,
"net_income": -14369731.62,
"employees": 111
}
]
}
Dieses Beispiel ist absichtlich stark gekürzt und anonymisiert. In der Praxis enthält jeder Bilanz-Knoten weitere children-Knoten, die die Unterpositionen aufschlüsseln – genau diese Baumstruktur nutzen wir später, um gezielt Eigenkapital, Verbindlichkeiten, Rückstellungen und andere Posten zu extrahieren.
Unser Ziel: „Financial Health" als Modellproblem definieren
Bevor wir Features berechnen und ein Modell trainieren, müssen wir eine grundlegende Frage klären: Wofür genau soll das Modell eine Vorhersage treffen – und woher kommen die Labels?
In einem idealen Szenario hättest du echte Ausfalldaten (z. B. „Firma X hat im Jahr Y Insolvenz angemeldet") oder extern vergebene Ratings als Zielvariable. In der Praxis fehlen solche Labels aber häufig – insbesondere wenn du als einzelner Entwickler oder kleines Team startest.
Die Lösung heißt Weak Supervision: Statt perfekter Labels definierst du eine einfache, regelbasierte Heuristik, die „finanziell kritisch" von „unauffällig" trennt. Das Modell lernt dann, diese Entscheidung aus den Kennzahlen zu generalisieren – idealerweise besser und robuster als die einzelne Regel allein.
Konkret definieren wir das Label so: Für jedes Unternehmen und Jahr t berechnen wir die Features und schauen dann auf den Zustand in t+1 (dem Folgejahr). Das Label distress_next_year wird auf 1 gesetzt, wenn im Folgejahr entweder die Eigenkapitalquote negativ ist (das Unternehmen also bilanziell überschuldet wäre) oder die Eigenkapitalquote unter 5 % liegt und gleichzeitig ein Verlust (negativer ROA) vorliegt. Alles andere wird als 0 (kein Distress) gelabelt.
Diese Definition ist bewusst simpel gehalten. Sie fängt die gravierendsten finanziellen Schieflagen ab, ist aber kein Ersatz für ein professionelles Rating. Der Punkt ist ein anderer: Sie gibt uns eine reproduzierbare, automatisierbare Grundwahrheit, auf der wir ein erstes Modell aufsetzen können, das du dann iterativ mit besseren Labels oder manuell kuratierten Beispielen verfeinerst.
Schritt 1: Projekt-Setup
Lege zunächst eine einfache Projektstruktur an. Du brauchst keine komplizierte Architektur – vier Python-Dateien in einem src/-Ordner und ein data/-Verzeichnis für Ein- und Ausgabedaten reichen vollkommen:
financial-health-model/
data/
companies.csv ← deine Firmenliste als Input
raw/ ← hier landen die abgerufenen JSON-Dateien
dataset.parquet ← der fertige Trainingsdatensatz
models/ ← trainiertes Modell + Metadaten
src/
fetch_data.py ← Datenabruf von der API
build_dataset.py ← Feature-Engineering + Labeling
train_model.py ← Modelltraining + Evaluation
score_company.py ← Live-Scoring einzelner Firmen
Erstelle eine virtuelle Umgebung und installiere die benötigten Pakete:
python -m venv .venv
source .venv/bin/activate # macOS/Linux
# .venv\Scripts\activate # Windows PowerShell
pip install -U pip
pip install requests pandas numpy scikit-learn joblib pyarrow
Setze deinen API-Key als Umgebungsvariable. Das ist die empfohlene Methode – analog zu den SDK-Beispielen im Blog:
export HANDELSREGISTER_API_KEY="YOUR_API_KEY"
Schritt 2: Daten abrufen und lokal speichern
Der erste Baustein unserer Pipeline liest eine Firmenliste aus einer CSV-Datei und ruft pro Unternehmen die drei Finanz-Features von der handelsregister.ai API ab. Für ein erstes Modell reichen oft ein paar hundert bis einige tausend Unternehmen – je nachdem, wie viele davon tatsächlich Jahresabschlüsse im Bundesanzeiger veröffentlicht haben.
Die Input-Datei: data/companies.csv
Die CSV-Datei unterstützt zwei Formate. Am einfachsten ist eine einzelne Spalte q, die direkt als Suchquery an die API weitergereicht wird:
q
Muster GmbH Berlin
Beispiel AG Hamburg
Beispiel GmbH München
Alternativ kannst du auch zwei Spalten company_name und city verwenden, die das Script dann automatisch zu einer Query zusammensetzt:
company_name,city
Muster GmbH,Berlin
Beispiel AG,Hamburg
Beispiel GmbH,München
Das Abruf-Script: src/fetch_data.py
Das folgende Script ist absichtlich ausführlich kommentiert, damit du jeden Schritt nachvollziehen kannst. Die Kernlogik ist simpel: CSV einlesen, pro Zeile einen API-Call machen, die Antwort auf die Finanzdaten reduzieren und als JSON speichern.
Zwei Design-Entscheidungen sind dabei wichtig: Erstens speichern wir bewusst nur den Finanzteil der API-Antwort (KPIs, Bilanz, GuV) plus die entity_id. Firmennamen und Adressen werden nicht in die Rohdaten geschrieben – so kannst du das Trainingsmaterial weitergeben, ohne identifizierende Daten preiszugeben. Zweitens bauen wir ein einfaches Rate-Limiting ein: Die API erlaubt 60 Anfragen pro Minute, also setzen wir ein Minimum von ca. 1 Sekunde zwischen aufeinanderfolgenden Requests.
import csv
import json
import os
import re
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
import requests
# ──────────────────────────────────────────────
# Konfiguration
# ──────────────────────────────────────────────
API_BASE = "https://handelsregister.ai/api"
FETCH_ENDPOINT = f"{API_BASE}/v1/fetch-organization"
# Die drei Finanz-Features, die wir für unser Modell brauchen.
# Jedes Feature verbraucht Credits – wir holen also gezielt nur das,
# was wir auch verarbeiten.
FEATURES = ["financial_kpi", "balance_sheet_accounts", "profit_and_loss_account"]
def _get_api_key() -> str:
"""Liest den API-Key aus der Umgebungsvariable."""
api_key = os.getenv("HANDELSREGISTER_API_KEY")
if not api_key:
raise RuntimeError(
"Bitte setze die Umgebungsvariable HANDELSREGISTER_API_KEY "
"(oder passe den Code an)."
)
return api_key
def _slugify(text: str) -> str:
"""
Erzeugt aus einem beliebigen Text einen dateisystemfreundlichen
Dateinamen (Kleinbuchstaben, Bindestriche statt Sonderzeichen).
"""
text = text.strip().lower()
text = re.sub(r"[^a-z0-9äöüß]+", "-", text, flags=re.IGNORECASE)
text = re.sub(r"-+", "-", text).strip("-")
return text[:80] or "company"
def _read_queries(csv_path: Path) -> List[str]:
"""
Liest die Firmenliste aus der CSV-Datei und gibt eine Liste
von Such-Queries zurück.
Unterstützt zwei CSV-Formate:
- Variante A: Eine Spalte 'q' mit der fertigen Query.
- Variante B: Spalten 'company_name' (oder 'name') und optional
'city' (oder 'location'/'ort'), die zusammengesetzt werden.
"""
with csv_path.open("r", encoding="utf-8") as f:
reader = csv.DictReader(f)
fieldnames = reader.fieldnames or []
fieldnames_lower = [c.lower() for c in fieldnames]
queries: List[str] = []
for row in reader:
# Variante A: Spalte "q"
if "q" in fieldnames_lower:
q = row.get(fieldnames[fieldnames_lower.index("q")], "").strip()
if q:
queries.append(q)
continue
# Variante B: "company_name" + "city"
name_key = None
city_key = None
for cand in ["company_name", "name"]:
if cand in fieldnames_lower:
name_key = fieldnames[fieldnames_lower.index(cand)]
break
for cand in ["city", "location", "ort"]:
if cand in fieldnames_lower:
city_key = fieldnames[fieldnames_lower.index(cand)]
break
if not name_key:
raise ValueError(
"CSV muss entweder 'q' oder 'company_name'/'name' enthalten."
)
name = (row.get(name_key) or "").strip()
city = (row.get(city_key) or "").strip() if city_key else ""
q = f"{name} {city}".strip()
if q:
queries.append(q)
return queries
def fetch_organization(
session: requests.Session,
api_key: str,
q: str,
features: Optional[List[str]] = None,
ai_search: Optional[str] = None,
timeout_s: int = 30,
) -> Dict[str, Any]:
"""
Ruft ein Firmenprofil mit den angegebenen Features von der
handelsregister.ai API ab.
Die Authentifizierung erfolgt per x-api-key Header (empfohlen).
Features werden als wiederholte Query-Parameter übergeben
(z. B. ?feature=financial_kpi&feature=balance_sheet_accounts).
Die requests-Library rendert Python-Listen automatisch korrekt
als mehrfache Parameter.
"""
headers = {"x-api-key": api_key}
params: Dict[str, Any] = {"q": q}
if features:
params["feature"] = features
if ai_search:
params["ai_search"] = ai_search
resp = session.get(
FETCH_ENDPOINT, headers=headers, params=params, timeout=timeout_s
)
resp.raise_for_status()
return resp.json()
def sanitize_financial_payload(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Reduziert die API-Antwort auf den Finanzteil.
Das ist aus zwei Gründen wichtig:
1. Privacy: Wir speichern keine Namen/Adressen in Trainingsartefakten.
2. Fokus: Weniger Daten = schneller zu laden und zu verarbeiten.
"""
return {
"entity_id": data.get("entity_id"),
"financial_kpi": data.get("financial_kpi") or [],
"balance_sheet_accounts": data.get("balance_sheet_accounts") or [],
"profit_and_loss_account": data.get("profit_and_loss_account") or [],
"meta": data.get("meta") or {},
}
def main() -> None:
base_dir = Path(__file__).resolve().parents[1]
csv_path = base_dir / "data" / "companies.csv"
out_dir = base_dir / "data" / "raw"
out_dir.mkdir(parents=True, exist_ok=True)
api_key = _get_api_key()
queries = _read_queries(csv_path)
session = requests.Session()
# Einfaches Pacing: 60 Requests/min = ca. 1 Request/Sekunde.
# Wir nehmen etwas Puffer und warten 1,05 s zwischen den Calls.
min_interval_s = 1.05
last_call = 0.0
for i, q in enumerate(queries, start=1):
now = time.time()
elapsed = now - last_call
if elapsed < min_interval_s:
time.sleep(min_interval_s - elapsed)
try:
data = fetch_organization(
session=session,
api_key=api_key,
q=q,
features=FEATURES,
ai_search=None, # oder "on-default" für fuzzy matching
)
last_call = time.time()
except requests.HTTPError as e:
status = getattr(e.response, "status_code", None)
print(f"[{i}/{len(queries)}] FEHLER ({status}) bei Query: {q!r}")
continue
except Exception as e:
print(f"[{i}/{len(queries)}] FEHLER bei Query: {q!r} -> {e}")
continue
sanitized = sanitize_financial_payload(data)
entity_id = sanitized.get("entity_id") or f"unknown-{i}"
filename = f"{entity_id}_{_slugify(q)}.json"
out_path = out_dir / filename
with out_path.open("w", encoding="utf-8") as f:
json.dump(sanitized, f, ensure_ascii=False)
print(f"[{i}/{len(queries)}] OK -> {out_path.name}")
print("Fertig.")
if __name__ == "__main__":
main()
Nach dem Lauf findest du im Ordner data/raw/ pro Firma eine JSON-Datei mit der entity_id, den KPIs, der Bilanz und der GuV – bereit für den nächsten Schritt.
Schritt 3: Trainingsdatensatz bauen (Features + Labels)
Das Herzstück jedes ML-Projekts ist das Feature-Engineering. Hier verwandeln wir die rohen JSON-Strukturen in eine flache Tabelle, in der jede Zeile ein Tupel (entity_id, Jahr) repräsentiert – mit allen berechneten Kennzahlen und dem zugehörigen Label.
Welche Features berechnen wir?
Die Features, die wir ableiten, sind die Klassiker der Bilanzanalyse – viele davon haben wir bereits im konzeptionellen Scoring-Artikel beschrieben. Hier eine Übersicht, was unser Script berechnet:
Kapitalstruktur: Die Eigenkapitalquote (Eigenkapital ÷ Bilanzsumme) ist der wichtigste Einzelindikator für finanzielle Stabilität. Ergänzt wird sie durch die Fremdkapitalquote (Verbindlichkeiten ÷ Bilanzsumme) und die Umlaufvermögensquote, die zeigt, wie liquide das Vermögen aufgestellt ist.
Profitabilität: Der Return on Assets (ROA = Jahresergebnis ÷ Bilanzsumme) misst, wie effizient das Unternehmen sein Gesamtvermögen einsetzt. Die Nettomarge (Jahresergebnis ÷ Umsatz) zeigt, was vom Umsatz als Gewinn übrig bleibt.
Kostenstruktur: Materialaufwandsquote und Personalaufwandsquote (jeweils bezogen auf den Umsatz) geben Aufschluss über die Kostenstruktur. Sind diese Quoten sehr hoch, bleibt wenig Spielraum für Gewinne.
Größe und Effizienz: Die logarithmierte Bilanzsumme und Mitarbeiterzahl fangen die Unternehmensgröße ein (logarithmiert, weil Finanzwerte typischerweise schief verteilt sind). Der Umsatz pro Mitarbeiter ist ein gängiger Effizienzindikator.
Wachstumsdynamik: Year-over-Year-Veränderungen von Bilanzsumme, Umsatz und Ergebnis zeigen Trends. Ein schrumpfender Umsatz bei gleichzeitig steigenden Verbindlichkeiten ist ein klassisches Warnsignal.
Das Feature-Engineering-Script: src/build_dataset.py
Das Script iteriert über alle JSON-Dateien in data/raw/, extrahiert pro Jahr die relevanten Posten aus der Baumstruktur der Bilanz und GuV, berechnet die Kennzahlen und erzeugt das Label. Die zentrale Herausforderung dabei: Die Bilanz- und GuV-Daten sind hierarchisch strukturiert. Um z. B. das Eigenkapital zu finden, müssen wir den Baum traversieren und den richtigen Knoten anhand seines deutschen Namens identifizieren.
Ein HGB-spezifisches Detail, das leicht übersehen wird: Wenn ein Unternehmen bilanziell überschuldet ist (negatives Eigenkapital), wird das in HGB-Bilanzen nicht einfach als negativer Eigenkapitalposten auf der Passivseite ausgewiesen. Stattdessen erscheint ein Posten „Nicht durch Eigenkapital gedeckter Fehlbetrag" auf der Aktivseite. Unser Code fängt diesen Fall explizit ab und rechnet den Wert in ein negatives ökonomisches Eigenkapital um – ohne diesen Schritt würden wir überschuldete Unternehmen möglicherweise gar nicht als solche erkennen.
import json
import math
import re
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional
import numpy as np
import pandas as pd
# ──────────────────────────────────────────────
# Hilfsfunktionen für die Baumnavigation
# ──────────────────────────────────────────────
def _norm(s: Optional[str]) -> str:
"""Normalisiert einen String für Vergleiche (lowercase, Whitespace reduzieren)."""
if not s:
return ""
s = s.strip().lower()
s = re.sub(r"\s+", " ", s)
return s
def iter_nodes(node: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
"""Traversiert rekursiv einen Bilanz-/GuV-Knoten und alle seine Kinder."""
yield node
for child in node.get("children") or []:
yield from iter_nodes(child)
def iter_roots(roots: List[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
"""Traversiert eine Liste von Wurzelknoten (z. B. Aktiva + Passiva)."""
for r in roots:
yield from iter_nodes(r)
def find_value(
roots: List[Dict[str, Any]],
*,
exact_de: Optional[str] = None,
contains: Optional[str] = None,
) -> Optional[float]:
"""
Sucht in der Baumstruktur nach einem Posten.
- exact_de: Exakte Übereinstimmung mit dem deutschen Namen
(z. B. 'Eigenkapital')
- contains: Teilstring-Suche im Namen und/oder im 'in_report'-Feld
(z. B. 'Kassenbestand' findet auch 'Kassenbestand, Bundesbankguthaben...')
Gibt den 'value' des ersten Treffers zurück oder None.
"""
target_exact = _norm(exact_de) if exact_de else None
target_contains = _norm(contains) if contains else None
for n in iter_roots(roots):
name = n.get("name") or {}
de = name.get("de")
in_report = name.get("in_report")
hay = _norm(f"{de or ''} {in_report or ''}")
if target_exact and _norm(de) == target_exact:
return n.get("value")
if target_contains and target_contains in hay:
return n.get("value")
return None
def safe_div(num: Optional[float], den: Optional[float]) -> Optional[float]:
"""Division mit None- und Zero-Safety. Gibt None zurück statt zu crashen."""
if num is None or den is None:
return None
den = float(den)
if den == 0.0:
return None
return float(num) / den
# ──────────────────────────────────────────────
# Extraktion: Bilanz
# ──────────────────────────────────────────────
def extract_balance_sheet_year(year_entry: Dict[str, Any]) -> Dict[str, Any]:
"""
Extrahiert relevante Bilanzposten für ein einzelnes Geschäftsjahr.
Besonders zu beachten: Der HGB-Spezialfall bei negativem Eigenkapital.
Laut HGB wird ein 'Nicht durch Eigenkapital gedeckter Fehlbetrag' auf
der Aktivseite ausgewiesen (statt negativem Eigenkapital auf der
Passivseite). Diesen fangen wir hier ab und berechnen ein
'ökonomisches Eigenkapital', das negativ sein kann.
"""
roots = year_entry.get("balance_sheet_accounts") or []
assets_total = (
find_value(roots, exact_de="Aktivseite")
or find_value(roots, contains="Aktiva")
)
fixed_assets = find_value(roots, exact_de="Anlagevermögen")
current_assets = find_value(roots, exact_de="Umlaufvermögen")
inventories = find_value(roots, exact_de="Vorräte")
receivables = find_value(roots, contains="Forderungen")
cash = find_value(roots, contains="Kassenbestand")
equity_reported = find_value(roots, exact_de="Eigenkapital")
liabilities = find_value(roots, exact_de="Verbindlichkeiten")
provisions = find_value(roots, exact_de="Rückstellungen")
# HGB-Spezialfall: negatives Eigenkapital
neg_equity_abs = find_value(
roots, contains="nicht durch eigenkapital gedeckter fehlbetrag"
)
equity_economic = (
-neg_equity_abs
if (neg_equity_abs is not None and neg_equity_abs > 0)
else equity_reported
)
return {
"assets_total_bs": assets_total,
"fixed_assets": fixed_assets,
"current_assets": current_assets,
"inventories": inventories,
"receivables": receivables,
"cash": cash,
"equity_economic": equity_economic,
"liabilities": liabilities,
"provisions": provisions,
"neg_equity_abs": neg_equity_abs,
}
# ──────────────────────────────────────────────
# Extraktion: GuV
# ──────────────────────────────────────────────
def extract_profit_loss_year(year_entry: Dict[str, Any]) -> Dict[str, Any]:
"""
Extrahiert relevante GuV-Posten für ein einzelnes Geschäftsjahr.
Hinweis: Der Schlüssel für die GuV-Knoten innerhalb eines Jahreseintrags
kann je nach API-Antwort 'profit_and_loss_accounts' (Plural) heißen.
Prüfe bei abweichenden Ergebnissen die tatsächliche JSON-Struktur.
"""
# Die API gibt die GuV-Knoten innerhalb eines Jahreseintrags
# unter dem Schlüssel 'profit_and_loss_accounts' (Plural) zurück.
roots = year_entry.get("profit_and_loss_accounts") or []
revenue = find_value(roots, exact_de="Umsatzerlöse")
material = find_value(roots, exact_de="Materialaufwand")
personnel = find_value(roots, exact_de="Personalaufwand")
net_income = (
find_value(roots, contains="Jahresüberschuss")
or find_value(roots, contains="Jahresfehlbetrag")
or find_value(roots, contains="Ergebnis nach Steuern")
)
def abs_or_none(x: Any) -> Optional[float]:
return abs(float(x)) if isinstance(x, (int, float)) else None
return {
"revenue_pl": abs_or_none(revenue),
"material_expenses_pl": abs_or_none(material),
"personnel_expenses_pl": abs_or_none(personnel),
"net_income_pl": (
float(net_income) if isinstance(net_income, (int, float)) else None
),
}
# ──────────────────────────────────────────────
# Feature-Berechnung: Alles zusammenführen
# ──────────────────────────────────────────────
def build_features_for_entity(entity: Dict[str, Any]) -> Dict[int, Dict[str, Any]]:
"""
Berechnet für eine Firma und alle verfügbaren Jahre die ML-Features.
Die Funktion kombiniert drei Datenquellen:
1. financial_kpi (vorab extrahierte Kennzahlen)
2. balance_sheet_accounts (detaillierte Bilanz)
3. profit_and_loss_account (detaillierte GuV)
Wo möglich, werden Werte aus den KPIs bevorzugt (da diese bereits
geprüft und normalisiert sind). Fehlen sie dort, greifen wir auf
die detaillierten Bilanz-/GuV-Daten zurück.
"""
kpi_by_year = {
int(d["year"]): d
for d in (entity.get("financial_kpi") or [])
if d.get("year") is not None
}
bs_by_year = {
int(d["year"]): extract_balance_sheet_year(d)
for d in (entity.get("balance_sheet_accounts") or [])
if d.get("year") is not None
}
pl_by_year = {
int(d["year"]): extract_profit_loss_year(d)
for d in (entity.get("profit_and_loss_account") or [])
if d.get("year") is not None
}
years = sorted(set(kpi_by_year) | set(bs_by_year) | set(pl_by_year))
out: Dict[int, Dict[str, Any]] = {}
prev: Optional[Dict[str, Any]] = None
for y in years:
kpi = kpi_by_year.get(y, {})
bs = bs_by_year.get(y, {})
pl = pl_by_year.get(y, {})
# ── Basiskennzahlen (KPIs als Primärquelle, Bilanz/GuV als Fallback) ──
total_assets = kpi.get("active_total") or bs.get("assets_total_bs")
revenue = kpi.get("revenue") or pl.get("revenue_pl")
net_income = kpi.get("net_income")
if net_income is None:
net_income = pl.get("net_income_pl")
# Aufwandsposten: Wir nehmen den Betrag (abs), da je nach Darstellung
# die Vorzeichen variieren können.
material_expenses = kpi.get("material_expenses") or pl.get("material_expenses_pl")
material_expenses = (
abs(float(material_expenses))
if isinstance(material_expenses, (int, float))
else None
)
personnel_expenses = kpi.get("personnel_expenses") or pl.get("personnel_expenses_pl")
personnel_expenses = (
abs(float(personnel_expenses))
if isinstance(personnel_expenses, (int, float))
else None
)
employees = kpi.get("employees")
employees = (
int(employees)
if isinstance(employees, (int, float)) and not math.isnan(employees)
else None
)
equity = bs.get("equity_economic")
liabilities = bs.get("liabilities")
current_assets = bs.get("current_assets")
cash = bs.get("cash")
inventories = bs.get("inventories")
receivables = bs.get("receivables")
# ── Abgeleitete Ratios ──
equity_ratio = safe_div(equity, total_assets)
liabilities_ratio = safe_div(liabilities, total_assets)
current_assets_ratio = safe_div(current_assets, total_assets)
cash_assets_ratio = safe_div(cash, total_assets)
roa = safe_div(net_income, total_assets)
net_margin = safe_div(net_income, revenue)
inventory_share = safe_div(inventories, current_assets)
receivables_share = safe_div(receivables, current_assets)
material_ratio = safe_div(material_expenses, revenue)
personnel_ratio = safe_div(personnel_expenses, revenue)
revenue_per_employee = safe_div(revenue, employees)
personnel_per_employee = safe_div(personnel_expenses, employees)
# ── Log-transformierte Größen-Features ──
# Finanzdaten sind typischerweise rechtsschief verteilt.
# log1p(x) = ln(1+x) komprimiert die Verteilung und macht
# sie für lineare Modelle besser handhabbar.
log_assets = (
math.log1p(total_assets)
if isinstance(total_assets, (int, float)) and total_assets > 0
else None
)
log_employees = (
math.log1p(employees)
if isinstance(employees, int) and employees > 0
else None
)
log_rev_per_emp = (
math.log1p(revenue_per_employee)
if isinstance(revenue_per_employee, (int, float))
and revenue_per_employee > 0
else None
)
# ── Year-over-Year Growth ──
# Wachstumsraten zeigen Trends und sind oft aussagekräftiger
# als absolute Werte.
asset_growth = None
revenue_growth = None
income_growth = None
if prev is not None:
if total_assets is not None and prev["total_assets"]:
asset_growth = safe_div(
total_assets - prev["total_assets"], prev["total_assets"]
)
if revenue is not None and prev["revenue"]:
revenue_growth = safe_div(
revenue - prev["revenue"], prev["revenue"]
)
# Beim Ergebnis dividieren wir durch |Vorjahreswert|,
# damit ein Vorzeichenwechsel nicht zu explodierenden Werten führt.
if net_income is not None and prev["net_income"] not in (None, 0):
income_growth = safe_div(
net_income - prev["net_income"], abs(prev["net_income"])
)
out[y] = {
"year": y,
"total_assets": total_assets,
"revenue": revenue,
"net_income": net_income,
"employees": employees,
"equity_ratio": equity_ratio,
"liabilities_ratio": liabilities_ratio,
"current_assets_ratio": current_assets_ratio,
"cash_assets_ratio": cash_assets_ratio,
"roa": roa,
"net_margin": net_margin,
"inventory_share": inventory_share,
"receivables_share": receivables_share,
"material_ratio": material_ratio,
"personnel_ratio": personnel_ratio,
"log_assets": log_assets,
"log_employees": log_employees,
"log_revenue_per_employee": log_rev_per_emp,
"personnel_per_employee": personnel_per_employee,
"asset_growth": asset_growth,
"revenue_growth": revenue_growth,
"income_growth": income_growth,
}
prev = {
"total_assets": total_assets,
"revenue": revenue,
"net_income": net_income,
}
return out
# ──────────────────────────────────────────────
# Label-Funktion: Weak Supervision
# ──────────────────────────────────────────────
def distress_rule(next_year_features: Dict[str, Any]) -> Optional[int]:
"""
Regelbasiertes Label für finanziellen Distress im Folgejahr.
Bedingungen:
- Eigenkapitalquote < 0 → bilanzielle Überschuldung → distress = 1
- Eigenkapitalquote < 5 % UND ROA < 0 → kritisch dünn kapitalisiert
bei gleichzeitigem Verlust → distress = 1
- Sonst → distress = 0
Gibt None zurück, wenn die Daten für eine Entscheidung nicht
ausreichen (z. B. fehlende EK-Quote oder ROA).
"""
eqr = next_year_features.get("equity_ratio")
roa = next_year_features.get("roa")
if eqr is None or roa is None:
return None
if eqr < 0:
return 1
if eqr < 0.05 and roa < 0:
return 1
return 0
FEATURE_COLUMNS = [
"equity_ratio",
"liabilities_ratio",
"current_assets_ratio",
"cash_assets_ratio",
"roa",
"net_margin",
"inventory_share",
"receivables_share",
"material_ratio",
"personnel_ratio",
"log_assets",
"log_employees",
"log_revenue_per_employee",
"personnel_per_employee",
"asset_growth",
"revenue_growth",
"income_growth",
]
def main() -> None:
base_dir = Path(__file__).resolve().parents[1]
raw_dir = base_dir / "data" / "raw"
out_path = base_dir / "data" / "dataset.parquet"
out_csv = base_dir / "data" / "dataset.csv"
rows: List[Dict[str, Any]] = []
for json_path in raw_dir.glob("*.json"):
with json_path.open("r", encoding="utf-8") as f:
entity = json.load(f)
entity_id = entity.get("entity_id")
if not entity_id:
continue
feat_by_year = build_features_for_entity(entity)
years = sorted(feat_by_year.keys())
# Für das Label brauchen wir das Folgejahr.
# Wir iterieren also bis zum vorletzten Jahr und schauen
# jeweils auf year+1 für die Label-Entscheidung.
for idx, y in enumerate(years[:-1]):
x = feat_by_year[y]
y_next = years[idx + 1]
label = distress_rule(feat_by_year[y_next])
if label is None:
continue
row = {
"entity_id": entity_id,
"year": y,
"label_distress_next_year": int(label),
}
for col in FEATURE_COLUMNS:
row[col] = x.get(col)
rows.append(row)
df = pd.DataFrame(rows)
if df.empty:
raise RuntimeError(
"Kein Trainingsdatensatz erzeugt. "
"Prüfe Rohdaten/CSV-Liste/Feature-Verfügbarkeit."
)
# Spalten entfernen, die für alle Samples leer sind
all_nan_cols = [c for c in FEATURE_COLUMNS if c in df.columns and df[c].isna().all()]
if all_nan_cols:
df = df.drop(columns=all_nan_cols)
print(f"Hinweis: Spalten komplett leer, entfernt: {all_nan_cols}")
print("Samples:", len(df))
print("Entities:", df["entity_id"].nunique())
print(
"Label-Verteilung:\n",
df["label_distress_next_year"].value_counts(normalize=True),
)
df.to_parquet(out_path, index=False)
df.to_csv(out_csv, index=False)
print("Gespeichert:", out_path)
print("Gespeichert:", out_csv)
if __name__ == "__main__":
main()
Wenn du das Script laufen lässt, bekommst du einen Datensatz als Parquet- und CSV-Datei. Der Output zeigt dir direkt, wie viele Samples und Entitäten du hast und wie die Label-Verteilung aussieht. Bei stark unausgewogenen Labels (z. B. nur 3 % Distress) ist das normal – Firmen in Schieflage sind glücklicherweise die Ausnahme.
Schritt 4: Modell trainieren
Jetzt wird's spannend. Wir trainieren eine logistische Regression – bewusst simpel, bewusst erklärbar. Logistische Regression hat mehrere Vorteile als Startmodell: Sie ist schnell zu trainieren, liefert kalibrierte Wahrscheinlichkeiten, und du kannst über die Koeffizienten direkt ablesen, welche Features in welche Richtung wirken. Für ein Deployment brauchst du nur eine einzige joblib-Datei.
Ein wichtiges technisches Detail beim Train/Test-Split: Wir verwenden einen Group-Shuffle-Split, bei dem die Gruppierung nach entity_id erfolgt. Das bedeutet, dass alle Jahre derselben Firma entweder komplett im Trainings- oder komplett im Testset landen. Ohne diese Gruppierung würde das Modell lernen, firmenspezifische Muster zu erkennen (z. B. „Bilanzsumme X bei Eigenkapitalquote Y gehört zu Firma Z"), was die Test-Metriken künstlich aufbläht. Mit dem Group-Split testen wir wirklich auf Firmen, die das Modell noch nie gesehen hat.
Außerdem nutzen wir class_weight="balanced", damit die logistische Regression die (typischerweise unterrepräsentierte) Distress-Klasse stärker gewichtet. Ohne diesen Parameter würde das Modell bei stark unausgewogenen Daten einfach immer „kein Distress" vorhersagen und trotzdem eine hohe Accuracy erreichen – was natürlich nutzlos wäre.
Das Training-Script: src/train_model.py
import json
from pathlib import Path
from typing import List, Tuple
import joblib
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
average_precision_score,
classification_report,
confusion_matrix,
roc_auc_score,
)
from sklearn.model_selection import GroupShuffleSplit
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
def find_working_split(
X: pd.DataFrame,
y: pd.Series,
groups: pd.Series,
test_size: float = 0.2,
max_tries: int = 50,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Erzeugt einen gruppenbasierten Train/Test-Split.
Warum GroupShuffleSplit? Wir wollen sicherstellen, dass keine Firma
gleichzeitig in Train und Test auftaucht (Data Leakage vermeiden).
Die Schleife probiert verschiedene Random Seeds, bis ein Split
gefunden wird, der beide Klassen in Train UND Test enthält.
Bei kleinen Datensätzen mit wenigen positiven Labels kann es sein,
dass ein zufälliger Split alle positiven Fälle in ein Set packt –
das fangen wir hier ab.
"""
for seed in range(max_tries):
splitter = GroupShuffleSplit(
n_splits=1, test_size=test_size, random_state=seed
)
train_idx, test_idx = next(splitter.split(X, y, groups=groups))
y_train = y.iloc[train_idx]
y_test = y.iloc[test_idx]
if y_train.nunique() == 2 and y_test.nunique() == 2:
return train_idx, test_idx
raise RuntimeError(
"Konnte keinen Split finden, der beide Klassen in Train & Test "
"enthält. Du brauchst mehr Daten oder eine andere Label-Definition."
)
def main() -> None:
base_dir = Path(__file__).resolve().parents[1]
data_path = base_dir / "data" / "dataset.parquet"
model_dir = base_dir / "models"
model_dir.mkdir(parents=True, exist_ok=True)
df = pd.read_parquet(data_path)
target = "label_distress_next_year"
group_col = "entity_id"
# Alle Spalten außer Target, Gruppen-ID und Jahr sind Features
feature_cols: List[str] = [
c for c in df.columns if c not in [target, group_col, "year"]
]
X = df[feature_cols]
y = df[target].astype(int)
groups = df[group_col].astype(str)
train_idx, test_idx = find_working_split(X, y, groups, test_size=0.2)
X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
# Die Pipeline führt drei Schritte in Reihe aus:
# 1. Imputer: Fehlende Werte durch den Median ersetzen
# (robuster als der Mittelwert bei schiefen Verteilungen)
# 2. Scaler: Features auf Mittelwert 0 und Standardabweichung 1
# normieren (wichtig für Logistic Regression)
# 3. Model: Logistische Regression mit balancierter Klassengewichtung
pipe = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler()),
("model", LogisticRegression(max_iter=2000, class_weight="balanced")),
]
)
pipe.fit(X_train, y_train)
# ── Evaluation ──
proba = pipe.predict_proba(X_test)[:, 1]
pred = (proba >= 0.5).astype(int)
print("ROC-AUC:", roc_auc_score(y_test, proba))
print("Average Precision:", average_precision_score(y_test, proba))
print("Confusion Matrix:\n", confusion_matrix(y_test, pred))
print(classification_report(y_test, pred, digits=3))
# ── Modell und Metadaten speichern ──
model_path = model_dir / "financial_health_model.joblib"
joblib.dump(pipe, model_path)
meta = {
"feature_cols": feature_cols,
"target": target,
"label_definition": (
"distress_next_year = (equity_ratio<0) OR "
"(equity_ratio<0.05 AND roa<0) im Folgejahr"
),
"threshold": 0.5,
}
meta_path = model_dir / "financial_health_model.meta.json"
meta_path.write_text(
json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8"
)
print("Modell gespeichert:", model_path)
print("Meta gespeichert:", meta_path)
if __name__ == "__main__":
main()
Neben dem Modell speichern wir eine Meta-Datei im JSON-Format, die festhält, welche Feature-Spalten das Modell erwartet und wie das Label definiert ist. Das klingt nach einem kleinen Detail, wird aber wichtig, sobald du das Modell aktualisierst: So stellst du sicher, dass Training und Inferenz immer mit derselben Feature-Definition arbeiten.
Schritt 5: Firma lokal scoren (CLI)
Jetzt bauen wir den letzten Baustein: Ein CLI-Tool, das in Echtzeit eine Firma über die API abfragt, die Features berechnet und den trainierten Score ausgibt. Das Tool importiert die Feature-Logik direkt aus build_dataset.py – so ist garantiert, dass Training und Inferenz identische Berechnungen verwenden.
Ein kleines Bonus-Feature: Da wir eine logistische Regression nutzen, können wir eine einfache Feature-Erklärung liefern. Die Idee: Wir schauen uns an, welche Features (nach Standardisierung und Gewichtung durch die Koeffizienten) am stärksten zum Score beitragen. Das gibt dir sofort eine intuitive Antwort auf die Frage „Warum hat diese Firma diesen Score?" – bei komplexeren Modellen bräuchtest du dafür SHAP oder ähnliche Methoden.
Das Scoring-Script: src/score_company.py
import json
import os
import sys
from typing import Any, Dict, List, Optional, Tuple
import joblib
import numpy as np
import pandas as pd
import requests
# Wir importieren die Feature-Logik 1:1 aus build_dataset.py,
# damit Training und Inferenz identisch sind.
from build_dataset import build_features_for_entity, FEATURE_COLUMNS
API_BASE = "https://handelsregister.ai/api"
FETCH_ENDPOINT = f"{API_BASE}/v1/fetch-organization"
FEATURES = ["financial_kpi", "balance_sheet_accounts", "profit_and_loss_account"]
def get_api_key() -> str:
api_key = os.getenv("HANDELSREGISTER_API_KEY")
if not api_key:
raise RuntimeError("Bitte setze HANDELSREGISTER_API_KEY.")
return api_key
def fetch_financial_data(q: str) -> Dict[str, Any]:
"""
Ruft die Finanzdaten einer Firma ab und reduziert
die Antwort auf den relevanten Teil (wie beim Training).
"""
headers = {"x-api-key": get_api_key()}
params = {"q": q, "feature": FEATURES}
resp = requests.get(FETCH_ENDPOINT, headers=headers, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
return {
"entity_id": data.get("entity_id"),
"financial_kpi": data.get("financial_kpi") or [],
"balance_sheet_accounts": data.get("balance_sheet_accounts") or [],
"profit_and_loss_account": data.get("profit_and_loss_account") or [],
}
def explain_linear_model(
pipeline, x_row: pd.DataFrame, feature_cols: List[str], top_k: int = 8
) -> List[Dict[str, Any]]:
"""
Erzeugt eine einfache, lokale Erklärung für die Vorhersage.
Bei logistischer Regression ist die Erklärung besonders intuitiv:
Der Beitrag jedes Features zum Log-Odds-Score ergibt sich aus
coef[i] * z[i], wobei z[i] der standardisierte Feature-Wert ist.
Positive Beiträge erhöhen die vorhergesagte Distress-Wahrscheinlichkeit,
negative senken sie. Wir sortieren nach absolutem Beitrag und zeigen
die top_k einflussreichsten Features.
"""
imputer = pipeline.named_steps["imputer"]
scaler = pipeline.named_steps["scaler"]
model = pipeline.named_steps["model"]
x_imp = imputer.transform(x_row[feature_cols])
x_z = scaler.transform(x_imp)
coefs = model.coef_.reshape(-1)
contrib = x_z.reshape(-1) * coefs
order = np.argsort(np.abs(contrib))[::-1][:top_k]
out = []
for idx in order:
out.append(
{
"feature": feature_cols[idx],
"contribution_log_odds": float(contrib[idx]),
"direction": "erhöht Risiko" if contrib[idx] > 0 else "senkt Risiko",
"value": (
None
if pd.isna(
x_row.iloc[0, x_row.columns.get_loc(feature_cols[idx])]
)
else float(x_row.iloc[0][feature_cols[idx]])
),
}
)
return out
def risk_to_health_score(prob_distress: float) -> float:
"""Wandelt die Distress-Wahrscheinlichkeit in einen 0-100 Score um."""
return round((1.0 - prob_distress) * 100.0, 1)
def bucket(score: float) -> str:
"""Ordnet den Score in eine menschenlesbare Kategorie ein."""
if score >= 70:
return "gesund"
if score >= 40:
return "beobachten"
return "kritisch"
def main() -> None:
if len(sys.argv) < 2:
print('Usage: python src/score_company.py "Firmenname Ort"')
sys.exit(1)
q = " ".join(sys.argv[1:]).strip()
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
model_path = os.path.join(base_dir, "models", "financial_health_model.joblib")
meta_path = os.path.join(base_dir, "models", "financial_health_model.meta.json")
pipe = joblib.load(model_path)
meta = json.loads(open(meta_path, "r", encoding="utf-8").read())
feature_cols = meta["feature_cols"]
entity = fetch_financial_data(q)
feat_by_year = build_features_for_entity(entity)
if not feat_by_year:
raise RuntimeError("Keine Finanzjahre gefunden. Prüfe Query/Features.")
latest_year = max(feat_by_year.keys())
x = feat_by_year[latest_year]
row = {c: x.get(c) for c in feature_cols}
x_row = pd.DataFrame([row])
prob = float(pipe.predict_proba(x_row)[:, 1][0])
score = risk_to_health_score(prob)
explanation = explain_linear_model(pipe, x_row, feature_cols, top_k=8)
result = {
"query": q,
"entity_id": entity.get("entity_id"),
"year": latest_year,
"prob_distress_next_year": round(prob, 4),
"financial_health_score_0_100": score,
"bucket": bucket(score),
"top_drivers": explanation,
"disclaimer": (
"Nur ein technisches Beispielmodell (Weak Labels). "
"Kein offizielles Rating, keine Beratung."
),
}
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
Alles zusammen ausführen
Die komplette Pipeline lässt sich in vier Befehlen starten:
python src/fetch_data.py # Daten abrufen
python src/build_dataset.py # Features + Labels erzeugen
python src/train_model.py # Modell trainieren
python src/score_company.py "Beispiel GmbH Berlin" # Live-Scoring
Der Output des Scoring-Scripts sieht ungefähr so aus (Werte sind exemplarisch):
{
"query": "Beispiel GmbH Berlin",
"entity_id": "abc123...",
"year": 2023,
"prob_distress_next_year": 0.1842,
"financial_health_score_0_100": 81.6,
"bucket": "gesund",
"top_drivers": [
{"feature": "equity_ratio", "contribution_log_odds": -1.23, "direction": "senkt Risiko", "value": 0.42},
{"feature": "roa", "contribution_log_odds": -0.87, "direction": "senkt Risiko", "value": 0.05}
],
"disclaimer": "Nur ein technisches Beispielmodell (Weak Labels). Kein offizielles Rating, keine Beratung."
}
Optional: Lokaler REST-Service (FastAPI)
Wenn du den Score nicht nur über die Kommandozeile abrufen, sondern z. B. in ein internes Tool oder Dashboard einbinden möchtest, kannst du das Ganze mit wenig Aufwand als lokalen HTTP-Endpunkt bereitstellen. FastAPI eignet sich dafür hervorragend – es ist schnell, typsicher und generiert automatisch eine interaktive API-Dokumentation unter /docs.
pip install fastapi uvicorn
Erstelle die Datei src/app.py:
import json
import os
from typing import Any, Dict
import joblib
import pandas as pd
from fastapi import FastAPI, HTTPException
import requests
from build_dataset import build_features_for_entity
API_BASE = "https://handelsregister.ai/api"
FETCH_ENDPOINT = f"{API_BASE}/v1/fetch-organization"
FEATURES = ["financial_kpi", "balance_sheet_accounts", "profit_and_loss_account"]
app = FastAPI(title="Financial Health Scoring (local)")
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
MODEL_PATH = os.path.join(BASE_DIR, "models", "financial_health_model.joblib")
META_PATH = os.path.join(BASE_DIR, "models", "financial_health_model.meta.json")
# Modell und Metadaten einmalig beim Start laden
pipe = joblib.load(MODEL_PATH)
meta = json.loads(open(META_PATH, "r", encoding="utf-8").read())
feature_cols = meta["feature_cols"]
def api_key() -> str:
k = os.getenv("HANDELSREGISTER_API_KEY")
if not k:
raise RuntimeError("HANDELSREGISTER_API_KEY fehlt.")
return k
def fetch_financial(q: str) -> Dict[str, Any]:
headers = {"x-api-key": api_key()}
params = {"q": q, "feature": FEATURES}
r = requests.get(FETCH_ENDPOINT, headers=headers, params=params, timeout=30)
if r.status_code != 200:
raise HTTPException(
status_code=502, detail=f"Upstream error: {r.status_code}"
)
data = r.json()
return {
"entity_id": data.get("entity_id"),
"financial_kpi": data.get("financial_kpi") or [],
"balance_sheet_accounts": data.get("balance_sheet_accounts") or [],
"profit_and_loss_account": data.get("profit_and_loss_account") or [],
}
@app.get("/health")
def health():
return {"ok": True}
@app.post("/score")
def score(payload: Dict[str, Any]):
q = (payload.get("q") or "").strip()
if not q:
raise HTTPException(
status_code=400, detail="payload must include non-empty 'q'"
)
entity = fetch_financial(q)
feat_by_year = build_features_for_entity(entity)
if not feat_by_year:
raise HTTPException(status_code=404, detail="no financial years found")
latest_year = max(feat_by_year.keys())
x = feat_by_year[latest_year]
row = {c: x.get(c) for c in feature_cols}
x_row = pd.DataFrame([row])
prob = float(pipe.predict_proba(x_row)[:, 1][0])
score_0_100 = round((1.0 - prob) * 100.0, 1)
return {
"q": q,
"entity_id": entity.get("entity_id"),
"year": latest_year,
"prob_distress_next_year": round(prob, 4),
"financial_health_score_0_100": score_0_100,
}
Starten:
uvicorn src.app:app --reload --port 8000
Danach erreichst du die interaktive Doku unter http://localhost:8000/docs und kannst den Score per POST-Request abrufen.
Was du als Nächstes verbessern kannst
Dieses Tutorial liefert dir ein funktionierendes Grundgerüst. Wenn du es produktiv einsetzen oder weiter ausbauen möchtest, gibt es mehrere natürliche nächste Schritte:
Bessere Labels sind der größte Hebel. Weak Labels sind ein pragmatischer Start, aber ein Modell kann nicht besser werden als seine Labels. Wenn du Zugang zu echten Ausfalldaten hast (z. B. aus Insolvenzbekanntmachungen oder über die handelsregister.ai API mit dem Feature insolvency_publications), kannst du damit deutlich validere Zielvariablen definieren. Auch eine manuelle Kuratierung – also das Durchschauen und Korrigieren von Labels für einige hundert Fälle – verbessert die Modellqualität spürbar.
Mehr und reichere Features bieten weiteres Potenzial. Beispielsweise könntest du Trends über 3–5 Jahre berechnen (z. B. durchschnittliche Eigenkapitalquote der letzten drei Jahre oder die Standardabweichung des ROA als Volatilitätsmaß). Auch die Branche als kategoriales Feature kann helfen, da „normale" Kennzahlenwerte stark branchenabhängig sind.
Modellkalibrierung ist wichtig, wenn du die Ausgabewahrscheinlichkeiten als tatsächliche Wahrscheinlichkeiten interpretieren willst. Methoden wie Platt Scaling oder Isotonic Regression stellen sicher, dass ein Output von 0.8 wirklich „80 % Distress-Wahrscheinlichkeit" bedeutet.
Alternative Modelle: Logistische Regression ist ein hervorragender Startpunkt, aber Gradient-Boosted Trees (z. B. LightGBM oder XGBoost) können nicht-lineare Zusammenhänge besser abbilden. Der Trade-off: etwas weniger Interpretierbarkeit, dafür oft bessere Vorhersageleistung.
Fazit
Du hast jetzt eine vollständige End-to-End-Pipeline: vom strukturierten Abruf der Bundesanzeiger-Finanzdaten über die handelsregister.ai API (JSON statt PDF), über Feature-Engineering und Modelltraining bis hin zum lokalen Deployment als CLI oder REST-Service. Alles läuft auf deinem Rechner, ohne externe Abhängigkeiten.
Wenn du das in größerem Maßstab nutzen willst (mehr Firmen, regelmäßige Updates), ist der nächste Schritt typischerweise: Batch-Anreicherung mit wiederholbarem Training und Monitoring – genau die Richtung, die viele der technischen Blogposts bereits abdecken.