#!/usr/bin/env python3 """ LLM Benchmark v2 — Qwen3.5 9B/27B vs GPT-OSS 20B/120B Blöcke: Code (A1–A3) · Business-Deutsch (B1–B3) · Agentic Coding (C1) Metriken: TTFT · Thinking-Zeit · tok/s · Gesamtzeit Verwendung: python benchmark_v2.py Qwen3.5-9B Qwen3.5-27B GPT-OSS-20B GPT-OSS-120B python benchmark_v2.py model1 model2 python benchmark_v2.py model1 --results-dir /tmp/bench """ import argparse import asyncio import json import re import sys import time from dataclasses import asdict, dataclass from datetime import datetime from pathlib import Path from typing import Optional import httpx from rich.console import Console from rich.progress import ( BarColumn, MofNCompleteColumn, Progress, SpinnerColumn, TaskID, TextColumn, TimeElapsedColumn, ) from rich import box from rich.table import Table console = Console() # ───────────────────────────────────────────────────────────────────────────── # Konfiguration # ───────────────────────────────────────────────────────────────────────────── BACKENDS: dict[str, str] = { "vllm": "http://localhost:8000/v1", "ollama": "http://localhost:11434/v1", "lmstudio": "http://localhost:1234/v1", } BASE_URL = BACKENDS["vllm"] # wird in main() überschrieben DEFAULT_TIMEOUT = 300.0 MAX_RETRIES = 3 PROMPTS: dict[str, dict] = { "A1": { "block": "Code", "label": "Sortierfunktion mit fehlenden Schlüsseln", "text": ( "Schreibe eine Python-Funktion, die eine Liste von Wörterbüchern nach einem beliebigen " "Schlüssel sortiert – aufsteigend und absteigend – und dabei fehlende Schlüssel " "graceful behandelt. Füge Typ-Annotationen und einen kurzen Docstring auf Deutsch hinzu." ), }, "A2": { "block": "Code", "label": "CSV-Debugging", "text": ( "Der folgende Python-Code soll eine CSV-Datei einlesen und den Durchschnitt einer " "Spalte berechnen, hat aber mehrere Fehler. Finde und erkläre alle Fehler auf Deutsch, " "dann liefere den korrigierten Code:\n\n" "import csv\n" "def berechne_durchschnitt(datei, spalte):\n" " werte = []\n" " with open(datei) as f:\n" " reader = csv.reader(f)\n" " for zeile in reader:\n" " werte.append(zeile[spalte])\n" " return sum(werte) / len(werte)" ), }, "A3": { "block": "Code", "label": "HTTP-API-Client", "text": ( "Schreibe eine Python-Klasse für einen einfachen HTTP-API-Client mit:\n" "- GET und POST Methoden\n" "- automatischem Retry bei 429 und 5xx Fehlern (max. 3 Versuche, exponential backoff)\n" "- Logging auf Deutsch\n" "- Typ-Annotationen\n" "Nutze nur die Standardbibliothek + requests." ), }, "B1": { "block": "Business", "label": "MoE-Erklärung für Geschäftskunden", "text": ( 'Erkläre einem nicht-technischen Geschäftskunden in 3–4 Sätzen, was "Mixture of ' "Experts\" bei KI-Modellen bedeutet und warum das für ihn als Anwender relevant sein " "könnte." ), }, "B2": { "block": "Business", "label": "E-Mail-Absage", "text": ( "Formuliere eine professionelle E-Mail-Absage (ca. 80 Wörter) an einen Dienstleister, " "der ein zu teures Angebot für eine KI-Implementierung eingereicht hat. " "Ton: höflich, klar, Tür offen lassen für die Zukunft." ), }, "B3": { "block": "Business", "label": "revDSG-Argumente", "text": ( "Nenne drei konkrete Argumente, warum ein Schweizer KMU seine Kundendaten NICHT in " "eine US-amerikanische Cloud-KI-Lösung geben sollte – aus Sicht des revDSG. " "Antworte prägnant und fachlich korrekt." ), }, } # ───────────────────────────────────────────────────────────────────────────── # Datenstrukturen # ───────────────────────────────────────────────────────────────────────────── @dataclass class RunResult: prompt_id: str model: str ttft_s: float thinking_time_s: float total_time_s: float total_tokens: int tokens_per_sec: float raw_response: str visible_response: str error: Optional[str] = None # ───────────────────────────────────────────────────────────────────────────── # Stream-Prozessor mit -Erkennung # ───────────────────────────────────────────────────────────────────────────── class StreamProcessor: """Verarbeitet Streaming-Output in Echtzeit und erkennt -Blöcke.""" _OPEN = "" _CLOSE = "" def __init__(self) -> None: self._chunks: list[str] = [] self._buffer: str = "" self._state: str = "init" # init | in_think | visible self.first_token_time: Optional[float] = None self.first_visible_time: Optional[float] = None self.think_start_time: Optional[float] = None self.think_end_time: Optional[float] = None def feed(self, chunk: str, ts: float) -> None: if not chunk: return self._chunks.append(chunk) if self.first_token_time is None: self.first_token_time = ts self._buffer += chunk self._advance(ts) def _advance(self, ts: float) -> None: """Zustandsmaschine: erkennt und Grenzen im Puffer.""" if self._state == "init": if self._OPEN in self._buffer: pre = self._buffer[: self._buffer.index(self._OPEN)] if pre.strip() and self.first_visible_time is None: self.first_visible_time = self.first_token_time self.think_start_time = self.first_token_time self._buffer = self._buffer[ self._buffer.index(self._OPEN) + len(self._OPEN) : ] self._state = "in_think" elif len(self._buffer) > len(self._OPEN) + 3: # Kein -Tag im Anflug → direkt sichtbar self._state = "visible" if self.first_visible_time is None: self.first_visible_time = self.first_token_time if self._state == "in_think": if self._CLOSE in self._buffer: self.think_end_time = ts rest = self._buffer[ self._buffer.index(self._CLOSE) + len(self._CLOSE) : ] self._buffer = rest self._state = "visible" if rest.strip() and self.first_visible_time is None: self.first_visible_time = ts if self._state == "visible": if self.first_visible_time is None and self._buffer.strip(): self.first_visible_time = ts @property def full_response(self) -> str: return "".join(self._chunks) @property def visible_response(self) -> str: """Vollständige Antwort ohne -Blöcke.""" return re.sub( r".*?", "", self.full_response, flags=re.DOTALL ).strip() @property def thinking_time(self) -> float: if self.think_start_time and self.think_end_time: return self.think_end_time - self.think_start_time return 0.0 # ───────────────────────────────────────────────────────────────────────────── # Inference (Streaming + Retry) # ───────────────────────────────────────────────────────────────────────────── async def infer( client: httpx.AsyncClient, model: str, prompt_id: str, prompt_text: str, ) -> RunResult: """Sendet einen Prompt ans Modell und misst alle Metriken per Streaming.""" payload = { "model": model, "messages": [{"role": "user", "content": prompt_text}], "stream": True, "stream_options": {"include_usage": True}, } last_exc: Optional[Exception] = None for attempt in range(MAX_RETRIES): proc = StreamProcessor() total_tokens = 0 start_time = time.perf_counter() try: async with client.stream( "POST", f"{BASE_URL}/chat/completions", json=payload, timeout=DEFAULT_TIMEOUT, ) as resp: resp.raise_for_status() async for line in resp.aiter_lines(): if not line.startswith("data: "): continue data = line[6:] if data == "[DONE]": break try: chunk = json.loads(data) except json.JSONDecodeError: continue ts = time.perf_counter() if usage := chunk.get("usage"): total_tokens = usage.get("completion_tokens", 0) for choice in chunk.get("choices", []): delta_content = (choice.get("delta") or {}).get("content") or "" if delta_content: proc.feed(delta_content, ts) end_time = time.perf_counter() ttft = (proc.first_token_time - start_time) if proc.first_token_time else 0.0 # Fallback: Chunk-Anzahl wenn usage fehlt if total_tokens == 0: total_tokens = len(proc._chunks) gen_time = end_time - (proc.first_token_time or start_time) tps = total_tokens / gen_time if gen_time > 0 else 0.0 return RunResult( prompt_id=prompt_id, model=model, ttft_s=round(ttft, 3), thinking_time_s=round(proc.thinking_time, 3), total_time_s=round(end_time - start_time, 3), total_tokens=total_tokens, tokens_per_sec=round(tps, 2), raw_response=proc.full_response, visible_response=proc.visible_response, ) except (httpx.TimeoutException, httpx.ConnectError, httpx.HTTPStatusError) as exc: last_exc = exc if attempt < MAX_RETRIES - 1: wait = 2**attempt console.print( f" [yellow]⚠ Versuch {attempt + 1}/{MAX_RETRIES} fehlgeschlagen " f"({exc.__class__.__name__}), warte {wait}s …[/yellow]" ) await asyncio.sleep(wait) return RunResult( prompt_id=prompt_id, model=model, ttft_s=0.0, thinking_time_s=0.0, total_time_s=0.0, total_tokens=0, tokens_per_sec=0.0, raw_response="", visible_response="", error=str(last_exc), ) # ───────────────────────────────────────────────────────────────────────────── # Modell-Benchmark # ───────────────────────────────────────────────────────────────────────────── async def benchmark_model( model: str, results_dir: Path, progress: Progress, task_id: TaskID, ) -> list[RunResult]: """Führt alle Prompts für ein Modell aus und speichert Rohausgaben.""" safe_name = re.sub(r"[^\w\-.]", "_", model.split("/")[-1]) model_dir = results_dir / safe_name model_dir.mkdir(parents=True, exist_ok=True) results: list[RunResult] = [] async with httpx.AsyncClient() as client: # Erreichbarkeit prüfen try: r = await client.get(f"{BASE_URL}/models", timeout=5.0) r.raise_for_status() except Exception as exc: console.print(f" [red]✗ Endpunkt nicht erreichbar: {exc}[/red]") return [] for prompt_id, prompt_data in PROMPTS.items(): progress.update( task_id, description=( f"[cyan]{safe_name[:25]}[/cyan] — " f"[bold]{prompt_id}[/bold] {prompt_data['label'][:35]}" ), ) result = await infer(client, model, prompt_id, prompt_data["text"]) results.append(result) # Rohausgabe speichern (model_dir / f"{prompt_id}.txt").write_text( result.raw_response or f"[FEHLER: {result.error}]", encoding="utf-8", ) # Konsolenstatus if result.error: console.print(f" {prompt_id} [red]✗ {result.error[:60]}[/red]") else: think_str = ( f" Thinking=[cyan]{result.thinking_time_s:.1f}s[/cyan]" if result.thinking_time_s > 0 else "" ) console.print( f" {prompt_id} [green]✓[/green] " f"TTFT=[cyan]{result.ttft_s:.2f}s[/cyan]{think_str} " f"[cyan]{result.tokens_per_sec:.1f}[/cyan] tok/s " f"Gesamt=[cyan]{result.total_time_s:.1f}s[/cyan] " f"Tokens=[cyan]{result.total_tokens}[/cyan]" ) progress.advance(task_id) # JSON-Export (Rohantworten stehen in .txt) json_path = results_dir / f"{safe_name}.json" export = { "model": model, "timestamp": datetime.now().isoformat(), "base_url": BASE_URL, "results": [ {k: v for k, v in asdict(r).items() if k not in ("raw_response", "visible_response")} for r in results ], } json_path.write_text( json.dumps(export, ensure_ascii=False, indent=2), encoding="utf-8" ) console.print(f" [dim]→ JSON: {json_path}[/dim]") console.print(f" [dim]→ Rohantworten: {model_dir}/[/dim]") return results # ───────────────────────────────────────────────────────────────────────────── # Markdown-Ausgabe # ───────────────────────────────────────────────────────────────────────────── def build_markdown(all_results: dict[str, list[RunResult]]) -> str: ts = datetime.now().strftime("%Y-%m-%d %H:%M") lines: list[str] = [ "# LLM Benchmark v2 — Ergebnisse\n", f"**Datum:** {ts} | **Server:** {BASE_URL}\n", ] # ── Zusammenfassung ────────────────────────────────── lines.append("## Zusammenfassung (Ø über alle Prompts)\n") lines.append("| Modell | TTFT (s) | Thinking (s) | tok/s | Gesamt (s) |") lines.append("|--------|----------|--------------|-------|------------|") for model, results in all_results.items(): ok = [r for r in results if not r.error] if not ok: lines.append(f"| `{model}` | — | — | — | — |") continue avg = lambda key: sum(getattr(r, key) for r in ok) / len(ok) # noqa: E731 lines.append( f"| `{model}` " f"| {avg('ttft_s'):.2f} " f"| {avg('thinking_time_s'):.1f} " f"| {avg('tokens_per_sec'):.1f} " f"| {avg('total_time_s'):.1f} |" ) lines.append("") # ── Details pro Prompt ─────────────────────────────── lines.append("## Details pro Prompt\n") for prompt_id, meta in PROMPTS.items(): lines.append(f"### {prompt_id} — {meta['label']} `[{meta['block']}]`\n") lines.append("| Modell | TTFT (s) | Thinking (s) | tok/s | Tokens | Gesamt (s) |") lines.append("|--------|----------|--------------|-------|--------|------------|") for model, results in all_results.items(): r = next((x for x in results if x.prompt_id == prompt_id), None) if r is None or r.error: err = (r.error or "—")[:50] if r else "—" lines.append(f"| `{model}` | ✗ | ✗ | ✗ | ✗ | {err} |") continue lines.append( f"| `{model}` " f"| {r.ttft_s:.2f} " f"| {r.thinking_time_s:.1f} " f"| {r.tokens_per_sec:.1f} " f"| {r.total_tokens} " f"| {r.total_time_s:.1f} |" ) lines.append("") return "\n".join(lines) def print_rich_summary(all_results: dict[str, list[RunResult]]) -> None: """Gibt eine Rich-Tabelle mit dem Gesamtüberblick aus.""" table = Table( title="\n[bold]Benchmark v2 — Zusammenfassung[/bold]", box=box.ROUNDED, header_style="bold magenta", show_lines=True, ) table.add_column("Prompt", style="bold white", width=7, no_wrap=True) table.add_column("Block", width=9) models = list(all_results.keys()) for m in models: short = m.split("/")[-1][:18] table.add_column(short, justify="right", width=22) for prompt_id, meta in PROMPTS.items(): row = [prompt_id, meta["block"]] for model, results in all_results.items(): r = next((x for x in results if x.prompt_id == prompt_id), None) if r is None or r.error: row.append("[red]✗[/red]") else: think = ( f"\n[dim]think={r.thinking_time_s:.1f}s[/dim]" if r.thinking_time_s > 0 else "" ) row.append( f"TTFT [cyan]{r.ttft_s:.2f}s[/cyan]\n" f"[cyan]{r.tokens_per_sec:.1f}[/cyan] tok/s{think}" ) table.add_row(*row) console.print(table) async def detect_model() -> Optional[str]: """Fragt den vllm-Server nach dem geladenen Modellnamen.""" try: async with httpx.AsyncClient() as client: r = await client.get(f"{BASE_URL}/models", timeout=5.0) r.raise_for_status() models = r.json().get("data", []) return models[0]["id"] if models else None except Exception as exc: console.print(f"[red]✗ Server nicht erreichbar: {exc}[/red]") return None # ───────────────────────────────────────────────────────────────────────────── # Einstiegspunkt # ───────────────────────────────────────────────────────────────────────────── async def main() -> None: global BASE_URL parser = argparse.ArgumentParser( description="LLM Benchmark v2 — Qwen3.5 9B/27B vs GPT-OSS 20B/120B", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Beispiele:\n" " python benchmark_v2.py 4\n" " python benchmark_v2.py 2 --backend ollama\n" " python benchmark_v2.py 1 --backend lmstudio\n" " python benchmark_v2.py 2 --url http://localhost:9000/v1\n" ), ) parser.add_argument( "count", type=int, metavar="ANZAHL", help="Anzahl Modelle die getestet werden (z.B. 4)", ) parser.add_argument( "--backend", choices=list(BACKENDS.keys()), default="vllm", help=f"Backend-Preset: {', '.join(f'{k}={v}' for k, v in BACKENDS.items())}", ) parser.add_argument( "--url", default=None, metavar="URL", help="Eigene Base-URL (überschreibt --backend), z.B. http://localhost:9000/v1", ) parser.add_argument( "--model", default=None, metavar="MODELL", help="Modellname explizit angeben (überspringt Auto-Detect), z.B. gemma4:31b", ) parser.add_argument( "--results-dir", default="results", metavar="DIR", help="Ausgabeverzeichnis (Standard: results/)", ) args = parser.parse_args() BASE_URL = args.url if args.url else BACKENDS[args.backend] results_dir = Path(args.results_dir) results_dir.mkdir(exist_ok=True) backend_label = args.url if args.url else args.backend console.rule("[bold magenta]LLM Benchmark v2[/bold magenta]") model_info = f"[cyan]{args.model}[/cyan] (fest)" if args.model else "[cyan]auto-detect[/cyan]" console.print( f"Backend: [cyan]{backend_label}[/cyan] → {BASE_URL}\n" f"Modell: {model_info}\n" f"Modelle: [cyan]{args.count}x[/cyan]\n" f"Prompts: [cyan]{len(PROMPTS)}[/cyan] " f"(A1–A3 Code · B1–B3 Business)\n" f"Output: [cyan]{results_dir}/[/cyan]\n" ) all_results: dict[str, list[RunResult]] = {} with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), MofNCompleteColumn(), TimeElapsedColumn(), console=console, transient=False, ) as progress: overall = progress.add_task("[bold]Gesamt[/bold]", total=args.count * len(PROMPTS)) for idx in range(args.count): # Vor jedem Modell: Server abfragen und Modellname ermitteln if idx > 0: progress.stop() console.print() console.rule("[bold yellow]Modellwechsel[/bold yellow]") console.print( f"\n Modell {idx + 1}/{args.count}\n\n" f" 1. Starte vllm neu: [dim]~/scripts/vllm/start_model.sh[/dim]\n" f" 2. Wähle das nächste Modell\n" f" 3. Warte bis der Server bereit ist\n" ) console.print(" Dann hier [bold]Enter[/bold] drücken …") input() progress.start() # Modellname: explizit oder vom Server model = args.model or await detect_model() if not model: console.print(f"[red]✗ Kein Modell auf {BASE_URL} gefunden — abbruch.[/red]") break console.rule(f"[bold cyan]{model}[/bold cyan]") task = progress.add_task("", total=len(PROMPTS)) results = await benchmark_model( model, results_dir, progress=progress, task_id=task ) all_results[model] = results progress.advance(overall, advance=len(PROMPTS)) # Rich-Tabelle im Terminal console.print() print_rich_summary(all_results) # Markdown speichern ts = datetime.now().strftime("%Y%m%d_%H%M%S") md_path = results_dir / f"benchmark_v2_{ts}.md" md_path.write_text(build_markdown(all_results), encoding="utf-8") console.print() console.rule("[bold green]Fertig[/bold green]") console.print(f"[green]✓[/green] Markdown: [bold]{md_path}[/bold]") console.print(f"[green]✓[/green] JSON + .txt: [bold]{results_dir}//[/bold]") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: console.print("\n[yellow]Abgebrochen.[/yellow]") sys.exit(0)