- Deleted test_auth.py (auth no longer exists) - Rewrote TestOrderDelayRoute -> TestRootRedirectWithOrderDelay using query params - Updated TestHashPageWithRefresh to use ?order=...&delay=... URLs - Added play button query param assertion in TestHashPage - Removed password=None from test_navigation.py seeded_indexers fixture - Formatted with black, all 59 tests passing
345 lines
11 KiB
Python
345 lines
11 KiB
Python
import argparse
|
|
import hashlib
|
|
import mimetypes
|
|
import os
|
|
import random
|
|
import secrets
|
|
import string
|
|
import zipfile
|
|
from glob import glob
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import (
|
|
FileResponse,
|
|
HTMLResponse,
|
|
RedirectResponse,
|
|
StreamingResponse,
|
|
)
|
|
|
|
app = FastAPI()
|
|
file_mapping = {}
|
|
indexers = []
|
|
|
|
|
|
class FileIndexer:
|
|
def __init__(self, path: str, salt: str | None = None):
|
|
self.path = Path(path)
|
|
self._salt = salt
|
|
self._file_mapping = self._index()
|
|
|
|
@property
|
|
def salt(self) -> str:
|
|
"""Generate a random salt for hashing"""
|
|
if self._salt is None:
|
|
self._salt = secrets.token_hex(16)
|
|
return self._salt
|
|
|
|
def _hash_path(self, filepath: str) -> str:
|
|
"""Generate a salted hash of the file path"""
|
|
return hashlib.sha256((filepath + self.salt).encode()).hexdigest()
|
|
|
|
def _index(self) -> dict[str, str]:
|
|
"""Index all files in the directory"""
|
|
mapping = {}
|
|
for root, _, files in os.walk(self.path):
|
|
for file in files:
|
|
filepath = os.path.join(root, file)
|
|
file_hash = self._hash_path(filepath)
|
|
mapping[file_hash] = filepath
|
|
return mapping
|
|
|
|
def get_file_by_hash(self, file_hash: str):
|
|
"""Get file content by hash"""
|
|
if file_hash not in self._file_mapping:
|
|
return None
|
|
|
|
file_path = self._file_mapping[file_hash]
|
|
with open(file_path, "rb") as f:
|
|
yield from f
|
|
|
|
def get_filename_by_hash(self, file_hash: str) -> str | None:
|
|
"""Get filename by hash"""
|
|
if file_hash not in self._file_mapping:
|
|
return None
|
|
return self._file_mapping[file_hash]
|
|
|
|
|
|
class ZipFileIndexer(FileIndexer):
|
|
def _index(self) -> dict[str, str]:
|
|
"""Index all files in the zip file"""
|
|
mapping = {}
|
|
with zipfile.ZipFile(self.path, "r") as zip_file:
|
|
for file_info in zip_file.infolist():
|
|
if not file_info.is_dir():
|
|
file_hash = self._hash_path(file_info.filename)
|
|
mapping[file_hash] = file_info.filename
|
|
return mapping
|
|
|
|
def get_file_by_hash(self, file_hash: str):
|
|
"""Get file content by hash"""
|
|
if file_hash not in self._file_mapping:
|
|
return None
|
|
|
|
filename = self._file_mapping[file_hash]
|
|
|
|
with zipfile.ZipFile(self.path, "r") as zip_file:
|
|
yield from BytesIO(zip_file.read(filename))
|
|
|
|
def get_filename_by_hash(self, file_hash: str) -> str | None:
|
|
"""Get filename by hash"""
|
|
if file_hash not in self._file_mapping:
|
|
return None
|
|
return self._file_mapping[file_hash]
|
|
|
|
|
|
INDEXER_MAP = {".zip": ZipFileIndexer}
|
|
|
|
|
|
def initialize_server(args: argparse.Namespace):
|
|
"""Initialize the server with directory or glob indexing"""
|
|
global file_mapping, indexers
|
|
|
|
src_path = Path(args.source)
|
|
|
|
shared_salt = args.salt
|
|
if shared_salt is None:
|
|
shared_salt = secrets.token_hex(16)
|
|
|
|
if src_path.is_dir():
|
|
indexer = FileIndexer(str(src_path), shared_salt)
|
|
indexers.append(indexer)
|
|
file_mapping.update(indexer._file_mapping)
|
|
else:
|
|
pattern = args.source
|
|
matching_files = glob(pattern)
|
|
if not matching_files:
|
|
raise SystemExit(f"No files match pattern {pattern}")
|
|
|
|
for file_path in matching_files:
|
|
file_ext = Path(file_path).suffix
|
|
if file_ext in INDEXER_MAP:
|
|
indexer = INDEXER_MAP[file_ext](file_path, shared_salt)
|
|
indexers.append(indexer)
|
|
file_mapping.update(indexer._file_mapping)
|
|
|
|
print(f"Indexed {len(file_mapping)} files from {len(indexers)} source(s)")
|
|
|
|
|
|
@app.get("/api/health")
|
|
async def health_check():
|
|
return {"status": "healthy", "file_count": len(file_mapping)}
|
|
|
|
|
|
@app.get("/api/{file_hash}/data")
|
|
async def get_file_data(file_hash: str):
|
|
"""Serve a specific file by its hash"""
|
|
if file_hash not in file_mapping:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
indexer = _find_indexer_for_hash(file_hash)
|
|
if not indexer:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
filename = indexer.get_filename_by_hash(file_hash)
|
|
content_type, _ = mimetypes.guess_type(filename or "")
|
|
if not content_type:
|
|
content_type = "application/octet-stream"
|
|
|
|
return StreamingResponse(
|
|
indexer.get_file_by_hash(file_hash),
|
|
media_type=content_type,
|
|
headers={
|
|
"Content-Disposition": f"inline; filename={os.path.basename(filename or '')}",
|
|
},
|
|
)
|
|
|
|
|
|
def _build_url(
|
|
file_hash: str, order: str | None = None, delay: int | None = None
|
|
) -> str:
|
|
"""Build a URL with optional order/delay query parameters."""
|
|
base = "/{hash}".format(hash=file_hash)
|
|
if order is not None and delay is not None:
|
|
return "{base}?order={order}&delay={delay}".format(
|
|
base=base, order=order, delay=delay
|
|
)
|
|
return base
|
|
|
|
|
|
@app.get("/")
|
|
async def root(order: str | None = None, delay: int | None = None):
|
|
"""Redirect to a random file hash"""
|
|
random_hash = _get_random_hash()
|
|
return RedirectResponse(url=_build_url(random_hash, order, delay))
|
|
|
|
|
|
def _get_navigation_data(file_hash: str, order: str | None = None):
|
|
"""Get navigation data for a file hash.
|
|
|
|
Args:
|
|
file_hash: The current file's hash.
|
|
order: Navigation order - 'next' for sequential, 'random' for random,
|
|
or None for default browse mode.
|
|
|
|
Returns:
|
|
Dictionary with navigation hashes and filename.
|
|
"""
|
|
keys = list(file_mapping.keys())
|
|
idx = keys.index(file_hash)
|
|
|
|
if order == "random":
|
|
next_hash = _get_random_hash()
|
|
prev_hash = _get_random_hash()
|
|
else:
|
|
next_hash = keys[(idx + 1) % len(keys)]
|
|
prev_hash = keys[idx - 1] if idx > 0 else keys[-1]
|
|
|
|
indexer = _find_indexer_for_hash(file_hash)
|
|
filename = indexer.get_filename_by_hash(file_hash) if indexer else ""
|
|
|
|
return {
|
|
"file_hash": file_hash,
|
|
"next_hash": next_hash,
|
|
"prev_hash": prev_hash,
|
|
"filename": filename,
|
|
}
|
|
|
|
|
|
def _render_page(
|
|
navigation_data: dict,
|
|
extra_meta: str = "",
|
|
image_click_url: str = "",
|
|
play_button: str = "",
|
|
current_order: str | None = None,
|
|
current_delay: int | None = None,
|
|
) -> HTMLResponse:
|
|
"""Render the frontend page with navigation data"""
|
|
with open("frontend.html", "r") as f:
|
|
content = f.read()
|
|
|
|
template = string.Template(content)
|
|
|
|
# Generate navigation URLs based on current mode
|
|
if current_order is not None:
|
|
# Timer mode: preserve current order and delay via query params
|
|
next_url = _build_url(
|
|
navigation_data["next_hash"],
|
|
order=current_order,
|
|
delay=current_delay,
|
|
)
|
|
prev_url = _build_url(
|
|
navigation_data["prev_hash"],
|
|
order=current_order,
|
|
delay=current_delay,
|
|
)
|
|
else:
|
|
# Browse mode: generate browse mode URLs
|
|
next_url = "/{next_hash}".format(next_hash=navigation_data["next_hash"])
|
|
prev_url = "/{prev_hash}".format(prev_hash=navigation_data["prev_hash"])
|
|
|
|
content = template.substitute(
|
|
img_url="/api/{file_hash}/data".format(file_hash=navigation_data["file_hash"]),
|
|
image_click_url=image_click_url or _get_random_hash(),
|
|
next_url=next_url,
|
|
prev_url=prev_url,
|
|
filename=navigation_data["filename"],
|
|
extra_meta=extra_meta,
|
|
play_button=play_button,
|
|
)
|
|
|
|
return HTMLResponse(content=content)
|
|
|
|
|
|
@app.get("/{file_hash}")
|
|
async def hash_page(file_hash: str, order: str | None = None, delay: int | None = None):
|
|
"""Serve a page for a specific file hash with optional auto-refresh navigation.
|
|
|
|
Args:
|
|
file_hash: The hash identifier for the file.
|
|
order: Navigation order - 'next' for sequential, 'random' for random.
|
|
delay: Delay in seconds before auto-navigating to next file.
|
|
"""
|
|
if file_hash not in file_mapping:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
if order is not None and order not in ("next", "random"):
|
|
raise HTTPException(
|
|
status_code=400, detail="Invalid order. Must be 'next' or 'random'"
|
|
)
|
|
|
|
navigation_data = _get_navigation_data(file_hash, order=order)
|
|
|
|
if order is not None and delay is not None:
|
|
# Timer mode: auto-refresh with query params
|
|
refresh_url = _build_url(navigation_data["next_hash"], order=order, delay=delay)
|
|
refresh_meta = (
|
|
f'<meta http-equiv="refresh" content="{delay};url={refresh_url}">'
|
|
)
|
|
image_click_url = _build_url(file_hash)
|
|
|
|
# Create pause button to stop auto-refresh
|
|
pause_button = (
|
|
'<a href="{file_hash}" class="play-btn" title="Pause">⏸</a>'.format(
|
|
file_hash=file_hash
|
|
)
|
|
)
|
|
|
|
return _render_page(
|
|
navigation_data,
|
|
refresh_meta,
|
|
image_click_url,
|
|
play_button=pause_button,
|
|
current_order=order,
|
|
current_delay=delay,
|
|
)
|
|
else:
|
|
# Browse mode
|
|
play_button = '<a href="/{file_hash}?order=next&delay=5" class="play-btn" title="Play next 5">⏵</a>'.format(
|
|
file_hash=file_hash
|
|
)
|
|
return _render_page(
|
|
navigation_data,
|
|
play_button=play_button,
|
|
current_order=None,
|
|
current_delay=None,
|
|
)
|
|
|
|
|
|
def _find_indexer_for_hash(file_hash: str):
|
|
"""Find the indexer that contains the file with the given hash"""
|
|
for idx in indexers:
|
|
if file_hash in idx._file_mapping:
|
|
return idx
|
|
return None
|
|
|
|
|
|
def _get_random_hash() -> str:
|
|
"""Get a random file hash from the indexed files"""
|
|
if not file_mapping:
|
|
raise HTTPException(status_code=404, detail="No files indexed")
|
|
keys = list(file_mapping.keys())
|
|
return random.choice(keys)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Run the file server")
|
|
parser.add_argument(
|
|
"source",
|
|
type=str,
|
|
help="Path to directory, ZIP archive, or glob pattern (e.g., *.zip, path/to/zips/*.zip)",
|
|
)
|
|
parser.add_argument("--host", type=str, default="0.0.0.0", help="Host to bind to")
|
|
parser.add_argument("--port", type=int, default=8000, help="Port to bind to")
|
|
parser.add_argument(
|
|
"--salt", type=str, default=None, help="Salt for hashing file paths"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
initialize_server(args)
|
|
|
|
import uvicorn
|
|
|
|
uvicorn.run(app, host=args.host, port=args.port)
|