First pass at handling password-protected zips
This commit is contained in:
parent
022d3c69f0
commit
b373313cf8
209
main.py
209
main.py
@ -1,4 +1,5 @@
|
||||
import argparse
|
||||
import base64
|
||||
import hashlib
|
||||
import mimetypes
|
||||
import os
|
||||
@ -10,9 +11,8 @@ from glob import glob
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from fastapi.responses import (
|
||||
FileResponse,
|
||||
HTMLResponse,
|
||||
RedirectResponse,
|
||||
StreamingResponse,
|
||||
@ -67,25 +67,81 @@ class FileIndexer:
|
||||
|
||||
|
||||
class ZipFileIndexer(FileIndexer):
|
||||
def __init__(self, path: str, salt: str | None = None):
|
||||
super().__init__(path, salt)
|
||||
self._password_cache: dict[str, bool] = {}
|
||||
|
||||
def _open_zip(self):
|
||||
"""Open the ZIP file, using pyzipper if available for AES support."""
|
||||
try:
|
||||
import pyzipper
|
||||
|
||||
return pyzipper.AESZipFile(self.path, "r")
|
||||
except ImportError:
|
||||
return zipfile.ZipFile(self.path, "r")
|
||||
|
||||
def _index(self) -> dict[str, str]:
|
||||
"""Index all files in the zip file"""
|
||||
mapping = {}
|
||||
with zipfile.ZipFile(self.path, "r") as zip_file:
|
||||
with self._open_zip() as zip_file:
|
||||
for file_info in zip_file.infolist():
|
||||
if not file_info.is_dir():
|
||||
file_hash = self._hash_path(file_info.filename)
|
||||
mapping[file_hash] = file_info.filename
|
||||
return mapping
|
||||
|
||||
def get_file_by_hash(self, file_hash: str):
|
||||
"""Get file content by hash"""
|
||||
def _needs_password(self, filename: str) -> bool:
|
||||
"""Check if a file inside the zip requires a password.
|
||||
|
||||
Uses lazy detection: tries to read the file without a password.
|
||||
Results are cached so the ZIP is not reopened on subsequent calls.
|
||||
|
||||
Args:
|
||||
filename: The internal filename inside the zip.
|
||||
|
||||
Returns:
|
||||
True if the file is password-protected, False otherwise.
|
||||
"""
|
||||
if filename in self._password_cache:
|
||||
return self._password_cache[filename]
|
||||
|
||||
try:
|
||||
with self._open_zip() as zip_file:
|
||||
zip_file.read(filename)
|
||||
self._password_cache[filename] = False
|
||||
return False
|
||||
except Exception:
|
||||
self._password_cache[filename] = True
|
||||
return True
|
||||
|
||||
def is_file_encrypted(self, file_hash: str) -> bool:
|
||||
"""Check if the file identified by hash is password-protected.
|
||||
|
||||
Args:
|
||||
file_hash: The hash of the file.
|
||||
|
||||
Returns:
|
||||
True if the file requires a password, False otherwise.
|
||||
"""
|
||||
filename = self._file_mapping.get(file_hash)
|
||||
if filename is None:
|
||||
return False
|
||||
return self._needs_password(filename)
|
||||
|
||||
def get_file_by_hash(self, file_hash: str, password: bytes | None = None):
|
||||
"""Get file content by hash.
|
||||
|
||||
Args:
|
||||
file_hash: The hash of the file.
|
||||
password: Optional password for encrypted files.
|
||||
"""
|
||||
if file_hash not in self._file_mapping:
|
||||
return None
|
||||
|
||||
filename = self._file_mapping[file_hash]
|
||||
|
||||
with zipfile.ZipFile(self.path, "r") as zip_file:
|
||||
yield from BytesIO(zip_file.read(filename))
|
||||
with self._open_zip() as zip_file:
|
||||
yield from BytesIO(zip_file.read(filename, pwd=password))
|
||||
|
||||
def get_filename_by_hash(self, file_hash: str) -> str | None:
|
||||
"""Get filename by hash"""
|
||||
@ -133,8 +189,8 @@ async def health_check():
|
||||
|
||||
|
||||
@app.get("/api/{file_hash}/data")
|
||||
async def get_file_data(file_hash: str):
|
||||
"""Serve a specific file by its hash"""
|
||||
async def get_file_data(file_hash: str, request: Request):
|
||||
"""Serve a specific file by its hash."""
|
||||
if file_hash not in file_mapping:
|
||||
raise HTTPException(status_code=404, detail="File not found")
|
||||
|
||||
@ -142,36 +198,68 @@ async def get_file_data(file_hash: str):
|
||||
if not indexer:
|
||||
raise HTTPException(status_code=404, detail="File not found")
|
||||
|
||||
password = _get_zip_password(file_hash, request)
|
||||
if (
|
||||
password is None
|
||||
and isinstance(indexer, ZipFileIndexer)
|
||||
and indexer.is_file_encrypted(file_hash)
|
||||
):
|
||||
_raise_unauthorized()
|
||||
|
||||
filename = indexer.get_filename_by_hash(file_hash)
|
||||
content_type, _ = mimetypes.guess_type(filename or "")
|
||||
if not content_type:
|
||||
content_type = "application/octet-stream"
|
||||
|
||||
return StreamingResponse(
|
||||
indexer.get_file_by_hash(file_hash),
|
||||
media_type=content_type,
|
||||
headers={
|
||||
"Content-Disposition": f"inline; filename={os.path.basename(filename or '')}",
|
||||
},
|
||||
)
|
||||
if isinstance(indexer, ZipFileIndexer):
|
||||
try:
|
||||
content = b"".join(indexer.get_file_by_hash(file_hash, password=password))
|
||||
except RuntimeError:
|
||||
_raise_unauthorized()
|
||||
return StreamingResponse(
|
||||
iter([content]),
|
||||
media_type=content_type,
|
||||
headers={
|
||||
"Content-Disposition": f"inline; filename={os.path.basename(filename or '')}",
|
||||
},
|
||||
)
|
||||
else:
|
||||
return StreamingResponse(
|
||||
indexer.get_file_by_hash(file_hash),
|
||||
media_type=content_type,
|
||||
headers={
|
||||
"Content-Disposition": f"inline; filename={os.path.basename(filename or '')}",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def _build_url(
|
||||
file_hash: str, order: str | None = None, delay: int | None = None
|
||||
) -> str:
|
||||
"""Build a URL with optional order/delay query parameters."""
|
||||
base = "/{hash}".format(hash=file_hash)
|
||||
base = f"/{file_hash}"
|
||||
if order is not None and delay is not None:
|
||||
return "{base}?order={order}&delay={delay}".format(
|
||||
base=base, order=order, delay=delay
|
||||
)
|
||||
return f"{base}?order={order}&delay={delay}"
|
||||
return base
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def root(order: str | None = None, delay: int | None = None):
|
||||
"""Redirect to a random file hash"""
|
||||
async def root(
|
||||
order: str | None = None, delay: int | None = None, request: Request = None
|
||||
):
|
||||
"""Redirect to a random file hash."""
|
||||
random_hash = _get_random_hash()
|
||||
|
||||
# Check if the random file is encrypted and requires auth
|
||||
if request is not None:
|
||||
password = _get_zip_password(random_hash, request)
|
||||
if password is None:
|
||||
indexer = _find_indexer_for_hash(random_hash)
|
||||
if isinstance(indexer, ZipFileIndexer) and indexer.is_file_encrypted(
|
||||
random_hash
|
||||
):
|
||||
_raise_unauthorized()
|
||||
|
||||
return RedirectResponse(url=_build_url(random_hash, order, delay))
|
||||
|
||||
|
||||
@ -216,7 +304,7 @@ def _render_page(
|
||||
current_delay: int | None = None,
|
||||
) -> HTMLResponse:
|
||||
"""Render the frontend page with navigation data"""
|
||||
with open("frontend.html", "r") as f:
|
||||
with open("frontend.html") as f:
|
||||
content = f.read()
|
||||
|
||||
template = string.Template(content)
|
||||
@ -253,7 +341,12 @@ def _render_page(
|
||||
|
||||
|
||||
@app.get("/{file_hash}")
|
||||
async def hash_page(file_hash: str, order: str | None = None, delay: int | None = None):
|
||||
async def hash_page(
|
||||
file_hash: str,
|
||||
order: str | None = None,
|
||||
delay: int | None = None,
|
||||
request: Request = None,
|
||||
):
|
||||
"""Serve a page for a specific file hash with optional auto-refresh navigation.
|
||||
|
||||
Args:
|
||||
@ -269,6 +362,16 @@ async def hash_page(file_hash: str, order: str | None = None, delay: int | None
|
||||
status_code=400, detail="Invalid order. Must be 'next' or 'random'"
|
||||
)
|
||||
|
||||
# Check if the file is encrypted and requires auth
|
||||
if request is not None:
|
||||
password = _get_zip_password(file_hash, request)
|
||||
if password is None:
|
||||
indexer = _find_indexer_for_hash(file_hash)
|
||||
if isinstance(indexer, ZipFileIndexer) and indexer.is_file_encrypted(
|
||||
file_hash
|
||||
):
|
||||
_raise_unauthorized()
|
||||
|
||||
navigation_data = _get_navigation_data(file_hash, order=order)
|
||||
|
||||
if order is not None and delay is not None:
|
||||
@ -281,9 +384,7 @@ async def hash_page(file_hash: str, order: str | None = None, delay: int | None
|
||||
|
||||
# Create pause button to stop auto-refresh
|
||||
pause_button = (
|
||||
'<a href="{file_hash}" class="play-btn" title="Pause">⏸</a>'.format(
|
||||
file_hash=file_hash
|
||||
)
|
||||
f'<a href="{file_hash}" class="play-btn" title="Pause">⏸</a>'
|
||||
)
|
||||
|
||||
return _render_page(
|
||||
@ -296,8 +397,9 @@ async def hash_page(file_hash: str, order: str | None = None, delay: int | None
|
||||
)
|
||||
else:
|
||||
# Browse mode
|
||||
play_button = '<a href="/{file_hash}?order=next&delay=5" class="play-btn" title="Play next 5">⏵</a>'.format(
|
||||
file_hash=file_hash
|
||||
play_button = (
|
||||
f'<a href="/{file_hash}?order=next&delay=5" '
|
||||
'class="play-btn" title="Play next 5">⏵</a>'
|
||||
)
|
||||
return _render_page(
|
||||
navigation_data,
|
||||
@ -315,6 +417,55 @@ def _find_indexer_for_hash(file_hash: str):
|
||||
return None
|
||||
|
||||
|
||||
def _get_zip_password(file_hash: str, request: Request) -> bytes | None:
|
||||
"""Get ZIP password from HTTP Basic Auth if the file is encrypted.
|
||||
|
||||
If the file is not encrypted, returns None (no password needed).
|
||||
If the file is encrypted and valid credentials are provided, returns
|
||||
the password as bytes.
|
||||
If the file is encrypted but no Authorization header is present, returns
|
||||
None so the caller can raise a 401.
|
||||
|
||||
Args:
|
||||
file_hash: The hash of the file to check.
|
||||
request: The incoming HTTP request.
|
||||
|
||||
Returns:
|
||||
The ZIP password as bytes if encrypted and auth provided, None otherwise.
|
||||
"""
|
||||
indexer = _find_indexer_for_hash(file_hash)
|
||||
if not isinstance(indexer, ZipFileIndexer):
|
||||
return None
|
||||
if not indexer.is_file_encrypted(file_hash):
|
||||
return None
|
||||
|
||||
# File is encrypted - check for Authorization header
|
||||
auth_header = request.headers.get("authorization")
|
||||
if not auth_header:
|
||||
return None
|
||||
|
||||
scheme, _, params = auth_header.partition(" ")
|
||||
if scheme.lower() != "basic":
|
||||
return None
|
||||
|
||||
try:
|
||||
decoded = base64.b64decode(params).decode()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
_, _, password = decoded.partition(":")
|
||||
return password.encode()
|
||||
|
||||
|
||||
def _raise_unauthorized() -> None:
|
||||
"""Raise an HTTP 401 Unauthorized with Basic auth challenge header."""
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="Authentication required",
|
||||
headers={"WWW-Authenticate": 'Basic realm="zip"'},
|
||||
)
|
||||
|
||||
|
||||
def _get_random_hash() -> str:
|
||||
"""Get a random file hash from the indexed files"""
|
||||
if not file_mapping:
|
||||
|
||||
@ -36,5 +36,6 @@ dev = [
|
||||
"httpx>=0.28.1",
|
||||
"pytest>=9.0.3",
|
||||
"pytest-asyncio>=1.3.0",
|
||||
"pyzipper>=0.3.6",
|
||||
"ruff>=0.15.5",
|
||||
]
|
||||
|
||||
@ -2,14 +2,19 @@
|
||||
|
||||
import argparse
|
||||
import zipfile
|
||||
from collections.abc import Generator
|
||||
from pathlib import Path
|
||||
from typing import Generator
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
||||
import main
|
||||
|
||||
try:
|
||||
import pyzipper
|
||||
except ImportError:
|
||||
pyzipper = None
|
||||
|
||||
|
||||
def _reset_state() -> None:
|
||||
"""Reset all global state to defaults."""
|
||||
@ -18,7 +23,7 @@ def _reset_state() -> None:
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_globals() -> Generator[None, None, None]:
|
||||
def reset_globals() -> Generator[None]:
|
||||
"""Reset global state before and after each test."""
|
||||
_reset_state()
|
||||
yield
|
||||
@ -109,7 +114,7 @@ def initialized_zip(args_zip: argparse.Namespace) -> None:
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def client_dir(initialized_dir: None) -> Generator[AsyncClient, None, None]:
|
||||
async def client_dir(initialized_dir: None) -> Generator[AsyncClient]:
|
||||
"""Async HTTP client against the app initialized with directory files."""
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
@ -117,8 +122,63 @@ async def client_dir(initialized_dir: None) -> Generator[AsyncClient, None, None
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def client_zip(initialized_zip: None) -> Generator[AsyncClient, None, None]:
|
||||
async def client_zip(initialized_zip: None) -> Generator[AsyncClient]:
|
||||
"""Async HTTP client against the app initialized with zip files."""
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
yield ac
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_encrypted_zip(tmp_path: Path) -> tuple[Path, dict[str, str], str]:
|
||||
"""Create a ZIP file with password-protected files.
|
||||
|
||||
Returns:
|
||||
Tuple of (zip_path, inner_files dict, password).
|
||||
inner_files maps logical names to internal paths.
|
||||
"""
|
||||
if pyzipper is None:
|
||||
pytest.skip("pyzipper not installed")
|
||||
|
||||
zip_path = tmp_path / "encrypted.zip"
|
||||
password = "secret123"
|
||||
inner_files: dict[str, str] = {}
|
||||
|
||||
with pyzipper.AESZipFile(zip_path, "w", encryption=pyzipper.WZ_AES) as zf:
|
||||
zf.setpassword(password.encode())
|
||||
zf.writestr("protected.txt", "secret content")
|
||||
inner_files["protected"] = "protected.txt"
|
||||
zf.writestr("folder/secret.png", b"\x89PNG fake")
|
||||
inner_files["secret_image"] = "folder/secret.png"
|
||||
|
||||
return zip_path, inner_files, password
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def args_encrypted_zip(
|
||||
sample_encrypted_zip: tuple[Path, dict[str, str], str], tmp_path: Path
|
||||
) -> argparse.Namespace:
|
||||
"""Argparse namespace pointing at the encrypted sample zip."""
|
||||
zip_path, _, _ = sample_encrypted_zip
|
||||
return argparse.Namespace(
|
||||
source=str(zip_path),
|
||||
host="127.0.0.1",
|
||||
port=0,
|
||||
salt="test-salt",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def initialized_encrypted_zip(args_encrypted_zip: argparse.Namespace) -> None:
|
||||
"""Initialize the server with encrypted zip files."""
|
||||
main.initialize_server(args_encrypted_zip)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def client_encrypted_zip(
|
||||
initialized_encrypted_zip: None,
|
||||
) -> Generator[AsyncClient]:
|
||||
"""Async HTTP client against the app initialized with encrypted zip files."""
|
||||
transport = ASGITransport(app=main.app)
|
||||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||||
yield ac
|
||||
|
||||
188
tests/test_encrypted_zip.py
Normal file
188
tests/test_encrypted_zip.py
Normal file
@ -0,0 +1,188 @@
|
||||
"""Tests for password-protected ZIP file handling."""
|
||||
|
||||
import base64
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from httpx import AsyncClient
|
||||
|
||||
import main
|
||||
|
||||
|
||||
class TestLazyEncryptionDetection:
|
||||
"""Tests for ZipFileIndexer._needs_password and is_file_encrypted."""
|
||||
|
||||
def test_detects_encrypted_file(
|
||||
self, sample_encrypted_zip: tuple[Path, dict[str, str], str]
|
||||
) -> None:
|
||||
"""Encrypted files are detected as needing a password."""
|
||||
zip_path, inner_files, _ = sample_encrypted_zip
|
||||
indexer = main.ZipFileIndexer(str(zip_path), salt="test")
|
||||
protected_hash = indexer._hash_path(inner_files["protected"])
|
||||
assert indexer.is_file_encrypted(protected_hash) is True
|
||||
|
||||
def test_detects_unencrypted_file(
|
||||
self, sample_zip: dict[str, Path], tmp_path: Path
|
||||
) -> None:
|
||||
"""Unencrypted files are detected as not needing a password."""
|
||||
zip_path = tmp_path / "test_archive.zip"
|
||||
indexer = main.ZipFileIndexer(str(zip_path), salt="test-salt")
|
||||
top_hash = indexer._hash_path("top.txt")
|
||||
assert indexer.is_file_encrypted(top_hash) is False
|
||||
|
||||
def test_caches_detection_result(
|
||||
self, sample_encrypted_zip: tuple[Path, dict[str, str], str]
|
||||
) -> None:
|
||||
"""Second call to is_file_encrypted uses the cache."""
|
||||
zip_path, inner_files, _ = sample_encrypted_zip
|
||||
indexer = main.ZipFileIndexer(str(zip_path), salt="test")
|
||||
protected_hash = indexer._hash_path(inner_files["protected"])
|
||||
|
||||
# First call populates the cache
|
||||
assert indexer.is_file_encrypted(protected_hash) is True
|
||||
assert "protected.txt" in indexer._password_cache
|
||||
|
||||
# Second call returns cached value
|
||||
assert indexer.is_file_encrypted(protected_hash) is True
|
||||
|
||||
def test_returns_false_for_unknown_hash(
|
||||
self, sample_encrypted_zip: tuple[Path, dict[str, str], str]
|
||||
) -> None:
|
||||
"""Unknown hash returns False (not encrypted)."""
|
||||
zip_path, _, _ = sample_encrypted_zip
|
||||
indexer = main.ZipFileIndexer(str(zip_path), salt="test")
|
||||
assert indexer.is_file_encrypted("nonexistent-hash") is False
|
||||
|
||||
def test_get_file_with_correct_password(
|
||||
self, sample_encrypted_zip: tuple[Path, dict[str, str], str]
|
||||
) -> None:
|
||||
"""Reading an encrypted file with the correct password succeeds."""
|
||||
zip_path, inner_files, password = sample_encrypted_zip
|
||||
indexer = main.ZipFileIndexer(str(zip_path), salt="test")
|
||||
protected_hash = indexer._hash_path(inner_files["protected"])
|
||||
content = b"".join(
|
||||
indexer.get_file_by_hash(protected_hash, password=password.encode())
|
||||
)
|
||||
assert content == b"secret content"
|
||||
|
||||
def test_get_file_with_wrong_password_raises(
|
||||
self, sample_encrypted_zip: tuple[Path, dict[str, str], str]
|
||||
) -> None:
|
||||
"""Reading an encrypted file with the wrong password raises."""
|
||||
zip_path, inner_files, _ = sample_encrypted_zip
|
||||
indexer = main.ZipFileIndexer(str(zip_path), salt="test")
|
||||
protected_hash = indexer._hash_path(inner_files["protected"])
|
||||
with pytest.raises(RuntimeError):
|
||||
list(indexer.get_file_by_hash(protected_hash, password=b"wrong"))
|
||||
|
||||
def test_get_unencrypted_file_without_password(
|
||||
self, sample_zip: dict[str, Path], tmp_path: Path
|
||||
) -> None:
|
||||
"""Reading an unencrypted file works without a password."""
|
||||
zip_path = tmp_path / "test_archive.zip"
|
||||
indexer = main.ZipFileIndexer(str(zip_path), salt="test-salt")
|
||||
top_hash = indexer._hash_path("top.txt")
|
||||
content = b"".join(indexer.get_file_by_hash(top_hash))
|
||||
assert content == b"top level content"
|
||||
|
||||
|
||||
class TestEncryptedZipEndpoints:
|
||||
"""Tests for endpoints serving encrypted ZIP files."""
|
||||
|
||||
def _basic_auth(self, password: str) -> str:
|
||||
"""Build a Basic Auth header value."""
|
||||
credentials = f"user:{password}"
|
||||
return f"Basic {base64.b64encode(credentials.encode()).decode()}"
|
||||
|
||||
async def test_encrypted_file_requires_auth_on_data_endpoint(
|
||||
self, client_encrypted_zip: AsyncClient
|
||||
) -> None:
|
||||
"""Encrypted file returns 401 without auth on /api/{hash}/data."""
|
||||
for hash_val, filepath in main.file_mapping.items():
|
||||
if filepath == "protected.txt":
|
||||
response = await client_encrypted_zip.get(f"/api/{hash_val}/data")
|
||||
assert response.status_code == 401
|
||||
break
|
||||
else:
|
||||
pytest.fail("protected.txt not found in file_mapping")
|
||||
|
||||
async def test_encrypted_file_requires_auth_on_hash_page(
|
||||
self, client_encrypted_zip: AsyncClient
|
||||
) -> None:
|
||||
"""Encrypted file returns 401 without auth on /{hash}."""
|
||||
for hash_val, filepath in main.file_mapping.items():
|
||||
if filepath == "protected.txt":
|
||||
response = await client_encrypted_zip.get(f"/{hash_val}")
|
||||
assert response.status_code == 401
|
||||
break
|
||||
else:
|
||||
pytest.fail("protected.txt not found in file_mapping")
|
||||
|
||||
async def test_encrypted_file_succeeds_with_correct_auth(
|
||||
self, client_encrypted_zip: AsyncClient
|
||||
) -> None:
|
||||
"""Encrypted file returns 200 with correct auth on /api/{hash}/data."""
|
||||
for hash_val, filepath in main.file_mapping.items():
|
||||
if filepath == "protected.txt":
|
||||
response = await client_encrypted_zip.get(
|
||||
f"/api/{hash_val}/data",
|
||||
headers={"Authorization": self._basic_auth("secret123")},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == b"secret content"
|
||||
break
|
||||
else:
|
||||
pytest.fail("protected.txt not found in file_mapping")
|
||||
|
||||
async def test_encrypted_file_wrong_password_returns_401(
|
||||
self, client_encrypted_zip: AsyncClient
|
||||
) -> None:
|
||||
"""Encrypted file returns 401 with wrong password."""
|
||||
for hash_val, filepath in main.file_mapping.items():
|
||||
if filepath == "protected.txt":
|
||||
response = await client_encrypted_zip.get(
|
||||
f"/api/{hash_val}/data",
|
||||
headers={"Authorization": self._basic_auth("wrong")},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
break
|
||||
else:
|
||||
pytest.fail("protected.txt not found in file_mapping")
|
||||
|
||||
async def test_www_authenticate_header_present(
|
||||
self, client_encrypted_zip: AsyncClient
|
||||
) -> None:
|
||||
"""401 response includes WWW-Authenticate header."""
|
||||
for hash_val, filepath in main.file_mapping.items():
|
||||
if filepath == "protected.txt":
|
||||
response = await client_encrypted_zip.get(f"/api/{hash_val}/data")
|
||||
assert response.status_code == 401
|
||||
assert "www-authenticate" in response.headers
|
||||
assert "Basic" in response.headers["www-authenticate"]
|
||||
break
|
||||
else:
|
||||
pytest.fail("protected.txt not found in file_mapping")
|
||||
|
||||
async def test_encrypted_hash_page_succeeds_with_auth(
|
||||
self, client_encrypted_zip: AsyncClient
|
||||
) -> None:
|
||||
"""Encrypted file hash page returns 200 with correct auth."""
|
||||
for hash_val, filepath in main.file_mapping.items():
|
||||
if filepath == "protected.txt":
|
||||
response = await client_encrypted_zip.get(
|
||||
f"/{hash_val}",
|
||||
headers={"Authorization": self._basic_auth("secret123")},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "text/html" in response.headers["content-type"]
|
||||
break
|
||||
else:
|
||||
pytest.fail("protected.txt not found in file_mapping")
|
||||
|
||||
async def test_unencrypted_zip_no_auth_required(
|
||||
self, client_zip: AsyncClient
|
||||
) -> None:
|
||||
"""Files from an unencrypted ZIP work without any auth."""
|
||||
file_hash = list(main.file_mapping.keys())[0]
|
||||
response = await client_zip.get(f"/api/{file_hash}/data")
|
||||
assert response.status_code == 200
|
||||
@ -1,6 +1,5 @@
|
||||
"""Tests for FastAPI endpoints."""
|
||||
|
||||
import pytest
|
||||
from httpx import AsyncClient
|
||||
|
||||
import main
|
||||
|
||||
@ -3,8 +3,6 @@
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from main import FileIndexer
|
||||
|
||||
|
||||
|
||||
@ -2,8 +2,6 @@
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from main import ZipFileIndexer
|
||||
|
||||
|
||||
|
||||
44
uv.lock
generated
44
uv.lock
generated
@ -124,6 +124,7 @@ dev = [
|
||||
{ name = "httpx" },
|
||||
{ name = "pytest" },
|
||||
{ name = "pytest-asyncio" },
|
||||
{ name = "pyzipper" },
|
||||
{ name = "ruff" },
|
||||
]
|
||||
|
||||
@ -141,6 +142,7 @@ dev = [
|
||||
{ name = "httpx", specifier = ">=0.28.1" },
|
||||
{ name = "pytest", specifier = ">=9.0.3" },
|
||||
{ name = "pytest-asyncio", specifier = ">=1.3.0" },
|
||||
{ name = "pyzipper", specifier = ">=0.3.6" },
|
||||
{ name = "ruff", specifier = ">=0.15.5" },
|
||||
]
|
||||
|
||||
@ -244,6 +246,36 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pycryptodomex"
|
||||
version = "3.23.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/c9/85/e24bf90972a30b0fcd16c73009add1d7d7cd9140c2498a68252028899e41/pycryptodomex-3.23.0.tar.gz", hash = "sha256:71909758f010c82bc99b0abf4ea12012c98962fbf0583c2164f8b84533c2e4da", size = 4922157, upload-time = "2025-05-17T17:23:41.434Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/2e/00/10edb04777069a42490a38c137099d4b17ba6e36a4e6e28bdc7470e9e853/pycryptodomex-3.23.0-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:7b37e08e3871efe2187bc1fd9320cc81d87caf19816c648f24443483005ff886", size = 2498764, upload-time = "2025-05-17T17:22:21.453Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/6b/3f/2872a9c2d3a27eac094f9ceaa5a8a483b774ae69018040ea3240d5b11154/pycryptodomex-3.23.0-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:91979028227543010d7b2ba2471cf1d1e398b3f183cb105ac584df0c36dac28d", size = 1643012, upload-time = "2025-05-17T17:22:23.702Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/70/af/774c2e2b4f6570fbf6a4972161adbb183aeeaa1863bde31e8706f123bf92/pycryptodomex-3.23.0-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6b8962204c47464d5c1c4038abeadd4514a133b28748bcd9fa5b6d62e3cec6fa", size = 2187643, upload-time = "2025-05-17T17:22:26.37Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/de/a3/71065b24cb889d537954cedc3ae5466af00a2cabcff8e29b73be047e9a19/pycryptodomex-3.23.0-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a33986a0066860f7fcf7c7bd2bc804fa90e434183645595ae7b33d01f3c91ed8", size = 2273762, upload-time = "2025-05-17T17:22:28.313Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/c9/0b/ff6f43b7fbef4d302c8b981fe58467b8871902cdc3eb28896b52421422cc/pycryptodomex-3.23.0-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c7947ab8d589e3178da3d7cdeabe14f841b391e17046954f2fbcd941705762b5", size = 2313012, upload-time = "2025-05-17T17:22:30.57Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/02/de/9d4772c0506ab6da10b41159493657105d3f8bb5c53615d19452afc6b315/pycryptodomex-3.23.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:c25e30a20e1b426e1f0fa00131c516f16e474204eee1139d1603e132acffc314", size = 2186856, upload-time = "2025-05-17T17:22:32.819Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/28/ad/8b30efcd6341707a234e5eba5493700a17852ca1ac7a75daa7945fcf6427/pycryptodomex-3.23.0-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:da4fa650cef02db88c2b98acc5434461e027dce0ae8c22dd5a69013eaf510006", size = 2347523, upload-time = "2025-05-17T17:22:35.386Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/0f/02/16868e9f655b7670dbb0ac4f2844145cbc42251f916fc35c414ad2359849/pycryptodomex-3.23.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:58b851b9effd0d072d4ca2e4542bf2a4abcf13c82a29fd2c93ce27ee2a2e9462", size = 2272825, upload-time = "2025-05-17T17:22:37.632Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/ca/18/4ca89ac737230b52ac8ffaca42f9c6f1fd07c81a6cd821e91af79db60632/pycryptodomex-3.23.0-cp313-cp313t-win32.whl", hash = "sha256:a9d446e844f08299236780f2efa9898c818fe7e02f17263866b8550c7d5fb328", size = 1772078, upload-time = "2025-05-17T17:22:40Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/73/34/13e01c322db027682e00986873eca803f11c56ade9ba5bbf3225841ea2d4/pycryptodomex-3.23.0-cp313-cp313t-win_amd64.whl", hash = "sha256:bc65bdd9fc8de7a35a74cab1c898cab391a4add33a8fe740bda00f5976ca4708", size = 1803656, upload-time = "2025-05-17T17:22:42.139Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/54/68/9504c8796b1805d58f4425002bcca20f12880e6fa4dc2fc9a668705c7a08/pycryptodomex-3.23.0-cp313-cp313t-win_arm64.whl", hash = "sha256:c885da45e70139464f082018ac527fdaad26f1657a99ee13eecdce0f0ca24ab4", size = 1707172, upload-time = "2025-05-17T17:22:44.704Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/dd/9c/1a8f35daa39784ed8adf93a694e7e5dc15c23c741bbda06e1d45f8979e9e/pycryptodomex-3.23.0-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:06698f957fe1ab229a99ba2defeeae1c09af185baa909a31a5d1f9d42b1aaed6", size = 2499240, upload-time = "2025-05-17T17:22:46.953Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/7a/62/f5221a191a97157d240cf6643747558759126c76ee92f29a3f4aee3197a5/pycryptodomex-3.23.0-cp37-abi3-macosx_10_9_x86_64.whl", hash = "sha256:b2c2537863eccef2d41061e82a881dcabb04944c5c06c5aa7110b577cc487545", size = 1644042, upload-time = "2025-05-17T17:22:49.098Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/8c/fd/5a054543c8988d4ed7b612721d7e78a4b9bf36bc3c5ad45ef45c22d0060e/pycryptodomex-3.23.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:43c446e2ba8df8889e0e16f02211c25b4934898384c1ec1ec04d7889c0333587", size = 2186227, upload-time = "2025-05-17T17:22:51.139Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/c8/a9/8862616a85cf450d2822dbd4fff1fcaba90877907a6ff5bc2672cafe42f8/pycryptodomex-3.23.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f489c4765093fb60e2edafdf223397bc716491b2b69fe74367b70d6999257a5c", size = 2272578, upload-time = "2025-05-17T17:22:53.676Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/46/9f/bda9c49a7c1842820de674ab36c79f4fbeeee03f8ff0e4f3546c3889076b/pycryptodomex-3.23.0-cp37-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bdc69d0d3d989a1029df0eed67cc5e8e5d968f3724f4519bd03e0ec68df7543c", size = 2312166, upload-time = "2025-05-17T17:22:56.585Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/03/cc/870b9bf8ca92866ca0186534801cf8d20554ad2a76ca959538041b7a7cf4/pycryptodomex-3.23.0-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:6bbcb1dd0f646484939e142462d9e532482bc74475cecf9c4903d4e1cd21f003", size = 2185467, upload-time = "2025-05-17T17:22:59.237Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/96/e3/ce9348236d8e669fea5dd82a90e86be48b9c341210f44e25443162aba187/pycryptodomex-3.23.0-cp37-abi3-musllinux_1_2_i686.whl", hash = "sha256:8a4fcd42ccb04c31268d1efeecfccfd1249612b4de6374205376b8f280321744", size = 2346104, upload-time = "2025-05-17T17:23:02.112Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/a5/e9/e869bcee87beb89040263c416a8a50204f7f7a83ac11897646c9e71e0daf/pycryptodomex-3.23.0-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:55ccbe27f049743a4caf4f4221b166560d3438d0b1e5ab929e07ae1702a4d6fd", size = 2271038, upload-time = "2025-05-17T17:23:04.872Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/8d/67/09ee8500dd22614af5fbaa51a4aee6e342b5fa8aecf0a6cb9cbf52fa6d45/pycryptodomex-3.23.0-cp37-abi3-win32.whl", hash = "sha256:189afbc87f0b9f158386bf051f720e20fa6145975f1e76369303d0f31d1a8d7c", size = 1771969, upload-time = "2025-05-17T17:23:07.115Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/69/96/11f36f71a865dd6df03716d33bd07a67e9d20f6b8d39820470b766af323c/pycryptodomex-3.23.0-cp37-abi3-win_amd64.whl", hash = "sha256:52e5ca58c3a0b0bd5e100a9fbc8015059b05cffc6c66ce9d98b4b45e023443b9", size = 1803124, upload-time = "2025-05-17T17:23:09.267Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/f9/93/45c1cdcbeb182ccd2e144c693eaa097763b08b38cded279f0053ed53c553/pycryptodomex-3.23.0-cp37-abi3-win_arm64.whl", hash = "sha256:02d87b80778c171445d67e23d1caef279bf4b25c3597050ccd2e13970b57fd51", size = 1707161, upload-time = "2025-05-17T17:23:11.414Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pydantic"
|
||||
version = "2.12.5"
|
||||
@ -373,6 +405,18 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/c6/78/397db326746f0a342855b81216ae1f0a32965deccfd7c830a2dbc66d2483/pytokens-0.4.1-py3-none-any.whl", hash = "sha256:26cef14744a8385f35d0e095dc8b3a7583f6c953c2e3d269c7f82484bf5ad2de", size = 13729, upload-time = "2026-01-30T01:03:45.029Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pyzipper"
|
||||
version = "0.3.6"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "pycryptodomex" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/ac/97/2f03c67b40e531b30f0e1357476b4db989097a92cd30c6d2389cfa12db49/pyzipper-0.3.6.tar.gz", hash = "sha256:0adca90a00c36a93fbe49bfa8c5add452bfe4ef85a1b8e3638739dd1c7b26bfc", size = 31377, upload-time = "2022-07-31T09:58:34.854Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/2d/b8/9d5d7cf4d96db8efa39f232fb152e87231fdaa5072229e6517f77a18d9c7/pyzipper-0.3.6-py2.py3-none-any.whl", hash = "sha256:6d097f465bfa47796b1494e12ea65d1478107d38e13bc56f6e58eedc4f6c1a87", size = 67652, upload-time = "2022-07-31T09:58:31.945Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ruff"
|
||||
version = "0.15.5"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user