mirror of
https://github.com/harivansh-afk/nix.git
synced 2026-04-15 04:03:30 +00:00
408 lines
14 KiB
Python
408 lines
14 KiB
Python
# /// script
|
|
# dependencies = ["pillow"]
|
|
# ///
|
|
"""Generate topographic contour wallpapers from real-world elevation data."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import math
|
|
import os
|
|
import random
|
|
import subprocess
|
|
import sys
|
|
import urllib.request
|
|
from datetime import datetime
|
|
from io import BytesIO
|
|
from typing import Iterator
|
|
|
|
from PIL import Image, ImageChops, ImageDraw, ImageFilter, ImageFont
|
|
|
|
Coord = tuple[float, float]
|
|
Candidate = tuple[float, float, str]
|
|
Rgb = tuple[int, int, int]
|
|
|
|
HOME: str = os.environ["HOME"]
|
|
DIR: str = os.path.join(HOME, "Pictures", "Screensavers")
|
|
STATE: str = os.path.join(
|
|
os.environ.get("XDG_STATE_HOME", os.path.join(HOME, ".local", "state")),
|
|
"theme",
|
|
"current",
|
|
)
|
|
LOG_DIR: str = os.path.join(
|
|
os.environ.get("XDG_STATE_HOME", os.path.join(HOME, ".local", "state")),
|
|
"wallpaper",
|
|
)
|
|
LOG_FILE: str = os.path.join(LOG_DIR, "gen.log")
|
|
HISTORY_FILE: str = os.path.join(LOG_DIR, "history.json")
|
|
CACHE: str = os.path.join(
|
|
os.environ.get("XDG_CACHE_HOME", os.path.join(HOME, ".cache")),
|
|
"wallpaper",
|
|
)
|
|
|
|
os.makedirs(DIR, exist_ok=True)
|
|
os.makedirs(LOG_DIR, exist_ok=True)
|
|
os.makedirs(CACHE, exist_ok=True)
|
|
|
|
TILE_URL: str = "https://s3.amazonaws.com/elevation-tiles-prod/terrarium/{z}/{x}/{y}.png"
|
|
TILE_SIZE: int = 256
|
|
ZOOM: int = 11
|
|
CONTOUR_LEVELS: int = 20
|
|
|
|
# cozybox palette - matches lib/theme.nix
|
|
THEMES: dict[str, dict[str, Rgb]] = {
|
|
"dark": {
|
|
"bg": (0x18, 0x18, 0x18),
|
|
"line": (0x2d, 0x2d, 0x2d),
|
|
"label": (0xFF, 0xFF, 0xFF),
|
|
},
|
|
"light": {
|
|
"bg": (0xE7, 0xE7, 0xE7),
|
|
"line": (0xCE, 0xCE, 0xCE),
|
|
"label": (0x00, 0x00, 0x00),
|
|
},
|
|
}
|
|
|
|
# curated mountain/terrain locations with interesting topography
|
|
LOCATIONS: list[Coord] = [
|
|
(46.56, 7.69), # Swiss Alps - Bernese Oberland
|
|
(36.10, -112.09), # Grand Canyon
|
|
(27.99, 86.93), # Everest region
|
|
(61.22, 6.78), # Norwegian fjords
|
|
(-50.94, -73.15), # Patagonia
|
|
(36.25, 137.60), # Japanese Alps
|
|
(39.11, -106.45), # Colorado Rockies
|
|
(46.42, 11.84), # Dolomites
|
|
(44.07, 6.87), # French Alps
|
|
(-43.59, 170.14), # New Zealand Alps
|
|
(64.07, -16.20), # Iceland highlands
|
|
(28.60, 83.82), # Annapurna
|
|
(42.50, 44.50), # Caucasus
|
|
(47.07, 12.69), # Austrian Alps
|
|
(45.83, 6.86), # Mont Blanc
|
|
]
|
|
|
|
MIN_RELIEF: int = 400
|
|
MAX_RETRIES: int = 20
|
|
SEA_LEVEL: int = 32768
|
|
MIN_LAND_FRACTION: float = 0.1
|
|
PREVIEW_W: int = 384
|
|
PREVIEW_H: int = 240
|
|
GRID_COLS: int = 6
|
|
GRID_ROWS: int = 4
|
|
MIN_CONTOUR_COVERAGE: float = 0.15
|
|
MIN_OCCUPIED_CELLS: int = 12
|
|
MAX_CELL_SHARE: float = 0.15
|
|
MAX_CACHED_CANDIDATES: int = 24
|
|
HISTORY_SIZE: int = 10 # remember this many recent locations to avoid repeats
|
|
RUN_ID: str = f"{datetime.now().strftime('%Y%m%dT%H%M%S')}-pid{os.getpid()}"
|
|
|
|
|
|
def log(message: str) -> None:
|
|
line = f"{datetime.now().astimezone().isoformat(timespec='seconds')} [{RUN_ID}] {message}"
|
|
print(line, file=sys.stderr, flush=True)
|
|
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
|
print(line, file=f)
|
|
|
|
|
|
def load_history() -> list[Coord]:
|
|
try:
|
|
with open(HISTORY_FILE, encoding="utf-8") as f:
|
|
entries = json.load(f)
|
|
return [(e[0], e[1]) for e in entries if isinstance(e, list) and len(e) == 2]
|
|
except (FileNotFoundError, json.JSONDecodeError, KeyError):
|
|
return []
|
|
|
|
|
|
def save_history(lat: float, lon: float) -> None:
|
|
history = load_history()
|
|
history.append((round(lat, 2), round(lon, 2)))
|
|
history = history[-HISTORY_SIZE:]
|
|
with open(HISTORY_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(history, f)
|
|
|
|
|
|
def random_location() -> Coord:
|
|
return random.uniform(-60, 70), random.uniform(-180, 180)
|
|
|
|
|
|
def cached_locations(tiles_x: int, tiles_y: int) -> list[Coord]:
|
|
suffix = f"_z{ZOOM}_{tiles_x}x{tiles_y}.png"
|
|
locations: list[Coord] = []
|
|
for name in os.listdir(CACHE):
|
|
if not name.startswith("terrain_") or not name.endswith(suffix):
|
|
continue
|
|
coords = name[len("terrain_") : -len(suffix)]
|
|
try:
|
|
lat, lon = coords.split("_", 1)
|
|
locations.append((float(lat), float(lon)))
|
|
except ValueError:
|
|
continue
|
|
random.shuffle(locations)
|
|
return locations[:MAX_CACHED_CANDIDATES]
|
|
|
|
|
|
def candidate_locations(tiles_x: int, tiles_y: int) -> Iterator[Candidate]:
|
|
seen: set[Coord] = set()
|
|
recent = {(round(lat, 2), round(lon, 2)) for lat, lon in load_history()}
|
|
cached: list[Coord] = cached_locations(tiles_x, tiles_y)
|
|
fallback: list[Coord] = list(LOCATIONS)
|
|
random.shuffle(fallback)
|
|
log(f"candidate_pools cached={len(cached)} curated={len(fallback)} random={MAX_RETRIES} recent_skip={len(recent)}")
|
|
for lat, lon in cached:
|
|
key = (round(lat, 2), round(lon, 2))
|
|
if key in seen or key in recent:
|
|
continue
|
|
seen.add(key)
|
|
yield lat, lon, "cached"
|
|
for lat, lon in fallback:
|
|
key = (round(lat, 2), round(lon, 2))
|
|
if key in seen or key in recent:
|
|
continue
|
|
seen.add(key)
|
|
yield lat, lon, "curated"
|
|
for _ in range(MAX_RETRIES):
|
|
lat, lon = random_location()
|
|
key = (round(lat, 2), round(lon, 2))
|
|
if key in seen or key in recent:
|
|
continue
|
|
seen.add(key)
|
|
yield lat, lon, "random"
|
|
|
|
|
|
def reverse_geocode(lat: float, lon: float) -> str | None:
|
|
try:
|
|
url = f"https://nominatim.openstreetmap.org/reverse?lat={lat}&lon={lon}&format=json&zoom=6&accept-language=en"
|
|
req = urllib.request.Request(url, headers={"User-Agent": "wallpaper-gen/1.0"})
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
data = json.loads(resp.read())
|
|
addr = data.get("address", {})
|
|
name = (
|
|
addr.get("region")
|
|
or addr.get("state_district")
|
|
or addr.get("state")
|
|
or addr.get("county")
|
|
or addr.get("country")
|
|
)
|
|
return name
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_resolution() -> tuple[int, int]:
|
|
"""Get primary display resolution on macOS."""
|
|
try:
|
|
out = subprocess.check_output(
|
|
["system_profiler", "SPDisplaysDataType", "-json"],
|
|
timeout=5,
|
|
).decode()
|
|
displays = json.loads(out)
|
|
for gpu in displays.get("SPDisplaysDataType", []):
|
|
for disp in gpu.get("spdisplays_ndrvs", []):
|
|
res = disp.get("_spdisplays_resolution", "")
|
|
if " x " in res:
|
|
parts = res.split(" x ")
|
|
w = int(parts[0].strip())
|
|
h = int(parts[1].split()[0].strip())
|
|
return w, h
|
|
except Exception:
|
|
pass
|
|
return 3024, 1964 # MacBook Pro 14" default
|
|
|
|
|
|
def lat_lon_to_tile(lat: float, lon: float, zoom: int) -> tuple[int, int]:
|
|
n = 2**zoom
|
|
x = int(n * ((lon + 180) / 360))
|
|
lat_rad = math.radians(lat)
|
|
y = int(n * (1 - (math.log(math.tan(lat_rad) + 1 / math.cos(lat_rad)) / math.pi)) / 2)
|
|
return x, y
|
|
|
|
|
|
def fetch_tile(z: int, x: int, y: int) -> Image.Image:
|
|
url = TILE_URL.format(z=z, x=x, y=y)
|
|
req = urllib.request.Request(url, headers={"User-Agent": "wallpaper-gen/1.0"})
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return Image.open(BytesIO(resp.read()))
|
|
|
|
|
|
def fetch_terrain(
|
|
lat: float, lon: float, tiles_x: int, tiles_y: int, zoom: int
|
|
) -> tuple[Image.Image, bool]:
|
|
tag = f"{lat:.2f}_{lon:.2f}_z{zoom}_{tiles_x}x{tiles_y}"
|
|
cache_file = os.path.join(CACHE, f"terrain_{tag}.png")
|
|
if os.path.exists(cache_file):
|
|
return Image.open(cache_file), True
|
|
cx, cy = lat_lon_to_tile(lat, lon, zoom)
|
|
sx, sy = cx - tiles_x // 2, cy - tiles_y // 2
|
|
full = Image.new("RGB", (tiles_x * TILE_SIZE, tiles_y * TILE_SIZE))
|
|
for ty in range(tiles_y):
|
|
for tx in range(tiles_x):
|
|
tile = fetch_tile(zoom, sx + tx, sy + ty)
|
|
full.paste(tile, (tx * TILE_SIZE, ty * TILE_SIZE))
|
|
full.save(cache_file)
|
|
return full, False
|
|
|
|
|
|
def decode_terrarium(img: Image.Image) -> tuple[Image.Image, int, float]:
|
|
raw_bytes = img.tobytes()
|
|
px_count = len(raw_bytes) // 3
|
|
raw: list[int] = [0] * px_count
|
|
land: int = 0
|
|
for i in range(px_count):
|
|
off = i * 3
|
|
raw[i] = (raw_bytes[off] << 8) | raw_bytes[off + 1]
|
|
if raw[i] > SEA_LEVEL:
|
|
land += 1
|
|
lo, hi = min(raw), max(raw)
|
|
rng = hi - lo or 1
|
|
norm = bytearray((e - lo) * 255 // rng for e in raw)
|
|
return Image.frombytes("L", img.size, bytes(norm)), hi - lo, land / px_count
|
|
|
|
|
|
def build_contour_mask(elevation: Image.Image, W: int, H: int) -> Image.Image:
|
|
S: int = 2
|
|
iW, iH = W * S, H * S
|
|
terrain = elevation.resize((iW, iH), Image.BICUBIC)
|
|
terrain = terrain.filter(ImageFilter.GaussianBlur(radius=15 * S))
|
|
step = max(1, 256 // CONTOUR_LEVELS)
|
|
terrain = terrain.point(lambda p: (p // step) * step)
|
|
eroded = terrain.filter(ImageFilter.MinFilter(3))
|
|
edges = ImageChops.subtract(terrain, eroded)
|
|
edges = edges.point(lambda p: 255 if p > 0 else 0)
|
|
edges = edges.filter(ImageFilter.MaxFilter(3))
|
|
edges = edges.filter(ImageFilter.GaussianBlur(radius=1.5 * S))
|
|
return edges.point(lambda p: 255 if p > 80 else 0)
|
|
|
|
|
|
def contour_stats(elevation: Image.Image) -> tuple[float, int, float]:
|
|
mask = build_contour_mask(elevation, PREVIEW_W, PREVIEW_H).resize(
|
|
(PREVIEW_W, PREVIEW_H), Image.NEAREST
|
|
)
|
|
mask_bytes = mask.tobytes()
|
|
total: int = len(mask_bytes)
|
|
filled: int = 0
|
|
cell_counts: list[int] = [0] * (GRID_COLS * GRID_ROWS)
|
|
for y in range(PREVIEW_H):
|
|
row_off = y * PREVIEW_W
|
|
cy = y * GRID_ROWS // PREVIEW_H
|
|
for x in range(PREVIEW_W):
|
|
if mask_bytes[row_off + x]:
|
|
filled += 1
|
|
cx = x * GRID_COLS // PREVIEW_W
|
|
cell_counts[cy * GRID_COLS + cx] += 1
|
|
coverage = filled / total
|
|
occupied = sum(1 for count in cell_counts if count)
|
|
largest_share = max(cell_counts) / filled if filled else 1
|
|
return coverage, occupied, largest_share
|
|
|
|
|
|
def candidate_summary(
|
|
elevation: Image.Image, relief: int, land_fraction: float
|
|
) -> tuple[bool, str]:
|
|
if relief < MIN_RELIEF or land_fraction < MIN_LAND_FRACTION:
|
|
return False, (
|
|
f"relief={relief} land_fraction={land_fraction:.3f} "
|
|
f"thresholds=({MIN_RELIEF},{MIN_LAND_FRACTION:.3f})"
|
|
)
|
|
coverage, occupied, largest_share = contour_stats(elevation)
|
|
ok = (
|
|
coverage >= MIN_CONTOUR_COVERAGE
|
|
and occupied >= MIN_OCCUPIED_CELLS
|
|
and largest_share <= MAX_CELL_SHARE
|
|
)
|
|
return ok, (
|
|
f"relief={relief} land_fraction={land_fraction:.3f} "
|
|
f"coverage={coverage:.3f} occupied={occupied} largest_cell_share={largest_share:.3f} "
|
|
f"thresholds=({MIN_CONTOUR_COVERAGE:.3f},{MIN_OCCUPIED_CELLS},{MAX_CELL_SHARE:.3f})"
|
|
)
|
|
|
|
|
|
def render_contours(
|
|
elevation: Image.Image, W: int, H: int, bg: Rgb, line_color: Rgb
|
|
) -> Image.Image:
|
|
S: int = 2
|
|
iW, iH = W * S, H * S
|
|
edges = build_contour_mask(elevation, W, H)
|
|
bg_img = Image.new("RGB", (iW, iH), bg)
|
|
fg_img = Image.new("RGB", (iW, iH), line_color)
|
|
img = Image.composite(fg_img, bg_img, edges)
|
|
return img.resize((W, H), Image.LANCZOS)
|
|
|
|
|
|
def find_font() -> str | None:
|
|
candidates = [
|
|
os.path.join(HOME, "Library/Fonts/BerkeleyMono-Regular.otf"),
|
|
os.path.join(HOME, "Library/Fonts/BerkeleyMono-Regular.ttf"),
|
|
os.path.join(HOME, ".local/share/fonts/berkeley-mono/BerkeleyMono-Regular.ttf"),
|
|
"/Library/Fonts/BerkeleyMono-Regular.otf",
|
|
]
|
|
for path in candidates:
|
|
if os.path.exists(path):
|
|
return path
|
|
return None
|
|
|
|
|
|
def gen() -> None:
|
|
W, H = get_resolution()
|
|
tiles_x = math.ceil(W / TILE_SIZE)
|
|
tiles_y = math.ceil(H / TILE_SIZE)
|
|
|
|
theme = "dark"
|
|
try:
|
|
with open(STATE) as f:
|
|
t = f.read().strip()
|
|
if t in THEMES:
|
|
theme = t
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
log(f"start theme={theme} resolution={W}x{H} tiles={tiles_x}x{tiles_y}")
|
|
|
|
for index, (lat, lon, source) in enumerate(candidate_locations(tiles_x, tiles_y), start=1):
|
|
log(f"candidate[{index}] source={source} lat={lat:.2f} lon={lon:.2f} begin")
|
|
try:
|
|
terrain_img, cache_hit = fetch_terrain(lat, lon, tiles_x, tiles_y, ZOOM)
|
|
except Exception as err:
|
|
log(f"candidate[{index}] source={source} lat={lat:.2f} lon={lon:.2f} fetch_error={err!r}")
|
|
continue
|
|
elevation, relief, land_fraction = decode_terrarium(terrain_img)
|
|
ok, summary = candidate_summary(elevation, relief, land_fraction)
|
|
status = "accept" if ok else "reject"
|
|
cache = "hit" if cache_hit else "miss"
|
|
log(f"candidate[{index}] source={source} lat={lat:.2f} lon={lon:.2f} cache={cache} {status} {summary}")
|
|
if ok:
|
|
break
|
|
else:
|
|
log("finish status=no_candidate")
|
|
return
|
|
|
|
elevation = elevation.crop((0, 0, min(elevation.width, W), min(elevation.height, H)))
|
|
save_history(lat, lon)
|
|
place = reverse_geocode(lat, lon)
|
|
log(f"selected lat={lat:.2f} lon={lon:.2f} place={place or '<none>'}")
|
|
|
|
coords = f"{lat:.2f}, {lon:.2f}"
|
|
label = f"{place} ({coords})" if place else coords
|
|
font_path = find_font()
|
|
|
|
for theme_name, colors in THEMES.items():
|
|
img = render_contours(elevation, W, H, colors["bg"], colors["line"])
|
|
draw = ImageDraw.Draw(img)
|
|
if font_path:
|
|
font = ImageFont.truetype(font_path, 14)
|
|
else:
|
|
font = ImageFont.load_default()
|
|
draw.text((24, H - 30), label, fill=colors["label"], font=font)
|
|
out_path = os.path.join(DIR, f"wallpaper-{theme_name}.jpg")
|
|
img.save(out_path, quality=95)
|
|
log(f"wrote theme={theme_name} path={out_path}")
|
|
|
|
link = os.path.join(DIR, "wallpaper.jpg")
|
|
if os.path.lexists(link):
|
|
os.unlink(link)
|
|
target = os.path.join(DIR, f"wallpaper-{theme}.jpg")
|
|
os.symlink(target, link)
|
|
log(f"finish status=ok target={target} link={link}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
gen()
|