Compare commits
6 Commits
1c6935307c
...
022d3c69f0
| Author | SHA1 | Date | |
|---|---|---|---|
| 022d3c69f0 | |||
| 3bb18bf7f8 | |||
| a739e7307d | |||
| f856c92394 | |||
| 22bf57f896 | |||
| 8a8fff37e0 |
4
.gitignore
vendored
4
.gitignore
vendored
@ -10,3 +10,7 @@ wheels/
|
||||
.venv
|
||||
|
||||
.nanocoder
|
||||
|
||||
# Task lists
|
||||
TODO.md
|
||||
TASKS.md
|
||||
|
||||
BIN
2024_seasonal_wallpapers_pack.zip
Normal file
BIN
2024_seasonal_wallpapers_pack.zip
Normal file
Binary file not shown.
@ -62,20 +62,23 @@
|
||||
</div>
|
||||
<script>
|
||||
function getRefreshParams() {
|
||||
var path = window.location.pathname.slice(1);
|
||||
var parts = path.split('/');
|
||||
if (parts.length < 3) return null;
|
||||
var order = parts[0];
|
||||
var params = new URLSearchParams(window.location.search);
|
||||
var order = params.get('order');
|
||||
if (order !== 'next' && order !== 'random') return null;
|
||||
var delay = parseInt(parts[1]);
|
||||
var delay = parseInt(params.get('delay'));
|
||||
if (isNaN(delay)) return null;
|
||||
var hash = window.location.pathname.slice(1);
|
||||
return {
|
||||
delay: delay,
|
||||
order: order,
|
||||
hash: parts[2]
|
||||
hash: hash
|
||||
};
|
||||
}
|
||||
|
||||
function buildUrl(hash, order, delay) {
|
||||
return '/' + hash + '?order=' + order + '&delay=' + delay;
|
||||
}
|
||||
|
||||
document.addEventListener('keydown', function(e) {
|
||||
e.preventDefault();
|
||||
if (e.code === 'Space') {
|
||||
@ -87,15 +90,15 @@
|
||||
document.getElementById('next-btn').click();
|
||||
} else if (e.code === 'Equal' || e.key.toLowerCase() === 'j') {
|
||||
var params = getRefreshParams();
|
||||
if (params) window.location.href = '/' + params.order + '/' + (params.delay + 1) + '/' + params.hash;
|
||||
if (params) window.location.href = buildUrl(params.hash, params.order, params.delay + 1);
|
||||
} else if (e.code === 'Minus' || e.key.toLowerCase() === 'k') {
|
||||
var params = getRefreshParams();
|
||||
if (params && params.delay > 1) window.location.href = '/' + params.order + '/' + (params.delay - 1) + '/' + params.hash;
|
||||
if (params && params.delay > 1) window.location.href = buildUrl(params.hash, params.order, params.delay - 1);
|
||||
} else if (e.key.toLowerCase() === 'o') {
|
||||
var params = getRefreshParams();
|
||||
if (params) {
|
||||
var newOrder = params.order === 'next' ? 'random' : 'next';
|
||||
window.location.href = '/' + newOrder + '/' + params.delay + '/' + params.hash;
|
||||
window.location.href = buildUrl(params.hash, newOrder, params.delay);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
128
main.py
128
main.py
@ -1,4 +1,3 @@
|
||||
from typing import Annotated
|
||||
import argparse
|
||||
import hashlib
|
||||
import mimetypes
|
||||
@ -7,13 +6,11 @@ import random
|
||||
import secrets
|
||||
import string
|
||||
import zipfile
|
||||
from base64 import b64encode
|
||||
from glob import glob
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import FastAPI, HTTPException, Depends
|
||||
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.responses import (
|
||||
FileResponse,
|
||||
HTMLResponse,
|
||||
@ -25,28 +22,6 @@ app = FastAPI()
|
||||
file_mapping = {}
|
||||
indexers = []
|
||||
|
||||
security = HTTPBasic()
|
||||
expected_password: str | None = None
|
||||
|
||||
|
||||
async def get_current_username(
|
||||
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
|
||||
) -> str:
|
||||
"""Verify Basic Authentication credentials"""
|
||||
if expected_password is not None and credentials.password != expected_password:
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="Incorrect password",
|
||||
headers={"WWW-Authenticate": "Basic"},
|
||||
)
|
||||
return credentials.username
|
||||
|
||||
|
||||
def set_auth_password(password: str | None):
|
||||
"""Set the expected password for authentication"""
|
||||
global expected_password
|
||||
expected_password = password
|
||||
|
||||
|
||||
class FileIndexer:
|
||||
def __init__(self, path: str, salt: str | None = None):
|
||||
@ -158,7 +133,7 @@ async def health_check():
|
||||
|
||||
|
||||
@app.get("/api/{file_hash}/data")
|
||||
async def get_file_data(file_hash: str, username: str = Depends(get_current_username)):
|
||||
async def get_file_data(file_hash: str):
|
||||
"""Serve a specific file by its hash"""
|
||||
if file_hash not in file_mapping:
|
||||
raise HTTPException(status_code=404, detail="File not found")
|
||||
@ -181,22 +156,23 @@ async def get_file_data(file_hash: str, username: str = Depends(get_current_user
|
||||
)
|
||||
|
||||
|
||||
def _build_url(
|
||||
file_hash: str, order: str | None = None, delay: int | None = None
|
||||
) -> str:
|
||||
"""Build a URL with optional order/delay query parameters."""
|
||||
base = "/{hash}".format(hash=file_hash)
|
||||
if order is not None and delay is not None:
|
||||
return "{base}?order={order}&delay={delay}".format(
|
||||
base=base, order=order, delay=delay
|
||||
)
|
||||
return base
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def root(username: str = Depends(get_current_username)):
|
||||
async def root(order: str | None = None, delay: int | None = None):
|
||||
"""Redirect to a random file hash"""
|
||||
random_hash = _get_random_hash()
|
||||
return RedirectResponse(url="/{hash}".format(hash=random_hash))
|
||||
|
||||
|
||||
@app.get("/{order}/{delay}")
|
||||
async def order_delay(
|
||||
order: str, delay: int, username: str = Depends(get_current_username)
|
||||
):
|
||||
"""Redirect to random file with order and delay"""
|
||||
random_hash = _get_random_hash()
|
||||
return RedirectResponse(
|
||||
url="/{order}/{delay}/{hash}".format(order=order, delay=delay, hash=random_hash)
|
||||
)
|
||||
return RedirectResponse(url=_build_url(random_hash, order, delay))
|
||||
|
||||
|
||||
def _get_navigation_data(file_hash: str, order: str | None = None):
|
||||
@ -247,16 +223,16 @@ def _render_page(
|
||||
|
||||
# Generate navigation URLs based on current mode
|
||||
if current_order is not None:
|
||||
# Timer mode: preserve current order and delay
|
||||
next_url = "/{order}/{delay}/{next_hash}".format(
|
||||
# Timer mode: preserve current order and delay via query params
|
||||
next_url = _build_url(
|
||||
navigation_data["next_hash"],
|
||||
order=current_order,
|
||||
delay=current_delay,
|
||||
next_hash=navigation_data["next_hash"],
|
||||
)
|
||||
prev_url = "/{order}/{delay}/{prev_hash}".format(
|
||||
prev_url = _build_url(
|
||||
navigation_data["prev_hash"],
|
||||
order=current_order,
|
||||
delay=current_delay,
|
||||
prev_hash=navigation_data["prev_hash"],
|
||||
)
|
||||
else:
|
||||
# Browse mode: generate browse mode URLs
|
||||
@ -277,49 +253,38 @@ def _render_page(
|
||||
|
||||
|
||||
@app.get("/{file_hash}")
|
||||
async def hash_page(file_hash: str, username: str = Depends(get_current_username)):
|
||||
"""Serve a page for a specific file hash with navigation"""
|
||||
async def hash_page(file_hash: str, order: str | None = None, delay: int | None = None):
|
||||
"""Serve a page for a specific file hash with optional auto-refresh navigation.
|
||||
|
||||
Args:
|
||||
file_hash: The hash identifier for the file.
|
||||
order: Navigation order - 'next' for sequential, 'random' for random.
|
||||
delay: Delay in seconds before auto-navigating to next file.
|
||||
"""
|
||||
if file_hash not in file_mapping:
|
||||
raise HTTPException(status_code=404, detail="File not found")
|
||||
|
||||
navigation_data = _get_navigation_data(file_hash, order=None)
|
||||
play_button = '<a href="/next/5/{file_hash}" class="play-btn" title="Play next 5">⏵</a>'.format(
|
||||
file_hash=file_hash
|
||||
)
|
||||
return _render_page(
|
||||
navigation_data, play_button=play_button, current_order=None, current_delay=None
|
||||
)
|
||||
|
||||
|
||||
@app.get("/{order}/{delay}/{file_hash}")
|
||||
async def hash_page_with_refresh(
|
||||
order: str,
|
||||
delay: int,
|
||||
file_hash: str,
|
||||
username: str = Depends(get_current_username),
|
||||
):
|
||||
"""Serve a page for a specific file hash with auto-refresh navigation"""
|
||||
if file_hash not in file_mapping:
|
||||
raise HTTPException(status_code=404, detail="File not found")
|
||||
|
||||
if order not in ("next", "random"):
|
||||
if order is not None and order not in ("next", "random"):
|
||||
raise HTTPException(
|
||||
status_code=400, detail="Invalid order. Must be 'next' or 'random'"
|
||||
)
|
||||
|
||||
navigation_data = _get_navigation_data(file_hash, order=order)
|
||||
|
||||
refresh_url = "/{order}/{delay}/{next_hash}".format(
|
||||
order=order, delay=delay, next_hash=navigation_data["next_hash"]
|
||||
if order is not None and delay is not None:
|
||||
# Timer mode: auto-refresh with query params
|
||||
refresh_url = _build_url(navigation_data["next_hash"], order=order, delay=delay)
|
||||
refresh_meta = (
|
||||
f'<meta http-equiv="refresh" content="{delay};url={refresh_url}">'
|
||||
)
|
||||
|
||||
refresh_meta = f'<meta http-equiv="refresh" content="{delay};url={refresh_url}">'
|
||||
image_click_url = "/{file_hash}".format(file_hash=file_hash)
|
||||
image_click_url = _build_url(file_hash)
|
||||
|
||||
# Create pause button to stop auto-refresh
|
||||
pause_button = '<a href="/{file_hash}" class="play-btn" title="Pause">⏸</a>'.format(
|
||||
pause_button = (
|
||||
'<a href="{file_hash}" class="play-btn" title="Pause">⏸</a>'.format(
|
||||
file_hash=file_hash
|
||||
)
|
||||
)
|
||||
|
||||
return _render_page(
|
||||
navigation_data,
|
||||
@ -329,6 +294,17 @@ async def hash_page_with_refresh(
|
||||
current_order=order,
|
||||
current_delay=delay,
|
||||
)
|
||||
else:
|
||||
# Browse mode
|
||||
play_button = '<a href="/{file_hash}?order=next&delay=5" class="play-btn" title="Play next 5">⏵</a>'.format(
|
||||
file_hash=file_hash
|
||||
)
|
||||
return _render_page(
|
||||
navigation_data,
|
||||
play_button=play_button,
|
||||
current_order=None,
|
||||
current_delay=None,
|
||||
)
|
||||
|
||||
|
||||
def _find_indexer_for_hash(file_hash: str):
|
||||
@ -359,13 +335,9 @@ if __name__ == "__main__":
|
||||
parser.add_argument(
|
||||
"--salt", type=str, default=None, help="Salt for hashing file paths"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--password", type=str, default=None, help="Password for Basic Authentication"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
initialize_server(args)
|
||||
set_auth_password(args.password)
|
||||
|
||||
import uvicorn
|
||||
|
||||
|
||||
@ -15,7 +15,6 @@ def _reset_state() -> None:
|
||||
"""Reset all global state to defaults."""
|
||||
main.file_mapping.clear()
|
||||
main.indexers.clear()
|
||||
main.expected_password = None
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@ -83,7 +82,6 @@ def args_directory(sample_files: dict[str, Path], tmp_path: Path) -> argparse.Na
|
||||
host="127.0.0.1",
|
||||
port=0,
|
||||
salt="test-salt",
|
||||
password=None,
|
||||
)
|
||||
|
||||
|
||||
@ -95,51 +93,32 @@ def args_zip(sample_zip: dict[str, Path], tmp_path: Path) -> argparse.Namespace:
|
||||
host="127.0.0.1",
|
||||
port=0,
|
||||
salt="test-salt",
|
||||
password=None,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def initialized_dir(args_directory: argparse.Namespace) -> None:
|
||||
"""Initialize the server with sample directory files (no auth)."""
|
||||
"""Initialize the server with sample directory files."""
|
||||
main.initialize_server(args_directory)
|
||||
main.set_auth_password(None)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def initialized_zip(args_zip: argparse.Namespace) -> None:
|
||||
"""Initialize the server with sample zip files (no auth)."""
|
||||
"""Initialize the server with sample zip files."""
|
||||
main.initialize_server(args_zip)
|
||||
main.set_auth_password(None)
|
||||
|
||||
|
||||
def _dummy_auth_header() -> str:
|
||||
"""Create a dummy Basic Auth header (any creds work when no password is set)."""
|
||||
import base64
|
||||
|
||||
creds = "test:test"
|
||||
return f"Basic {base64.b64encode(creds.encode()).decode()}"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def client_dir(initialized_dir: None) -> Generator[AsyncClient, None, None]:
|
||||
"""Async HTTP client against the app initialized with directory files.
|
||||
|
||||
Sends dummy auth headers since HTTPBasic() always requires them.
|
||||
"""
|
||||
"""Async HTTP client against the app initialized with directory files."""
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
ac.headers["Authorization"] = _dummy_auth_header()
|
||||
yield ac
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def client_zip(initialized_zip: None) -> Generator[AsyncClient, None, None]:
|
||||
"""Async HTTP client against the app initialized with zip files.
|
||||
|
||||
Sends dummy auth headers since HTTPBasic() always requires them.
|
||||
"""
|
||||
"""Async HTTP client against the app initialized with zip files."""
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
ac.headers["Authorization"] = _dummy_auth_header()
|
||||
yield ac
|
||||
|
||||
@ -1,189 +0,0 @@
|
||||
"""Tests for authentication."""
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
||||
import main
|
||||
|
||||
|
||||
def _basic_auth_header(username: str, password: str) -> str:
|
||||
"""Create a Basic Auth header value."""
|
||||
creds = f"{username}:{password}"
|
||||
return f"Basic {base64.b64encode(creds.encode()).decode()}"
|
||||
|
||||
|
||||
def _make_args(tmp_path: Path) -> argparse.Namespace:
|
||||
"""Create an argparse.Namespace for the given path."""
|
||||
return argparse.Namespace(
|
||||
source=str(tmp_path),
|
||||
host="127.0.0.1",
|
||||
port=0,
|
||||
salt="auth-salt",
|
||||
password=None,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def auth_setup(tmp_path: Path) -> tuple[str, str]:
|
||||
"""Set up server with sample files and password protection.
|
||||
|
||||
Returns:
|
||||
Tuple of (username, password).
|
||||
"""
|
||||
(tmp_path / "test.txt").write_text("hello")
|
||||
main.initialize_server(_make_args(tmp_path))
|
||||
main.set_auth_password("secret123")
|
||||
return ("user", "secret123")
|
||||
|
||||
|
||||
class TestNoPasswordSet:
|
||||
"""Tests when no password is configured.
|
||||
|
||||
Note: HTTPBasic() always requires an Authorization header.
|
||||
When expected_password is None, any credentials pass.
|
||||
"""
|
||||
|
||||
async def test_health_always_open(self, client_dir: AsyncClient) -> None:
|
||||
"""Health check has no auth dependency — always accessible."""
|
||||
response = await client_dir.get("/api/health")
|
||||
assert response.status_code == 200
|
||||
|
||||
async def test_protected_endpoint_requires_auth_header(
|
||||
self, initialized_dir: None
|
||||
) -> None:
|
||||
"""Even with no password, HTTPBasic requires an auth header."""
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
# No auth header → 401 from HTTPBasic
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
response = await ac.get(f"/api/{file_hash}/data")
|
||||
assert response.status_code == 401
|
||||
|
||||
async def test_any_credentials_pass_when_no_password(
|
||||
self, client_dir: AsyncClient
|
||||
) -> None:
|
||||
"""Any credentials pass when no password is set."""
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
ac.headers["Authorization"] = _basic_auth_header("any", "thing")
|
||||
response = await ac.get(f"/api/{file_hash}/data")
|
||||
assert response.status_code == 200
|
||||
|
||||
async def test_root_requires_auth_header(self, initialized_dir: None) -> None:
|
||||
"""Root endpoint requires auth header even with no password."""
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
response = await ac.get("/", follow_redirects=False)
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
class TestCorrectPassword:
|
||||
"""Tests with correct password."""
|
||||
|
||||
async def test_health_with_correct_password(
|
||||
self, auth_setup: tuple[str, str]
|
||||
) -> None:
|
||||
"""Health check works (it has no auth, always 200)."""
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
response = await ac.get("/api/health")
|
||||
assert response.status_code == 200
|
||||
|
||||
async def test_file_access_with_correct_password(
|
||||
self, auth_setup: tuple[str, str]
|
||||
) -> None:
|
||||
"""File access works with correct password."""
|
||||
username, password = auth_setup
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
ac.headers["Authorization"] = _basic_auth_header(username, password)
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
response = await ac.get(f"/api/{file_hash}/data")
|
||||
assert response.status_code == 200
|
||||
|
||||
async def test_root_with_correct_password(
|
||||
self, auth_setup: tuple[str, str]
|
||||
) -> None:
|
||||
"""Root redirect works with correct password."""
|
||||
username, password = auth_setup
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
ac.headers["Authorization"] = _basic_auth_header(username, password)
|
||||
response = await ac.get("/", follow_redirects=False)
|
||||
assert response.status_code in (307, 302, 301)
|
||||
|
||||
async def test_hash_page_with_correct_password(
|
||||
self, auth_setup: tuple[str, str]
|
||||
) -> None:
|
||||
"""Hash page works with correct password."""
|
||||
username, password = auth_setup
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
ac.headers["Authorization"] = _basic_auth_header(username, password)
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
response = await ac.get(f"/{file_hash}")
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
class TestWrongPassword:
|
||||
"""Tests with incorrect password."""
|
||||
|
||||
async def test_file_access_with_wrong_password(
|
||||
self, auth_setup: tuple[str, str]
|
||||
) -> None:
|
||||
"""File access returns 401 with wrong password."""
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
ac.headers["Authorization"] = _basic_auth_header("user", "wrong")
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
response = await ac.get(f"/api/{file_hash}/data")
|
||||
assert response.status_code == 401
|
||||
|
||||
async def test_root_with_wrong_password(self, auth_setup: tuple[str, str]) -> None:
|
||||
"""Root redirect returns 401 with wrong password."""
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
ac.headers["Authorization"] = _basic_auth_header("user", "wrong")
|
||||
response = await ac.get("/", follow_redirects=False)
|
||||
assert response.status_code == 401
|
||||
|
||||
async def test_no_auth_header_returns_401(
|
||||
self, auth_setup: tuple[str, str]
|
||||
) -> None:
|
||||
"""Missing auth header returns 401."""
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
response = await ac.get(f"/api/{file_hash}/data")
|
||||
assert response.status_code == 401
|
||||
|
||||
async def test_includes_www_authenticate_header(
|
||||
self, auth_setup: tuple[str, str]
|
||||
) -> None:
|
||||
"""401 response includes WWW-Authenticate header."""
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
response = await ac.get(f"/api/{file_hash}/data")
|
||||
assert response.status_code == 401
|
||||
assert "www-authenticate" in response.headers
|
||||
|
||||
|
||||
class TestSetAuthPassword:
|
||||
"""Tests for set_auth_password function."""
|
||||
|
||||
def test_sets_password(self) -> None:
|
||||
"""Password is set correctly."""
|
||||
main.set_auth_password("newpass")
|
||||
assert main.expected_password == "newpass"
|
||||
|
||||
def test_clears_password_with_none(self) -> None:
|
||||
"""Passing None clears the password."""
|
||||
main.set_auth_password("something")
|
||||
main.set_auth_password(None)
|
||||
assert main.expected_password is None
|
||||
@ -101,20 +101,28 @@ class TestRootRedirect:
|
||||
assert hash_from_url in main.file_mapping
|
||||
|
||||
|
||||
class TestOrderDelayRoute:
|
||||
"""Tests for GET /{order}/{delay}."""
|
||||
class TestRootRedirectWithOrderDelay:
|
||||
"""Tests for GET / with order/delay query parameters."""
|
||||
|
||||
async def test_next_order_redirects(self, client_dir: AsyncClient) -> None:
|
||||
"""/next/5 redirects to /next/5/{hash}."""
|
||||
response = await client_dir.get("/next/5", follow_redirects=False)
|
||||
"""/?order=next&delay=5 redirects to /{hash}?order=next&delay=5."""
|
||||
response = await client_dir.get(
|
||||
"/", params={"order": "next", "delay": 5}, follow_redirects=False
|
||||
)
|
||||
assert response.status_code in (307, 302, 301)
|
||||
assert "/next/5/" in response.headers["location"]
|
||||
location = response.headers["location"]
|
||||
assert "order=next" in location
|
||||
assert "delay=5" in location
|
||||
|
||||
async def test_random_order_redirects(self, client_dir: AsyncClient) -> None:
|
||||
"""/random/3 redirects to /random/3/{hash}."""
|
||||
response = await client_dir.get("/random/3", follow_redirects=False)
|
||||
"""/?order=random&delay=3 redirects to /{hash}?order=random&delay=3."""
|
||||
response = await client_dir.get(
|
||||
"/", params={"order": "random", "delay": 3}, follow_redirects=False
|
||||
)
|
||||
assert response.status_code in (307, 302, 301)
|
||||
assert "/random/3/" in response.headers["location"]
|
||||
location = response.headers["location"]
|
||||
assert "order=random" in location
|
||||
assert "delay=3" in location
|
||||
|
||||
|
||||
class TestHashPage:
|
||||
@ -142,6 +150,12 @@ class TestHashPage:
|
||||
assert 'class="chevron left"' in response.text
|
||||
assert 'class="chevron right"' in response.text
|
||||
|
||||
async def test_page_contains_play_button(self, client_dir: AsyncClient) -> None:
|
||||
"""HTML page contains play button with query param URL."""
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
response = await client_dir.get(f"/{file_hash}")
|
||||
assert "?order=next&delay=5" in response.text
|
||||
|
||||
async def test_returns_404_for_invalid_hash(self, client_dir: AsyncClient) -> None:
|
||||
"""Returns 404 for a hash that doesn't exist."""
|
||||
response = await client_dir.get("/nonexistent-hash")
|
||||
@ -149,37 +163,48 @@ class TestHashPage:
|
||||
|
||||
|
||||
class TestHashPageWithRefresh:
|
||||
"""Tests for GET /{order}/{delay}/{file_hash}."""
|
||||
"""Tests for GET /{file_hash}?order=...&delay=... (auto-refresh mode)."""
|
||||
|
||||
async def test_next_order_returns_html(self, client_dir: AsyncClient) -> None:
|
||||
"""Next order returns HTML with refresh meta tag."""
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
response = await client_dir.get(f"/next/5/{file_hash}")
|
||||
response = await client_dir.get(
|
||||
f"/{file_hash}", params={"order": "next", "delay": 5}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert 'http-equiv="refresh"' in response.text
|
||||
|
||||
async def test_random_order_returns_html(self, client_dir: AsyncClient) -> None:
|
||||
"""Random order returns HTML with refresh meta tag."""
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
response = await client_dir.get(f"/random/3/{file_hash}")
|
||||
response = await client_dir.get(
|
||||
f"/{file_hash}", params={"order": "random", "delay": 3}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert 'http-equiv="refresh"' in response.text
|
||||
|
||||
async def test_invalid_order_returns_400(self, client_dir: AsyncClient) -> None:
|
||||
"""Invalid order parameter returns 400."""
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
response = await client_dir.get(f"/shuffle/5/{file_hash}")
|
||||
response = await client_dir.get(
|
||||
f"/{file_hash}", params={"order": "shuffle", "delay": 5}
|
||||
)
|
||||
assert response.status_code == 400
|
||||
|
||||
async def test_returns_404_for_invalid_hash(self, client_dir: AsyncClient) -> None:
|
||||
"""Returns 404 for a hash that doesn't exist."""
|
||||
response = await client_dir.get("/next/5/nonexistent-hash")
|
||||
response = await client_dir.get(
|
||||
"/nonexistent-hash", params={"order": "next", "delay": 5}
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
async def test_refresh_url_points_to_next_file(
|
||||
self, client_dir: AsyncClient
|
||||
) -> None:
|
||||
"""Refresh meta tag points to the next file in sequence."""
|
||||
"""Refresh meta tag points to the next file in sequence with query params."""
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
response = await client_dir.get(f"/next/5/{file_hash}")
|
||||
assert "url=/next/5/" in response.text
|
||||
response = await client_dir.get(
|
||||
f"/{file_hash}", params={"order": "next", "delay": 5}
|
||||
)
|
||||
assert "order=next" in response.text
|
||||
assert "delay=5" in response.text
|
||||
|
||||
@ -16,7 +16,6 @@ def seeded_indexers(sample_files: dict[str, Path], tmp_path: Path) -> None:
|
||||
host="127.0.0.1",
|
||||
port=0,
|
||||
salt="nav-test-salt",
|
||||
password=None,
|
||||
)
|
||||
main.initialize_server(args)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user