| from fastapi import FastAPI, HTTPException, UploadFile, File, Form |
| from pydantic import BaseModel |
| import numpy as np |
| from PIL import Image |
| import io, uuid, os, shutil, timeit |
| from datetime import datetime |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import FileResponse |
|
|
| |
| from app import ( |
| predict_full_paper, |
| ReferenceBoxNotDetectedError, |
| FingerCutOverlapError, |
| MultipleObjectsError, |
| NoObjectDetectedError, |
| PaperNotDetectedError |
| ) |
|
|
| app = FastAPI() |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| BASE_URL = "https://app.us-central1.run.app" |
|
|
| OUTPUT_DIR = os.path.abspath("./outputs") |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| UPDATES_DIR = os.path.abspath("./updates") |
| os.makedirs(UPDATES_DIR, exist_ok=True) |
|
|
| |
| app.mount("/outputs", StaticFiles(directory=OUTPUT_DIR), name="outputs") |
| app.mount("/updates", StaticFiles(directory=UPDATES_DIR), name="updates") |
|
|
|
|
| def save_and_build_urls( |
| session_id: str, |
| dxf_path: str, |
| output_image: np.ndarray = None, |
| outlines: np.ndarray = None, |
| mask: np.ndarray = None, |
| endpoint_type: str = "predict", |
| paper_size: str = None, |
| offset_value: float = None, |
| offset_unit: str = "mm", |
| finger_cut: str = "Off" |
| ): |
| """Helper to save all artifacts and return public URLs.""" |
| request_dir = os.path.join(OUTPUT_DIR, session_id) |
| os.makedirs(request_dir, exist_ok=True) |
|
|
| |
| current_date = datetime.utcnow().strftime("%d-%m-%Y") |
| |
| |
| offset_str = f"{offset_value:.3f}".replace(".", "_") if offset_value is not None else "0_000" |
| |
| |
| if paper_size and offset_value is not None: |
| dxf_fn = f"DXF_{current_date}_{paper_size}_{offset_str}{offset_unit}" |
| if finger_cut == "On": |
| dxf_fn += "_fingercut" |
| dxf_fn += ".dxf" |
| else: |
| dxf_fn = f"DXF_{current_date}.dxf" |
|
|
| |
| new_dxf_path = os.path.join(request_dir, dxf_fn) |
|
|
| |
| if os.path.exists(dxf_path): |
| shutil.copy(dxf_path, new_dxf_path) |
| else: |
| |
| with open(new_dxf_path, "wb") as f: |
| if isinstance(dxf_path, (bytes, bytearray)): |
| f.write(dxf_path) |
| else: |
| f.write(str(dxf_path).encode("utf-8")) |
|
|
| urls = { |
| "dxf_url": f"{BASE_URL}/download/{session_id}/{dxf_fn}", |
| } |
|
|
| |
| if output_image is not None: |
| out_fn = "annotated_image.jpg" |
| out_path = os.path.join(request_dir, out_fn) |
| Image.fromarray(output_image).save(out_path) |
| urls["output_image_url"] = f"{BASE_URL}/outputs/{session_id}/{out_fn}" |
|
|
| if outlines is not None: |
| outlines_fn = "outlines.jpg" |
| outlines_path = os.path.join(request_dir, outlines_fn) |
| Image.fromarray(outlines).save(outlines_path) |
| urls["outlines_url"] = f"{BASE_URL}/outputs/{session_id}/{outlines_fn}" |
|
|
| if mask is not None: |
| mask_fn = "mask.jpg" |
| mask_path = os.path.join(request_dir, mask_fn) |
| Image.fromarray(mask).save(mask_path) |
| urls["mask_url"] = f"{BASE_URL}/outputs/{session_id}/{mask_fn}" |
|
|
| return urls |
|
|
|
|
| |
| @app.get("/download/{session_id}/{filename}") |
| async def download_file(session_id: str, filename: str): |
| file_path = os.path.join(OUTPUT_DIR, session_id, filename) |
| if not os.path.exists(file_path): |
| raise HTTPException(status_code=404, detail="File not found") |
| |
| return FileResponse( |
| path=file_path, |
| filename=filename, |
| media_type="application/x-dxf", |
| headers={"Content-Disposition": f"attachment; filename={filename}"} |
| ) |
|
|
|
|
| @app.post("/predict_paper_simple") |
| async def predict_paper_simple_api( |
| file: UploadFile = File(...), |
| paper_size: str = Form(..., regex="^(A4|A3|US Letter)$"), |
| ): |
| """ |
| Simple paper-based predict: image + paper size → DXF only |
| Default: 0mm offset, no finger cuts |
| """ |
| session_id = str(uuid.uuid4()) |
| try: |
| img_bytes = await file.read() |
| image = np.array(Image.open(io.BytesIO(img_bytes)).convert("RGB")) |
| except Exception: |
| raise HTTPException(400, "Invalid image upload") |
|
|
| try: |
| start = timeit.default_timer() |
| |
| |
| dxf_path, ann_img, outlines_img, mask_img, scale_info = predict_full_paper( |
| image=image, |
| paper_size=paper_size, |
| offset_value_mm=0.0, |
| offset_unit="mm", |
| enable_finger_cut="Off", |
| selected_outputs=[] |
| ) |
| |
| elapsed = timeit.default_timer() - start |
| print(f"[{session_id}] predict_paper_simple in {elapsed:.2f}s - {scale_info}") |
|
|
| urls = save_and_build_urls( |
| session_id=session_id, |
| dxf_path=dxf_path, |
| endpoint_type="predict_paper_simple", |
| paper_size=paper_size, |
| offset_value=0.0, |
| offset_unit="mm", |
| finger_cut="Off" |
| ) |
| |
| |
| urls["scale_info"] = scale_info |
| return urls |
|
|
| except (ReferenceBoxNotDetectedError, PaperNotDetectedError): |
| raise HTTPException(status_code=400, detail="Error detecting paper! Please ensure the paper is clearly visible and try again.") |
| except (MultipleObjectsError): |
| raise HTTPException(status_code=400, detail="Multiple objects detected! Please place only a single object on the paper.") |
| except (NoObjectDetectedError): |
| raise HTTPException(status_code=400, detail="No object detected! Please ensure an object is placed on the paper.") |
| except FingerCutOverlapError: |
| raise HTTPException(status_code=400, detail="There was an overlap with fingercuts! Please try again to generate dxf.") |
| except Exception as e: |
| print(f"Error in predict_paper_simple: {str(e)}") |
| raise HTTPException(status_code=500, detail="Error processing image! Please try again with a clearer image.") |
|
|
|
|
| @app.post("/predict_paper_with_offset") |
| async def predict_paper_with_offset_api( |
| file: UploadFile = File(...), |
| paper_size: str = Form(..., regex="^(A4|A3|US Letter)$"), |
| offset_value: float = Form(...), |
| offset_unit: str = Form(..., regex="^(mm|inches)$"), |
| include_images: bool = Form(False) |
| ): |
| """ |
| Paper-based predict with offset: image + paper size + offset → DXF + optional images |
| """ |
| session_id = str(uuid.uuid4()) |
| try: |
| img_bytes = await file.read() |
| image = np.array(Image.open(io.BytesIO(img_bytes)).convert("RGB")) |
| except Exception: |
| raise HTTPException(400, "Invalid image upload") |
|
|
| |
| if offset_value < 0: |
| raise HTTPException(400, "Offset value cannot be negative") |
| if offset_value > 50: |
| raise HTTPException(400, "Offset value too large (max 50)") |
|
|
| try: |
| start = timeit.default_timer() |
| |
| |
| selected_outputs = ["Annotated Image", "Outlines", "Mask"] if include_images else [] |
| |
| dxf_path, ann_img, outlines_img, mask_img, scale_info = predict_full_paper( |
| image=image, |
| paper_size=paper_size, |
| offset_value_mm=offset_value, |
| offset_unit=offset_unit, |
| enable_finger_cut="Off", |
| selected_outputs=selected_outputs |
| ) |
| |
| elapsed = timeit.default_timer() - start |
| print(f"[{session_id}] predict_paper_with_offset in {elapsed:.2f}s - {scale_info}") |
|
|
| urls = save_and_build_urls( |
| session_id=session_id, |
| dxf_path=dxf_path, |
| output_image=ann_img if include_images else None, |
| outlines=outlines_img if include_images else None, |
| mask=mask_img if include_images else None, |
| endpoint_type="predict_paper_with_offset", |
| paper_size=paper_size, |
| offset_value=offset_value, |
| offset_unit=offset_unit, |
| finger_cut="Off" |
| ) |
| |
| urls["scale_info"] = scale_info |
| return urls |
|
|
| except (ReferenceBoxNotDetectedError, PaperNotDetectedError): |
| raise HTTPException(status_code=400, detail="Error detecting paper! Please ensure the paper is clearly visible and try again.") |
| except (MultipleObjectsError): |
| raise HTTPException(status_code=400, detail="Multiple objects detected! Please place only a single object on the paper.") |
| except (NoObjectDetectedError): |
| raise HTTPException(status_code=400, detail="No object detected! Please ensure an object is placed on the paper.") |
| except FingerCutOverlapError: |
| raise HTTPException(status_code=400, detail="There was an overlap with fingercuts! Please try again to generate dxf.") |
| except Exception as e: |
| print(f"Error in predict_paper_with_offset: {str(e)}") |
| raise HTTPException(status_code=500, detail="Error processing image! Please try again with a clearer image.") |
|
|
|
|
| @app.post("/predict_paper_full") |
| async def predict_paper_full_api( |
| file: UploadFile = File(...), |
| paper_size: str = Form(..., regex="^(A4|A3|US Letter)$"), |
| offset_value: float = Form(...), |
| offset_unit: str = Form(..., regex="^(mm|inches)$"), |
| enable_finger_cut: str = Form(..., regex="^(On|Off)$"), |
| include_images: bool = Form(False) |
| ): |
| """ |
| Full paper-based predict: image + paper size + offset + finger cuts → DXF + optional images |
| """ |
| session_id = str(uuid.uuid4()) |
| try: |
| img_bytes = await file.read() |
| image = np.array(Image.open(io.BytesIO(img_bytes)).convert("RGB")) |
| except Exception: |
| raise HTTPException(400, "Invalid image upload") |
|
|
| |
| if offset_value < 0: |
| raise HTTPException(400, "Offset value cannot be negative") |
| if offset_value > 50: |
| raise HTTPException(400, "Offset value too large (max 50)") |
|
|
| try: |
| start = timeit.default_timer() |
| |
| |
| selected_outputs = ["Annotated Image", "Outlines", "Mask"] if include_images else [] |
| |
| dxf_path, ann_img, outlines_img, mask_img, scale_info = predict_full_paper( |
| image=image, |
| paper_size=paper_size, |
| offset_value_mm=offset_value, |
| offset_unit=offset_unit, |
| enable_finger_cut=enable_finger_cut, |
| selected_outputs=selected_outputs |
| ) |
| |
| elapsed = timeit.default_timer() - start |
| print(f"[{session_id}] predict_paper_full in {elapsed:.2f}s - {scale_info}") |
|
|
| urls = save_and_build_urls( |
| session_id=session_id, |
| dxf_path=dxf_path, |
| output_image=ann_img if include_images else None, |
| outlines=outlines_img if include_images else None, |
| mask=mask_img if include_images else None, |
| endpoint_type="predict_paper_full", |
| paper_size=paper_size, |
| offset_value=offset_value, |
| offset_unit=offset_unit, |
| finger_cut=enable_finger_cut |
| ) |
| |
| urls["scale_info"] = scale_info |
| return urls |
|
|
| except (ReferenceBoxNotDetectedError, PaperNotDetectedError): |
| raise HTTPException(status_code=400, detail="Error detecting paper! Please ensure the paper is clearly visible and try again.") |
| except (MultipleObjectsError): |
| raise HTTPException(status_code=400, detail="Multiple objects detected! Please place only a single object on the paper.") |
| except (NoObjectDetectedError): |
| raise HTTPException(status_code=400, detail="No object detected! Please ensure an object is placed on the paper.") |
| except FingerCutOverlapError: |
| raise HTTPException(status_code=400, detail="There was an overlap with fingercuts! Please try again to generate dxf.") |
| except Exception as e: |
| print(f"Error in predict_paper_full: {str(e)}") |
| raise HTTPException(status_code=500, detail="Error processing image! Please try again with a clearer image.") |
|
|
|
|
| |
| @app.post("/predict1") |
| async def predict1_api( |
| file: UploadFile = File(...) |
| ): |
| """ |
| Legacy endpoint - redirects to simple paper-based prediction with A4 default |
| """ |
| return await predict_paper_simple_api(file=file, paper_size="A4") |
|
|
|
|
| @app.post("/predict2") |
| async def predict2_api( |
| file: UploadFile = File(...), |
| enable_fillet: str = Form(..., regex="^(On|Off)$"), |
| fillet_value_mm: float = Form(...) |
| ): |
| """ |
| Legacy endpoint - redirects to paper-based prediction with offset |
| Note: Fillet functionality mapped to offset for compatibility |
| """ |
| |
| offset_value = fillet_value_mm if enable_fillet == "On" else 0.0 |
| |
| return await predict_paper_with_offset_api( |
| file=file, |
| paper_size="A4", |
| offset_value=offset_value, |
| offset_unit="mm", |
| include_images=True |
| ) |
|
|
|
|
| @app.post("/predict3") |
| async def predict3_api( |
| file: UploadFile = File(...), |
| enable_fillet: str = Form(..., regex="^(On|Off)$"), |
| fillet_value_mm: float = Form(...), |
| enable_finger_cut: str = Form(..., regex="^(On|Off)$") |
| ): |
| """ |
| Legacy endpoint - redirects to full paper-based prediction |
| """ |
| offset_value = fillet_value_mm if enable_fillet == "On" else 0.0 |
| |
| return await predict_paper_full_api( |
| file=file, |
| paper_size="A4", |
| offset_value=offset_value, |
| offset_unit="mm", |
| enable_finger_cut=enable_finger_cut, |
| include_images=True |
| ) |
|
|
|
|
| @app.post("/update") |
| async def update_files( |
| output_image: UploadFile = File(...), |
| outlines_image: UploadFile = File(...), |
| mask_image: UploadFile = File(...), |
| dxf_file: UploadFile = File(...) |
| ): |
| session_id = str(uuid.uuid4()) |
| update_dir = os.path.join(UPDATES_DIR, session_id) |
| os.makedirs(update_dir, exist_ok=True) |
|
|
| try: |
| upload_map = { |
| "output_image": output_image, |
| "outlines_image": outlines_image, |
| "mask_image": mask_image, |
| "dxf_file": dxf_file, |
| } |
| urls = {} |
| for key, up in upload_map.items(): |
| fn = up.filename |
| path = os.path.join(update_dir, fn) |
| with open(path, "wb") as f: |
| shutil.copyfileobj(up.file, f) |
| urls[key] = f"{BASE_URL}/updates/{session_id}/{fn}" |
|
|
| return {"session_id": session_id, "uploaded": urls} |
|
|
| except Exception as e: |
| raise HTTPException(500, f"Update failed: {e}") |
|
|
|
|
| from fastapi import Response |
|
|
| @app.get("/health") |
| def health(): |
| return Response(content="OK", status_code=200) |
|
|
|
|
| @app.get("/") |
| def root(): |
| return { |
| "message": "Paper-based DXF Generator API", |
| "endpoints": [ |
| "/predict_paper_simple - Simple DXF generation with paper reference", |
| "/predict_paper_with_offset - DXF generation with contour offset", |
| "/predict_paper_full - Full DXF generation with all features", |
| "/predict1, /predict2, /predict3 - Legacy endpoints (backward compatibility)" |
| ], |
| "paper_sizes": ["A4", "A3", "US Letter"], |
| "units": ["mm", "inches"] |
| } |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| port = int(os.environ.get("PORT", 8080)) |
| print(f"Starting FastAPI server on 0.0.0.0:{port}...") |
| uvicorn.run(app, host="0.0.0.0", port=port) |