""" URL Shortener API Endpoints: - POST /shorten - Create a short URL - GET /{code} - Redirect to original URL - GET /stats/{code} - Get click statistics """ import os import asyncio from contextlib import asynccontextmanager from urllib.parse import urlparse import asyncpg import redis.asyncio as redis from fastapi import FastAPI, HTTPException, Request from fastapi.responses import RedirectResponse from pydantic import BaseModel, HttpUrl from app.encoding import base62_encode from app.snowflake import init_generator, generate_id # Configuration from environment DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://urlshortner:localdev@localhost:5432/urlshortner") REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") MACHINE_ID = int(os.getenv("MACHINE_ID", "1")) BASE_URL = os.getenv("BASE_URL", "http://localhost") # Cache TTL in seconds (1 hour) CACHE_TTL = 3600 # Global connections db_pool: asyncpg.Pool | None = None redis_client: redis.Redis | None = None @asynccontextmanager async def lifespan(app: FastAPI): """Manage database and Redis connections.""" global db_pool, redis_client # Startup init_generator(MACHINE_ID) db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=5, max_size=20) redis_client = redis.from_url(REDIS_URL, decode_responses=True) print(f"[Startup] Connected to PostgreSQL and Redis. Machine ID: {MACHINE_ID}") yield # Shutdown if db_pool: await db_pool.close() if redis_client: await redis_client.close() print("[Shutdown] Connections closed.") app = FastAPI( title="URL Shortener", description="Distributed URL shortening service", version="1.0.0", lifespan=lifespan, ) # Request/Response models class ShortenRequest(BaseModel): url: HttpUrl custom_code: str | None = None # Optional custom short code class ShortenResponse(BaseModel): short_url: str short_code: str original_url: str class StatsResponse(BaseModel): short_code: str original_url: str click_count: int created_at: str # Endpoints @app.get("/health") async def health_check(): """Health check for load balancer.""" return {"status": "healthy", "machine_id": MACHINE_ID} @app.post("/shorten", response_model=ShortenResponse) async def shorten_url(request: ShortenRequest, req: Request): """ Create a shortened URL. Process: 1. Generate unique ID using Snowflake 2. Encode as base62 for short code 3. Store in PostgreSQL 4. Cache in Redis """ original_url = str(request.url) # Validate URL has a valid domain parsed = urlparse(original_url) if not parsed.netloc: raise HTTPException(status_code=400, detail="Invalid URL") # Generate short code if request.custom_code: short_code = request.custom_code # Check if custom code already exists existing = await redis_client.get(f"url:{short_code}") if existing: raise HTTPException(status_code=409, detail="Custom code already in use") else: # Generate using Snowflake + base62 snowflake_id = generate_id() short_code = base62_encode(snowflake_id) # Get client info client_ip = req.headers.get("X-Real-IP", req.client.host if req.client else "unknown") user_agent = req.headers.get("User-Agent", "") # Store in database try: await db_pool.execute( """ INSERT INTO urls (short_code, original_url, ip_address, user_agent) VALUES ($1, $2, $3, $4) """, short_code, original_url, client_ip, user_agent, ) except asyncpg.UniqueViolationError: raise HTTPException(status_code=409, detail="Short code collision. Please retry.") # Cache in Redis await redis_client.setex(f"url:{short_code}", CACHE_TTL, original_url) return ShortenResponse( short_url=f"{BASE_URL}/{short_code}", short_code=short_code, original_url=original_url, ) @app.get("/{short_code}") async def redirect_to_url(short_code: str, req: Request): """ Redirect to the original URL. Process: 1. Check Redis cache first (fast path) 2. If cache miss, query PostgreSQL 3. Update cache on miss 4. Track click asynchronously (fire and forget) """ # Try cache first original_url = await redis_client.get(f"url:{short_code}") if not original_url: # Cache miss - query database row = await db_pool.fetchrow( "SELECT original_url FROM urls WHERE short_code = $1", short_code, ) if not row: raise HTTPException(status_code=404, detail="Short URL not found") original_url = row["original_url"] # Populate cache await redis_client.setex(f"url:{short_code}", CACHE_TTL, original_url) # Track click asynchronously (don't slow down redirect) asyncio.create_task( track_click( short_code, req.headers.get("X-Real-IP", req.client.host if req.client else None), req.headers.get("User-Agent"), req.headers.get("Referer"), ) ) # 301 = permanent redirect (cacheable by browsers) # 302 = temporary redirect (not cached, better for analytics) return RedirectResponse(url=original_url, status_code=302) @app.get("/stats/{short_code}", response_model=StatsResponse) async def get_stats(short_code: str): """Get statistics for a short URL.""" row = await db_pool.fetchrow( """ SELECT short_code, original_url, click_count, created_at FROM urls WHERE short_code = $1 """, short_code, ) if not row: raise HTTPException(status_code=404, detail="Short URL not found") return StatsResponse( short_code=row["short_code"], original_url=row["original_url"], click_count=row["click_count"], created_at=row["created_at"].isoformat(), ) async def track_click( short_code: str, ip_address: str | None, user_agent: str | None, referer: str | None, ): """ Track a click event asynchronously. This runs in the background after the redirect is sent, so it doesn't slow down the user experience. """ try: # Increment click count await db_pool.execute( "UPDATE urls SET click_count = click_count + 1 WHERE short_code = $1", short_code, ) # Store detailed click record await db_pool.execute( """ INSERT INTO clicks (short_code, ip_address, user_agent, referer) VALUES ($1, $2, $3, $4) """, short_code, ip_address, user_agent, referer, ) except Exception as e: # Log but don't fail - analytics shouldn't break redirects print(f"[Warning] Failed to track click: {e}")