Compare commits

..

No commits in common. "022d3c69f063d654625a4d0e126bea2b753b1ac6" and "1c6935307ca4055f68d78bd9389ab40f3562d861" have entirely different histories.

8 changed files with 331 additions and 124 deletions

4
.gitignore vendored
View File

@ -10,7 +10,3 @@ wheels/
.venv
.nanocoder
# Task lists
TODO.md
TASKS.md

Binary file not shown.

View File

@ -62,23 +62,20 @@
</div>
<script>
function getRefreshParams() {
var params = new URLSearchParams(window.location.search);
var order = params.get('order');
var path = window.location.pathname.slice(1);
var parts = path.split('/');
if (parts.length < 3) return null;
var order = parts[0];
if (order !== 'next' && order !== 'random') return null;
var delay = parseInt(params.get('delay'));
var delay = parseInt(parts[1]);
if (isNaN(delay)) return null;
var hash = window.location.pathname.slice(1);
return {
delay: delay,
order: order,
hash: hash
hash: parts[2]
};
}
function buildUrl(hash, order, delay) {
return '/' + hash + '?order=' + order + '&delay=' + delay;
}
document.addEventListener('keydown', function(e) {
e.preventDefault();
if (e.code === 'Space') {
@ -90,15 +87,15 @@
document.getElementById('next-btn').click();
} else if (e.code === 'Equal' || e.key.toLowerCase() === 'j') {
var params = getRefreshParams();
if (params) window.location.href = buildUrl(params.hash, params.order, params.delay + 1);
if (params) window.location.href = '/' + params.order + '/' + (params.delay + 1) + '/' + params.hash;
} else if (e.code === 'Minus' || e.key.toLowerCase() === 'k') {
var params = getRefreshParams();
if (params && params.delay > 1) window.location.href = buildUrl(params.hash, params.order, params.delay - 1);
if (params && params.delay > 1) window.location.href = '/' + params.order + '/' + (params.delay - 1) + '/' + params.hash;
} else if (e.key.toLowerCase() === 'o') {
var params = getRefreshParams();
if (params) {
var newOrder = params.order === 'next' ? 'random' : 'next';
window.location.href = buildUrl(params.hash, newOrder, params.delay);
window.location.href = '/' + newOrder + '/' + params.delay + '/' + params.hash;
}
}
});

128
main.py
View File

