#!/usr/bin/env python3 """ Agent DXF - Dorafort DXF upload, SVG preview, RAG chat cu qwen2.5vl:3b """ import json, uuid, math from pathlib import Path from typing import Optional import ezdxf import httpx from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.responses import HTMLResponse, JSONResponse, Response from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel app = FastAPI(title="Agent DXF - Dorafort") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) LITELLM_URL = "http://10.0.12.4:4000" LITELLM_KEY = "DHoa2l5FBCrhFd8DZIV1LIpn7AVyNHss" MODEL = "llama-70b" UPLOAD_DIR = Path("/tmp/dxf_uploads") UPLOAD_DIR.mkdir(exist_ok=True) sessions: dict[str, dict] = {} # session_id -> {context, svg, analysis} # ── LAYER STYLE MAP ─────────────────────────────────────────────────────────── LAYER_STYLE = { "CADRU": {"color": "#e2e8f0", "width": 2.5, "dash": ""}, "STICLA": {"color": "#38bdf8", "width": 1.0, "dash": "6,3"}, "PROFIL": {"color": "#4ade80", "width": 2.0, "dash": ""}, "FERONERIE": {"color": "#fb923c", "width": 1.5, "dash": ""}, "COTE": {"color": "#fbbf24", "width": 0.8, "dash": "2,2"}, "TEXT": {"color": "#94a3b8", "width": 0.5, "dash": ""}, "0": {"color": "#64748b", "width": 0.8, "dash": ""}, } def layer_style(name: str) -> dict: for key, s in LAYER_STYLE.items(): if key.upper() in name.upper(): return s return {"color": "#94a3b8", "width": 1.0, "dash": ""} # ── SVG RENDERER ────────────────────────────────────────────────────────────── def dxf_to_svg(filepath: str) -> str: doc = ezdxf.readfile(filepath) msp = doc.modelspace() all_x, all_y = [], [] by_layer: dict[str, list[str]] = {} def reg(layer, el): by_layer.setdefault(layer, []).append(el) def fy(y): return -y # flip Y axis (DXF up → SVG down) def arc_path(cx, cy, r, start_deg, end_deg): s = math.radians(start_deg) e = math.radians(end_deg) if end_deg <= start_deg: e += 2 * math.pi large = 1 if (e - s) > math.pi else 0 x1 = cx + r * math.cos(s); y1 = fy(cy + r * math.sin(s)) x2 = cx + r * math.cos(e); y2 = fy(cy + r * math.sin(e)) return f"M {x1} {y1} A {r} {r} 0 {large} 0 {x2} {y2}" for entity in msp: etype = entity.dxftype() layer = entity.dxf.layer st = layer_style(layer) col = st["color"] sw = st["width"] dash = f'stroke-dasharray="{st["dash"]}"' if st["dash"] else "" attrs = f'stroke="{col}" stroke-width="{sw}" fill="none" {dash}' try: if etype == "LWPOLYLINE": pts = [(p[0], p[1]) for p in entity.get_points()] if len(pts) < 2: continue all_x += [p[0] for p in pts]; all_y += [p[1] for p in pts] pts_s = " ".join(f"{p[0]:.3f},{fy(p[1]):.3f}" for p in pts) tag = "polygon" if entity.closed else "polyline" reg(layer, f'<{tag} points="{pts_s}" {attrs}/>') elif etype == "LINE": s, e = entity.dxf.start, entity.dxf.end all_x += [s.x, e.x]; all_y += [s.y, e.y] reg(layer, f'') elif etype == "CIRCLE": c, r = entity.dxf.center, entity.dxf.radius all_x += [c.x-r, c.x+r]; all_y += [c.y-r, c.y+r] reg(layer, f'') elif etype == "ARC": c, r = entity.dxf.center, entity.dxf.radius sa, ea = entity.dxf.start_angle, entity.dxf.end_angle all_x += [c.x-r, c.x+r]; all_y += [c.y-r, c.y+r] d = arc_path(c.x, c.y, r, sa, ea) reg(layer, f'') elif etype in ("TEXT", "MTEXT"): txt = (entity.dxf.text if etype == "TEXT" else entity.text).strip() if not txt: continue ins = entity.dxf.insert h = max(getattr(entity.dxf, "height", 10), 5) all_x.append(ins.x); all_y.append(ins.y) esc = txt.replace("&","&").replace("<","<").replace(">",">") ty_svg = fy(ins.y) # matrix(1 0 0 -1 0 2*ty_svg) flips Y around the text baseline # without CSS transform-origin which causes mirroring artifacts reg(layer, f'' f'{esc}') except Exception: continue if not all_x: return 'No geometry' mn_x, mx_x = min(all_x), max(all_x) mn_y, mx_y = min(all_y), max(all_y) pad = max(mx_x - mn_x, mx_y - mn_y) * 0.07 + 10 vbx = mn_x - pad vby = fy(mx_y) - pad vbw = (mx_x - mn_x) + 2*pad vbh = (mx_y - mn_y) + 2*pad parts = [ f'' ] # grid grid_step = max(100, round((vbw+vbh)/2 / 10 / 100)*100) parts.append(f'' f'') parts.append(f'') skip = {"Defpoints", "defpoints"} for layer, els in by_layer.items(): if layer in skip: continue safe = layer.replace(" ","_").replace("/","_").replace("(","").replace(")","") col = layer_style(layer)["color"] parts.append(f'') parts += els parts.append('') parts.append('') return "\n".join(parts) # ── DXF ANALYSIS ───────────────────────────────────────────────────────────── def analyze_dxf(filepath: str) -> dict: doc = ezdxf.readfile(filepath) msp = doc.modelspace() res = {"file": Path(filepath).name, "layers": [], "entities_count": {}, "texts": [], "geometry": {"polylines": [], "circles": []}, "summary": {}} for layer in doc.layers: res["layers"].append(layer.dxf.name) for entity in msp: et = entity.dxftype() res["entities_count"][et] = res["entities_count"].get(et, 0) + 1 try: if et == "TEXT": v = entity.dxf.text.strip() if v: res["texts"].append({"text": v, "layer": entity.dxf.layer}) elif et == "MTEXT": v = entity.text.strip() if v: res["texts"].append({"text": v, "layer": entity.dxf.layer}) elif et == "LWPOLYLINE": pts = list(entity.get_points()) if pts: xs = [p[0] for p in pts]; ys = [p[1] for p in pts] w, h = max(xs)-min(xs), max(ys)-min(ys) res["geometry"]["polylines"].append({ "layer": entity.dxf.layer, "bbox_width_mm": round(w,2), "bbox_height_mm": round(h,2), "area_m2": round(w*h/1e6, 4), }) elif et == "CIRCLE": res["geometry"]["circles"].append({ "layer": entity.dxf.layer, "radius_mm": round(entity.dxf.radius,2) }) except Exception: continue cadru = [p for p in res["geometry"]["polylines"] if "CADRU" in p["layer"].upper()] sticla = [p for p in res["geometry"]["polylines"] if "STICLA" in p["layer"].upper()] if not cadru: cadru = sorted(res["geometry"]["polylines"], key=lambda p: p["area_m2"], reverse=True)[:1] if cadru: f = max(cadru, key=lambda p: p["area_m2"]) res["summary"]["latime_mm"] = f["bbox_width_mm"] res["summary"]["inaltime_mm"] = f["bbox_height_mm"] res["summary"]["suprafata_bruta_m2"] = f["area_m2"] res["summary"]["perimetru_m"] = round(2*(f["bbox_width_mm"]+f["bbox_height_mm"])/1000, 3) if sticla: g = max(sticla, key=lambda p: p["area_m2"]) res["summary"]["sticla_w_mm"] = g["bbox_width_mm"] res["summary"]["sticla_h_mm"] = g["bbox_height_mm"] res["summary"]["sticla_m2"] = g["area_m2"] for t in res["texts"]: tx = t["text"] if "PROFIL:" in tx.upper(): res["summary"]["profil"] = tx if "STICLA:" in tx.upper(): res["summary"]["tip_sticla"] = tx if any(k in tx.upper() for k in ["TIP ", "FEREASTRA", "USA"]): res["summary"]["tip"] = tx return res def fmt_context(a: dict) -> str: s = a.get("summary", {}) return "\n".join([ f"=== DXF: {a['file']} ===", f"Tip: {s.get('tip','?')}", f"Dimensiuni: {s.get('latime_mm','?')} x {s.get('inaltime_mm','?')} mm", f"Suprafata bruta: {s.get('suprafata_bruta_m2','?')} m2", f"Perimetru: {s.get('perimetru_m','?')} m", f"Sticla: {s.get('sticla_w_mm','?')} x {s.get('sticla_h_mm','?')} mm = {s.get('sticla_m2','?')} m2", f"Profil: {s.get('profil','?')}", f"Tip sticla: {s.get('tip_sticla','?')}", "", "Texte din desen: " + " | ".join(t["text"] for t in a.get("texts",[])), f"Layere: {', '.join(a.get('layers',[]))}", ]) # ── LLM (Nvidia via LiteLLM) ────────────────────────────────────────────────── async def ask_ollama(context: str, message: str) -> str: system = f"""Esti Agent DXF pentru Dorafort, producator de ferestre si usi din lemn stratificat. Ai urmatoarea analiza a unui desen DXF: {context} Raspunde in romana, concis si precis. Poti calcula cantitati de profil, suprafete sticla, liste materiale.""" async with httpx.AsyncClient(timeout=60) as c: r = await c.post( f"{LITELLM_URL}/v1/chat/completions", headers={"Authorization": f"Bearer {LITELLM_KEY}"}, json={ "model": MODEL, "messages": [{"role":"system","content":system},{"role":"user","content":message}], "temperature": 0.1, "max_tokens": 800, } ) r.raise_for_status() return r.json()["choices"][0]["message"]["content"] # ── ENDPOINTS ───────────────────────────────────────────────────────────────── @app.post("/upload") async def upload(file: UploadFile = File(...)): if not file.filename.lower().endswith(".dxf"): raise HTTPException(400, "Fisierul trebuie sa fie .dxf") sid = str(uuid.uuid4()) path = UPLOAD_DIR / f"{sid}.dxf" path.write_bytes(await file.read()) try: analysis = analyze_dxf(str(path)) svg = dxf_to_svg(str(path)) except Exception as e: path.unlink(missing_ok=True) raise HTTPException(400, f"Eroare DXF: {e}") sessions[sid] = {"context": fmt_context(analysis), "svg": svg, "analysis": analysis, "filename": file.filename} return {"session_id": sid, "analysis": analysis["summary"], "layers": [l for l in analysis["layers"] if l not in ("Defpoints","0")], "filename": file.filename} @app.get("/preview/{sid}") async def preview(sid: str): s = sessions.get(sid) if not s: raise HTTPException(404, "Sesiune invalida") return Response(content=s["svg"], media_type="image/svg+xml") class ChatReq(BaseModel): session_id: str message: str @app.post("/chat") async def chat(req: ChatReq): s = sessions.get(req.session_id) if not s: raise HTTPException(404, "Sesiune invalida — re-uploadati DXF-ul") answer = await ask_ollama(s["context"], req.message) return {"answer": answer} @app.get("/health") async def health(): return {"status":"ok","model":MODEL,"provider":"nvidia-nim","sessions":len(sessions)} # ── UI ──────────────────────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def ui(): return r""" Agent DXF — Dorafort

Agent DXF

Dorafort — Ferestre & Uși
🗂️

Incarca un fisier DXF
pentru previzualizare

"""