Files
llm-benchmark/benchmark_v2.py
2026-04-06 17:44:30 +02:00

642 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
LLM Benchmark v2 — Qwen3.5 9B/27B vs GPT-OSS 20B/120B
Blöcke: Code (A1A3) · Business-Deutsch (B1B3) · Agentic Coding (C1)
Metriken: TTFT · Thinking-Zeit · tok/s · Gesamtzeit
Verwendung:
python benchmark_v2.py Qwen3.5-9B Qwen3.5-27B GPT-OSS-20B GPT-OSS-120B
python benchmark_v2.py model1 model2
python benchmark_v2.py model1 --results-dir /tmp/bench
"""
import argparse
import asyncio
import json
import re
import sys
import time
from dataclasses import asdict, dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional
import httpx
from rich.console import Console
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TaskID,
TextColumn,
TimeElapsedColumn,
)
from rich import box
from rich.table import Table
console = Console()
# ─────────────────────────────────────────────────────────────────────────────
# Konfiguration
# ─────────────────────────────────────────────────────────────────────────────
BACKENDS: dict[str, str] = {
"vllm": "http://localhost:8000/v1",
"ollama": "http://localhost:11434/v1",
"lmstudio": "http://localhost:1234/v1",
}
BASE_URL = BACKENDS["vllm"] # wird in main() überschrieben
DEFAULT_TIMEOUT = 300.0
MAX_RETRIES = 3
PROMPTS: dict[str, dict] = {
"A1": {
"block": "Code",
"label": "Sortierfunktion mit fehlenden Schlüsseln",
"text": (
"Schreibe eine Python-Funktion, die eine Liste von Wörterbüchern nach einem beliebigen "
"Schlüssel sortiert aufsteigend und absteigend und dabei fehlende Schlüssel "
"graceful behandelt. Füge Typ-Annotationen und einen kurzen Docstring auf Deutsch hinzu."
),
},
"A2": {
"block": "Code",
"label": "CSV-Debugging",
"text": (
"Der folgende Python-Code soll eine CSV-Datei einlesen und den Durchschnitt einer "
"Spalte berechnen, hat aber mehrere Fehler. Finde und erkläre alle Fehler auf Deutsch, "
"dann liefere den korrigierten Code:\n\n"
"import csv\n"
"def berechne_durchschnitt(datei, spalte):\n"
" werte = []\n"
" with open(datei) as f:\n"
" reader = csv.reader(f)\n"
" for zeile in reader:\n"
" werte.append(zeile[spalte])\n"
" return sum(werte) / len(werte)"
),
},
"A3": {
"block": "Code",
"label": "HTTP-API-Client",
"text": (
"Schreibe eine Python-Klasse für einen einfachen HTTP-API-Client mit:\n"
"- GET und POST Methoden\n"
"- automatischem Retry bei 429 und 5xx Fehlern (max. 3 Versuche, exponential backoff)\n"
"- Logging auf Deutsch\n"
"- Typ-Annotationen\n"
"Nutze nur die Standardbibliothek + requests."
),
},
"B1": {
"block": "Business",
"label": "MoE-Erklärung für Geschäftskunden",
"text": (
'Erkläre einem nicht-technischen Geschäftskunden in 34 Sätzen, was "Mixture of '
"Experts\" bei KI-Modellen bedeutet und warum das für ihn als Anwender relevant sein "
"könnte."
),
},
"B2": {
"block": "Business",
"label": "E-Mail-Absage",
"text": (
"Formuliere eine professionelle E-Mail-Absage (ca. 80 Wörter) an einen Dienstleister, "
"der ein zu teures Angebot für eine KI-Implementierung eingereicht hat. "
"Ton: höflich, klar, Tür offen lassen für die Zukunft."
),
},
"B3": {
"block": "Business",
"label": "revDSG-Argumente",
"text": (
"Nenne drei konkrete Argumente, warum ein Schweizer KMU seine Kundendaten NICHT in "
"eine US-amerikanische Cloud-KI-Lösung geben sollte aus Sicht des revDSG. "
"Antworte prägnant und fachlich korrekt."
),
},
}
# ─────────────────────────────────────────────────────────────────────────────
# Datenstrukturen
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class RunResult:
prompt_id: str
model: str
ttft_s: float
thinking_time_s: float
total_time_s: float
total_tokens: int
tokens_per_sec: float
raw_response: str
visible_response: str
error: Optional[str] = None
# ─────────────────────────────────────────────────────────────────────────────
# Stream-Prozessor mit <think>-Erkennung
# ─────────────────────────────────────────────────────────────────────────────
class StreamProcessor:
"""Verarbeitet Streaming-Output in Echtzeit und erkennt <think>-Blöcke."""
_OPEN = "<think>"
_CLOSE = "</think>"
def __init__(self) -> None:
self._chunks: list[str] = []
self._buffer: str = ""
self._state: str = "init" # init | in_think | visible
self.first_token_time: Optional[float] = None
self.first_visible_time: Optional[float] = None
self.think_start_time: Optional[float] = None
self.think_end_time: Optional[float] = None
def feed(self, chunk: str, ts: float) -> None:
if not chunk:
return
self._chunks.append(chunk)
if self.first_token_time is None:
self.first_token_time = ts
self._buffer += chunk
self._advance(ts)
def _advance(self, ts: float) -> None:
"""Zustandsmaschine: erkennt <think> und </think> Grenzen im Puffer."""
if self._state == "init":
if self._OPEN in self._buffer:
pre = self._buffer[: self._buffer.index(self._OPEN)]
if pre.strip() and self.first_visible_time is None:
self.first_visible_time = self.first_token_time
self.think_start_time = self.first_token_time
self._buffer = self._buffer[
self._buffer.index(self._OPEN) + len(self._OPEN) :
]
self._state = "in_think"
elif len(self._buffer) > len(self._OPEN) + 3:
# Kein <think>-Tag im Anflug → direkt sichtbar
self._state = "visible"
if self.first_visible_time is None:
self.first_visible_time = self.first_token_time
if self._state == "in_think":
if self._CLOSE in self._buffer:
self.think_end_time = ts
rest = self._buffer[
self._buffer.index(self._CLOSE) + len(self._CLOSE) :
]
self._buffer = rest
self._state = "visible"
if rest.strip() and self.first_visible_time is None:
self.first_visible_time = ts
if self._state == "visible":
if self.first_visible_time is None and self._buffer.strip():
self.first_visible_time = ts
@property
def full_response(self) -> str:
return "".join(self._chunks)
@property
def visible_response(self) -> str:
"""Vollständige Antwort ohne <think>…</think>-Blöcke."""
return re.sub(
r"<think>.*?</think>", "", self.full_response, flags=re.DOTALL
).strip()
@property
def thinking_time(self) -> float:
if self.think_start_time and self.think_end_time:
return self.think_end_time - self.think_start_time
return 0.0
# ─────────────────────────────────────────────────────────────────────────────
# Inference (Streaming + Retry)
# ─────────────────────────────────────────────────────────────────────────────
async def infer(
client: httpx.AsyncClient,
model: str,
prompt_id: str,
prompt_text: str,
) -> RunResult:
"""Sendet einen Prompt ans Modell und misst alle Metriken per Streaming."""
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt_text}],
"stream": True,
"stream_options": {"include_usage": True},
}
last_exc: Optional[Exception] = None
for attempt in range(MAX_RETRIES):
proc = StreamProcessor()
total_tokens = 0
start_time = time.perf_counter()
try:
async with client.stream(
"POST",
f"{BASE_URL}/chat/completions",
json=payload,
timeout=DEFAULT_TIMEOUT,
) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
data = line[6:]
if data == "[DONE]":
break
try:
chunk = json.loads(data)
except json.JSONDecodeError:
continue
ts = time.perf_counter()
if usage := chunk.get("usage"):
total_tokens = usage.get("completion_tokens", 0)
for choice in chunk.get("choices", []):
delta_content = (choice.get("delta") or {}).get("content") or ""
if delta_content:
proc.feed(delta_content, ts)
end_time = time.perf_counter()
ttft = (proc.first_token_time - start_time) if proc.first_token_time else 0.0
# Fallback: Chunk-Anzahl wenn usage fehlt
if total_tokens == 0:
total_tokens = len(proc._chunks)
gen_time = end_time - (proc.first_token_time or start_time)
tps = total_tokens / gen_time if gen_time > 0 else 0.0
return RunResult(
prompt_id=prompt_id,
model=model,
ttft_s=round(ttft, 3),
thinking_time_s=round(proc.thinking_time, 3),
total_time_s=round(end_time - start_time, 3),
total_tokens=total_tokens,
tokens_per_sec=round(tps, 2),
raw_response=proc.full_response,
visible_response=proc.visible_response,
)
except (httpx.TimeoutException, httpx.ConnectError, httpx.HTTPStatusError) as exc:
last_exc = exc
if attempt < MAX_RETRIES - 1:
wait = 2**attempt
console.print(
f" [yellow]⚠ Versuch {attempt + 1}/{MAX_RETRIES} fehlgeschlagen "
f"({exc.__class__.__name__}), warte {wait}s …[/yellow]"
)
await asyncio.sleep(wait)
return RunResult(
prompt_id=prompt_id,
model=model,
ttft_s=0.0,
thinking_time_s=0.0,
total_time_s=0.0,
total_tokens=0,
tokens_per_sec=0.0,
raw_response="",
visible_response="",
error=str(last_exc),
)
# ─────────────────────────────────────────────────────────────────────────────
# Modell-Benchmark
# ─────────────────────────────────────────────────────────────────────────────
async def benchmark_model(
model: str,
results_dir: Path,
progress: Progress,
task_id: TaskID,
) -> list[RunResult]:
"""Führt alle Prompts für ein Modell aus und speichert Rohausgaben."""
safe_name = re.sub(r"[^\w\-.]", "_", model.split("/")[-1])
model_dir = results_dir / safe_name
model_dir.mkdir(parents=True, exist_ok=True)
results: list[RunResult] = []
async with httpx.AsyncClient() as client:
# Erreichbarkeit prüfen
try:
r = await client.get(f"{BASE_URL}/models", timeout=5.0)
r.raise_for_status()
except Exception as exc:
console.print(f" [red]✗ Endpunkt nicht erreichbar: {exc}[/red]")
return []
for prompt_id, prompt_data in PROMPTS.items():
progress.update(
task_id,
description=(
f"[cyan]{safe_name[:25]}[/cyan] — "
f"[bold]{prompt_id}[/bold] {prompt_data['label'][:35]}"
),
)
result = await infer(client, model, prompt_id, prompt_data["text"])
results.append(result)
# Rohausgabe speichern
(model_dir / f"{prompt_id}.txt").write_text(
result.raw_response or f"[FEHLER: {result.error}]",
encoding="utf-8",
)
# Konsolenstatus
if result.error:
console.print(f" {prompt_id} [red]✗ {result.error[:60]}[/red]")
else:
think_str = (
f" Thinking=[cyan]{result.thinking_time_s:.1f}s[/cyan]"
if result.thinking_time_s > 0
else ""
)
console.print(
f" {prompt_id} [green]✓[/green] "
f"TTFT=[cyan]{result.ttft_s:.2f}s[/cyan]{think_str} "
f"[cyan]{result.tokens_per_sec:.1f}[/cyan] tok/s "
f"Gesamt=[cyan]{result.total_time_s:.1f}s[/cyan] "
f"Tokens=[cyan]{result.total_tokens}[/cyan]"
)
progress.advance(task_id)
# JSON-Export (Rohantworten stehen in .txt)
json_path = results_dir / f"{safe_name}.json"
export = {
"model": model,
"timestamp": datetime.now().isoformat(),
"base_url": BASE_URL,
"results": [
{k: v for k, v in asdict(r).items() if k not in ("raw_response", "visible_response")}
for r in results
],
}
json_path.write_text(
json.dumps(export, ensure_ascii=False, indent=2), encoding="utf-8"
)
console.print(f" [dim]→ JSON: {json_path}[/dim]")
console.print(f" [dim]→ Rohantworten: {model_dir}/[/dim]")
return results
# ─────────────────────────────────────────────────────────────────────────────
# Markdown-Ausgabe
# ─────────────────────────────────────────────────────────────────────────────
def build_markdown(all_results: dict[str, list[RunResult]]) -> str:
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
lines: list[str] = [
"# LLM Benchmark v2 — Ergebnisse\n",
f"**Datum:** {ts} | **Server:** {BASE_URL}\n",
]
# ── Zusammenfassung ──────────────────────────────────
lines.append("## Zusammenfassung (Ø über alle Prompts)\n")
lines.append("| Modell | TTFT (s) | Thinking (s) | tok/s | Gesamt (s) |")
lines.append("|--------|----------|--------------|-------|------------|")
for model, results in all_results.items():
ok = [r for r in results if not r.error]
if not ok:
lines.append(f"| `{model}` | — | — | — | — |")
continue
avg = lambda key: sum(getattr(r, key) for r in ok) / len(ok) # noqa: E731
lines.append(
f"| `{model}` "
f"| {avg('ttft_s'):.2f} "
f"| {avg('thinking_time_s'):.1f} "
f"| {avg('tokens_per_sec'):.1f} "
f"| {avg('total_time_s'):.1f} |"
)
lines.append("")
# ── Details pro Prompt ───────────────────────────────
lines.append("## Details pro Prompt\n")
for prompt_id, meta in PROMPTS.items():
lines.append(f"### {prompt_id}{meta['label']} `[{meta['block']}]`\n")
lines.append("| Modell | TTFT (s) | Thinking (s) | tok/s | Tokens | Gesamt (s) |")
lines.append("|--------|----------|--------------|-------|--------|------------|")
for model, results in all_results.items():
r = next((x for x in results if x.prompt_id == prompt_id), None)
if r is None or r.error:
err = (r.error or "")[:50] if r else ""
lines.append(f"| `{model}` | ✗ | ✗ | ✗ | ✗ | {err} |")
continue
lines.append(
f"| `{model}` "
f"| {r.ttft_s:.2f} "
f"| {r.thinking_time_s:.1f} "
f"| {r.tokens_per_sec:.1f} "
f"| {r.total_tokens} "
f"| {r.total_time_s:.1f} |"
)
lines.append("")
return "\n".join(lines)
def print_rich_summary(all_results: dict[str, list[RunResult]]) -> None:
"""Gibt eine Rich-Tabelle mit dem Gesamtüberblick aus."""
table = Table(
title="\n[bold]Benchmark v2 — Zusammenfassung[/bold]",
box=box.ROUNDED,
header_style="bold magenta",
show_lines=True,
)
table.add_column("Prompt", style="bold white", width=7, no_wrap=True)
table.add_column("Block", width=9)
models = list(all_results.keys())
for m in models:
short = m.split("/")[-1][:18]
table.add_column(short, justify="right", width=22)
for prompt_id, meta in PROMPTS.items():
row = [prompt_id, meta["block"]]
for model, results in all_results.items():
r = next((x for x in results if x.prompt_id == prompt_id), None)
if r is None or r.error:
row.append("[red]✗[/red]")
else:
think = (
f"\n[dim]think={r.thinking_time_s:.1f}s[/dim]"
if r.thinking_time_s > 0
else ""
)
row.append(
f"TTFT [cyan]{r.ttft_s:.2f}s[/cyan]\n"
f"[cyan]{r.tokens_per_sec:.1f}[/cyan] tok/s{think}"
)
table.add_row(*row)
console.print(table)
async def detect_model() -> Optional[str]:
"""Fragt den vllm-Server nach dem geladenen Modellnamen."""
try:
async with httpx.AsyncClient() as client:
r = await client.get(f"{BASE_URL}/models", timeout=5.0)
r.raise_for_status()
models = r.json().get("data", [])
return models[0]["id"] if models else None
except Exception as exc:
console.print(f"[red]✗ Server nicht erreichbar: {exc}[/red]")
return None
# ─────────────────────────────────────────────────────────────────────────────
# Einstiegspunkt
# ─────────────────────────────────────────────────────────────────────────────
async def main() -> None:
global BASE_URL
parser = argparse.ArgumentParser(
description="LLM Benchmark v2 — Qwen3.5 9B/27B vs GPT-OSS 20B/120B",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Beispiele:\n"
" python benchmark_v2.py 4\n"
" python benchmark_v2.py 2 --backend ollama\n"
" python benchmark_v2.py 1 --backend lmstudio\n"
" python benchmark_v2.py 2 --url http://localhost:9000/v1\n"
),
)
parser.add_argument(
"count", type=int, metavar="ANZAHL",
help="Anzahl Modelle die getestet werden (z.B. 4)",
)
parser.add_argument(
"--backend", choices=list(BACKENDS.keys()), default="vllm",
help=f"Backend-Preset: {', '.join(f'{k}={v}' for k, v in BACKENDS.items())}",
)
parser.add_argument(
"--url", default=None, metavar="URL",
help="Eigene Base-URL (überschreibt --backend), z.B. http://localhost:9000/v1",
)
parser.add_argument(
"--model", default=None, metavar="MODELL",
help="Modellname explizit angeben (überspringt Auto-Detect), z.B. gemma4:31b",
)
parser.add_argument(
"--results-dir", default="results", metavar="DIR",
help="Ausgabeverzeichnis (Standard: results/)",
)
args = parser.parse_args()
BASE_URL = args.url if args.url else BACKENDS[args.backend]
results_dir = Path(args.results_dir)
results_dir.mkdir(exist_ok=True)
backend_label = args.url if args.url else args.backend
console.rule("[bold magenta]LLM Benchmark v2[/bold magenta]")
model_info = f"[cyan]{args.model}[/cyan] (fest)" if args.model else "[cyan]auto-detect[/cyan]"
console.print(
f"Backend: [cyan]{backend_label}[/cyan] → {BASE_URL}\n"
f"Modell: {model_info}\n"
f"Modelle: [cyan]{args.count}x[/cyan]\n"
f"Prompts: [cyan]{len(PROMPTS)}[/cyan] "
f"(A1A3 Code · B1B3 Business)\n"
f"Output: [cyan]{results_dir}/[/cyan]\n"
)
all_results: dict[str, list[RunResult]] = {}
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
console=console,
transient=False,
) as progress:
overall = progress.add_task("[bold]Gesamt[/bold]", total=args.count * len(PROMPTS))
for idx in range(args.count):
# Vor jedem Modell: Server abfragen und Modellname ermitteln
if idx > 0:
progress.stop()
console.print()
console.rule("[bold yellow]Modellwechsel[/bold yellow]")
console.print(
f"\n Modell {idx + 1}/{args.count}\n\n"
f" 1. Starte vllm neu: [dim]~/scripts/vllm/start_model.sh[/dim]\n"
f" 2. Wähle das nächste Modell\n"
f" 3. Warte bis der Server bereit ist\n"
)
console.print(" Dann hier [bold]Enter[/bold] drücken …")
input()
progress.start()
# Modellname: explizit oder vom Server
model = args.model or await detect_model()
if not model:
console.print(f"[red]✗ Kein Modell auf {BASE_URL} gefunden — abbruch.[/red]")
break
console.rule(f"[bold cyan]{model}[/bold cyan]")
task = progress.add_task("", total=len(PROMPTS))
results = await benchmark_model(
model, results_dir, progress=progress, task_id=task
)
all_results[model] = results
progress.advance(overall, advance=len(PROMPTS))
# Rich-Tabelle im Terminal
console.print()
print_rich_summary(all_results)
# Markdown speichern
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
md_path = results_dir / f"benchmark_v2_{ts}.md"
md_path.write_text(build_markdown(all_results), encoding="utf-8")
console.print()
console.rule("[bold green]Fertig[/bold green]")
console.print(f"[green]✓[/green] Markdown: [bold]{md_path}[/bold]")
console.print(f"[green]✓[/green] JSON + .txt: [bold]{results_dir}/<modell>/[/bold]")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
console.print("\n[yellow]Abgebrochen.[/yellow]")
sys.exit(0)