import argparse
import base64
import hashlib
import mimetypes
import os
import random
import secrets
import string
import zipfile
from glob import glob
from io import BytesIO
from pathlib import Path
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import (
HTMLResponse,
RedirectResponse,
StreamingResponse,
)
app = FastAPI()
file_mapping = {}
indexers = []
class FileIndexer:
def __init__(self, path: str, salt: str | None = None):
self.path = Path(path)
self._salt = salt
self._file_mapping = self._index()
@property
def salt(self) -> str:
"""Generate a random salt for hashing"""
if self._salt is None:
self._salt = secrets.token_hex(16)
return self._salt
def _hash_path(self, filepath: str) -> str:
"""Generate a salted hash of the file path"""
return hashlib.sha256((filepath + self.salt).encode()).hexdigest()
def _index(self) -> dict[str, str]:
"""Index all files in the directory"""
mapping = {}
for root, _, files in os.walk(self.path):
for file in files:
filepath = os.path.join(root, file)
file_hash = self._hash_path(filepath)
mapping[file_hash] = filepath
return mapping
def get_file_by_hash(self, file_hash: str):
"""Get file content by hash"""
if file_hash not in self._file_mapping:
return None
file_path = self._file_mapping[file_hash]
with open(file_path, "rb") as f:
yield from f
def get_filename_by_hash(self, file_hash: str) -> str | None:
"""Get filename by hash"""
if file_hash not in self._file_mapping:
return None
return self._file_mapping[file_hash]
class ZipFileIndexer(FileIndexer):
def __init__(self, path: str, salt: str | None = None):
super().__init__(path, salt)
self._password_cache: dict[str, bool] = {}
def _open_zip(self):
"""Open the ZIP file for reading."""
return zipfile.ZipFile(self.path, "r")
def _index(self) -> dict[str, str]:
"""Index all files in the zip file"""
mapping = {}
with self._open_zip() as zip_file:
for file_info in zip_file.infolist():
if not file_info.is_dir():
file_hash = self._hash_path(file_info.filename)
mapping[file_hash] = file_info.filename
return mapping
def _needs_password(self, filename: str) -> bool:
"""Check if a file inside the zip requires a password.
Uses lazy detection: tries to read the file without a password.
Results are cached so the ZIP is not reopened on subsequent calls.
Args:
filename: The internal filename inside the zip.
Returns:
True if the file is password-protected, False otherwise.
"""
if filename in self._password_cache:
return self._password_cache[filename]
try:
with self._open_zip() as zip_file:
zip_file.read(filename)
self._password_cache[filename] = False
return False
except Exception:
self._password_cache[filename] = True
return True
def is_file_encrypted(self, file_hash: str) -> bool:
"""Check if the file identified by hash is password-protected.
Args:
file_hash: The hash of the file.
Returns:
True if the file requires a password, False otherwise.
"""
filename = self._file_mapping.get(file_hash)
if filename is None:
return False
return self._needs_password(filename)
def get_file_by_hash(self, file_hash: str, password: bytes | None = None):
"""Get file content by hash.
Args:
file_hash: The hash of the file.
password: Optional password for encrypted files.
"""
if file_hash not in self._file_mapping:
return None
filename = self._file_mapping[file_hash]
with self._open_zip() as zip_file:
yield from BytesIO(zip_file.read(filename, pwd=password))
def get_filename_by_hash(self, file_hash: str) -> str | None:
"""Get filename by hash"""
if file_hash not in self._file_mapping:
return None
return self._file_mapping[file_hash]
INDEXER_MAP = {".zip": ZipFileIndexer}
def initialize_server(args: argparse.Namespace):
"""Initialize the server with directory or glob indexing"""
global file_mapping, indexers
src_path = Path(args.source)
shared_salt = args.salt
if shared_salt is None:
shared_salt = secrets.token_hex(16)
if src_path.is_dir():
indexer = FileIndexer(str(src_path), shared_salt)
indexers.append(indexer)
file_mapping.update(indexer._file_mapping)
else:
pattern = args.source
matching_files = glob(pattern)
if not matching_files:
raise SystemExit(f"No files match pattern {pattern}")
for file_path in matching_files:
file_ext = Path(file_path).suffix
if file_ext in INDEXER_MAP:
indexer = INDEXER_MAP[file_ext](file_path, shared_salt)
indexers.append(indexer)
file_mapping.update(indexer._file_mapping)
print(f"Indexed {len(file_mapping)} files from {len(indexers)} source(s)")
@app.get("/api/health")
async def health_check():
return {"status": "healthy", "file_count": len(file_mapping)}
@app.get("/api/{file_hash}/data")
async def get_file_data(file_hash: str, request: Request):
"""Serve a specific file by its hash."""
if file_hash not in file_mapping:
raise HTTPException(status_code=404, detail="File not found")
indexer = _find_indexer_for_hash(file_hash)
if not indexer:
raise HTTPException(status_code=404, detail="File not found")
password = _get_zip_password(file_hash, request)
if (
password is None
and isinstance(indexer, ZipFileIndexer)
and indexer.is_file_encrypted(file_hash)
):
_raise_unauthorized()
filename = indexer.get_filename_by_hash(file_hash)
content_type, _ = mimetypes.guess_type(filename or "")
if not content_type:
content_type = "application/octet-stream"
if isinstance(indexer, ZipFileIndexer):
try:
content = b"".join(indexer.get_file_by_hash(file_hash, password=password))
except RuntimeError:
_raise_unauthorized()
return StreamingResponse(
iter([content]),
media_type=content_type,
headers={
"Content-Disposition": f"inline; filename={os.path.basename(filename or '')}",
},
)
else:
return StreamingResponse(
indexer.get_file_by_hash(file_hash),
media_type=content_type,
headers={
"Content-Disposition": f"inline; filename={os.path.basename(filename or '')}",
},
)
def _build_url(
file_hash: str, order: str | None = None, delay: int | None = None
) -> str:
"""Build a URL with optional order/delay query parameters."""
base = f"/{file_hash}"
if order is not None and delay is not None:
return f"{base}?order={order}&delay={delay}"
return base
@app.get("/")
async def root(
order: str | None = None, delay: int | None = None, request: Request = None
):
"""Redirect to a random file hash."""
random_hash = _get_random_hash()
# Check if the random file is encrypted and requires auth
if request is not None:
password = _get_zip_password(random_hash, request)
if password is None:
indexer = _find_indexer_for_hash(random_hash)
if isinstance(indexer, ZipFileIndexer) and indexer.is_file_encrypted(
random_hash
):
_raise_unauthorized()
return RedirectResponse(url=_build_url(random_hash, order, delay))
def _get_navigation_data(file_hash: str, order: str | None = None):
"""Get navigation data for a file hash.
Args:
file_hash: The current file's hash.
order: Navigation order - 'next' for sequential, 'random' for random,
or None for default browse mode.
Returns:
Dictionary with navigation hashes and filename.
"""
keys = list(file_mapping.keys())
idx = keys.index(file_hash)
if order == "random":
next_hash = _get_random_hash()
prev_hash = _get_random_hash()
else:
next_hash = keys[(idx + 1) % len(keys)]
prev_hash = keys[idx - 1] if idx > 0 else keys[-1]
indexer = _find_indexer_for_hash(file_hash)
filename = indexer.get_filename_by_hash(file_hash) if indexer else ""
return {
"file_hash": file_hash,
"next_hash": next_hash,
"prev_hash": prev_hash,
"filename": filename,
}
def _render_page(
navigation_data: dict,
extra_meta: str = "",
image_click_url: str = "",
play_button: str = "",
current_order: str | None = None,
current_delay: int | None = None,
) -> HTMLResponse:
"""Render the frontend page with navigation data"""
with open("frontend.html") as f:
content = f.read()
template = string.Template(content)
# Generate navigation URLs based on current mode
if current_order is not None:
# Timer mode: preserve current order and delay via query params
next_url = _build_url(
navigation_data["next_hash"],
order=current_order,
delay=current_delay,
)
prev_url = _build_url(
navigation_data["prev_hash"],
order=current_order,
delay=current_delay,
)
else:
# Browse mode: generate browse mode URLs
next_url = "/{next_hash}".format(next_hash=navigation_data["next_hash"])
prev_url = "/{prev_hash}".format(prev_hash=navigation_data["prev_hash"])
content = template.substitute(
img_url="/api/{file_hash}/data".format(file_hash=navigation_data["file_hash"]),
image_click_url=image_click_url or _get_random_hash(),
next_url=next_url,
prev_url=prev_url,
filename=navigation_data["filename"],
extra_meta=extra_meta,
play_button=play_button,
)
return HTMLResponse(content=content)
@app.get("/{file_hash}")
async def hash_page(
file_hash: str,
order: str | None = None,
delay: int | None = None,
request: Request = None,
):
"""Serve a page for a specific file hash with optional auto-refresh navigation.
Args:
file_hash: The hash identifier for the file.
order: Navigation order - 'next' for sequential, 'random' for random.
delay: Delay in seconds before auto-navigating to next file.
"""
if file_hash not in file_mapping:
raise HTTPException(status_code=404, detail="File not found")
if order is not None and order not in ("next", "random"):
raise HTTPException(
status_code=400, detail="Invalid order. Must be 'next' or 'random'"
)
# Check if the file is encrypted and requires auth
if request is not None:
password = _get_zip_password(file_hash, request)
if password is None:
indexer = _find_indexer_for_hash(file_hash)
if isinstance(indexer, ZipFileIndexer) and indexer.is_file_encrypted(
file_hash
):
_raise_unauthorized()
navigation_data = _get_navigation_data(file_hash, order=order)
if order is not None and delay is not None:
# Timer mode: auto-refresh with query params
refresh_url = _build_url(navigation_data["next_hash"], order=order, delay=delay)
refresh_meta = (
f''
)
image_click_url = _build_url(file_hash)
# Create pause button to stop auto-refresh
pause_button = f'⏸'
return _render_page(
navigation_data,
refresh_meta,
image_click_url,
play_button=pause_button,
current_order=order,
current_delay=delay,
)
else:
# Browse mode
play_button = (
f'⏵'
)
return _render_page(
navigation_data,
play_button=play_button,
current_order=None,
current_delay=None,
)
def _find_indexer_for_hash(file_hash: str):
"""Find the indexer that contains the file with the given hash"""
for idx in indexers:
if file_hash in idx._file_mapping:
return idx
return None
def _get_zip_password(file_hash: str, request: Request) -> bytes | None:
"""Get ZIP password from HTTP Basic Auth if the file is encrypted.
If the file is not encrypted, returns None (no password needed).
If the file is encrypted and valid credentials are provided, returns
the password as bytes.
If the file is encrypted but no Authorization header is present, returns
None so the caller can raise a 401.
Args:
file_hash: The hash of the file to check.
request: The incoming HTTP request.
Returns:
The ZIP password as bytes if encrypted and auth provided, None otherwise.
"""
indexer = _find_indexer_for_hash(file_hash)
if not isinstance(indexer, ZipFileIndexer):
return None
if not indexer.is_file_encrypted(file_hash):
return None
# File is encrypted - check for Authorization header
auth_header = request.headers.get("authorization")
if not auth_header:
return None
scheme, _, params = auth_header.partition(" ")
if scheme.lower() != "basic":
return None
try:
decoded = base64.b64decode(params).decode()
except Exception:
return None
_, _, password = decoded.partition(":")
return password.encode()
def _raise_unauthorized() -> None:
"""Raise an HTTP 401 Unauthorized with Basic auth challenge header."""
raise HTTPException(
status_code=401,
detail="Authentication required",
headers={"WWW-Authenticate": 'Basic realm="zip"'},
)
def _get_random_hash() -> str:
"""Get a random file hash from the indexed files"""
if not file_mapping:
raise HTTPException(status_code=404, detail="No files indexed")
keys = list(file_mapping.keys())
return random.choice(keys)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run the file server")
parser.add_argument(
"source",
type=str,
help="Path to directory, ZIP archive, or glob pattern (e.g., *.zip, path/to/zips/*.zip)",
)
parser.add_argument("--host", type=str, default="0.0.0.0", help="Host to bind to")
parser.add_argument("--port", type=int, default=8000, help="Port to bind to")
parser.add_argument(
"--salt", type=str, default=None, help="Salt for hashing file paths"
)
args = parser.parse_args()
initialize_server(args)
import uvicorn
uvicorn.run(app, host=args.host, port=args.port)