timestamp use your local django timezone

This commit is contained in:
RNL
2026-03-29 15:44:27 +11:00
parent d167073fde
commit e551dcc5fd
3 changed files with 374 additions and 9 deletions

View File

@@ -0,0 +1,355 @@
"""
Management command to run the Bambu-Run MCP server.
Supports SSE (network) and stdio (local) transports.
Usage:
python manage.py bambu_mcp_server
python manage.py bambu_mcp_server --transport sse --host 0.0.0.0 --port 8808
python manage.py bambu_mcp_server --transport stdio
"""
import logging
from django.core.management.base import BaseCommand, CommandError
logger = logging.getLogger("bambu_run.mcp")
class Command(BaseCommand):
help = "Run the Bambu-Run MCP server for AI agent access"
def add_arguments(self, parser):
from bambu_run.conf import app_settings
parser.add_argument(
"--transport",
choices=["sse", "stdio"],
default="sse",
help="Transport mode (default: sse)",
)
parser.add_argument(
"--host",
default=app_settings.MCP_HOST,
help=f"Host to bind to (default: {app_settings.MCP_HOST})",
)
parser.add_argument(
"--port",
type=int,
default=app_settings.MCP_PORT,
help=f"Port to listen on (default: {app_settings.MCP_PORT})",
)
def handle(self, *args, **options):
try:
from mcp.server.fastmcp import FastMCP
except ImportError:
raise CommandError(
"The 'mcp' package is required. Install it with: pip install 'bambu-run[mcp]'"
)
from asgiref.sync import sync_to_async
from bambu_run.conf import app_settings
from bambu_run import mcp_tools
transport = options["transport"]
host = options["host"]
port = options["port"]
mcp = FastMCP(
"Bambu-Run",
instructions=(
"Bambu-Run MCP server provides read-only access to 3D printer data "
"including live printer status, filament inventory, print history, "
"temperature trends, and diagnostics. All data comes from Bambu Lab "
"printers monitored via MQTT."
),
)
# ── Register Tools ───────────────────────────────────────────────
@mcp.tool()
async def get_printer_status(printer_id: int | None = None) -> str:
"""Get current live status of printer(s) including temperatures, progress, AMS slots, and errors.
Args:
printer_id: Optional printer ID to filter. Omit for all printers.
"""
return await sync_to_async(mcp_tools.get_printer_status)(printer_id=printer_id)
@mcp.tool()
async def list_printers() -> str:
"""List all registered printers with their model, serial, IP, and active status."""
return await sync_to_async(mcp_tools.list_printers)()
@mcp.tool()
async def get_print_history(
status: str | None = None,
days: int | None = None,
project_name: str | None = None,
limit: int = 20,
) -> str:
"""Get print job history with optional filters.
Args:
status: Filter by status (FINISH, FAILED, CANCELLED).
days: Only show jobs from the last N days.
project_name: Filter by project name (partial match).
limit: Maximum number of results (default 20).
"""
return await sync_to_async(mcp_tools.get_print_history)(
status=status, days=days, project_name=project_name, limit=limit
)
@mcp.tool()
async def get_print_job_detail(job_id: int) -> str:
"""Get detailed information about a single print job including filament usage.
Args:
job_id: The print job ID.
"""
return await sync_to_async(mcp_tools.get_print_job_detail)(job_id=job_id)
@mcp.tool()
async def list_filaments(
type: str | None = None,
brand: str | None = None,
color: str | None = None,
loaded_in_ams: bool | None = None,
low_filament: bool | None = None,
) -> str:
"""List filament inventory with optional filters.
Args:
type: Filter by material type (PLA, PETG, ABS, etc.).
brand: Filter by brand name (partial match).
color: Filter by color name (partial match).
loaded_in_ams: Filter by whether spool is currently in AMS.
low_filament: If true, only show spools with <=20% remaining.
"""
return await sync_to_async(mcp_tools.list_filaments)(
type=type, brand=brand, color=color,
loaded_in_ams=loaded_in_ams, low_filament=low_filament,
)
@mcp.tool()
async def get_filament_detail(filament_id: int) -> str:
"""Get detailed information about a single filament spool including usage history.
Args:
filament_id: The filament spool ID.
"""
return await sync_to_async(mcp_tools.get_filament_detail)(filament_id=filament_id)
@mcp.tool()
async def get_temperature_history(
printer_id: int | None = None,
hours: int = 6,
metric: str = "all",
) -> str:
"""Get temperature trends (avg/min/max) over recent hours.
Args:
printer_id: Optional printer ID to filter.
hours: Number of hours to look back (default 6).
metric: Which sensor to show: 'all', 'nozzle', 'bed', or 'chamber'.
"""
return await sync_to_async(mcp_tools.get_temperature_history)(
printer_id=printer_id, hours=hours, metric=metric
)
@mcp.tool()
async def get_filament_usage_stats(days: int = 30, group_by: str = "type") -> str:
"""Get aggregate filament consumption statistics.
Args:
days: Number of days to look back (default 30).
group_by: Group results by 'type', 'color', or 'spool'.
"""
return await sync_to_async(mcp_tools.get_filament_usage_stats)(days=days, group_by=group_by)
@mcp.tool()
async def get_printer_health(printer_id: int | None = None) -> str:
"""Get printer diagnostics including errors, humidity, WiFi signal, and recent failures.
Args:
printer_id: Optional printer ID to filter. Omit for all printers.
"""
return await sync_to_async(mcp_tools.get_printer_health)(printer_id=printer_id)
@mcp.tool()
async def search_print_jobs(query: str) -> str:
"""Search print jobs by project name or gcode filename.
Args:
query: Search text (partial match on project name or gcode file).
"""
return await sync_to_async(mcp_tools.search_print_jobs)(query=query)
@mcp.tool()
async def get_printing_summary(days: int = 7) -> str:
"""Get high-level printing activity summary including job counts, success rate, and top projects.
Args:
days: Number of days to summarize (default 7).
"""
return await sync_to_async(mcp_tools.get_printing_summary)(days=days)
@mcp.tool()
async def find_compatible_filament(
type: str,
min_remaining_percent: int = 10,
color: str | None = None,
) -> str:
"""Find filament spools matching material type and optional criteria.
Args:
type: Material type to search for (PLA, PETG, ABS, etc.).
min_remaining_percent: Minimum remaining percentage (default 10).
color: Optional color filter (partial match).
"""
return await sync_to_async(mcp_tools.find_compatible_filament)(
type=type, min_remaining_percent=min_remaining_percent, color=color
)
# ── Register Resources ───────────────────────────────────────────
@mcp.resource("bambu://printers")
async def res_printers() -> str:
"""List all registered printers."""
return await sync_to_async(mcp_tools.resource_printers)()
@mcp.resource("bambu://printers/{printer_id}/status")
async def res_printer_status(printer_id: int) -> str:
"""Get latest status for a specific printer."""
return await sync_to_async(mcp_tools.resource_printer_status)(printer_id)
@mcp.resource("bambu://filaments")
async def res_filaments() -> str:
"""Full filament inventory."""
return await sync_to_async(mcp_tools.resource_filaments)()
@mcp.resource("bambu://filaments/{filament_id}")
async def res_filament_detail(filament_id: int) -> str:
"""Single filament spool with usage history."""
return await sync_to_async(mcp_tools.resource_filament_detail)(filament_id)
@mcp.resource("bambu://print-jobs/recent")
async def res_recent_jobs() -> str:
"""Last 20 print jobs."""
return await sync_to_async(mcp_tools.resource_recent_print_jobs)()
@mcp.resource("bambu://filament-types")
async def res_filament_types() -> str:
"""Filament type registry."""
return await sync_to_async(mcp_tools.resource_filament_types)()
@mcp.resource("bambu://filament-colors")
async def res_filament_colors() -> str:
"""Filament color database."""
return await sync_to_async(mcp_tools.resource_filament_colors)()
# ── Register Prompts ─────────────────────────────────────────────
@mcp.prompt()
async def printer_check_in(printer_id: int | None = None) -> str:
"""Full printer status briefing with health check and recent prints.
Args:
printer_id: Optional printer ID. Omit for all printers.
"""
return await sync_to_async(mcp_tools.prompt_printer_check_in)(printer_id=printer_id)
@mcp.prompt()
async def filament_inventory_report() -> str:
"""Comprehensive filament inventory report with low-stock warnings."""
return await sync_to_async(mcp_tools.prompt_filament_inventory_report)()
@mcp.prompt()
async def print_job_review(job_id: int) -> str:
"""Detailed review of a completed print job.
Args:
job_id: The print job ID to review.
"""
return await sync_to_async(mcp_tools.prompt_print_job_review)(job_id)
@mcp.prompt()
async def weekly_printing_digest() -> str:
"""Weekly printing activity summary with filament usage breakdown."""
return await sync_to_async(mcp_tools.prompt_weekly_digest)()
@mcp.prompt()
async def troubleshoot_printer(printer_id: int | None = None) -> str:
"""Diagnose printer issues using recent health data, status, and temperatures.
Args:
printer_id: Optional printer ID. Omit for all printers.
"""
return await sync_to_async(mcp_tools.prompt_troubleshoot_printer)(printer_id=printer_id)
# ── Auth middleware for SSE ───────────────────────────────────────
api_key = app_settings.MCP_API_KEY
auth_backend = app_settings.MCP_AUTH_BACKEND
if api_key or auth_backend:
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
class AuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
# Custom auth backend takes priority
if auth_backend:
if not auth_backend(request):
return JSONResponse(
{"error": "Unauthorized"}, status_code=401
)
return await call_next(request)
# API key auth
if api_key:
auth_header = request.headers.get("Authorization", "")
if auth_header == f"Bearer {api_key}":
return await call_next(request)
return JSONResponse(
{"error": "Invalid or missing API key"}, status_code=401
)
return await call_next(request)
# Attach middleware — FastMCP's SSE app is a Starlette app
original_sse_app = mcp.sse_app
def patched_sse_app():
app = original_sse_app()
app.add_middleware(AuthMiddleware)
return app
mcp.sse_app = patched_sse_app
# ── Run ──────────────────────────────────────────────────────────
if transport == "sse":
try:
import uvicorn
except ImportError:
raise CommandError(
"uvicorn is required for SSE transport. Install it with: pip install uvicorn"
)
self.stdout.write(
self.style.SUCCESS(
f"Starting Bambu-Run MCP server (SSE) on {host}:{port}"
)
)
self.stdout.write(
f"Connect with: http://{host}:{port}/sse"
)
app = mcp.sse_app()
uvicorn.run(app, host=host, port=port)
else:
self.stdout.write(
self.style.SUCCESS("Starting Bambu-Run MCP server (stdio)")
)
mcp.run(transport="stdio")

