Compare commits
No commits in common. "022d3c69f063d654625a4d0e126bea2b753b1ac6" and "1c6935307ca4055f68d78bd9389ab40f3562d861" have entirely different histories.
022d3c69f0
...
1c6935307c
4
.gitignore
vendored
4
.gitignore
vendored
@ -10,7 +10,3 @@ wheels/
|
|||||||
.venv
|
.venv
|
||||||
|
|
||||||
.nanocoder
|
.nanocoder
|
||||||
|
|
||||||
# Task lists
|
|
||||||
TODO.md
|
|
||||||
TASKS.md
|
|
||||||
|
|||||||
Binary file not shown.
@ -62,23 +62,20 @@
|
|||||||
</div>
|
</div>
|
||||||
<script>
|
<script>
|
||||||
function getRefreshParams() {
|
function getRefreshParams() {
|
||||||
var params = new URLSearchParams(window.location.search);
|
var path = window.location.pathname.slice(1);
|
||||||
var order = params.get('order');
|
var parts = path.split('/');
|
||||||
|
if (parts.length < 3) return null;
|
||||||
|
var order = parts[0];
|
||||||
if (order !== 'next' && order !== 'random') return null;
|
if (order !== 'next' && order !== 'random') return null;
|
||||||
var delay = parseInt(params.get('delay'));
|
var delay = parseInt(parts[1]);
|
||||||
if (isNaN(delay)) return null;
|
if (isNaN(delay)) return null;
|
||||||
var hash = window.location.pathname.slice(1);
|
|
||||||
return {
|
return {
|
||||||
delay: delay,
|
delay: delay,
|
||||||
order: order,
|
order: order,
|
||||||
hash: hash
|
hash: parts[2]
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
function buildUrl(hash, order, delay) {
|
|
||||||
return '/' + hash + '?order=' + order + '&delay=' + delay;
|
|
||||||
}
|
|
||||||
|
|
||||||
document.addEventListener('keydown', function(e) {
|
document.addEventListener('keydown', function(e) {
|
||||||
e.preventDefault();
|
e.preventDefault();
|
||||||
if (e.code === 'Space') {
|
if (e.code === 'Space') {
|
||||||
@ -90,15 +87,15 @@
|
|||||||
document.getElementById('next-btn').click();
|
document.getElementById('next-btn').click();
|
||||||
} else if (e.code === 'Equal' || e.key.toLowerCase() === 'j') {
|
} else if (e.code === 'Equal' || e.key.toLowerCase() === 'j') {
|
||||||
var params = getRefreshParams();
|
var params = getRefreshParams();
|
||||||
if (params) window.location.href = buildUrl(params.hash, params.order, params.delay + 1);
|
if (params) window.location.href = '/' + params.order + '/' + (params.delay + 1) + '/' + params.hash;
|
||||||
} else if (e.code === 'Minus' || e.key.toLowerCase() === 'k') {
|
} else if (e.code === 'Minus' || e.key.toLowerCase() === 'k') {
|
||||||
var params = getRefreshParams();
|
var params = getRefreshParams();
|
||||||
if (params && params.delay > 1) window.location.href = buildUrl(params.hash, params.order, params.delay - 1);
|
if (params && params.delay > 1) window.location.href = '/' + params.order + '/' + (params.delay - 1) + '/' + params.hash;
|
||||||
} else if (e.key.toLowerCase() === 'o') {
|
} else if (e.key.toLowerCase() === 'o') {
|
||||||
var params = getRefreshParams();
|
var params = getRefreshParams();
|
||||||
if (params) {
|
if (params) {
|
||||||
var newOrder = params.order === 'next' ? 'random' : 'next';
|
var newOrder = params.order === 'next' ? 'random' : 'next';
|
||||||
window.location.href = buildUrl(params.hash, newOrder, params.delay);
|
window.location.href = '/' + newOrder + '/' + params.delay + '/' + params.hash;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
128
main.py
128
main.py
@ -1,3 +1,4 @@
|
|||||||
|
from typing import Annotated
|
||||||
import argparse
|
import argparse
|
||||||
import hashlib
|
import hashlib
|
||||||
import mimetypes
|
import mimetypes
|
||||||
@ -6,11 +7,13 @@ import random
|
|||||||
import secrets
|
import secrets
|
||||||
import string
|
import string
|
||||||
import zipfile
|
import zipfile
|
||||||
|
from base64 import b64encode
|
||||||
from glob import glob
|
from glob import glob
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from fastapi import FastAPI, HTTPException
|
from fastapi import FastAPI, HTTPException, Depends
|
||||||
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
||||||
from fastapi.responses import (
|
from fastapi.responses import (
|
||||||
FileResponse,
|
FileResponse,
|
||||||
HTMLResponse,
|
HTMLResponse,
|
||||||
@ -22,6 +25,28 @@ app = FastAPI()
|
|||||||
file_mapping = {}
|
file_mapping = {}
|
||||||
indexers = []
|
indexers = []
|
||||||
|
|
||||||
|
security = HTTPBasic()
|
||||||
|
expected_password: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
async def get_current_username(
|
||||||
|
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
|
||||||
|
) -> str:
|
||||||
|
"""Verify Basic Authentication credentials"""
|
||||||
|
if expected_password is not None and credentials.password != expected_password:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=401,
|
||||||
|
detail="Incorrect password",
|
||||||
|
headers={"WWW-Authenticate": "Basic"},
|
||||||
|
)
|
||||||
|
return credentials.username
|
||||||
|
|
||||||
|
|
||||||
|
def set_auth_password(password: str | None):
|
||||||
|
"""Set the expected password for authentication"""
|
||||||
|
global expected_password
|
||||||
|
expected_password = password
|
||||||
|
|
||||||
|
|
||||||
class FileIndexer:
|
class FileIndexer:
|
||||||
def __init__(self, path: str, salt: str | None = None):
|
def __init__(self, path: str, salt: str | None = None):
|
||||||
@ -133,7 +158,7 @@ async def health_check():
|
|||||||
|
|
||||||
|
|
||||||
@app.get("/api/{file_hash}/data")
|
@app.get("/api/{file_hash}/data")
|
||||||
async def get_file_data(file_hash: str):
|
async def get_file_data(file_hash: str, username: str = Depends(get_current_username)):
|
||||||
"""Serve a specific file by its hash"""
|
"""Serve a specific file by its hash"""
|
||||||
if file_hash not in file_mapping:
|
if file_hash not in file_mapping:
|
||||||
raise HTTPException(status_code=404, detail="File not found")
|
raise HTTPException(status_code=404, detail="File not found")
|
||||||
@ -156,23 +181,22 @@ async def get_file_data(file_hash: str):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _build_url(
|
|
||||||
file_hash: str, order: str | None = None, delay: int | None = None
|
|
||||||
) -> str:
|
|
||||||
"""Build a URL with optional order/delay query parameters."""
|
|
||||||
base = "/{hash}".format(hash=file_hash)
|
|
||||||
if order is not None and delay is not None:
|
|
||||||
return "{base}?order={order}&delay={delay}".format(
|
|
||||||
base=base, order=order, delay=delay
|
|
||||||
)
|
|
||||||
return base
|
|
||||||
|
|
||||||
|
|
||||||
@app.get("/")
|
@app.get("/")
|
||||||
async def root(order: str | None = None, delay: int | None = None):
|
async def root(username: str = Depends(get_current_username)):
|
||||||
"""Redirect to a random file hash"""
|
"""Redirect to a random file hash"""
|
||||||
random_hash = _get_random_hash()
|
random_hash = _get_random_hash()
|
||||||
return RedirectResponse(url=_build_url(random_hash, order, delay))
|
return RedirectResponse(url="/{hash}".format(hash=random_hash))
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/{order}/{delay}")
|
||||||
|
async def order_delay(
|
||||||
|
order: str, delay: int, username: str = Depends(get_current_username)
|
||||||
|
):
|
||||||
|
"""Redirect to random file with order and delay"""
|
||||||
|
random_hash = _get_random_hash()
|
||||||
|
return RedirectResponse(
|
||||||
|
url="/{order}/{delay}/{hash}".format(order=order, delay=delay, hash=random_hash)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _get_navigation_data(file_hash: str, order: str | None = None):
|
def _get_navigation_data(file_hash: str, order: str | None = None):
|
||||||
@ -223,16 +247,16 @@ def _render_page(
|
|||||||
|
|
||||||
# Generate navigation URLs based on current mode
|
# Generate navigation URLs based on current mode
|
||||||
if current_order is not None:
|
if current_order is not None:
|
||||||
# Timer mode: preserve current order and delay via query params
|
# Timer mode: preserve current order and delay
|
||||||
next_url = _build_url(
|
next_url = "/{order}/{delay}/{next_hash}".format(
|
||||||
navigation_data["next_hash"],
|
|
||||||
order=current_order,
|
order=current_order,
|
||||||
delay=current_delay,
|
delay=current_delay,
|
||||||
|
next_hash=navigation_data["next_hash"],
|
||||||
)
|
)
|
||||||
prev_url = _build_url(
|
prev_url = "/{order}/{delay}/{prev_hash}".format(
|
||||||
navigation_data["prev_hash"],
|
|
||||||
order=current_order,
|
order=current_order,
|
||||||
delay=current_delay,
|
delay=current_delay,
|
||||||
|
prev_hash=navigation_data["prev_hash"],
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Browse mode: generate browse mode URLs
|
# Browse mode: generate browse mode URLs
|
||||||
@ -253,38 +277,49 @@ def _render_page(
|
|||||||
|
|
||||||
|
|
||||||
@app.get("/{file_hash}")
|
@app.get("/{file_hash}")
|
||||||
async def hash_page(file_hash: str, order: str | None = None, delay: int | None = None):
|
async def hash_page(file_hash: str, username: str = Depends(get_current_username)):
|
||||||
"""Serve a page for a specific file hash with optional auto-refresh navigation.
|
"""Serve a page for a specific file hash with navigation"""
|
||||||
|
|
||||||
Args:
|
|
||||||
file_hash: The hash identifier for the file.
|
|
||||||
order: Navigation order - 'next' for sequential, 'random' for random.
|
|
||||||
delay: Delay in seconds before auto-navigating to next file.
|
|
||||||
"""
|
|
||||||
if file_hash not in file_mapping:
|
if file_hash not in file_mapping:
|
||||||
raise HTTPException(status_code=404, detail="File not found")
|
raise HTTPException(status_code=404, detail="File not found")
|
||||||
|
|
||||||
if order is not None and order not in ("next", "random"):
|
navigation_data = _get_navigation_data(file_hash, order=None)
|
||||||
|
play_button = '<a href="/next/5/{file_hash}" class="play-btn" title="Play next 5">⏵</a>'.format(
|
||||||
|
file_hash=file_hash
|
||||||
|
)
|
||||||
|
return _render_page(
|
||||||
|
navigation_data, play_button=play_button, current_order=None, current_delay=None
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/{order}/{delay}/{file_hash}")
|
||||||
|
async def hash_page_with_refresh(
|
||||||
|
order: str,
|
||||||
|
delay: int,
|
||||||
|
file_hash: str,
|
||||||
|
username: str = Depends(get_current_username),
|
||||||
|
):
|
||||||
|
"""Serve a page for a specific file hash with auto-refresh navigation"""
|
||||||
|
if file_hash not in file_mapping:
|
||||||
|
raise HTTPException(status_code=404, detail="File not found")
|
||||||
|
|
||||||
|
if order not in ("next", "random"):
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=400, detail="Invalid order. Must be 'next' or 'random'"
|
status_code=400, detail="Invalid order. Must be 'next' or 'random'"
|
||||||
)
|
)
|
||||||
|
|
||||||
navigation_data = _get_navigation_data(file_hash, order=order)
|
navigation_data = _get_navigation_data(file_hash, order=order)
|
||||||
|
|
||||||
if order is not None and delay is not None:
|
refresh_url = "/{order}/{delay}/{next_hash}".format(
|
||||||
# Timer mode: auto-refresh with query params
|
order=order, delay=delay, next_hash=navigation_data["next_hash"]
|
||||||
refresh_url = _build_url(navigation_data["next_hash"], order=order, delay=delay)
|
|
||||||
refresh_meta = (
|
|
||||||
f'<meta http-equiv="refresh" content="{delay};url={refresh_url}">'
|
|
||||||
)
|
)
|
||||||
image_click_url = _build_url(file_hash)
|
|
||||||
|
refresh_meta = f'<meta http-equiv="refresh" content="{delay};url={refresh_url}">'
|
||||||
|
image_click_url = "/{file_hash}".format(file_hash=file_hash)
|
||||||
|
|
||||||
# Create pause button to stop auto-refresh
|
# Create pause button to stop auto-refresh
|
||||||
pause_button = (
|
pause_button = '<a href="/{file_hash}" class="play-btn" title="Pause">⏸</a>'.format(
|
||||||
'<a href="{file_hash}" class="play-btn" title="Pause">⏸</a>'.format(
|
|
||||||
file_hash=file_hash
|
file_hash=file_hash
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
return _render_page(
|
return _render_page(
|
||||||
navigation_data,
|
navigation_data,
|
||||||
@ -294,17 +329,6 @@ async def hash_page(file_hash: str, order: str | None = None, delay: int | None
|
|||||||
current_order=order,
|
current_order=order,
|
||||||
current_delay=delay,
|
current_delay=delay,
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
# Browse mode
|
|
||||||
play_button = '<a href="/{file_hash}?order=next&delay=5" class="play-btn" title="Play next 5">⏵</a>'.format(
|
|
||||||
file_hash=file_hash
|
|
||||||
)
|
|
||||||
return _render_page(
|
|
||||||
navigation_data,
|
|
||||||
play_button=play_button,
|
|
||||||
current_order=None,
|
|
||||||
current_delay=None,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _find_indexer_for_hash(file_hash: str):
|
def _find_indexer_for_hash(file_hash: str):
|
||||||
@ -335,9 +359,13 @@ if __name__ == "__main__":
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--salt", type=str, default=None, help="Salt for hashing file paths"
|
"--salt", type=str, default=None, help="Salt for hashing file paths"
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--password", type=str, default=None, help="Password for Basic Authentication"
|
||||||
|
)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
initialize_server(args)
|
initialize_server(args)
|
||||||
|
set_auth_password(args.password)
|
||||||
|
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
|
||||||
|
|||||||
@ -15,6 +15,7 @@ def _reset_state() -> None:
|
|||||||
"""Reset all global state to defaults."""
|
"""Reset all global state to defaults."""
|
||||||
main.file_mapping.clear()
|
main.file_mapping.clear()
|
||||||
main.indexers.clear()
|
main.indexers.clear()
|
||||||
|
main.expected_password = None
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
@ -82,6 +83,7 @@ def args_directory(sample_files: dict[str, Path], tmp_path: Path) -> argparse.Na
|
|||||||
host="127.0.0.1",
|
host="127.0.0.1",
|
||||||
port=0,
|
port=0,
|
||||||
salt="test-salt",
|
salt="test-salt",
|
||||||
|
password=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -93,32 +95,51 @@ def args_zip(sample_zip: dict[str, Path], tmp_path: Path) -> argparse.Namespace:
|
|||||||
host="127.0.0.1",
|
host="127.0.0.1",
|
||||||
port=0,
|
port=0,
|
||||||
salt="test-salt",
|
salt="test-salt",
|
||||||
|
password=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def initialized_dir(args_directory: argparse.Namespace) -> None:
|
def initialized_dir(args_directory: argparse.Namespace) -> None:
|
||||||
"""Initialize the server with sample directory files."""
|
"""Initialize the server with sample directory files (no auth)."""
|
||||||
main.initialize_server(args_directory)
|
main.initialize_server(args_directory)
|
||||||
|
main.set_auth_password(None)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def initialized_zip(args_zip: argparse.Namespace) -> None:
|
def initialized_zip(args_zip: argparse.Namespace) -> None:
|
||||||
"""Initialize the server with sample zip files."""
|
"""Initialize the server with sample zip files (no auth)."""
|
||||||
main.initialize_server(args_zip)
|
main.initialize_server(args_zip)
|
||||||
|
main.set_auth_password(None)
|
||||||
|
|
||||||
|
|
||||||
|
def _dummy_auth_header() -> str:
|
||||||
|
"""Create a dummy Basic Auth header (any creds work when no password is set)."""
|
||||||
|
import base64
|
||||||
|
|
||||||
|
creds = "test:test"
|
||||||
|
return f"Basic {base64.b64encode(creds.encode()).decode()}"
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
async def client_dir(initialized_dir: None) -> Generator[AsyncClient, None, None]:
|
async def client_dir(initialized_dir: None) -> Generator[AsyncClient, None, None]:
|
||||||
"""Async HTTP client against the app initialized with directory files."""
|
"""Async HTTP client against the app initialized with directory files.
|
||||||
|
|
||||||
|
Sends dummy auth headers since HTTPBasic() always requires them.
|
||||||
|
"""
|
||||||
transport = ASGITransport(app=main.app)
|
transport = ASGITransport(app=main.app)
|
||||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
ac.headers["Authorization"] = _dummy_auth_header()
|
||||||
yield ac
|
yield ac
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
async def client_zip(initialized_zip: None) -> Generator[AsyncClient, None, None]:
|
async def client_zip(initialized_zip: None) -> Generator[AsyncClient, None, None]:
|
||||||
"""Async HTTP client against the app initialized with zip files."""
|
"""Async HTTP client against the app initialized with zip files.
|
||||||
|
|
||||||
|
Sends dummy auth headers since HTTPBasic() always requires them.
|
||||||
|
"""
|
||||||
transport = ASGITransport(app=main.app)
|
transport = ASGITransport(app=main.app)
|
||||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
ac.headers["Authorization"] = _dummy_auth_header()
|
||||||
yield ac
|
yield ac
|
||||||
|
|||||||
189
tests/test_auth.py
Normal file
189
tests/test_auth.py
Normal file
@ -0,0 +1,189 @@
|
|||||||
|
"""Tests for authentication."""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import base64
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from httpx import ASGITransport, AsyncClient
|
||||||
|
|
||||||
|
import main
|
||||||
|
|
||||||
|
|
||||||
|
def _basic_auth_header(username: str, password: str) -> str:
|
||||||
|
"""Create a Basic Auth header value."""
|
||||||
|
creds = f"{username}:{password}"
|
||||||
|
return f"Basic {base64.b64encode(creds.encode()).decode()}"
|
||||||
|
|
||||||
|
|
||||||
|
def _make_args(tmp_path: Path) -> argparse.Namespace:
|
||||||
|
"""Create an argparse.Namespace for the given path."""
|
||||||
|
return argparse.Namespace(
|
||||||
|
source=str(tmp_path),
|
||||||
|
host="127.0.0.1",
|
||||||
|
port=0,
|
||||||
|
salt="auth-salt",
|
||||||
|
password=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def auth_setup(tmp_path: Path) -> tuple[str, str]:
|
||||||
|
"""Set up server with sample files and password protection.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (username, password).
|
||||||
|
"""
|
||||||
|
(tmp_path / "test.txt").write_text("hello")
|
||||||
|
main.initialize_server(_make_args(tmp_path))
|
||||||
|
main.set_auth_password("secret123")
|
||||||
|
return ("user", "secret123")
|
||||||
|
|
||||||
|
|
||||||
|
class TestNoPasswordSet:
|
||||||
|
"""Tests when no password is configured.
|
||||||
|
|
||||||
|
Note: HTTPBasic() always requires an Authorization header.
|
||||||
|
When expected_password is None, any credentials pass.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def test_health_always_open(self, client_dir: AsyncClient) -> None:
|
||||||
|
"""Health check has no auth dependency — always accessible."""
|
||||||
|
response = await client_dir.get("/api/health")
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
async def test_protected_endpoint_requires_auth_header(
|
||||||
|
self, initialized_dir: None
|
||||||
|
) -> None:
|
||||||
|
"""Even with no password, HTTPBasic requires an auth header."""
|
||||||
|
file_hash = list(main.file_mapping.keys())[0]
|
||||||
|
# No auth header → 401 from HTTPBasic
|
||||||
|
transport = ASGITransport(app=main.app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
response = await ac.get(f"/api/{file_hash}/data")
|
||||||
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
async def test_any_credentials_pass_when_no_password(
|
||||||
|
self, client_dir: AsyncClient
|
||||||
|
) -> None:
|
||||||
|
"""Any credentials pass when no password is set."""
|
||||||
|
file_hash = list(main.file_mapping.keys())[0]
|
||||||
|
transport = ASGITransport(app=main.app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
ac.headers["Authorization"] = _basic_auth_header("any", "thing")
|
||||||
|
response = await ac.get(f"/api/{file_hash}/data")
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
async def test_root_requires_auth_header(self, initialized_dir: None) -> None:
|
||||||
|
"""Root endpoint requires auth header even with no password."""
|
||||||
|
transport = ASGITransport(app=main.app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
response = await ac.get("/", follow_redirects=False)
|
||||||
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
class TestCorrectPassword:
|
||||||
|
"""Tests with correct password."""
|
||||||
|
|
||||||
|
async def test_health_with_correct_password(
|
||||||
|
self, auth_setup: tuple[str, str]
|
||||||
|
) -> None:
|
||||||
|
"""Health check works (it has no auth, always 200)."""
|
||||||
|
transport = ASGITransport(app=main.app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
response = await ac.get("/api/health")
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
async def test_file_access_with_correct_password(
|
||||||
|
self, auth_setup: tuple[str, str]
|
||||||
|
) -> None:
|
||||||
|
"""File access works with correct password."""
|
||||||
|
username, password = auth_setup
|
||||||
|
transport = ASGITransport(app=main.app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
ac.headers["Authorization"] = _basic_auth_header(username, password)
|
||||||
|
file_hash = list(main.file_mapping.keys())[0]
|
||||||
|
response = await ac.get(f"/api/{file_hash}/data")
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
async def test_root_with_correct_password(
|
||||||
|
self, auth_setup: tuple[str, str]
|
||||||
|
) -> None:
|
||||||
|
"""Root redirect works with correct password."""
|
||||||
|
username, password = auth_setup
|
||||||
|
transport = ASGITransport(app=main.app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
ac.headers["Authorization"] = _basic_auth_header(username, password)
|
||||||
|
response = await ac.get("/", follow_redirects=False)
|
||||||
|
assert response.status_code in (307, 302, 301)
|
||||||
|
|
||||||
|
async def test_hash_page_with_correct_password(
|
||||||
|
self, auth_setup: tuple[str, str]
|
||||||
|
) -> None:
|
||||||
|
"""Hash page works with correct password."""
|
||||||
|
username, password = auth_setup
|
||||||
|
transport = ASGITransport(app=main.app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
ac.headers["Authorization"] = _basic_auth_header(username, password)
|
||||||
|
file_hash = list(main.file_mapping.keys())[0]
|
||||||
|
response = await ac.get(f"/{file_hash}")
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
class TestWrongPassword:
|
||||||
|
"""Tests with incorrect password."""
|
||||||
|
|
||||||
|
async def test_file_access_with_wrong_password(
|
||||||
|
self, auth_setup: tuple[str, str]
|
||||||
|
) -> None:
|
||||||
|
"""File access returns 401 with wrong password."""
|
||||||
|
transport = ASGITransport(app=main.app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
ac.headers["Authorization"] = _basic_auth_header("user", "wrong")
|
||||||
|
file_hash = list(main.file_mapping.keys())[0]
|
||||||
|
response = await ac.get(f"/api/{file_hash}/data")
|
||||||
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
async def test_root_with_wrong_password(self, auth_setup: tuple[str, str]) -> None:
|
||||||
|
"""Root redirect returns 401 with wrong password."""
|
||||||
|
transport = ASGITransport(app=main.app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
ac.headers["Authorization"] = _basic_auth_header("user", "wrong")
|
||||||
|
response = await ac.get("/", follow_redirects=False)
|
||||||
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
async def test_no_auth_header_returns_401(
|
||||||
|
self, auth_setup: tuple[str, str]
|
||||||
|
) -> None:
|
||||||
|
"""Missing auth header returns 401."""
|
||||||
|
transport = ASGITransport(app=main.app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
file_hash = list(main.file_mapping.keys())[0]
|
||||||
|
response = await ac.get(f"/api/{file_hash}/data")
|
||||||
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
async def test_includes_www_authenticate_header(
|
||||||
|
self, auth_setup: tuple[str, str]
|
||||||
|
) -> None:
|
||||||
|
"""401 response includes WWW-Authenticate header."""
|
||||||
|
transport = ASGITransport(app=main.app)
|
||||||
|
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||||
|
file_hash = list(main.file_mapping.keys())[0]
|
||||||
|
response = await ac.get(f"/api/{file_hash}/data")
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert "www-authenticate" in response.headers
|
||||||
|
|
||||||
|
|
||||||
|
class TestSetAuthPassword:
|
||||||
|
"""Tests for set_auth_password function."""
|
||||||
|
|
||||||
|
def test_sets_password(self) -> None:
|
||||||
|
"""Password is set correctly."""
|
||||||
|
main.set_auth_password("newpass")
|
||||||
|
assert main.expected_password == "newpass"
|
||||||
|
|
||||||
|
def test_clears_password_with_none(self) -> None:
|
||||||
|
"""Passing None clears the password."""
|
||||||
|
main.set_auth_password("something")
|
||||||
|
main.set_auth_password(None)
|
||||||
|
assert main.expected_password is None
|
||||||
@ -101,28 +101,20 @@ class TestRootRedirect:
|
|||||||
assert hash_from_url in main.file_mapping
|
assert hash_from_url in main.file_mapping
|
||||||
|
|
||||||
|
|
||||||
class TestRootRedirectWithOrderDelay:
|
class TestOrderDelayRoute:
|
||||||
"""Tests for GET / with order/delay query parameters."""
|
"""Tests for GET /{order}/{delay}."""
|
||||||
|
|
||||||
async def test_next_order_redirects(self, client_dir: AsyncClient) -> None:
|
async def test_next_order_redirects(self, client_dir: AsyncClient) -> None:
|
||||||
"""/?order=next&delay=5 redirects to /{hash}?order=next&delay=5."""
|
"""/next/5 redirects to /next/5/{hash}."""
|
||||||
response = await client_dir.get(
|
response = await client_dir.get("/next/5", follow_redirects=False)
|
||||||
"/", params={"order": "next", "delay": 5}, follow_redirects=False
|
|
||||||
)
|
|
||||||
assert response.status_code in (307, 302, 301)
|
assert response.status_code in (307, 302, 301)
|
||||||
location = response.headers["location"]
|
assert "/next/5/" in response.headers["location"]
|
||||||
assert "order=next" in location
|
|
||||||
assert "delay=5" in location
|
|
||||||
|
|
||||||
async def test_random_order_redirects(self, client_dir: AsyncClient) -> None:
|
async def test_random_order_redirects(self, client_dir: AsyncClient) -> None:
|
||||||
"""/?order=random&delay=3 redirects to /{hash}?order=random&delay=3."""
|
"""/random/3 redirects to /random/3/{hash}."""
|
||||||
response = await client_dir.get(
|
response = await client_dir.get("/random/3", follow_redirects=False)
|
||||||
"/", params={"order": "random", "delay": 3}, follow_redirects=False
|
|
||||||
)
|
|
||||||
assert response.status_code in (307, 302, 301)
|
assert response.status_code in (307, 302, 301)
|
||||||
location = response.headers["location"]
|
assert "/random/3/" in response.headers["location"]
|
||||||
assert "order=random" in location
|
|
||||||
assert "delay=3" in location
|
|
||||||
|
|
||||||
|
|
||||||
class TestHashPage:
|
class TestHashPage:
|
||||||
@ -150,12 +142,6 @@ class TestHashPage:
|
|||||||
assert 'class="chevron left"' in response.text
|
assert 'class="chevron left"' in response.text
|
||||||
assert 'class="chevron right"' in response.text
|
assert 'class="chevron right"' in response.text
|
||||||
|
|
||||||
async def test_page_contains_play_button(self, client_dir: AsyncClient) -> None:
|
|
||||||
"""HTML page contains play button with query param URL."""
|
|
||||||
file_hash = list(main.file_mapping.keys())[0]
|
|
||||||
response = await client_dir.get(f"/{file_hash}")
|
|
||||||
assert "?order=next&delay=5" in response.text
|
|
||||||
|
|
||||||
async def test_returns_404_for_invalid_hash(self, client_dir: AsyncClient) -> None:
|
async def test_returns_404_for_invalid_hash(self, client_dir: AsyncClient) -> None:
|
||||||
"""Returns 404 for a hash that doesn't exist."""
|
"""Returns 404 for a hash that doesn't exist."""
|
||||||
response = await client_dir.get("/nonexistent-hash")
|
response = await client_dir.get("/nonexistent-hash")
|
||||||
@ -163,48 +149,37 @@ class TestHashPage:
|
|||||||
|
|
||||||
|
|
||||||
class TestHashPageWithRefresh:
|
class TestHashPageWithRefresh:
|
||||||
"""Tests for GET /{file_hash}?order=...&delay=... (auto-refresh mode)."""
|
"""Tests for GET /{order}/{delay}/{file_hash}."""
|
||||||
|
|
||||||
async def test_next_order_returns_html(self, client_dir: AsyncClient) -> None:
|
async def test_next_order_returns_html(self, client_dir: AsyncClient) -> None:
|
||||||
"""Next order returns HTML with refresh meta tag."""
|
"""Next order returns HTML with refresh meta tag."""
|
||||||
file_hash = list(main.file_mapping.keys())[0]
|
file_hash = list(main.file_mapping.keys())[0]
|
||||||
response = await client_dir.get(
|
response = await client_dir.get(f"/next/5/{file_hash}")
|
||||||
f"/{file_hash}", params={"order": "next", "delay": 5}
|
|
||||||
)
|
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert 'http-equiv="refresh"' in response.text
|
assert 'http-equiv="refresh"' in response.text
|
||||||
|
|
||||||
async def test_random_order_returns_html(self, client_dir: AsyncClient) -> None:
|
async def test_random_order_returns_html(self, client_dir: AsyncClient) -> None:
|
||||||
"""Random order returns HTML with refresh meta tag."""
|
"""Random order returns HTML with refresh meta tag."""
|
||||||
file_hash = list(main.file_mapping.keys())[0]
|
file_hash = list(main.file_mapping.keys())[0]
|
||||||
response = await client_dir.get(
|
response = await client_dir.get(f"/random/3/{file_hash}")
|
||||||
f"/{file_hash}", params={"order": "random", "delay": 3}
|
|
||||||
)
|
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert 'http-equiv="refresh"' in response.text
|
assert 'http-equiv="refresh"' in response.text
|
||||||
|
|
||||||
async def test_invalid_order_returns_400(self, client_dir: AsyncClient) -> None:
|
async def test_invalid_order_returns_400(self, client_dir: AsyncClient) -> None:
|
||||||
"""Invalid order parameter returns 400."""
|
"""Invalid order parameter returns 400."""
|
||||||
file_hash = list(main.file_mapping.keys())[0]
|
file_hash = list(main.file_mapping.keys())[0]
|
||||||
response = await client_dir.get(
|
response = await client_dir.get(f"/shuffle/5/{file_hash}")
|
||||||
f"/{file_hash}", params={"order": "shuffle", "delay": 5}
|
|
||||||
)
|
|
||||||
assert response.status_code == 400
|
assert response.status_code == 400
|
||||||
|
|
||||||
async def test_returns_404_for_invalid_hash(self, client_dir: AsyncClient) -> None:
|
async def test_returns_404_for_invalid_hash(self, client_dir: AsyncClient) -> None:
|
||||||
"""Returns 404 for a hash that doesn't exist."""
|
"""Returns 404 for a hash that doesn't exist."""
|
||||||
response = await client_dir.get(
|
response = await client_dir.get("/next/5/nonexistent-hash")
|
||||||
"/nonexistent-hash", params={"order": "next", "delay": 5}
|
|
||||||
)
|
|
||||||
assert response.status_code == 404
|
assert response.status_code == 404
|
||||||
|
|
||||||
async def test_refresh_url_points_to_next_file(
|
async def test_refresh_url_points_to_next_file(
|
||||||
self, client_dir: AsyncClient
|
self, client_dir: AsyncClient
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Refresh meta tag points to the next file in sequence with query params."""
|
"""Refresh meta tag points to the next file in sequence."""
|
||||||
file_hash = list(main.file_mapping.keys())[0]
|
file_hash = list(main.file_mapping.keys())[0]
|
||||||
response = await client_dir.get(
|
response = await client_dir.get(f"/next/5/{file_hash}")
|
||||||
f"/{file_hash}", params={"order": "next", "delay": 5}
|
assert "url=/next/5/" in response.text
|
||||||
)
|
|
||||||
assert "order=next" in response.text
|
|
||||||
assert "delay=5" in response.text
|
|
||||||
|
|||||||
@ -16,6 +16,7 @@ def seeded_indexers(sample_files: dict[str, Path], tmp_path: Path) -> None:
|
|||||||
host="127.0.0.1",
|
host="127.0.0.1",
|
||||||
port=0,
|
port=0,
|
||||||
salt="nav-test-salt",
|
salt="nav-test-salt",
|
||||||
|
password=None,
|
||||||
)
|
)
|
||||||
main.initialize_server(args)
|
main.initialize_server(args)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user