953 lines
30 KiB
Python
953 lines
30 KiB
Python
import argparse
|
|
import base64
|
|
import hashlib
|
|
import mimetypes
|
|
import os
|
|
import random
|
|
import secrets
|
|
import string
|
|
import zipfile
|
|
from glob import glob
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
|
|
from fastapi import FastAPI, HTTPException, Request
|
|
from fastapi.responses import (
|
|
HTMLResponse,
|
|
RedirectResponse,
|
|
StreamingResponse,
|
|
)
|
|
|
|
app = FastAPI()
|
|
file_mapping: dict[str, str] = {}
|
|
indexers = []
|
|
sorted_hashes: list[str] = []
|
|
navigate_enabled = False
|
|
root_path: str = ""
|
|
|
|
|
|
class FileIndexer:
|
|
def __init__(self, path: str, salt: str | None = None):
|
|
self.path = Path(path)
|
|
self._salt = salt
|
|
self._file_mapping = self._index()
|
|
|
|
@property
|
|
def salt(self) -> str:
|
|
"""Generate a random salt for hashing"""
|
|
if self._salt is None:
|
|
self._salt = secrets.token_hex(16)
|
|
return self._salt
|
|
|
|
def _hash_path(self, filepath: str) -> str:
|
|
"""Generate a salted hash of the file path"""
|
|
return hashlib.sha256((filepath + self.salt).encode()).hexdigest()
|
|
|
|
def _index(self) -> dict[str, str]:
|
|
"""Index all files in the directory"""
|
|
mapping = {}
|
|
for root, _, files in os.walk(self.path):
|
|
for file in files:
|
|
filepath = os.path.join(root, file)
|
|
file_hash = self._hash_path(filepath)
|
|
mapping[file_hash] = filepath
|
|
return mapping
|
|
|
|
def get_file_by_hash(self, file_hash: str):
|
|
"""Get file content by hash"""
|
|
if file_hash not in self._file_mapping:
|
|
return None
|
|
|
|
file_path = self._file_mapping[file_hash]
|
|
with open(file_path, "rb") as f:
|
|
yield from f
|
|
|
|
def get_filename_by_hash(self, file_hash: str) -> str | None:
|
|
"""Get filename by hash"""
|
|
if file_hash not in self._file_mapping:
|
|
return None
|
|
return self._file_mapping[file_hash]
|
|
|
|
|
|
class ZipFileIndexer(FileIndexer):
|
|
def __init__(self, path: str, salt: str | None = None):
|
|
super().__init__(path, salt)
|
|
self._password_cache: dict[str, bool] = {}
|
|
|
|
def _open_zip(self):
|
|
"""Open the ZIP file for reading."""
|
|
return zipfile.ZipFile(self.path, "r")
|
|
|
|
def _index(self) -> dict[str, str]:
|
|
"""Index all files in the zip file"""
|
|
mapping = {}
|
|
with self._open_zip() as zip_file:
|
|
for file_info in zip_file.infolist():
|
|
if not file_info.is_dir():
|
|
file_hash = self._hash_path(file_info.filename)
|
|
mapping[file_hash] = file_info.filename
|
|
return mapping
|
|
|
|
def _needs_password(self, filename: str) -> bool:
|
|
"""Check if a file inside the zip requires a password.
|
|
|
|
Uses lazy detection: tries to read the file without a password.
|
|
Results are cached so the ZIP is not reopened on subsequent calls.
|
|
|
|
Args:
|
|
filename: The internal filename inside the zip.
|
|
|
|
Returns:
|
|
True if the file is password-protected, False otherwise.
|
|
"""
|
|
if filename in self._password_cache:
|
|
return self._password_cache[filename]
|
|
|
|
try:
|
|
with self._open_zip() as zip_file:
|
|
zip_file.read(filename)
|
|
self._password_cache[filename] = False
|
|
return False
|
|
except Exception:
|
|
self._password_cache[filename] = True
|
|
return True
|
|
|
|
def is_file_encrypted(self, file_hash: str) -> bool:
|
|
"""Check if the file identified by hash is password-protected.
|
|
|
|
Args:
|
|
file_hash: The hash of the file.
|
|
|
|
Returns:
|
|
True if the file requires a password, False otherwise.
|
|
"""
|
|
filename = self._file_mapping.get(file_hash)
|
|
if filename is None:
|
|
return False
|
|
return self._needs_password(filename)
|
|
|
|
def get_file_by_hash(self, file_hash: str, password: bytes | None = None):
|
|
"""Get file content by hash.
|
|
|
|
Args:
|
|
file_hash: The hash of the file.
|
|
password: Optional password for encrypted files.
|
|
"""
|
|
if file_hash not in self._file_mapping:
|
|
return None
|
|
|
|
filename = self._file_mapping[file_hash]
|
|
|
|
with self._open_zip() as zip_file:
|
|
yield from BytesIO(zip_file.read(filename, pwd=password))
|
|
|
|
def get_filename_by_hash(self, file_hash: str) -> str | None:
|
|
"""Get filename by hash"""
|
|
if file_hash not in self._file_mapping:
|
|
return None
|
|
return self._file_mapping[file_hash]
|
|
|
|
def get_hash_by_path(self, path: str) -> str | None:
|
|
"""Reverse-lookup: given an internal ZIP filename, find its hash.
|
|
|
|
Args:
|
|
path: The internal filename inside the zip.
|
|
|
|
Returns:
|
|
The hash if found, None otherwise.
|
|
"""
|
|
for file_hash, filename in self._file_mapping.items():
|
|
if filename == path:
|
|
return file_hash
|
|
return None
|
|
|
|
|
|
INDEXER_MAP = {".zip": ZipFileIndexer}
|
|
|
|
|
|
def initialize_server(args: argparse.Namespace):
|
|
"""Initialize the server with directory or glob indexing"""
|
|
global file_mapping, indexers, sorted_hashes, navigate_enabled, root_path
|
|
navigate_enabled = args.navigate
|
|
root_path = getattr(args, "root_path", None) or ""
|
|
if root_path:
|
|
# Ensure root_path starts with / but not ends with /
|
|
root_path = "/" + root_path.strip("/")
|
|
app.root_path = root_path
|
|
|
|
src_path = Path(args.source)
|
|
|
|
shared_salt = args.salt
|
|
if shared_salt is None:
|
|
shared_salt = secrets.token_hex(16)
|
|
|
|
if src_path.is_dir():
|
|
indexer = FileIndexer(str(src_path), shared_salt)
|
|
indexers.append(indexer)
|
|
file_mapping.update(indexer._file_mapping)
|
|
else:
|
|
pattern = args.source
|
|
matching_files = glob(pattern)
|
|
if not matching_files:
|
|
raise SystemExit(f"No files match pattern {pattern}")
|
|
|
|
for file_path in matching_files:
|
|
file_ext = Path(file_path).suffix
|
|
if file_ext in INDEXER_MAP:
|
|
indexer = INDEXER_MAP[file_ext](file_path, shared_salt)
|
|
indexers.append(indexer)
|
|
file_mapping.update(indexer._file_mapping)
|
|
|
|
print(f"Indexed {len(file_mapping)} files from {len(indexers)} source(s)")
|
|
|
|
# Build a sorted index for filename-ordered navigation
|
|
sorted_hashes = sorted(
|
|
file_mapping.keys(),
|
|
key=lambda h: (
|
|
(indexer.get_filename_by_hash(h) or "").lower()
|
|
if (indexer := _find_indexer_for_hash(h))
|
|
else ""
|
|
),
|
|
)
|
|
|
|
|
|
@app.get("/api/health")
|
|
async def health_check():
|
|
return {"status": "healthy", "file_count": len(file_mapping)}
|
|
|
|
|
|
@app.get("/api/{file_hash}/data")
|
|
async def get_file_data(file_hash: str, request: Request):
|
|
"""Serve a specific file by its hash."""
|
|
if file_hash not in file_mapping:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
indexer = _find_indexer_for_hash(file_hash)
|
|
if not indexer:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
password = _get_zip_password(file_hash, request)
|
|
if (
|
|
password is None
|
|
and isinstance(indexer, ZipFileIndexer)
|
|
and indexer.is_file_encrypted(file_hash)
|
|
):
|
|
_raise_unauthorized()
|
|
|
|
filename = indexer.get_filename_by_hash(file_hash)
|
|
content_type, _ = mimetypes.guess_type(filename or "")
|
|
if not content_type:
|
|
content_type = "application/octet-stream"
|
|
|
|
if isinstance(indexer, ZipFileIndexer):
|
|
try:
|
|
content = b"".join(indexer.get_file_by_hash(file_hash, password=password))
|
|
except RuntimeError:
|
|
_raise_unauthorized()
|
|
return StreamingResponse(
|
|
iter([content]),
|
|
media_type=content_type,
|
|
headers={
|
|
"Content-Disposition": f"inline; filename={os.path.basename(filename or '')}",
|
|
},
|
|
)
|
|
else:
|
|
return StreamingResponse(
|
|
indexer.get_file_by_hash(file_hash),
|
|
media_type=content_type,
|
|
headers={
|
|
"Content-Disposition": f"inline; filename={os.path.basename(filename or '')}",
|
|
},
|
|
)
|
|
|
|
|
|
def _build_url(
|
|
file_hash: str, order: str | None = None, delay: int | None = None
|
|
) -> str:
|
|
"""Build a URL with optional order/delay query parameters."""
|
|
base = f"{root_path}/{file_hash}"
|
|
if order is not None and delay is not None:
|
|
return f"{base}?order={order}&delay={delay}"
|
|
return base
|
|
|
|
|
|
@app.get("/")
|
|
async def root(
|
|
order: str | None = None, delay: int | None = None, request: Request = None
|
|
):
|
|
"""Redirect to a random file hash."""
|
|
random_hash = _get_random_hash()
|
|
|
|
# Check if the random file is encrypted and requires auth
|
|
if request is not None:
|
|
password = _get_zip_password(random_hash, request)
|
|
if password is None:
|
|
indexer = _find_indexer_for_hash(random_hash)
|
|
if isinstance(indexer, ZipFileIndexer) and indexer.is_file_encrypted(
|
|
random_hash
|
|
):
|
|
_raise_unauthorized()
|
|
|
|
return RedirectResponse(url=_build_url(random_hash, order, delay), status_code=307)
|
|
|
|
|
|
def _get_navigation_data(file_hash: str, order: str | None = None):
|
|
"""Get navigation data for a file hash.
|
|
|
|
Args:
|
|
file_hash: The current file's hash.
|
|
order: Navigation order - 'next' for sequential, 'random' for random,
|
|
or None for default browse mode.
|
|
|
|
Returns:
|
|
Dictionary with navigation hashes and filename.
|
|
"""
|
|
keys = _get_sorted_hashes()
|
|
idx = keys.index(file_hash)
|
|
|
|
if order == "random":
|
|
next_hash = _get_random_hash()
|
|
prev_hash = _get_random_hash()
|
|
else:
|
|
next_hash = keys[(idx + 1) % len(keys)]
|
|
prev_hash = keys[idx - 1] if idx > 0 else keys[-1]
|
|
|
|
indexer = _find_indexer_for_hash(file_hash)
|
|
filename = indexer.get_filename_by_hash(file_hash) if indexer else ""
|
|
|
|
return {
|
|
"file_hash": file_hash,
|
|
"next_hash": next_hash,
|
|
"prev_hash": prev_hash,
|
|
"filename": filename,
|
|
}
|
|
|
|
|
|
def _render_page(
|
|
navigation_data: dict,
|
|
extra_meta: str = "",
|
|
image_click_url: str = "",
|
|
play_button: str = "",
|
|
current_order: str | None = None,
|
|
current_delay: int | None = None,
|
|
use_navigate_urls: bool = False,
|
|
) -> HTMLResponse:
|
|
"""Render the frontend page with navigation data.
|
|
|
|
Args:
|
|
navigation_data: Dictionary with navigation info.
|
|
extra_meta: Extra <meta> tags (e.g., auto-refresh).
|
|
image_click_url: URL navigated to on image click.
|
|
play_button: Play/pause button HTML.
|
|
current_order: Current navigation order ('next' or 'random').
|
|
current_delay: Delay in seconds for auto-navigation.
|
|
use_navigate_urls: If True, use /navigate/{path} URLs for prev/next
|
|
instead of /{hash} URLs.
|
|
"""
|
|
with open("frontend.html") as f:
|
|
content = f.read()
|
|
|
|
template = string.Template(content)
|
|
|
|
# Generate navigation URLs based on current mode
|
|
if use_navigate_urls:
|
|
next_path = navigation_data["next_path"]
|
|
prev_path = navigation_data["prev_path"]
|
|
if current_order is not None:
|
|
next_url = _build_navigate_url(
|
|
next_path, order=current_order, delay=current_delay
|
|
)
|
|
prev_url = _build_navigate_url(
|
|
prev_path, order=current_order, delay=current_delay
|
|
)
|
|
else:
|
|
next_url = f"{root_path}/navigate/{next_path}"
|
|
prev_url = f"{root_path}/navigate/{prev_path}"
|
|
else:
|
|
if current_order is not None:
|
|
next_url = _build_url(
|
|
navigation_data["next_hash"],
|
|
order=current_order,
|
|
delay=current_delay,
|
|
)
|
|
prev_url = _build_url(
|
|
navigation_data["prev_hash"],
|
|
order=current_order,
|
|
delay=current_delay,
|
|
)
|
|
else:
|
|
next_url = "{root_path}/{next_hash}".format(
|
|
root_path=root_path, next_hash=navigation_data["next_hash"]
|
|
)
|
|
prev_url = "{root_path}/{prev_hash}".format(
|
|
root_path=root_path, prev_hash=navigation_data["prev_hash"]
|
|
)
|
|
|
|
# Build the toggle URL (alternate view mode)
|
|
file_path = navigation_data.get("filename", "")
|
|
if navigate_enabled:
|
|
if use_navigate_urls:
|
|
toggle_url = _build_url(
|
|
navigation_data["file_hash"],
|
|
order=current_order,
|
|
delay=current_delay,
|
|
)
|
|
else:
|
|
toggle_url = _build_navigate_url(
|
|
file_path,
|
|
order=current_order,
|
|
delay=current_delay,
|
|
)
|
|
else:
|
|
toggle_url = "#"
|
|
|
|
# Build folder index sidebar if in navigate mode
|
|
folder_index = ""
|
|
sidebar_class = "hidden"
|
|
if navigate_enabled and use_navigate_urls:
|
|
current_path = file_path
|
|
folder_index = _render_folder_index_html(
|
|
current_path=current_path,
|
|
current_order=current_order,
|
|
current_delay=current_delay,
|
|
)
|
|
sidebar_class = ""
|
|
|
|
content = template.substitute(
|
|
img_url="{root_path}/api/{file_hash}/data".format(
|
|
root_path=root_path, file_hash=navigation_data["file_hash"]
|
|
),
|
|
image_click_url=image_click_url or _get_random_hash(),
|
|
next_url=next_url,
|
|
prev_url=prev_url,
|
|
filename=navigation_data["filename"],
|
|
file_hash=navigation_data["file_hash"],
|
|
file_path=file_path if navigate_enabled else "",
|
|
toggle_url=toggle_url,
|
|
extra_meta=extra_meta,
|
|
play_button=play_button,
|
|
folder_index=folder_index,
|
|
sidebar_class=sidebar_class,
|
|
container_class="",
|
|
)
|
|
|
|
return HTMLResponse(content=content)
|
|
|
|
|
|
@app.get("/{file_hash}")
|
|
async def hash_page(
|
|
file_hash: str,
|
|
order: str | None = None,
|
|
delay: int | None = None,
|
|
request: Request = None,
|
|
):
|
|
"""Serve a page for a specific file hash with optional auto-refresh navigation.
|
|
|
|
Args:
|
|
file_hash: The hash identifier for the file.
|
|
order: Navigation order - 'next' for sequential, 'random' for random.
|
|
delay: Delay in seconds before auto-navigating to next file.
|
|
"""
|
|
if file_hash not in file_mapping:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
if order is not None and order not in ("next", "random"):
|
|
raise HTTPException(
|
|
status_code=400, detail="Invalid order. Must be 'next' or 'random'"
|
|
)
|
|
|
|
# Check if the file is encrypted and requires auth
|
|
if request is not None:
|
|
password = _get_zip_password(file_hash, request)
|
|
if password is None:
|
|
indexer = _find_indexer_for_hash(file_hash)
|
|
if isinstance(indexer, ZipFileIndexer) and indexer.is_file_encrypted(
|
|
file_hash
|
|
):
|
|
_raise_unauthorized()
|
|
|
|
navigation_data = _get_navigation_data(file_hash, order=order)
|
|
|
|
if order is not None and delay is not None:
|
|
# Timer mode: auto-refresh with query params
|
|
refresh_url = _build_url(navigation_data["next_hash"], order=order, delay=delay)
|
|
refresh_meta = (
|
|
f'<meta http-equiv="refresh" content="{delay};url={refresh_url}">'
|
|
)
|
|
image_click_url = _build_url(file_hash)
|
|
|
|
# Create pause button to stop auto-refresh
|
|
pause_button = (
|
|
f'<a href="{root_path}/{file_hash}" class="play-btn" title="Pause">⏸</a>'
|
|
)
|
|
|
|
return _render_page(
|
|
navigation_data,
|
|
refresh_meta,
|
|
image_click_url,
|
|
play_button=pause_button,
|
|
current_order=order,
|
|
current_delay=delay,
|
|
)
|
|
else:
|
|
# Browse mode
|
|
play_button = (
|
|
f'<a href="{root_path}/{file_hash}?order=next&delay=5" '
|
|
'class="play-btn" title="Play next 5">⏵</a>'
|
|
)
|
|
return _render_page(
|
|
navigation_data,
|
|
play_button=play_button,
|
|
current_order=None,
|
|
current_delay=None,
|
|
)
|
|
|
|
|
|
def _find_indexer_for_hash(file_hash: str):
|
|
"""Find the indexer that contains the file with the given hash"""
|
|
for idx in indexers:
|
|
if file_hash in idx._file_mapping:
|
|
return idx
|
|
return None
|
|
|
|
|
|
def _get_zip_password(file_hash: str, request: Request) -> bytes | None:
|
|
"""Get ZIP password from HTTP Basic Auth if the file is encrypted.
|
|
|
|
If the file is not encrypted, returns None (no password needed).
|
|
If the file is encrypted and valid credentials are provided, returns
|
|
the password as bytes.
|
|
If the file is encrypted but no Authorization header is present, returns
|
|
None so the caller can raise a 401.
|
|
|
|
Args:
|
|
file_hash: The hash of the file to check.
|
|
request: The incoming HTTP request.
|
|
|
|
Returns:
|
|
The ZIP password as bytes if encrypted and auth provided, None otherwise.
|
|
"""
|
|
indexer = _find_indexer_for_hash(file_hash)
|
|
if not isinstance(indexer, ZipFileIndexer):
|
|
return None
|
|
if not indexer.is_file_encrypted(file_hash):
|
|
return None
|
|
|
|
# File is encrypted - check for Authorization header
|
|
auth_header = request.headers.get("authorization")
|
|
if not auth_header:
|
|
return None
|
|
|
|
scheme, _, params = auth_header.partition(" ")
|
|
if scheme.lower() != "basic":
|
|
return None
|
|
|
|
try:
|
|
decoded = base64.b64decode(params).decode()
|
|
except Exception:
|
|
return None
|
|
|
|
_, _, password = decoded.partition(":")
|
|
return password.encode()
|
|
|
|
|
|
def _raise_unauthorized() -> None:
|
|
"""Raise an HTTP 401 Unauthorized with Basic auth challenge header."""
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Authentication required",
|
|
headers={"WWW-Authenticate": 'Basic realm="zip"'},
|
|
)
|
|
|
|
|
|
def _get_random_hash() -> str:
|
|
"""Get a random file hash from the indexed files"""
|
|
if not file_mapping:
|
|
raise HTTPException(status_code=404, detail="No files indexed")
|
|
keys = list(file_mapping.keys())
|
|
return random.choice(keys)
|
|
|
|
|
|
def _get_sorted_hashes() -> list[str]:
|
|
"""Return the cached filename-sorted hash list (built at index time)."""
|
|
return sorted_hashes
|
|
|
|
|
|
def _get_random_navigate_path() -> str | None:
|
|
"""Get a random internal path from ZipFileIndexers only."""
|
|
zip_paths: list[str] = []
|
|
for indexer in indexers:
|
|
if isinstance(indexer, ZipFileIndexer):
|
|
zip_paths.extend(indexer._file_mapping.values())
|
|
if not zip_paths:
|
|
return None
|
|
return random.choice(zip_paths)
|
|
|
|
|
|
def _find_hash_by_path(path: str) -> str | None:
|
|
"""Find the hash for a given internal ZIP path across all ZipFileIndexers.
|
|
|
|
Args:
|
|
path: The internal filename inside a zip.
|
|
|
|
Returns:
|
|
The hash if found, None otherwise.
|
|
"""
|
|
for idx in indexers:
|
|
if isinstance(idx, ZipFileIndexer):
|
|
file_hash = idx.get_hash_by_path(path)
|
|
if file_hash is not None:
|
|
return file_hash
|
|
return None
|
|
|
|
|
|
def _get_path_from_hash(file_hash: str) -> str | None:
|
|
"""Get the internal ZIP path for a given hash.
|
|
|
|
Args:
|
|
file_hash: The hash of the file.
|
|
|
|
Returns:
|
|
The internal filename if found, None otherwise.
|
|
"""
|
|
indexer = _find_indexer_for_hash(file_hash)
|
|
if indexer is not None:
|
|
return indexer.get_filename_by_hash(file_hash)
|
|
return None
|
|
|
|
|
|
def _build_navigate_url(
|
|
path: str, order: str | None = None, delay: int | None = None
|
|
) -> str:
|
|
"""Build a /navigate URL with optional order/delay query parameters."""
|
|
base = f"{root_path}/navigate/{path}"
|
|
if order is not None and delay is not None:
|
|
return f"{base}?order={order}&delay={delay}"
|
|
return base
|
|
|
|
|
|
def _get_navigation_data_by_path(
|
|
path: str, order: str | None = None
|
|
) -> dict[str, str] | None:
|
|
"""Get navigation data for a file identified by its internal ZIP path.
|
|
|
|
Args:
|
|
path: The internal filename inside a zip.
|
|
order: Navigation order - 'next' for sequential, 'random' for random,
|
|
or None for default browse mode.
|
|
|
|
Returns:
|
|
Dictionary with navigation paths and filename, or None if path not found.
|
|
"""
|
|
file_hash = _find_hash_by_path(path)
|
|
if file_hash is None:
|
|
return None
|
|
|
|
keys = _get_sorted_hashes()
|
|
idx = keys.index(file_hash)
|
|
|
|
if order == "random":
|
|
next_hash = _get_random_hash()
|
|
prev_hash = _get_random_hash()
|
|
else:
|
|
next_hash = keys[(idx + 1) % len(keys)]
|
|
prev_hash = keys[idx - 1] if idx > 0 else keys[-1]
|
|
|
|
next_path = _get_path_from_hash(next_hash) or ""
|
|
prev_path = _get_path_from_hash(prev_hash) or ""
|
|
|
|
return {
|
|
"file_hash": file_hash,
|
|
"next_path": next_path,
|
|
"prev_path": prev_path,
|
|
"filename": path,
|
|
}
|
|
|
|
|
|
def _collect_zip_paths() -> list[str]:
|
|
"""Collect all internal file paths from all ZipFileIndexers."""
|
|
paths: list[str] = []
|
|
for indexer in indexers:
|
|
if isinstance(indexer, ZipFileIndexer):
|
|
paths.extend(indexer._file_mapping.values())
|
|
return sorted(paths)
|
|
|
|
|
|
def _get_folder_index(path: str | None) -> tuple[list[str], list[str]]:
|
|
"""Get the contents of a folder (subfolders and files) at a given path.
|
|
|
|
Args:
|
|
path: The folder path (e.g., 'folder/' or None for root).
|
|
|
|
Returns:
|
|
Tuple of (subfolder_names, file_names) sorted alphabetically.
|
|
"""
|
|
all_paths = _collect_zip_paths()
|
|
prefix = path or ""
|
|
|
|
folders: set[str] = set()
|
|
files: set[str] = set()
|
|
|
|
for p in all_paths:
|
|
if p.startswith(prefix):
|
|
remainder = p[len(prefix) :]
|
|
if "/" in remainder:
|
|
folder_name = remainder.split("/", 1)[0]
|
|
folders.add(folder_name)
|
|
else:
|
|
files.add(remainder)
|
|
|
|
return sorted(folders), sorted(files)
|
|
|
|
|
|
def _render_folder_index_html(
|
|
current_path: str | None = None,
|
|
current_order: str | None = None,
|
|
current_delay: int | None = None,
|
|
) -> str:
|
|
"""Render a folder index sidebar showing the full tree from root.
|
|
|
|
Args:
|
|
current_path: The currently viewed file path (for highlighting).
|
|
current_order: Current navigation order.
|
|
current_delay: Current navigation delay.
|
|
|
|
Returns:
|
|
HTML string for the folder index sidebar.
|
|
"""
|
|
all_paths = _collect_zip_paths()
|
|
lines: list[str] = []
|
|
|
|
# Breadcrumb
|
|
lines.append("<nav class='breadcrumb'>")
|
|
lines.append(f'<a href="{root_path}/navigate/">/</a>')
|
|
if current_path:
|
|
parts = current_path.split("/")
|
|
accumulated = ""
|
|
for part in parts[:-1]:
|
|
accumulated += f"{part}/"
|
|
lines.append(f'<a href="{root_path}/navigate/{accumulated}">{part}</a>')
|
|
lines.append("</nav>")
|
|
|
|
def _render_folder(folder_prefix: str, depth: int = 0) -> list[str]:
|
|
"""Recursively render a folder's contents."""
|
|
result: list[str] = []
|
|
indent = " " * depth
|
|
prefix = folder_prefix or ""
|
|
|
|
folders: set[str] = set()
|
|
files: set[str] = set()
|
|
for p in all_paths:
|
|
if p.startswith(prefix):
|
|
remainder = p[len(prefix) :]
|
|
if "/" in remainder:
|
|
folders.add(remainder.split("/", 1)[0])
|
|
else:
|
|
files.add(remainder)
|
|
|
|
for folder in sorted(folders):
|
|
folder_path = f"{prefix}{folder}/" if prefix else f"{folder}/"
|
|
result.append(
|
|
f'<div class="index-item folder">'
|
|
f'{indent}<a href="{root_path}/navigate/{folder_path}">📁 {folder}</a>'
|
|
f"</div>"
|
|
)
|
|
result.extend(_render_folder(folder_path, depth + 1))
|
|
|
|
for file in sorted(files):
|
|
file_path = f"{prefix}{file}" if prefix else file
|
|
is_current = file_path == current_path
|
|
cls = "index-item file current" if is_current else "index-item file"
|
|
href_params = ""
|
|
if current_order is not None and current_delay is not None:
|
|
href_params = f"?order={current_order}&delay={current_delay}"
|
|
result.append(
|
|
f'<div class="{cls}">'
|
|
f'{indent}<a href="{root_path}/navigate/{file_path}{href_params}">📄 {file}</a>'
|
|
f"</div>"
|
|
)
|
|
|
|
return result
|
|
|
|
lines.extend(_render_folder(""))
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _render_folder_index_page(
|
|
path: str,
|
|
folders: list[str],
|
|
files: list[str],
|
|
current_order: str | None = None,
|
|
current_delay: int | None = None,
|
|
) -> HTMLResponse:
|
|
"""Render a folder index page showing subfolders and files.
|
|
|
|
Args:
|
|
path: The folder path (e.g., 'folder/').
|
|
folders: List of subfolder names.
|
|
files: List of file names.
|
|
current_order: Current navigation order.
|
|
current_delay: Current navigation delay.
|
|
|
|
Returns:
|
|
HTMLResponse with the folder index page.
|
|
"""
|
|
with open("frontend.html") as f:
|
|
content = f.read()
|
|
|
|
template = string.Template(content)
|
|
|
|
# Build folder index sidebar HTML using recursive tree renderer
|
|
folder_path = path.rstrip("/") or None
|
|
folder_index_html = _render_folder_index_html(
|
|
current_path=folder_path,
|
|
current_order=current_order,
|
|
current_delay=current_delay,
|
|
)
|
|
|
|
content = template.substitute(
|
|
img_url="#",
|
|
image_click_url="#",
|
|
next_url="#",
|
|
prev_url="#",
|
|
filename=path.rstrip("/"),
|
|
file_hash="",
|
|
file_path=path.rstrip("/"),
|
|
toggle_url="#",
|
|
extra_meta="",
|
|
play_button="",
|
|
folder_index=folder_index_html,
|
|
sidebar_class="",
|
|
container_class="hidden",
|
|
)
|
|
|
|
return HTMLResponse(content=content)
|
|
|
|
|
|
@app.get("/navigate/{path:path}")
|
|
async def navigate_page(
|
|
path: str,
|
|
order: str | None = None,
|
|
delay: int | None = None,
|
|
request: Request = None,
|
|
):
|
|
"""Serve a page for a file identified by its internal ZIP path.
|
|
|
|
Navigation links use /navigate/{path} URLs instead of hashed URLs.
|
|
|
|
Args:
|
|
path: The internal filename inside a zip.
|
|
order: Navigation order - 'next' for sequential, 'random' for random.
|
|
delay: Delay in seconds before auto-navigating to next file.
|
|
"""
|
|
if not navigate_enabled:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
if order is not None and order not in ("next", "random"):
|
|
raise HTTPException(
|
|
status_code=400, detail="Invalid order. Must be 'next' or 'random'"
|
|
)
|
|
|
|
# Handle folder index view (path ends with / or is root)
|
|
if not path or path.endswith("/"):
|
|
folder_path = path.rstrip("/") or None
|
|
folders, files = _get_folder_index(folder_path)
|
|
if not folders and not files:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
return _render_folder_index_page(
|
|
path if path else "/",
|
|
folders,
|
|
files,
|
|
current_order=order,
|
|
current_delay=delay,
|
|
)
|
|
|
|
navigation_data = _get_navigation_data_by_path(path, order=order)
|
|
if navigation_data is None:
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
file_hash = navigation_data["file_hash"]
|
|
|
|
# Check if the file is encrypted and requires auth
|
|
if request is not None:
|
|
password = _get_zip_password(file_hash, request)
|
|
if password is None:
|
|
indexer = _find_indexer_for_hash(file_hash)
|
|
if isinstance(indexer, ZipFileIndexer) and indexer.is_file_encrypted(
|
|
file_hash
|
|
):
|
|
_raise_unauthorized()
|
|
|
|
if order is not None and delay is not None:
|
|
# Timer mode: auto-refresh with query params
|
|
refresh_url = _build_navigate_url(
|
|
navigation_data["next_path"], order=order, delay=delay
|
|
)
|
|
refresh_meta = (
|
|
f'<meta http-equiv="refresh" content="{delay};url={refresh_url}">'
|
|
)
|
|
image_click_url = _build_navigate_url(path)
|
|
|
|
# Create pause button to stop auto-refresh
|
|
pause_button = f'<a href="{root_path}/navigate/{path}" class="play-btn" title="Pause">⏸</a>'
|
|
|
|
return _render_page(
|
|
navigation_data,
|
|
refresh_meta,
|
|
image_click_url,
|
|
play_button=pause_button,
|
|
current_order=order,
|
|
current_delay=delay,
|
|
use_navigate_urls=True,
|
|
)
|
|
else:
|
|
# Browse mode
|
|
play_button = (
|
|
f'<a href="{root_path}/navigate/{path}?order=next&delay=5" '
|
|
'class="play-btn" title="Play next 5">⏵</a>'
|
|
)
|
|
random_path = _get_random_navigate_path()
|
|
image_click_url = (
|
|
f"{root_path}/navigate/{random_path}" if random_path else _get_random_hash()
|
|
)
|
|
return _render_page(
|
|
navigation_data,
|
|
image_click_url=image_click_url,
|
|
play_button=play_button,
|
|
current_order=None,
|
|
current_delay=None,
|
|
use_navigate_urls=True,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Run the file server")
|
|
parser.add_argument(
|
|
"source",
|
|
type=str,
|
|
help="Path to directory, ZIP archive, or glob pattern (e.g., *.zip, path/to/zips/*.zip)",
|
|
)
|
|
parser.add_argument("--host", type=str, default="0.0.0.0", help="Host to bind to")
|
|
parser.add_argument("--port", type=int, default=8000, help="Port to bind to")
|
|
parser.add_argument(
|
|
"--salt", type=str, default=None, help="Salt for hashing file paths"
|
|
)
|
|
parser.add_argument(
|
|
"--navigate",
|
|
action="store_true",
|
|
help="Enable path-based navigation (/navigate/{path})",
|
|
)
|
|
parser.add_argument(
|
|
"--root-path",
|
|
type=str,
|
|
default=None,
|
|
help="URL path prefix (e.g., /images for serving at example.com/images)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
initialize_server(args)
|
|
|
|
import uvicorn
|
|
|
|
uvicorn.run(app, host=args.host, port=args.port)
|