View File

@@ -7,6 +7,7 @@ RAE can reuse these directly.
from datetime import timedelta
from decimal import Decimal
from zoneinfo import ZoneInfo
from django.db.models import Avg, Count, Max, Min, Q, Sum
from django.utils import timezone
@@ -14,6 +15,14 @@ from django.utils import timezone
from .conf import app_settings
def _local_dt(dt, fmt="%Y-%m-%d %H:%M %Z"):
"""Convert a UTC-aware datetime to the configured local timezone for display."""
if dt is None:
return ""
tz = ZoneInfo(app_settings.TIMEZONE)
return dt.astimezone(tz).strftime(fmt)
def _redact(value, label="[redacted]"):
"""Redact sensitive values if MCP_HIDE_SENSITIVE is enabled."""
if app_settings.MCP_HIDE_SENSITIVE:
@@ -108,7 +117,7 @@ def get_printer_status(printer_id=None):
for msg in metric.hms[:5]:
lines.append(f"- HMS: {msg}")
lines.append(f"\n*Last updated: {metric.timestamp.strftime('%Y-%m-%d %H:%M:%S')}*")
lines.append(f"\n*Last updated: {_local_dt(metric.timestamp, '%Y-%m-%d %H:%M:%S %Z')}*")
parts.append("\n".join(lines))
return "\n\n---\n\n".join(parts)
@@ -161,7 +170,7 @@ def get_print_history(status=None, days=None, project_name=None, limit=20):
f"| {j.id} | {j.project_name} | {j.device.name} | "
f"{j.final_status or 'In Progress'} | {j.completion_percent}% | "
f"{_format_duration(j.duration_minutes)} | "
f"{j.start_time.strftime('%Y-%m-%d %H:%M')} |"
f"{_local_dt(j.start_time, '%Y-%m-%d %H:%M')} |"
)
return "\n".join(lines)
@@ -181,9 +190,9 @@ def get_print_job_detail(job_id):
lines.append(f"**Progress**: {job.completion_percent}%")
if job.gcode_file:
lines.append(f"**G-code**: {job.gcode_file}")
lines.append(f"**Started**: {job.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"**Started**: {_local_dt(job.start_time, '%Y-%m-%d %H:%M:%S %Z')}")
if job.end_time:
lines.append(f"**Ended**: {job.end_time.strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"**Ended**: {_local_dt(job.end_time, '%Y-%m-%d %H:%M:%S %Z')}")
lines.append(f"**Duration**: {_format_duration(job.duration_minutes)}")
if job.total_layers:
lines.append(f"**Total Layers**: {job.total_layers}")
@@ -234,7 +243,7 @@ def list_filaments(type=None, brand=None, color=None, loaded_in_ams=None, low_fi
color_display = f"{f.color}"
if f.color_hex:
color_display += f" ({f.color_hex})"
last_used = f.last_used.strftime("%Y-%m-%d") if f.last_used else "-"
last_used = _local_dt(f.last_used, "%Y-%m-%d") if f.last_used else "-"
lines.append(
f"| {f.id} | {f.brand} | {f.sub_type or f.type} | "
f"{color_display} | {f.remaining_percent}% | "
@@ -278,7 +287,7 @@ def get_filament_detail(filament_id):
for u in usages:
lines.append(
f"| {u.print_job.project_name} | "
f"{u.print_job.start_time.strftime('%Y-%m-%d')} | "
f"{_local_dt(u.print_job.start_time, '%Y-%m-%d')} | "
f"{u.consumed_percent or 0}% | {u.consumed_grams or '-'}g |"
)
@@ -431,7 +440,7 @@ def get_printer_health(printer_id=None):
signal = latest.wifi_signal_dbm
quality = "Excellent" if signal > -50 else "Good" if signal > -60 else "Fair" if signal > -70 else "Poor"
parts.append(f"- WiFi: {signal} dBm ({quality})")
parts.append(f"- Last seen: {latest.timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
parts.append(f"- Last seen: {_local_dt(latest.timestamp, '%Y-%m-%d %H:%M:%S %Z')}")
age = (timezone.now() - latest.timestamp).total_seconds()
if age > 300:
parts.append(f"- **Warning**: No data for {_format_duration(age / 60)}")
@@ -461,7 +470,7 @@ def get_printer_health(printer_id=None):
if failed.exists():
parts.append(f"### Recent Failures (7d): {failed.count()}")
for job in failed[:5]:
parts.append(f"- {job.project_name} ({job.final_status}) — {job.start_time.strftime('%m-%d %H:%M')}")
parts.append(f"- {job.project_name} ({job.final_status}) — {_local_dt(job.start_time, '%m-%d %H:%M')}")
# Success rate
week_jobs = PrintJob.objects.filter(device=printer, start_time__gte=week_ago)
@@ -496,7 +505,7 @@ def search_print_jobs(query):
for j in jobs:
lines.append(
f"| {j.id} | {j.project_name} | {j.device.name} | "
f"{j.final_status or 'In Progress'} | {j.start_time.strftime('%Y-%m-%d')} |"
f"{j.final_status or 'In Progress'} | {_local_dt(j.start_time, '%Y-%m-%d')} |"
)
return "\n".join(lines)

View File

@@ -36,6 +36,7 @@ stderr_logfile_maxbytes=0
autorestart=true
startretries=10
startsecs=5
priority=10
[program:migrate]
command=python standalone/manage.py migrate --noinput