""" Snowflake ID Generator for distributed unique ID generation. Structure (64 bits total): - 1 bit: sign (always 0) - 41 bits: timestamp in milliseconds (69 years from epoch) - 10 bits: machine/worker ID (1024 unique machines) - 12 bits: sequence number (4096 IDs per millisecond per machine) Benefits: - No coordination needed between machines - Time-sortable (IDs are roughly ordered by creation time) - Guaranteed unique across distributed system - High throughput: 4096 IDs/ms/machine = 4M IDs/second/machine Used by: Twitter, Discord, Instagram (with variations) """ import time import threading class SnowflakeGenerator: # Custom epoch: Jan 1, 2024 00:00:00 UTC (extends usable time range) EPOCH = 1704067200000 # milliseconds # Bit lengths TIMESTAMP_BITS = 41 MACHINE_ID_BITS = 10 SEQUENCE_BITS = 12 # Max values MAX_MACHINE_ID = (1 << MACHINE_ID_BITS) - 1 # 1023 MAX_SEQUENCE = (1 << SEQUENCE_BITS) - 1 # 4095 # Bit shifts TIMESTAMP_SHIFT = MACHINE_ID_BITS + SEQUENCE_BITS # 22 MACHINE_ID_SHIFT = SEQUENCE_BITS # 12 def __init__(self, machine_id: int): """ Initialize generator with a unique machine ID. Args: machine_id: Unique identifier for this machine/worker (0-1023) """ if not 0 <= machine_id <= self.MAX_MACHINE_ID: raise ValueError(f"machine_id must be between 0 and {self.MAX_MACHINE_ID}") self.machine_id = machine_id self.sequence = 0 self.last_timestamp = -1 self._lock = threading.Lock() def _current_timestamp(self) -> int: """Get current time in milliseconds since our epoch.""" return int(time.time() * 1000) - self.EPOCH def _wait_next_millis(self, last_timestamp: int) -> int: """Block until next millisecond if we've exhausted sequence.""" timestamp = self._current_timestamp() while timestamp <= last_timestamp: time.sleep(0.0001) # 0.1ms timestamp = self._current_timestamp() return timestamp def generate(self) -> int: """ Generate a unique Snowflake ID. Thread-safe: Can be called from multiple threads. Returns: 64-bit unique ID """ with self._lock: timestamp = self._current_timestamp() if timestamp < self.last_timestamp: # Clock moved backwards - this is problematic raise RuntimeError( f"Clock moved backwards. Refusing to generate ID. " f"Last: {self.last_timestamp}, Current: {timestamp}" ) if timestamp == self.last_timestamp: # Same millisecond - increment sequence self.sequence = (self.sequence + 1) & self.MAX_SEQUENCE if self.sequence == 0: # Sequence exhausted - wait for next millisecond timestamp = self._wait_next_millis(self.last_timestamp) else: # New millisecond - reset sequence self.sequence = 0 self.last_timestamp = timestamp # Compose the ID snowflake_id = ( (timestamp << self.TIMESTAMP_SHIFT) | (self.machine_id << self.MACHINE_ID_SHIFT) | self.sequence ) return snowflake_id def parse(self, snowflake_id: int) -> dict: """ Parse a Snowflake ID into its components. Useful for debugging and understanding ID generation. """ timestamp = (snowflake_id >> self.TIMESTAMP_SHIFT) + self.EPOCH machine_id = (snowflake_id >> self.MACHINE_ID_SHIFT) & self.MAX_MACHINE_ID sequence = snowflake_id & self.MAX_SEQUENCE return { "timestamp_ms": timestamp, "timestamp_iso": time.strftime( "%Y-%m-%d %H:%M:%S", time.gmtime(timestamp / 1000) ), "machine_id": machine_id, "sequence": sequence, } # Module-level generator (initialized in main.py) _generator: SnowflakeGenerator | None = None def init_generator(machine_id: int) -> None: """Initialize the global Snowflake generator.""" global _generator _generator = SnowflakeGenerator(machine_id) def generate_id() -> int: """Generate a unique Snowflake ID using the global generator.""" if _generator is None: raise RuntimeError("Snowflake generator not initialized. Call init_generator() first.") return _generator.generate()