import argparse import base64 import hashlib import mimetypes import os import random import secrets import string import zipfile from glob import glob from io import BytesIO from pathlib import Path from fastapi import FastAPI, HTTPException, Request from fastapi.responses import ( HTMLResponse, RedirectResponse, StreamingResponse, ) app = FastAPI() file_mapping: dict[str, str] = {} indexers = [] sorted_hashes: list[str] = [] navigate_enabled = False root_path: str = "" class FileIndexer: def __init__(self, path: str, salt: str | None = None): self.path = Path(path) self._salt = salt self._file_mapping = self._index() @property def salt(self) -> str: """Generate a random salt for hashing""" if self._salt is None: self._salt = secrets.token_hex(16) return self._salt def _hash_path(self, filepath: str) -> str: """Generate a salted hash of the file path""" return hashlib.sha256((filepath + self.salt).encode()).hexdigest() def _index(self) -> dict[str, str]: """Index all files in the directory""" mapping = {} for root, _, files in os.walk(self.path): for file in files: filepath = os.path.join(root, file) file_hash = self._hash_path(filepath) mapping[file_hash] = filepath return mapping def get_file_by_hash(self, file_hash: str): """Get file content by hash""" if file_hash not in self._file_mapping: return None file_path = self._file_mapping[file_hash] with open(file_path, "rb") as f: yield from f def get_filename_by_hash(self, file_hash: str) -> str | None: """Get filename by hash""" if file_hash not in self._file_mapping: return None return self._file_mapping[file_hash] class ZipFileIndexer(FileIndexer): def __init__(self, path: str, salt: str | None = None): super().__init__(path, salt) self._password_cache: dict[str, bool] = {} def _open_zip(self): """Open the ZIP file for reading.""" return zipfile.ZipFile(self.path, "r") def _index(self) -> dict[str, str]: """Index all files in the zip file""" mapping = {} with self._open_zip() as zip_file: for file_info in zip_file.infolist(): if not file_info.is_dir(): file_hash = self._hash_path(file_info.filename) mapping[file_hash] = file_info.filename return mapping def _needs_password(self, filename: str) -> bool: """Check if a file inside the zip requires a password. Uses lazy detection: tries to read the file without a password. Results are cached so the ZIP is not reopened on subsequent calls. Args: filename: The internal filename inside the zip. Returns: True if the file is password-protected, False otherwise. """ if filename in self._password_cache: return self._password_cache[filename] try: with self._open_zip() as zip_file: zip_file.read(filename) self._password_cache[filename] = False return False except Exception: self._password_cache[filename] = True return True def is_file_encrypted(self, file_hash: str) -> bool: """Check if the file identified by hash is password-protected. Args: file_hash: The hash of the file. Returns: True if the file requires a password, False otherwise. """ filename = self._file_mapping.get(file_hash) if filename is None: return False return self._needs_password(filename) def get_file_by_hash(self, file_hash: str, password: bytes | None = None): """Get file content by hash. Args: file_hash: The hash of the file. password: Optional password for encrypted files. """ if file_hash not in self._file_mapping: return None filename = self._file_mapping[file_hash] with self._open_zip() as zip_file: yield from BytesIO(zip_file.read(filename, pwd=password)) def get_filename_by_hash(self, file_hash: str) -> str | None: """Get filename by hash""" if file_hash not in self._file_mapping: return None return self._file_mapping[file_hash] def get_hash_by_path(self, path: str) -> str | None: """Reverse-lookup: given an internal ZIP filename, find its hash. Args: path: The internal filename inside the zip. Returns: The hash if found, None otherwise. """ for file_hash, filename in self._file_mapping.items(): if filename == path: return file_hash return None INDEXER_MAP = {".zip": ZipFileIndexer} def initialize_server(args: argparse.Namespace): """Initialize the server with directory or glob indexing""" global file_mapping, indexers, sorted_hashes, navigate_enabled, root_path navigate_enabled = args.navigate root_path = getattr(args, "root_path", None) or "" if root_path: # Ensure root_path starts with / but not ends with / root_path = "/" + root_path.strip("/") app.root_path = root_path src_path = Path(args.source) shared_salt = args.salt if shared_salt is None: shared_salt = secrets.token_hex(16) if src_path.is_dir(): indexer = FileIndexer(str(src_path), shared_salt) indexers.append(indexer) file_mapping.update(indexer._file_mapping) else: pattern = args.source matching_files = glob(pattern) if not matching_files: raise SystemExit(f"No files match pattern {pattern}") for file_path in matching_files: file_ext = Path(file_path).suffix if file_ext in INDEXER_MAP: indexer = INDEXER_MAP[file_ext](file_path, shared_salt) indexers.append(indexer) file_mapping.update(indexer._file_mapping) print(f"Indexed {len(file_mapping)} files from {len(indexers)} source(s)") # Build a sorted index for filename-ordered navigation sorted_hashes = sorted( file_mapping.keys(), key=lambda h: ( (indexer.get_filename_by_hash(h) or "").lower() if (indexer := _find_indexer_for_hash(h)) else "" ), ) @app.get("/api/health") async def health_check(): return {"status": "healthy", "file_count": len(file_mapping)} @app.get("/api/{file_hash}/data") async def get_file_data(file_hash: str, request: Request): """Serve a specific file by its hash.""" if file_hash not in file_mapping: raise HTTPException(status_code=404, detail="File not found") indexer = _find_indexer_for_hash(file_hash) if not indexer: raise HTTPException(status_code=404, detail="File not found") password = _get_zip_password(file_hash, request) if ( password is None and isinstance(indexer, ZipFileIndexer) and indexer.is_file_encrypted(file_hash) ): _raise_unauthorized() filename = indexer.get_filename_by_hash(file_hash) content_type, _ = mimetypes.guess_type(filename or "") if not content_type: content_type = "application/octet-stream" if isinstance(indexer, ZipFileIndexer): try: content = b"".join(indexer.get_file_by_hash(file_hash, password=password)) except RuntimeError: _raise_unauthorized() return StreamingResponse( iter([content]), media_type=content_type, headers={ "Content-Disposition": f"inline; filename={os.path.basename(filename or '')}", }, ) else: return StreamingResponse( indexer.get_file_by_hash(file_hash), media_type=content_type, headers={ "Content-Disposition": f"inline; filename={os.path.basename(filename or '')}", }, ) def _build_url( file_hash: str, order: str | None = None, delay: int | None = None ) -> str: """Build a URL with optional order/delay query parameters.""" base = f"{root_path}/{file_hash}" if order is not None and delay is not None: return f"{base}?order={order}&delay={delay}" return base @app.get("/") async def root( order: str | None = None, delay: int | None = None, request: Request = None ): """Redirect to a random file hash.""" random_hash = _get_random_hash() # Check if the random file is encrypted and requires auth if request is not None: password = _get_zip_password(random_hash, request) if password is None: indexer = _find_indexer_for_hash(random_hash) if isinstance(indexer, ZipFileIndexer) and indexer.is_file_encrypted( random_hash ): _raise_unauthorized() return RedirectResponse(url=_build_url(random_hash, order, delay), status_code=307) def _get_navigation_data(file_hash: str, order: str | None = None): """Get navigation data for a file hash. Args: file_hash: The current file's hash. order: Navigation order - 'next' for sequential, 'random' for random, or None for default browse mode. Returns: Dictionary with navigation hashes and filename. """ keys = _get_sorted_hashes() idx = keys.index(file_hash) if order == "random": next_hash = _get_random_hash() prev_hash = _get_random_hash() else: next_hash = keys[(idx + 1) % len(keys)] prev_hash = keys[idx - 1] if idx > 0 else keys[-1] indexer = _find_indexer_for_hash(file_hash) filename = indexer.get_filename_by_hash(file_hash) if indexer else "" return { "file_hash": file_hash, "next_hash": next_hash, "prev_hash": prev_hash, "filename": filename, } def _render_page( navigation_data: dict, extra_meta: str = "", image_click_url: str = "", play_button: str = "", current_order: str | None = None, current_delay: int | None = None, use_navigate_urls: bool = False, ) -> HTMLResponse: """Render the frontend page with navigation data. Args: navigation_data: Dictionary with navigation info. extra_meta: Extra tags (e.g., auto-refresh). image_click_url: URL navigated to on image click. play_button: Play/pause button HTML. current_order: Current navigation order ('next' or 'random'). current_delay: Delay in seconds for auto-navigation. use_navigate_urls: If True, use /navigate/{path} URLs for prev/next instead of /{hash} URLs. """ with open("frontend.html") as f: content = f.read() template = string.Template(content) # Generate navigation URLs based on current mode if use_navigate_urls: next_path = navigation_data["next_path"] prev_path = navigation_data["prev_path"] if current_order is not None: next_url = _build_navigate_url( next_path, order=current_order, delay=current_delay ) prev_url = _build_navigate_url( prev_path, order=current_order, delay=current_delay ) else: next_url = f"{root_path}/navigate/{next_path}" prev_url = f"{root_path}/navigate/{prev_path}" else: if current_order is not None: next_url = _build_url( navigation_data["next_hash"], order=current_order, delay=current_delay, ) prev_url = _build_url( navigation_data["prev_hash"], order=current_order, delay=current_delay, ) else: next_url = "{root_path}/{next_hash}".format( root_path=root_path, next_hash=navigation_data["next_hash"] ) prev_url = "{root_path}/{prev_hash}".format( root_path=root_path, prev_hash=navigation_data["prev_hash"] ) # Build the toggle URL (alternate view mode) file_path = navigation_data.get("filename", "") if navigate_enabled: if use_navigate_urls: toggle_url = _build_url( navigation_data["file_hash"], order=current_order, delay=current_delay, ) else: toggle_url = _build_navigate_url( file_path, order=current_order, delay=current_delay, ) else: toggle_url = "#" # Build folder index sidebar if in navigate mode folder_index = "" sidebar_class = "hidden" if navigate_enabled and use_navigate_urls: current_path = file_path folder_index = _render_folder_index_html( current_path=current_path, current_order=current_order, current_delay=current_delay, ) sidebar_class = "" # Build the media element based on file type img_url = "{root_path}/api/{file_hash}/data".format( root_path=root_path, file_hash=navigation_data["file_hash"] ) content_type, _ = mimetypes.guess_type(file_path or "") data_attrs = f'data-hash="{navigation_data["file_hash"]}"' if navigate_enabled: data_attrs += f' data-path="{file_path}"' if content_type and content_type.startswith("video"): media_element = ( f'' ) else: media_element = ( f'' ) content = template.substitute( media_element=media_element, image_click_url=image_click_url or _get_random_hash(), next_url=next_url, prev_url=prev_url, filename=navigation_data["filename"], file_hash=navigation_data["file_hash"], file_path=file_path if navigate_enabled else "", toggle_url=toggle_url, extra_meta=extra_meta, play_button=play_button, folder_index=folder_index, sidebar_class=sidebar_class, container_class="", ) return HTMLResponse(content=content) @app.get("/{file_hash}") async def hash_page( file_hash: str, order: str | None = None, delay: int | None = None, request: Request = None, ): """Serve a page for a specific file hash with optional auto-refresh navigation. Args: file_hash: The hash identifier for the file. order: Navigation order - 'next' for sequential, 'random' for random. delay: Delay in seconds before auto-navigating to next file. """ if file_hash not in file_mapping: raise HTTPException(status_code=404, detail="File not found") if order is not None and order not in ("next", "random"): raise HTTPException( status_code=400, detail="Invalid order. Must be 'next' or 'random'" ) # Check if the file is encrypted and requires auth if request is not None: password = _get_zip_password(file_hash, request) if password is None: indexer = _find_indexer_for_hash(file_hash) if isinstance(indexer, ZipFileIndexer) and indexer.is_file_encrypted( file_hash ): _raise_unauthorized() navigation_data = _get_navigation_data(file_hash, order=order) if order is not None and delay is not None: # Timer mode: auto-refresh with query params refresh_url = _build_url(navigation_data["next_hash"], order=order, delay=delay) refresh_meta = ( f'' ) image_click_url = _build_url(file_hash) # Create pause button to stop auto-refresh pause_button = ( f'' ) return _render_page( navigation_data, refresh_meta, image_click_url, play_button=pause_button, current_order=order, current_delay=delay, ) else: # Browse mode play_button = ( f'' ) return _render_page( navigation_data, play_button=play_button, current_order=None, current_delay=None, ) def _find_indexer_for_hash(file_hash: str): """Find the indexer that contains the file with the given hash""" for idx in indexers: if file_hash in idx._file_mapping: return idx return None def _get_zip_password(file_hash: str, request: Request) -> bytes | None: """Get ZIP password from HTTP Basic Auth if the file is encrypted. If the file is not encrypted, returns None (no password needed). If the file is encrypted and valid credentials are provided, returns the password as bytes. If the file is encrypted but no Authorization header is present, returns None so the caller can raise a 401. Args: file_hash: The hash of the file to check. request: The incoming HTTP request. Returns: The ZIP password as bytes if encrypted and auth provided, None otherwise. """ indexer = _find_indexer_for_hash(file_hash) if not isinstance(indexer, ZipFileIndexer): return None if not indexer.is_file_encrypted(file_hash): return None # File is encrypted - check for Authorization header auth_header = request.headers.get("authorization") if not auth_header: return None scheme, _, params = auth_header.partition(" ") if scheme.lower() != "basic": return None try: decoded = base64.b64decode(params).decode() except Exception: return None _, _, password = decoded.partition(":") return password.encode() def _raise_unauthorized() -> None: """Raise an HTTP 401 Unauthorized with Basic auth challenge header.""" raise HTTPException( status_code=401, detail="Authentication required", headers={"WWW-Authenticate": 'Basic realm="zip"'}, ) def _get_random_hash() -> str: """Get a random file hash from the indexed files""" if not file_mapping: raise HTTPException(status_code=404, detail="No files indexed") keys = list(file_mapping.keys()) return random.choice(keys) def _get_sorted_hashes() -> list[str]: """Return the cached filename-sorted hash list (built at index time).""" return sorted_hashes def _get_random_navigate_path() -> str | None: """Get a random internal path from ZipFileIndexers only.""" zip_paths: list[str] = [] for indexer in indexers: if isinstance(indexer, ZipFileIndexer): zip_paths.extend(indexer._file_mapping.values()) if not zip_paths: return None return random.choice(zip_paths) def _find_hash_by_path(path: str) -> str | None: """Find the hash for a given internal ZIP path across all ZipFileIndexers. Args: path: The internal filename inside a zip. Returns: The hash if found, None otherwise. """ for idx in indexers: if isinstance(idx, ZipFileIndexer): file_hash = idx.get_hash_by_path(path) if file_hash is not None: return file_hash return None def _get_path_from_hash(file_hash: str) -> str | None: """Get the internal ZIP path for a given hash. Args: file_hash: The hash of the file. Returns: The internal filename if found, None otherwise. """ indexer = _find_indexer_for_hash(file_hash) if indexer is not None: return indexer.get_filename_by_hash(file_hash) return None def _build_navigate_url( path: str, order: str | None = None, delay: int | None = None ) -> str: """Build a /navigate URL with optional order/delay query parameters.""" base = f"{root_path}/navigate/{path}" if order is not None and delay is not None: return f"{base}?order={order}&delay={delay}" return base def _get_navigation_data_by_path( path: str, order: str | None = None ) -> dict[str, str] | None: """Get navigation data for a file identified by its internal ZIP path. Args: path: The internal filename inside a zip. order: Navigation order - 'next' for sequential, 'random' for random, or None for default browse mode. Returns: Dictionary with navigation paths and filename, or None if path not found. """ file_hash = _find_hash_by_path(path) if file_hash is None: return None keys = _get_sorted_hashes() idx = keys.index(file_hash) if order == "random": next_hash = _get_random_hash() prev_hash = _get_random_hash() else: next_hash = keys[(idx + 1) % len(keys)] prev_hash = keys[idx - 1] if idx > 0 else keys[-1] next_path = _get_path_from_hash(next_hash) or "" prev_path = _get_path_from_hash(prev_hash) or "" return { "file_hash": file_hash, "next_path": next_path, "prev_path": prev_path, "filename": path, } def _collect_zip_paths() -> list[str]: """Collect all internal file paths from all ZipFileIndexers.""" paths: list[str] = [] for indexer in indexers: if isinstance(indexer, ZipFileIndexer): paths.extend(indexer._file_mapping.values()) return sorted(paths) def _get_folder_index(path: str | None) -> tuple[list[str], list[str]]: """Get the contents of a folder (subfolders and files) at a given path. Args: path: The folder path (e.g., 'folder/' or None for root). Returns: Tuple of (subfolder_names, file_names) sorted alphabetically. """ all_paths = _collect_zip_paths() prefix = path or "" folders: set[str] = set() files: set[str] = set() for p in all_paths: if p.startswith(prefix): remainder = p[len(prefix) :] if "/" in remainder: folder_name = remainder.split("/", 1)[0] folders.add(folder_name) else: files.add(remainder) return sorted(folders), sorted(files) def _render_folder_index_html( current_path: str | None = None, current_order: str | None = None, current_delay: int | None = None, ) -> str: """Render a folder index sidebar showing only the current folder's contents. Args: current_path: The currently viewed file path (for highlighting). current_order: Current navigation order. current_delay: Current navigation delay. Returns: HTML string for the folder index sidebar. """ all_paths = _collect_zip_paths() # Determine the folder to display: # - None/"" → root # - ends with "/" → that folder # - file path → parent folder if not current_path: folder_prefix = "" elif current_path.endswith("/"): folder_prefix = current_path else: # current_path is a file — show its parent folder slash_idx = current_path.rfind("/") folder_prefix = current_path[: slash_idx + 1] if slash_idx >= 0 else "" folders: set[str] = set() files: set[str] = set() for p in all_paths: if p.startswith(folder_prefix): remainder = p[len(folder_prefix) :] if "/" in remainder: folders.add(remainder.split("/", 1)[0]) else: files.add(remainder) lines: list[str] = [] # Breadcrumb lines.append("") href_params = "" if current_order is not None and current_delay is not None: href_params = f"?order={current_order}&delay={current_delay}" for folder in sorted(folders): folder_path = f"{folder_prefix}{folder}/" if folder_prefix else f"{folder}/" lines.append( f'
' f'📁 {folder}' f"
" ) for file in sorted(files): file_path = f"{folder_prefix}{file}" if folder_prefix else file is_current = file_path == current_path cls = "index-item file current" if is_current else "index-item file" lines.append( f'
' f'📄 {file}' f"
" ) return "\n".join(lines) def _render_folder_index_page( path: str, current_order: str | None = None, current_delay: int | None = None, ) -> HTMLResponse: """Render a folder index page showing the current folder's contents. Args: path: The folder path (e.g., 'folder/'). current_order: Current navigation order. current_delay: Current navigation delay. Returns: HTMLResponse with the folder index page. """ with open("frontend.html") as f: content = f.read() template = string.Template(content) # Build folder index sidebar HTML showing current folder contents folder_path = path.rstrip("/") if folder_path: folder_path += "/" folder_index_html = _render_folder_index_html( current_path=folder_path, current_order=current_order, current_delay=current_delay, ) content = template.substitute( media_element="", image_click_url="#", next_url="#", prev_url="#", filename=path.rstrip("/"), file_hash="", file_path=path.rstrip("/"), toggle_url="#", extra_meta="", play_button="", folder_index=folder_index_html, sidebar_class="", container_class="hidden", ) return HTMLResponse(content=content) @app.get("/navigate/{path:path}") async def navigate_page( path: str, order: str | None = None, delay: int | None = None, request: Request = None, ): """Serve a page for a file identified by its internal ZIP path. Navigation links use /navigate/{path} URLs instead of hashed URLs. Args: path: The internal filename inside a zip. order: Navigation order - 'next' for sequential, 'random' for random. delay: Delay in seconds before auto-navigating to next file. """ if not navigate_enabled: raise HTTPException(status_code=404, detail="File not found") if order is not None and order not in ("next", "random"): raise HTTPException( status_code=400, detail="Invalid order. Must be 'next' or 'random'" ) # Handle folder index view (path ends with / or is root) if not path or path.endswith("/"): folder_path = path.rstrip("/") or None folders, files = _get_folder_index(folder_path) if not folders and not files: raise HTTPException(status_code=404, detail="File not found") return _render_folder_index_page( path if path else "/", current_order=order, current_delay=delay, ) navigation_data = _get_navigation_data_by_path(path, order=order) if navigation_data is None: raise HTTPException(status_code=404, detail="File not found") file_hash = navigation_data["file_hash"] # Check if the file is encrypted and requires auth if request is not None: password = _get_zip_password(file_hash, request) if password is None: indexer = _find_indexer_for_hash(file_hash) if isinstance(indexer, ZipFileIndexer) and indexer.is_file_encrypted( file_hash ): _raise_unauthorized() if order is not None and delay is not None: # Timer mode: auto-refresh with query params refresh_url = _build_navigate_url( navigation_data["next_path"], order=order, delay=delay ) refresh_meta = ( f'' ) image_click_url = _build_navigate_url(path) # Create pause button to stop auto-refresh pause_button = f'' return _render_page( navigation_data, refresh_meta, image_click_url, play_button=pause_button, current_order=order, current_delay=delay, use_navigate_urls=True, ) else: # Browse mode play_button = ( f'' ) random_path = _get_random_navigate_path() image_click_url = ( f"{root_path}/navigate/{random_path}" if random_path else _get_random_hash() ) return _render_page( navigation_data, image_click_url=image_click_url, play_button=play_button, current_order=None, current_delay=None, use_navigate_urls=True, ) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run the file server") parser.add_argument( "source", type=str, help="Path to directory, ZIP archive, or glob pattern (e.g., *.zip, path/to/zips/*.zip)", ) parser.add_argument("--host", type=str, default="0.0.0.0", help="Host to bind to") parser.add_argument("--port", type=int, default=8000, help="Port to bind to") parser.add_argument( "--salt", type=str, default=None, help="Salt for hashing file paths" ) parser.add_argument( "--navigate", action="store_true", help="Enable path-based navigation (/navigate/{path})", ) parser.add_argument( "--root-path", type=str, default=None, help="URL path prefix (e.g., /images for serving at example.com/images)", ) args = parser.parse_args() initialize_server(args) import uvicorn uvicorn.run(app, host=args.host, port=args.port)