From 1c6935307ca4055f68d78bd9389ab40f3562d861 Mon Sep 17 00:00:00 2001 From: Timothy Farrell Date: Thu, 23 Apr 2026 22:11:51 -0500 Subject: [PATCH] Add test suite --- pyproject.toml | 8 ++ tests/__init__.py | 0 tests/conftest.py | 145 ++++++++++++++++++++++++++++ tests/test_auth.py | 189 +++++++++++++++++++++++++++++++++++++ tests/test_endpoints.py | 185 ++++++++++++++++++++++++++++++++++++ tests/test_file_indexer.py | 154 ++++++++++++++++++++++++++++++ tests/test_navigation.py | 125 ++++++++++++++++++++++++ tests/test_zip_indexer.py | 121 ++++++++++++++++++++++++ uv.lock | 102 +++++++++++++++++++- 9 files changed, 1028 insertions(+), 1 deletion(-) create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_auth.py create mode 100644 tests/test_endpoints.py create mode 100644 tests/test_file_indexer.py create mode 100644 tests/test_navigation.py create mode 100644 tests/test_zip_indexer.py diff --git a/pyproject.toml b/pyproject.toml index f828af4..4ba98e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,11 @@ dev = [ "ruff>=0.15.5", ] +[tool.pytest.ini_options] +asyncio_mode = "auto" + +testpaths = ["tests"] + [tool.ruff] target-version = "py313" line-length = 100 @@ -28,5 +33,8 @@ quote-style = "double" [dependency-groups] dev = [ + "httpx>=0.28.1", + "pytest>=9.0.3", + "pytest-asyncio>=1.3.0", "ruff>=0.15.5", ] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..0eb6884 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,145 @@ +"""Shared fixtures for the test suite.""" + +import argparse +import zipfile +from pathlib import Path +from typing import Generator + +import pytest +from httpx import ASGITransport, AsyncClient + +import main + + +def _reset_state() -> None: + """Reset all global state to defaults.""" + main.file_mapping.clear() + main.indexers.clear() + main.expected_password = None + + +@pytest.fixture(autouse=True) +def reset_globals() -> Generator[None, None, None]: + """Reset global state before and after each test.""" + _reset_state() + yield + _reset_state() + + +@pytest.fixture +def sample_files(tmp_path: Path) -> dict[str, Path]: + """Create a directory with sample files for testing. + + Returns: + Dict mapping logical names to file paths. + """ + subdir = tmp_path / "subdir" + subdir.mkdir() + + files: dict[str, Path] = {} + files["root_file"] = tmp_path / "root.txt" + files["root_file"].write_text("root content") + + files["sub_file"] = subdir / "nested.txt" + files["sub_file"].write_text("nested content") + + files["binary_file"] = tmp_path / "data.bin" + files["binary_file"].write_bytes(b"\x00\x01\x02\x03") + + files["image_file"] = tmp_path / "photo.jpg" + files["image_file"].write_bytes(b"\xff\xd8\xff\xe0fake_jpeg_data") + + return files + + +@pytest.fixture +def sample_zip(tmp_path: Path) -> dict[str, Path]: + """Create a ZIP file with sample files for testing. + + Returns: + Dict mapping logical names to file paths inside the zip. + """ + zip_path = tmp_path / "test_archive.zip" + inner_files: dict[str, Path] = {} + + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("top.txt", "top level content") + inner_files["top"] = Path("top.txt") + + zf.writestr("folder/deep.txt", "deep content") + inner_files["deep"] = Path("folder/deep.txt") + + zf.writestr("folder/image.png", b"\x89PNG fake png") + inner_files["image"] = Path("folder/image.png") + + return inner_files + + +@pytest.fixture +def args_directory(sample_files: dict[str, Path], tmp_path: Path) -> argparse.Namespace: + """Argparse namespace pointing at the sample directory.""" + return argparse.Namespace( + source=str(tmp_path), + host="127.0.0.1", + port=0, + salt="test-salt", + password=None, + ) + + +@pytest.fixture +def args_zip(sample_zip: dict[str, Path], tmp_path: Path) -> argparse.Namespace: + """Argparse namespace pointing at the sample zip.""" + return argparse.Namespace( + source=str(tmp_path / "test_archive.zip"), + host="127.0.0.1", + port=0, + salt="test-salt", + password=None, + ) + + +@pytest.fixture +def initialized_dir(args_directory: argparse.Namespace) -> None: + """Initialize the server with sample directory files (no auth).""" + main.initialize_server(args_directory) + main.set_auth_password(None) + + +@pytest.fixture +def initialized_zip(args_zip: argparse.Namespace) -> None: + """Initialize the server with sample zip files (no auth).""" + main.initialize_server(args_zip) + main.set_auth_password(None) + + +def _dummy_auth_header() -> str: + """Create a dummy Basic Auth header (any creds work when no password is set).""" + import base64 + + creds = "test:test" + return f"Basic {base64.b64encode(creds.encode()).decode()}" + + +@pytest.fixture +async def client_dir(initialized_dir: None) -> Generator[AsyncClient, None, None]: + """Async HTTP client against the app initialized with directory files. + + Sends dummy auth headers since HTTPBasic() always requires them. + """ + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + ac.headers["Authorization"] = _dummy_auth_header() + yield ac + + +@pytest.fixture +async def client_zip(initialized_zip: None) -> Generator[AsyncClient, None, None]: + """Async HTTP client against the app initialized with zip files. + + Sends dummy auth headers since HTTPBasic() always requires them. + """ + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + ac.headers["Authorization"] = _dummy_auth_header() + yield ac diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..dd5eb62 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,189 @@ +"""Tests for authentication.""" + +import argparse +import base64 +from pathlib import Path + +import pytest +from httpx import ASGITransport, AsyncClient + +import main + + +def _basic_auth_header(username: str, password: str) -> str: + """Create a Basic Auth header value.""" + creds = f"{username}:{password}" + return f"Basic {base64.b64encode(creds.encode()).decode()}" + + +def _make_args(tmp_path: Path) -> argparse.Namespace: + """Create an argparse.Namespace for the given path.""" + return argparse.Namespace( + source=str(tmp_path), + host="127.0.0.1", + port=0, + salt="auth-salt", + password=None, + ) + + +@pytest.fixture +def auth_setup(tmp_path: Path) -> tuple[str, str]: + """Set up server with sample files and password protection. + + Returns: + Tuple of (username, password). + """ + (tmp_path / "test.txt").write_text("hello") + main.initialize_server(_make_args(tmp_path)) + main.set_auth_password("secret123") + return ("user", "secret123") + + +class TestNoPasswordSet: + """Tests when no password is configured. + + Note: HTTPBasic() always requires an Authorization header. + When expected_password is None, any credentials pass. + """ + + async def test_health_always_open(self, client_dir: AsyncClient) -> None: + """Health check has no auth dependency — always accessible.""" + response = await client_dir.get("/api/health") + assert response.status_code == 200 + + async def test_protected_endpoint_requires_auth_header( + self, initialized_dir: None + ) -> None: + """Even with no password, HTTPBasic requires an auth header.""" + file_hash = list(main.file_mapping.keys())[0] + # No auth header → 401 from HTTPBasic + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.get(f"/api/{file_hash}/data") + assert response.status_code == 401 + + async def test_any_credentials_pass_when_no_password( + self, client_dir: AsyncClient + ) -> None: + """Any credentials pass when no password is set.""" + file_hash = list(main.file_mapping.keys())[0] + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + ac.headers["Authorization"] = _basic_auth_header("any", "thing") + response = await ac.get(f"/api/{file_hash}/data") + assert response.status_code == 200 + + async def test_root_requires_auth_header(self, initialized_dir: None) -> None: + """Root endpoint requires auth header even with no password.""" + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.get("/", follow_redirects=False) + assert response.status_code == 401 + + +class TestCorrectPassword: + """Tests with correct password.""" + + async def test_health_with_correct_password( + self, auth_setup: tuple[str, str] + ) -> None: + """Health check works (it has no auth, always 200).""" + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.get("/api/health") + assert response.status_code == 200 + + async def test_file_access_with_correct_password( + self, auth_setup: tuple[str, str] + ) -> None: + """File access works with correct password.""" + username, password = auth_setup + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + ac.headers["Authorization"] = _basic_auth_header(username, password) + file_hash = list(main.file_mapping.keys())[0] + response = await ac.get(f"/api/{file_hash}/data") + assert response.status_code == 200 + + async def test_root_with_correct_password( + self, auth_setup: tuple[str, str] + ) -> None: + """Root redirect works with correct password.""" + username, password = auth_setup + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + ac.headers["Authorization"] = _basic_auth_header(username, password) + response = await ac.get("/", follow_redirects=False) + assert response.status_code in (307, 302, 301) + + async def test_hash_page_with_correct_password( + self, auth_setup: tuple[str, str] + ) -> None: + """Hash page works with correct password.""" + username, password = auth_setup + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + ac.headers["Authorization"] = _basic_auth_header(username, password) + file_hash = list(main.file_mapping.keys())[0] + response = await ac.get(f"/{file_hash}") + assert response.status_code == 200 + + +class TestWrongPassword: + """Tests with incorrect password.""" + + async def test_file_access_with_wrong_password( + self, auth_setup: tuple[str, str] + ) -> None: + """File access returns 401 with wrong password.""" + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + ac.headers["Authorization"] = _basic_auth_header("user", "wrong") + file_hash = list(main.file_mapping.keys())[0] + response = await ac.get(f"/api/{file_hash}/data") + assert response.status_code == 401 + + async def test_root_with_wrong_password(self, auth_setup: tuple[str, str]) -> None: + """Root redirect returns 401 with wrong password.""" + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + ac.headers["Authorization"] = _basic_auth_header("user", "wrong") + response = await ac.get("/", follow_redirects=False) + assert response.status_code == 401 + + async def test_no_auth_header_returns_401( + self, auth_setup: tuple[str, str] + ) -> None: + """Missing auth header returns 401.""" + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + file_hash = list(main.file_mapping.keys())[0] + response = await ac.get(f"/api/{file_hash}/data") + assert response.status_code == 401 + + async def test_includes_www_authenticate_header( + self, auth_setup: tuple[str, str] + ) -> None: + """401 response includes WWW-Authenticate header.""" + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + file_hash = list(main.file_mapping.keys())[0] + response = await ac.get(f"/api/{file_hash}/data") + assert response.status_code == 401 + assert "www-authenticate" in response.headers + + +class TestSetAuthPassword: + """Tests for set_auth_password function.""" + + def test_sets_password(self) -> None: + """Password is set correctly.""" + main.set_auth_password("newpass") + assert main.expected_password == "newpass" + + def test_clears_password_with_none(self) -> None: + """Passing None clears the password.""" + main.set_auth_password("something") + main.set_auth_password(None) + assert main.expected_password is None diff --git a/tests/test_endpoints.py b/tests/test_endpoints.py new file mode 100644 index 0000000..0198043 --- /dev/null +++ b/tests/test_endpoints.py @@ -0,0 +1,185 @@ +"""Tests for FastAPI endpoints.""" + +import pytest +from httpx import AsyncClient + +import main + + +class TestHealthCheck: + """Tests for GET /api/health.""" + + async def test_returns_200(self, client_dir: AsyncClient) -> None: + """Health endpoint returns 200 OK.""" + response = await client_dir.get("/api/health") + assert response.status_code == 200 + + async def test_returns_file_count(self, client_dir: AsyncClient) -> None: + """Health endpoint reports correct file count.""" + response = await client_dir.get("/api/health") + data = response.json() + assert data["status"] == "healthy" + assert data["file_count"] == len(main.file_mapping) + + async def test_returns_zero_when_empty(self) -> None: + """Health endpoint returns 0 when no files are indexed.""" + main.file_mapping.clear() + from httpx import ASGITransport + + transport = ASGITransport(app=main.app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + response = await ac.get("/api/health") + data = response.json() + assert data["file_count"] == 0 + + +class TestGetFileData: + """Tests for GET /api/{file_hash}/data.""" + + async def test_returns_file_content(self, client_dir: AsyncClient) -> None: + """Returns the content of a valid file.""" + file_hash = list(main.file_mapping.keys())[0] + response = await client_dir.get(f"/api/{file_hash}/data") + assert response.status_code == 200 + assert len(response.content) > 0 + + async def test_returns_correct_content_type_for_text( + self, client_dir: AsyncClient + ) -> None: + """Text files get text/plain content type.""" + # Find a .txt file hash + for file_hash, filepath in main.file_mapping.items(): + if filepath.endswith(".txt"): + response = await client_dir.get(f"/api/{file_hash}/data") + assert response.status_code == 200 + assert "text/plain" in response.headers["content-type"] + break + + async def test_returns_correct_content_type_for_image( + self, client_dir: AsyncClient + ) -> None: + """Image files get appropriate content type.""" + for file_hash, filepath in main.file_mapping.items(): + if filepath.endswith(".jpg"): + response = await client_dir.get(f"/api/{file_hash}/data") + assert response.status_code == 200 + assert "image/jpeg" in response.headers["content-type"] + break + + async def test_returns_404_for_invalid_hash(self, client_dir: AsyncClient) -> None: + """Returns 404 for a hash that doesn't exist.""" + response = await client_dir.get("/api/nonexistent-hash/data") + assert response.status_code == 404 + + async def test_has_content_disposition_header( + self, client_dir: AsyncClient + ) -> None: + """Response includes Content-Disposition header.""" + file_hash = list(main.file_mapping.keys())[0] + response = await client_dir.get(f"/api/{file_hash}/data") + assert "content-disposition" in response.headers + assert "inline" in response.headers["content-disposition"] + + async def test_zip_file_content(self, client_zip: AsyncClient) -> None: + """Files from a zip archive are served correctly.""" + file_hash = list(main.file_mapping.keys())[0] + response = await client_zip.get(f"/api/{file_hash}/data") + assert response.status_code == 200 + assert len(response.content) > 0 + + +class TestRootRedirect: + """Tests for GET /.""" + + async def test_redirects_to_random_hash(self, client_dir: AsyncClient) -> None: + """Root redirects (307) to a random file hash page.""" + response = await client_dir.get("/", follow_redirects=False) + assert response.status_code in (307, 302, 301) + location = response.headers["location"] + # Location should be a hash in the mapping + hash_from_url = location.lstrip("/") + assert hash_from_url in main.file_mapping + + +class TestOrderDelayRoute: + """Tests for GET /{order}/{delay}.""" + + async def test_next_order_redirects(self, client_dir: AsyncClient) -> None: + """/next/5 redirects to /next/5/{hash}.""" + response = await client_dir.get("/next/5", follow_redirects=False) + assert response.status_code in (307, 302, 301) + assert "/next/5/" in response.headers["location"] + + async def test_random_order_redirects(self, client_dir: AsyncClient) -> None: + """/random/3 redirects to /random/3/{hash}.""" + response = await client_dir.get("/random/3", follow_redirects=False) + assert response.status_code in (307, 302, 301) + assert "/random/3/" in response.headers["location"] + + +class TestHashPage: + """Tests for GET /{file_hash}.""" + + async def test_returns_html_page(self, client_dir: AsyncClient) -> None: + """Returns an HTML page for a valid file hash.""" + file_hash = list(main.file_mapping.keys())[0] + response = await client_dir.get(f"/{file_hash}") + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + async def test_page_contains_image_url(self, client_dir: AsyncClient) -> None: + """HTML page contains the image URL.""" + file_hash = list(main.file_mapping.keys())[0] + response = await client_dir.get(f"/{file_hash}") + assert f"/api/{file_hash}/data" in response.text + + async def test_page_contains_prev_next_buttons( + self, client_dir: AsyncClient + ) -> None: + """HTML page contains prev and next navigation.""" + file_hash = list(main.file_mapping.keys())[0] + response = await client_dir.get(f"/{file_hash}") + assert 'class="chevron left"' in response.text + assert 'class="chevron right"' in response.text + + async def test_returns_404_for_invalid_hash(self, client_dir: AsyncClient) -> None: + """Returns 404 for a hash that doesn't exist.""" + response = await client_dir.get("/nonexistent-hash") + assert response.status_code == 404 + + +class TestHashPageWithRefresh: + """Tests for GET /{order}/{delay}/{file_hash}.""" + + async def test_next_order_returns_html(self, client_dir: AsyncClient) -> None: + """Next order returns HTML with refresh meta tag.""" + file_hash = list(main.file_mapping.keys())[0] + response = await client_dir.get(f"/next/5/{file_hash}") + assert response.status_code == 200 + assert 'http-equiv="refresh"' in response.text + + async def test_random_order_returns_html(self, client_dir: AsyncClient) -> None: + """Random order returns HTML with refresh meta tag.""" + file_hash = list(main.file_mapping.keys())[0] + response = await client_dir.get(f"/random/3/{file_hash}") + assert response.status_code == 200 + assert 'http-equiv="refresh"' in response.text + + async def test_invalid_order_returns_400(self, client_dir: AsyncClient) -> None: + """Invalid order parameter returns 400.""" + file_hash = list(main.file_mapping.keys())[0] + response = await client_dir.get(f"/shuffle/5/{file_hash}") + assert response.status_code == 400 + + async def test_returns_404_for_invalid_hash(self, client_dir: AsyncClient) -> None: + """Returns 404 for a hash that doesn't exist.""" + response = await client_dir.get("/next/5/nonexistent-hash") + assert response.status_code == 404 + + async def test_refresh_url_points_to_next_file( + self, client_dir: AsyncClient + ) -> None: + """Refresh meta tag points to the next file in sequence.""" + file_hash = list(main.file_mapping.keys())[0] + response = await client_dir.get(f"/next/5/{file_hash}") + assert "url=/next/5/" in response.text diff --git a/tests/test_file_indexer.py b/tests/test_file_indexer.py new file mode 100644 index 0000000..23ae171 --- /dev/null +++ b/tests/test_file_indexer.py @@ -0,0 +1,154 @@ +"""Tests for the FileIndexer class.""" + +import hashlib +from pathlib import Path + +import pytest + +from main import FileIndexer + + +class TestHashPath: + """Tests for FileIndexer._hash_path.""" + + def test_consistent_hash_with_same_salt( + self, sample_files: dict[str, Path] + ) -> None: + """Hash is deterministic when salt is fixed.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt="fixed") + path = str(sample_files["root_file"]) + hash1 = indexer._hash_path(path) + hash2 = indexer._hash_path(path) + assert hash1 == hash2 + + def test_different_hash_with_different_salt( + self, sample_files: dict[str, Path] + ) -> None: + """Different salts produce different hashes for the same path.""" + indexer1 = FileIndexer(str(sample_files["root_file"].parent), salt="salt-a") + indexer2 = FileIndexer(str(sample_files["root_file"].parent), salt="salt-b") + path = str(sample_files["root_file"]) + assert indexer1._hash_path(path) != indexer2._hash_path(path) + + def test_different_hash_for_different_paths( + self, sample_files: dict[str, Path] + ) -> None: + """Different paths produce different hashes.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt="fixed") + hash1 = indexer._hash_path(str(sample_files["root_file"])) + hash2 = indexer._hash_path(str(sample_files["sub_file"])) + assert hash1 != hash2 + + def test_hash_is_sha256_hex(self, sample_files: dict[str, Path]) -> None: + """Hash output is a 64-character hex string (SHA-256).""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt="fixed") + path = str(sample_files["root_file"]) + result = indexer._hash_path(path) + assert len(result) == 64 + assert all(c in "0123456789abcdef" for c in result) + + def test_hash_includes_salt(self, sample_files: dict[str, Path]) -> None: + """Hash is computed from path + salt concatenated.""" + salt = "mysalt" + path = "/some/file.txt" + expected = hashlib.sha256((path + salt).encode()).hexdigest() + indexer = FileIndexer(str(sample_files["root_file"].parent), salt=salt) + assert indexer._hash_path(path) == expected + + +class TestSalt: + """Tests for FileIndexer.salt property.""" + + def test_provided_salt_is_returned(self, sample_files: dict[str, Path]) -> None: + """Explicitly provided salt is returned unchanged.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt="explicit") + assert indexer.salt == "explicit" + + def test_none_salt_generates_random(self, sample_files: dict[str, Path]) -> None: + """None salt generates a random hex string.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt=None) + salt = indexer.salt + assert salt is not None + assert len(salt) == 32 # secrets.token_hex(16) = 32 hex chars + assert all(c in "0123456789abcdef" for c in salt) + + def test_salt_is_stable_after_first_access( + self, sample_files: dict[str, Path] + ) -> None: + """Salt doesn't change between accesses.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt=None) + first = indexer.salt + second = indexer.salt + assert first == second + + +class TestIndex: + """Tests for FileIndexer._index and indexing behavior.""" + + def test_indexes_all_files(self, sample_files: dict[str, Path]) -> None: + """All files in the directory tree are indexed.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt="test") + assert ( + len(indexer._file_mapping) == 4 + ) # root.txt, nested.txt, data.bin, photo.jpg + + def test_hash_maps_to_correct_path(self, sample_files: dict[str, Path]) -> None: + """Each hash maps to the correct file path.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt="test") + expected_hash = indexer._hash_path(str(sample_files["root_file"])) + assert indexer._file_mapping[expected_hash] == str(sample_files["root_file"]) + + def test_empty_directory(self, tmp_path: Path) -> None: + """Empty directory produces an empty mapping.""" + indexer = FileIndexer(str(tmp_path), salt="test") + assert indexer._file_mapping == {} + + def test_nested_directories(self, sample_files: dict[str, Path]) -> None: + """Files in subdirectories are included.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt="test") + sub_hash = indexer._hash_path(str(sample_files["sub_file"])) + assert sub_hash in indexer._file_mapping + + +class TestGetFileByHash: + """Tests for FileIndexer.get_file_by_hash.""" + + def test_returns_file_content(self, sample_files: dict[str, Path]) -> None: + """Returns the binary content of the file.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt="test") + file_hash = indexer._hash_path(str(sample_files["root_file"])) + content = b"".join(indexer.get_file_by_hash(file_hash)) + assert content == b"root content" + + def test_returns_empty_for_invalid_hash( + self, sample_files: dict[str, Path] + ) -> None: + """Returns an empty generator for a hash that doesn't exist.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt="test") + content = b"".join(indexer.get_file_by_hash("nonexistent")) + assert content == b"" + + def test_returns_binary_content_correctly( + self, sample_files: dict[str, Path] + ) -> None: + """Binary file content is returned byte-for-byte.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt="test") + file_hash = indexer._hash_path(str(sample_files["binary_file"])) + content = b"".join(indexer.get_file_by_hash(file_hash)) + assert content == b"\x00\x01\x02\x03" + + +class TestGetFilenameByHash: + """Tests for FileIndexer.get_filename_by_hash.""" + + def test_returns_filename(self, sample_files: dict[str, Path]) -> None: + """Returns the full file path for a valid hash.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt="test") + file_hash = indexer._hash_path(str(sample_files["root_file"])) + result = indexer.get_filename_by_hash(file_hash) + assert result == str(sample_files["root_file"]) + + def test_returns_none_for_invalid_hash(self, sample_files: dict[str, Path]) -> None: + """Returns None for a hash that doesn't exist.""" + indexer = FileIndexer(str(sample_files["root_file"].parent), salt="test") + assert indexer.get_filename_by_hash("nonexistent") is None diff --git a/tests/test_navigation.py b/tests/test_navigation.py new file mode 100644 index 0000000..4ca0583 --- /dev/null +++ b/tests/test_navigation.py @@ -0,0 +1,125 @@ +"""Tests for navigation helper functions.""" + +import argparse +from pathlib import Path + +import pytest + +import main + + +@pytest.fixture +def seeded_indexers(sample_files: dict[str, Path], tmp_path: Path) -> None: + """Initialize with a fixed salt so hashes are predictable.""" + args = argparse.Namespace( + source=str(tmp_path), + host="127.0.0.1", + port=0, + salt="nav-test-salt", + password=None, + ) + main.initialize_server(args) + + +class TestGetNavigationData: + """Tests for _get_navigation_data.""" + + def test_returns_all_keys(self, seeded_indexers: None) -> None: + """Navigation data contains all required keys.""" + file_hash = list(main.file_mapping.keys())[0] + data = main._get_navigation_data(file_hash, order=None) + assert "file_hash" in data + assert "next_hash" in data + assert "prev_hash" in data + assert "filename" in data + + def test_file_hash_matches_input(self, seeded_indexers: None) -> None: + """Returned file_hash matches the input.""" + file_hash = list(main.file_mapping.keys())[0] + data = main._get_navigation_data(file_hash, order=None) + assert data["file_hash"] == file_hash + + def test_sequential_next_and_prev(self, seeded_indexers: None) -> None: + """In sequential mode, next and prev are adjacent in the list.""" + keys = list(main.file_mapping.keys()) + if len(keys) < 2: + pytest.skip("Need at least 2 files for sequential navigation") + file_hash = keys[0] + data = main._get_navigation_data(file_hash, order=None) + assert data["next_hash"] == keys[1] + # First item's prev wraps to last + assert data["prev_hash"] == keys[-1] + + def test_middle_item_navigation(self, seeded_indexers: None) -> None: + """Middle item has correct prev and next.""" + keys = list(main.file_mapping.keys()) + if len(keys) < 3: + pytest.skip("Need at least 3 files for middle-item test") + mid_idx = len(keys) // 2 + file_hash = keys[mid_idx] + data = main._get_navigation_data(file_hash, order=None) + assert data["next_hash"] == keys[mid_idx + 1] + assert data["prev_hash"] == keys[mid_idx - 1] + + def test_last_item_wraps_next(self, seeded_indexers: None) -> None: + """Last item's next wraps to the first item.""" + keys = list(main.file_mapping.keys()) + if len(keys) < 2: + pytest.skip("Need at least 2 files for wrap test") + file_hash = keys[-1] + data = main._get_navigation_data(file_hash, order=None) + assert data["next_hash"] == keys[0] + + def test_random_order_returns_different_hashes(self, seeded_indexers: None) -> None: + """Random order returns hashes that may differ from current.""" + keys = list(main.file_mapping.keys()) + if len(keys) < 3: + pytest.skip("Need at least 3 files for random test") + file_hash = keys[0] + data = main._get_navigation_data(file_hash, order="random") + # next and prev should be random (not necessarily adjacent) + assert data["next_hash"] in keys + assert data["prev_hash"] in keys + + def test_filename_is_returned(self, seeded_indexers: None) -> None: + """Filename is populated from the indexer.""" + file_hash = list(main.file_mapping.keys())[0] + data = main._get_navigation_data(file_hash, order=None) + assert data["filename"] is not None + assert isinstance(data["filename"], str) + + +class TestGetRandomHash: + """Tests for _get_random_hash.""" + + def test_returns_valid_hash(self, seeded_indexers: None) -> None: + """Returns a hash that exists in the mapping.""" + random_hash = main._get_random_hash() + assert random_hash in main.file_mapping + + def test_raises_when_empty(self) -> None: + """Raises HTTPException when no files are indexed.""" + main.file_mapping.clear() + with pytest.raises(main.HTTPException) as exc_info: + main._get_random_hash() + assert exc_info.value.status_code == 404 + + +class TestFindIndexerForHash: + """Tests for _find_indexer_for_hash.""" + + def test_finds_correct_indexer(self, seeded_indexers: None) -> None: + """Returns the indexer containing the given hash.""" + file_hash = list(main.file_mapping.keys())[0] + indexer = main._find_indexer_for_hash(file_hash) + assert indexer is not None + assert file_hash in indexer._file_mapping + + def test_returns_none_for_unknown_hash(self, seeded_indexers: None) -> None: + """Returns None for a hash not in any indexer.""" + assert main._find_indexer_for_hash("nonexistent") is None + + def test_returns_none_when_no_indexers(self) -> None: + """Returns None when no indexers are registered.""" + main.indexers.clear() + assert main._find_indexer_for_hash("any-hash") is None diff --git a/tests/test_zip_indexer.py b/tests/test_zip_indexer.py new file mode 100644 index 0000000..7140b39 --- /dev/null +++ b/tests/test_zip_indexer.py @@ -0,0 +1,121 @@ +"""Tests for the ZipFileIndexer class.""" + +from pathlib import Path + +import pytest + +from main import ZipFileIndexer + + +class TestZipIndex: + """Tests for ZipFileIndexer._index.""" + + def test_indexes_real_zip(self, tmp_path: Path) -> None: + """Index a real zip file with multiple entries.""" + import zipfile + + zip_path = tmp_path / "archive.zip" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("a.txt", "aaa") + zf.writestr("b/c.txt", "ccc") + zf.writestr("b/d.txt", "ddd") + + indexer = ZipFileIndexer(str(zip_path), salt="test") + assert len(indexer._file_mapping) == 3 + + def test_excludes_directories(self, tmp_path: Path) -> None: + """Directory entries in the zip are excluded from the index.""" + import zipfile + + zip_path = tmp_path / "archive.zip" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("folder/", "") # directory entry + zf.writestr("folder/file.txt", "content") + + indexer = ZipFileIndexer(str(zip_path), salt="test") + assert len(indexer._file_mapping) == 1 + filename = list(indexer._file_mapping.values())[0] + assert filename == "folder/file.txt" + + def test_hash_matches_filename(self, tmp_path: Path) -> None: + """Hash is computed from the filename inside the zip.""" + import zipfile + + zip_path = tmp_path / "archive.zip" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("hello.txt", "world") + + indexer = ZipFileIndexer(str(zip_path), salt="test") + expected_hash = indexer._hash_path("hello.txt") + assert expected_hash in indexer._file_mapping + assert indexer._file_mapping[expected_hash] == "hello.txt" + + +class TestZipGetFileByHash: + """Tests for ZipFileIndexer.get_file_by_hash.""" + + def test_returns_file_content(self, tmp_path: Path) -> None: + """Returns the content of a file inside the zip.""" + import zipfile + + zip_path = tmp_path / "archive.zip" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("data.txt", "zip content here") + + indexer = ZipFileIndexer(str(zip_path), salt="test") + file_hash = indexer._hash_path("data.txt") + content = b"".join(indexer.get_file_by_hash(file_hash)) + assert content == b"zip content here" + + def test_returns_empty_for_invalid_hash(self, tmp_path: Path) -> None: + """Returns an empty generator for a hash that doesn't exist in the zip.""" + import zipfile + + zip_path = tmp_path / "archive.zip" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("file.txt", "data") + + indexer = ZipFileIndexer(str(zip_path), salt="test") + content = b"".join(indexer.get_file_by_hash("nonexistent-hash")) + assert content == b"" + + def test_returns_binary_content(self, tmp_path: Path) -> None: + """Binary content from zip is returned correctly.""" + import zipfile + + zip_path = tmp_path / "archive.zip" + binary_data = b"\x00\xff\x80\x7f" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("binary.bin", binary_data) + + indexer = ZipFileIndexer(str(zip_path), salt="test") + file_hash = indexer._hash_path("binary.bin") + content = b"".join(indexer.get_file_by_hash(file_hash)) + assert content == binary_data + + +class TestZipGetFilenameByHash: + """Tests for ZipFileIndexer.get_filename_by_hash.""" + + def test_returns_filename(self, tmp_path: Path) -> None: + """Returns the internal filename for a valid hash.""" + import zipfile + + zip_path = tmp_path / "archive.zip" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("folder/nested.txt", "content") + + indexer = ZipFileIndexer(str(zip_path), salt="test") + file_hash = indexer._hash_path("folder/nested.txt") + assert indexer.get_filename_by_hash(file_hash) == "folder/nested.txt" + + def test_returns_none_for_invalid_hash(self, tmp_path: Path) -> None: + """Returns None for a hash that doesn't exist.""" + import zipfile + + zip_path = tmp_path / "archive.zip" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("file.txt", "data") + + indexer = ZipFileIndexer(str(zip_path), salt="test") + assert indexer.get_filename_by_hash("bad-hash") is None diff --git a/uv.lock b/uv.lock index eeafa79..9b8a7b3 100644 --- a/uv.lock +++ b/uv.lock @@ -59,6 +59,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8e/0d/52d98722666d6fc6c3dd4c76df339501d6efd40e0ff95e6186a7b7f0befd/black-26.3.1-py3-none-any.whl", hash = "sha256:2bd5aa94fc267d38bb21a70d7410a89f1a1d318841855f698746f8e7f51acd1b", size = 207542, upload-time = "2026-03-12T03:36:01.668Z" }, ] +[[package]] +name = "certifi" +version = "2026.4.22" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/25/ee/6caf7a40c36a1220410afe15a1cc64993a1f864871f698c0f93acb72842a/certifi-2026.4.22.tar.gz", hash = "sha256:8d455352a37b71bf76a79caa83a3d6c25afee4a385d632127b6afb3963f1c580", size = 137077, upload-time = "2026-04-22T11:26:11.191Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/22/30/7cd8fdcdfbc5b869528b079bfb76dcdf6056b1a2097a662e5e8c04f42965/certifi-2026.4.22-py3-none-any.whl", hash = "sha256:3cb2210c8f88ba2318d29b0388d1023c8492ff72ecdde4ebdaddbb13a31b1c4a", size = 135707, upload-time = "2026-04-22T11:26:09.372Z" }, +] + [[package]] name = "click" version = "8.3.1" @@ -112,6 +121,9 @@ dev = [ [package.dev-dependencies] dev = [ + { name = "httpx" }, + { name = "pytest" }, + { name = "pytest-asyncio" }, { name = "ruff" }, ] @@ -125,7 +137,12 @@ requires-dist = [ provides-extras = ["dev"] [package.metadata.requires-dev] -dev = [{ name = "ruff", specifier = ">=0.15.5" }] +dev = [ + { name = "httpx", specifier = ">=0.28.1" }, + { name = "pytest", specifier = ">=9.0.3" }, + { name = "pytest-asyncio", specifier = ">=1.3.0" }, + { name = "ruff", specifier = ">=0.15.5" }, +] [[package]] name = "h11" @@ -136,6 +153,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" }, ] +[[package]] +name = "httpcore" +version = "1.0.9" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "h11" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" }, +] + +[[package]] +name = "httpx" +version = "0.28.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "certifi" }, + { name = "httpcore" }, + { name = "idna" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, +] + [[package]] name = "idna" version = "3.11" @@ -145,6 +190,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0e/61/66938bbb5fc52dbdf84594873d5b51fb1f7c7794e9c0f5bd885f30bc507b/idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea", size = 71008, upload-time = "2025-10-12T14:55:18.883Z" }, ] +[[package]] +name = "iniconfig" +version = "2.3.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/72/34/14ca021ce8e5dfedc35312d08ba8bf51fdd999c576889fc2c24cb97f4f10/iniconfig-2.3.0.tar.gz", hash = "sha256:c76315c77db068650d49c5b56314774a7804df16fee4402c1f19d6d15d8c4730", size = 20503, upload-time = "2025-10-18T21:55:43.219Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484, upload-time = "2025-10-18T21:55:41.639Z" }, +] + [[package]] name = "mypy-extensions" version = "1.1.0" @@ -181,6 +235,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/75/a6/a0a304dc33b49145b21f4808d763822111e67d1c3a32b524a1baf947b6e1/platformdirs-4.9.6-py3-none-any.whl", hash = "sha256:e61adb1d5e5cb3441b4b7710bea7e4c12250ca49439228cc1021c00dcfac0917", size = 21348, upload-time = "2026-04-09T00:04:09.463Z" }, ] +[[package]] +name = "pluggy" +version = "1.6.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, +] + [[package]] name = "pydantic" version = "2.12.5" @@ -249,6 +312,43 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9f/ed/068e41660b832bb0b1aa5b58011dea2a3fe0ba7861ff38c4d4904c1c1a99/pydantic_core-2.41.5-cp314-cp314t-win_arm64.whl", hash = "sha256:35b44f37a3199f771c3eaa53051bc8a70cd7b54f333531c59e29fd4db5d15008", size = 1974769, upload-time = "2025-11-04T13:42:01.186Z" }, ] +[[package]] +name = "pygments" +version = "2.20.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/c3/b2/bc9c9196916376152d655522fdcebac55e66de6603a76a02bca1b6414f6c/pygments-2.20.0.tar.gz", hash = "sha256:6757cd03768053ff99f3039c1a36d6c0aa0b263438fcab17520b30a303a82b5f", size = 4955991, upload-time = "2026-03-29T13:29:33.898Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl", hash = "sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176", size = 1231151, upload-time = "2026-03-29T13:29:30.038Z" }, +] + +[[package]] +name = "pytest" +version = "9.0.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, + { name = "iniconfig" }, + { name = "packaging" }, + { name = "pluggy" }, + { name = "pygments" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/7d/0d/549bd94f1a0a402dc8cf64563a117c0f3765662e2e668477624baeec44d5/pytest-9.0.3.tar.gz", hash = "sha256:b86ada508af81d19edeb213c681b1d48246c1a91d304c6c81a427674c17eb91c", size = 1572165, upload-time = "2026-04-07T17:16:18.027Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d4/24/a372aaf5c9b7208e7112038812994107bc65a84cd00e0354a88c2c77a617/pytest-9.0.3-py3-none-any.whl", hash = "sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9", size = 375249, upload-time = "2026-04-07T17:16:16.13Z" }, +] + +[[package]] +name = "pytest-asyncio" +version = "1.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/90/2c/8af215c0f776415f3590cac4f9086ccefd6fd463befeae41cd4d3f193e5a/pytest_asyncio-1.3.0.tar.gz", hash = "sha256:d7f52f36d231b80ee124cd216ffb19369aa168fc10095013c6b014a34d3ee9e5", size = 50087, upload-time = "2025-11-10T16:07:47.256Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" }, +] + [[package]] name = "pytokens" version = "0.4.1"