@ -1,3 +1,4 @@
from typing import Annotated
import argparse
import hashlib
import mimetypes
@ -6,11 +7,13 @@ import random
import secrets
import string
import zipfile
from base64 import b64encode
from glob import glob
from io import BytesIO
from pathlib import Path
from fastapi import FastAPI, HTTPException
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.responses import (
FileResponse,
HTMLResponse,
@ -22,6 +25,28 @@ app = FastAPI()
file_mapping = {}
indexers = []
security = HTTPBasic()
expected_password: str | None = None
async def get_current_username(
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
) -> str:
"""Verify Basic Authentication credentials"""
if expected_password is not None and credentials.password != expected_password:
raise HTTPException(
status_code=401,
detail="Incorrect password",
headers={"WWW-Authenticate": "Basic"},
)
return credentials.username
def set_auth_password(password: str | None):
"""Set the expected password for authentication"""
global expected_password
expected_password = password
class FileIndexer:
def __init__(self, path: str, salt: str | None = None):
@ -133,7 +158,7 @@ async def health_check():
@app.get("/api/{file_hash}/data")
async def get_file_data(file_hash: str):
async def get_file_data(file_hash: str, username: str = Depends(get_current_username)):
"""Serve a specific file by its hash"""
if file_hash not in file_mapping:
raise HTTPException(status_code=404, detail="File not found")
@ -156,23 +181,22 @@ async def get_file_data(file_hash: str):
)
def _build_url(
file_hash: str, order: str | None = None, delay: int | None = None
) -> str:
"""Build a URL with optional order/delay query parameters."""
base = "/{hash}".format(hash=file_hash)
if order is not None and delay is not None:
return "{base}?order={order}&delay={delay}".format(
base=base, order=order, delay=delay
)
return base
@app.get("/")
async def root(order: str | None = None, delay: int | None = None):
async def root(username: str = Depends(get_current_username)):
"""Redirect to a random file hash"""
random_hash = _get_random_hash()
return RedirectResponse(url=_build_url(random_hash, order, delay))
return RedirectResponse(url="/{hash}".format(hash=random_hash))
@app.get("/{order}/{delay}")
async def order_delay(
order: str, delay: int, username: str = Depends(get_current_username)
):
"""Redirect to random file with order and delay"""
random_hash = _get_random_hash()
return RedirectResponse(
url="/{order}/{delay}/{hash}".format(order=order, delay=delay, hash=random_hash)
)
def _get_navigation_data(file_hash: str, order: str | None = None):
@ -223,16 +247,16 @@ def _render_page(
# Generate navigation URLs based on current mode
if current_order is not None:
# Timer mode: preserve current order and delay via query params
next_url = _build_url(
navigation_data["next_hash"],
# Timer mode: preserve current order and delay
next_url = "/{order}/{delay}/{next_hash}".format(
order=current_order,
delay=current_delay,
next_hash=navigation_data["next_hash"],
)
prev_url = _build_url(
navigation_data["prev_hash"],
prev_url = "/{order}/{delay}/{prev_hash}".format(
order=current_order,
delay=current_delay,
prev_hash=navigation_data["prev_hash"],
)
else:
# Browse mode: generate browse mode URLs
@ -253,38 +277,49 @@ def _render_page(
@app.get("/{file_hash}")
async def hash_page(file_hash: str, order: str | None = None, delay: int | None = None):
"""Serve a page for a specific file hash with optional auto-refresh navigation.
Args:
file_hash: The hash identifier for the file.
order: Navigation order - 'next' for sequential, 'random' for random.
delay: Delay in seconds before auto-navigating to next file.
"""
async def hash_page(file_hash: str, username: str = Depends(get_current_username)):
"""Serve a page for a specific file hash with navigation"""
if file_hash not in file_mapping:
raise HTTPException(status_code=404, detail="File not found")
if order is not None and order not in ("next", "random"):
navigation_data = _get_navigation_data(file_hash, order=None)
play_button = '<a href="/next/5/{file_hash}" class="play-btn" title="Play next 5">⏵</a>'.format(
file_hash=file_hash
)
return _render_page(
navigation_data, play_button=play_button, current_order=None, current_delay=None
)
@app.get("/{order}/{delay}/{file_hash}")
async def hash_page_with_refresh(
order: str,
delay: int,
file_hash: str,
username: str = Depends(get_current_username),
):
"""Serve a page for a specific file hash with auto-refresh navigation"""
if file_hash not in file_mapping:
raise HTTPException(status_code=404, detail="File not found")
if order not in ("next", "random"):
raise HTTPException(
status_code=400, detail="Invalid order. Must be 'next' or 'random'"
)
navigation_data = _get_navigation_data(file_hash, order=order)
if order is not None and delay is not None:
# Timer mode: auto-refresh with query params
refresh_url = _build_url(navigation_data["next_hash"], order=order, delay=delay)
refresh_meta = (
f'<meta http-equiv="refresh" content="{delay};url={refresh_url}">'
refresh_url = "/{order}/{delay}/{next_hash}".format(
order=order, delay=delay, next_hash=navigation_data["next_hash"]
)
image_click_url = _build_url(file_hash)
refresh_meta = f'<meta http-equiv="refresh" content="{delay};url={refresh_url}">'
image_click_url = "/{file_hash}".format(file_hash=file_hash)
# Create pause button to stop auto-refresh
pause_button = (
'<a href="{file_hash}" class="play-btn" title="Pause">⏸</a>'.format(
pause_button = '<a href="/{file_hash}" class="play-btn" title="Pause">⏸</a>'.format(
file_hash=file_hash
)
)
return _render_page(
navigation_data,
@ -294,17 +329,6 @@ async def hash_page(file_hash: str, order: str | None = None, delay: int | None
current_order=order,
current_delay=delay,
)
else:
# Browse mode
play_button = '<a href="/{file_hash}?order=next&delay=5" class="play-btn" title="Play next 5">⏵</a>'.format(
file_hash=file_hash
)
return _render_page(
navigation_data,
play_button=play_button,
current_order=None,
current_delay=None,
)
def _find_indexer_for_hash(file_hash: str):
@ -335,9 +359,13 @@ if __name__ == "__main__":
parser.add_argument(
"--salt", type=str, default=None, help="Salt for hashing file paths"
)
parser.add_argument(
"--password", type=str, default=None, help="Password for Basic Authentication"
)
args = parser.parse_args()
initialize_server(args)
set_auth_password(args.password)
import uvicorn

View File

@ -15,6 +15,7 @@ def _reset_state() -> None:
"""Reset all global state to defaults."""
main.file_mapping.clear()
main.indexers.clear()
main.expected_password = None
@pytest.fixture(autouse=True)
@ -82,6 +83,7 @@ def args_directory(sample_files: dict[str, Path], tmp_path: Path) -> argparse.Na
host="127.0.0.1",
port=0,
salt="test-salt",
password=None,
)
@ -93,32 +95,51 @@ def args_zip(sample_zip: dict[str, Path], tmp_path: Path) -> argparse.Namespace:
host="127.0.0.1",
port=0,
salt="test-salt",
password=None,
)
@pytest.fixture
def initialized_dir(args_directory: argparse.Namespace) -> None:
"""Initialize the server with sample directory files."""
"""Initialize the server with sample directory files (no auth)."""
main.initialize_server(args_directory)
main.set_auth_password(None)
@pytest.fixture
def initialized_zip(args_zip: argparse.Namespace) -> None:
"""Initialize the server with sample zip files."""
"""Initialize the server with sample zip files (no auth)."""
main.initialize_server(args_zip)
main.set_auth_password(None)
def _dummy_auth_header() -> str:
"""Create a dummy Basic Auth header (any creds work when no password is set)."""
import base64
creds = "test:test"
return f"Basic {base64.b64encode(creds.encode()).decode()}"
@pytest.fixture
async def client_dir(initialized_dir: None) -> Generator[AsyncClient, None, None]:
"""Async HTTP client against the app initialized with directory files."""
"""Async HTTP client against the app initialized with directory files.
Sends dummy auth headers since HTTPBasic() always requires them.
"""
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
ac.headers["Authorization"] = _dummy_auth_header()
yield ac
@pytest.fixture
async def client_zip(initialized_zip: None) -> Generator[AsyncClient, None, None]:
"""Async HTTP client against the app initialized with zip files."""
"""Async HTTP client against the app initialized with zip files.
Sends dummy auth headers since HTTPBasic() always requires them.
"""
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
ac.headers["Authorization"] = _dummy_auth_header()
yield ac

189
tests/test_auth.py Normal file
View File

@ -0,0 +1,189 @@
"""Tests for authentication."""
import argparse
import base64
from pathlib import Path
import pytest
from httpx import ASGITransport, AsyncClient
import main
def _basic_auth_header(username: str, password: str) -> str:
"""Create a Basic Auth header value."""
creds = f"{username}:{password}"
return f"Basic {base64.b64encode(creds.encode()).decode()}"
def _make_args(tmp_path: Path) -> argparse.Namespace:
"""Create an argparse.Namespace for the given path."""
return argparse.Namespace(
source=str(tmp_path),
host="127.0.0.1",
port=0,
salt="auth-salt",
password=None,
)
@pytest.fixture
def auth_setup(tmp_path: Path) -> tuple[str, str]:
"""Set up server with sample files and password protection.
Returns:
Tuple of (username, password).
"""
(tmp_path / "test.txt").write_text("hello")
main.initialize_server(_make_args(tmp_path))
main.set_auth_password("secret123")
return ("user", "secret123")
class TestNoPasswordSet:
"""Tests when no password is configured.
Note: HTTPBasic() always requires an Authorization header.
When expected_password is None, any credentials pass.
"""
async def test_health_always_open(self, client_dir: AsyncClient) -> None:
"""Health check has no auth dependency — always accessible."""
response = await client_dir.get("/api/health")
assert response.status_code == 200
async def test_protected_endpoint_requires_auth_header(
self, initialized_dir: None
) -> None:
"""Even with no password, HTTPBasic requires an auth header."""
file_hash = list(main.file_mapping.keys())[0]
# No auth header → 401 from HTTPBasic
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
response = await ac.get(f"/api/{file_hash}/data")
assert response.status_code == 401
async def test_any_credentials_pass_when_no_password(
self, client_dir: AsyncClient
) -> None:
"""Any credentials pass when no password is set."""
file_hash = list(main.file_mapping.keys())[0]
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
ac.headers["Authorization"] = _basic_auth_header("any", "thing")
response = await ac.get(f"/api/{file_hash}/data")
assert response.status_code == 200
async def test_root_requires_auth_header(self, initialized_dir: None) -> None:
"""Root endpoint requires auth header even with no password."""
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
response = await ac.get("/", follow_redirects=False)
assert response.status_code == 401
class TestCorrectPassword:
"""Tests with correct password."""
async def test_health_with_correct_password(
self, auth_setup: tuple[str, str]
) -> None:
"""Health check works (it has no auth, always 200)."""
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
response = await ac.get("/api/health")
assert response.status_code == 200
async def test_file_access_with_correct_password(
self, auth_setup: tuple[str, str]
) -> None:
"""File access works with correct password."""
username, password = auth_setup
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
ac.headers["Authorization"] = _basic_auth_header(username, password)
file_hash = list(main.file_mapping.keys())[0]
response = await ac.get(f"/api/{file_hash}/data")
assert response.status_code == 200
async def test_root_with_correct_password(
self, auth_setup: tuple[str, str]
) -> None:
"""Root redirect works with correct password."""
username, password = auth_setup
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
ac.headers["Authorization"] = _basic_auth_header(username, password)
response = await ac.get("/", follow_redirects=False)
assert response.status_code in (307, 302, 301)
async def test_hash_page_with_correct_password(
self, auth_setup: tuple[str, str]
) -> None:
"""Hash page works with correct password."""
username, password = auth_setup
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
ac.headers["Authorization"] = _basic_auth_header(username, password)
file_hash = list(main.file_mapping.keys())[0]
response = await ac.get(f"/{file_hash}")
assert response.status_code == 200
class TestWrongPassword:
"""Tests with incorrect password."""
async def test_file_access_with_wrong_password(
self, auth_setup: tuple[str, str]
) -> None:
"""File access returns 401 with wrong password."""
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
ac.headers["Authorization"] = _basic_auth_header("user", "wrong")
file_hash = list(main.file_mapping.keys())[0]
response = await ac.get(f"/api/{file_hash}/data")
assert response.status_code == 401
async def test_root_with_wrong_password(self, auth_setup: tuple[str, str]) -> None:
"""Root redirect returns 401 with wrong password."""
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
ac.headers["Authorization"] = _basic_auth_header("user", "wrong")
response = await ac.get("/", follow_redirects=False)
assert response.status_code == 401
async def test_no_auth_header_returns_401(
self, auth_setup: tuple[str, str]
) -> None:
"""Missing auth header returns 401."""
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
file_hash = list(main.file_mapping.keys())[0]
response = await ac.get(f"/api/{file_hash}/data")
assert response.status_code == 401
async def test_includes_www_authenticate_header(
self, auth_setup: tuple[str, str]
) -> None:
"""401 response includes WWW-Authenticate header."""
transport = ASGITransport(app=main.app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
file_hash = list(main.file_mapping.keys())[0]
response = await ac.get(f"/api/{file_hash}/data")
assert response.status_code == 401
assert "www-authenticate" in response.headers
class TestSetAuthPassword:
"""Tests for set_auth_password function."""
def test_sets_password(self) -> None:
"""Password is set correctly."""
main.set_auth_password("newpass")
assert main.expected_password == "newpass"
def test_clears_password_with_none(self) -> None:
"""Passing None clears the password."""
main.set_auth_password("something")
main.set_auth_password(None)
assert main.expected_password is None

View File

@ -101,28 +101,20 @@ class TestRootRedirect:
assert hash_from_url in main.file_mapping
class TestRootRedirectWithOrderDelay:
"""Tests for GET / with order/delay query parameters."""
class TestOrderDelayRoute:
"""Tests for GET /{order}/{delay}."""
async def test_next_order_redirects(self, client_dir: AsyncClient) -> None:
"""/?order=next&delay=5 redirects to /{hash}?order=next&delay=5."""
response = await client_dir.get(
"/", params={"order": "next", "delay": 5}, follow_redirects=False
)
"""/next/5 redirects to /next/5/{hash}."""
response = await client_dir.get("/next/5", follow_redirects=False)
assert response.status_code in (307, 302, 301)
location = response.headers["location"]
assert "order=next" in location
assert "delay=5" in location
assert "/next/5/" in response.headers["location"]
async def test_random_order_redirects(self, client_dir: AsyncClient) -> None:
"""/?order=random&delay=3 redirects to /{hash}?order=random&delay=3."""
response = await client_dir.get(
"/", params={"order": "random", "delay": 3}, follow_redirects=False
)
"""/random/3 redirects to /random/3/{hash}."""
response = await client_dir.get("/random/3", follow_redirects=False)
assert response.status_code in (307, 302, 301)
location = response.headers["location"]
assert "order=random" in location
assert "delay=3" in location
assert "/random/3/" in response.headers["location"]
class TestHashPage:
@ -150,12 +142,6 @@ class TestHashPage:
assert 'class="chevron left"' in response.text
assert 'class="chevron right"' in response.text
async def test_page_contains_play_button(self, client_dir: AsyncClient) -> None:
"""HTML page contains play button with query param URL."""
file_hash = list(main.file_mapping.keys())[0]
response = await client_dir.get(f"/{file_hash}")
assert "?order=next&delay=5" in response.text
async def test_returns_404_for_invalid_hash(self, client_dir: AsyncClient) -> None:
"""Returns 404 for a hash that doesn't exist."""
response = await client_dir.get("/nonexistent-hash")
@ -163,48 +149,37 @@ class TestHashPage:
class TestHashPageWithRefresh:
"""Tests for GET /{file_hash}?order=...&delay=... (auto-refresh mode)."""
"""Tests for GET /{order}/{delay}/{file_hash}."""
async def test_next_order_returns_html(self, client_dir: AsyncClient) -> None:
"""Next order returns HTML with refresh meta tag."""
file_hash = list(main.file_mapping.keys())[0]
response = await client_dir.get(
f"/{file_hash}", params={"order": "next", "delay": 5}
)
response = await client_dir.get(f"/next/5/{file_hash}")
assert response.status_code == 200
assert 'http-equiv="refresh"' in response.text
async def test_random_order_returns_html(self, client_dir: AsyncClient) -> None:
"""Random order returns HTML with refresh meta tag."""
file_hash = list(main.file_mapping.keys())[0]
response = await client_dir.get(
f"/{file_hash}", params={"order": "random", "delay": 3}
)
response = await client_dir.get(f"/random/3/{file_hash}")
assert response.status_code == 200
assert 'http-equiv="refresh"' in response.text
async def test_invalid_order_returns_400(self, client_dir: AsyncClient) -> None:
"""Invalid order parameter returns 400."""
file_hash = list(main.file_mapping.keys())[0]
response = await client_dir.get(
f"/{file_hash}", params={"order": "shuffle", "delay": 5}
)
response = await client_dir.get(f"/shuffle/5/{file_hash}")
assert response.status_code == 400
async def test_returns_404_for_invalid_hash(self, client_dir: AsyncClient) -> None:
"""Returns 404 for a hash that doesn't exist."""
response = await client_dir.get(
"/nonexistent-hash", params={"order": "next", "delay": 5}
)
response = await client_dir.get("/next/5/nonexistent-hash")
assert response.status_code == 404
async def test_refresh_url_points_to_next_file(
self, client_dir: AsyncClient
) -> None:
"""Refresh meta tag points to the next file in sequence with query params."""
"""Refresh meta tag points to the next file in sequence."""
file_hash = list(main.file_mapping.keys())[0]
response = await client_dir.get(
f"/{file_hash}", params={"order": "next", "delay": 5}
)
assert "order=next" in response.text
assert "delay=5" in response.text
response = await client_dir.get(f"/next/5/{file_hash}")
assert "url=/next/5/" in response.text

View File

@ -16,6 +16,7 @@ def seeded_indexers(sample_files: dict[str, Path], tmp_path: Path) -> None:
host="127.0.0.1",
port=0,
salt="nav-test-salt",
password=None,
)
main.initialize_server(args)