image_server/tests/test_encrypted_zip.py

202 lines
8.7 KiB
Python

"""Tests for password-protected ZIP file handling."""
import base64
from pathlib import Path
import pytest
from httpx import AsyncClient
import main
class TestLazyEncryptionDetection:
"""Tests for ZipFileIndexer._needs_password and is_file_encrypted."""
def test_detects_encrypted_file(
self, sample_encrypted_zip: tuple[Path, dict[str, str], str]
) -> None:
"""Encrypted files are detected as needing a password."""
zip_path, inner_files, _ = sample_encrypted_zip
indexer = main.ZipFileIndexer(str(zip_path), salt="test")
encrypted_hash = indexer._hash_path(inner_files["encrypted"])
assert indexer.is_file_encrypted(encrypted_hash) is True
def test_detects_unencrypted_file(
self, sample_encrypted_zip: tuple[Path, dict[str, str], str]
) -> None:
"""Unencrypted files are detected as not needing a password."""
zip_path, inner_files, _ = sample_encrypted_zip
indexer = main.ZipFileIndexer(str(zip_path), salt="test")
unencrypted_hash = indexer._hash_path(inner_files["unencrypted"])
assert indexer.is_file_encrypted(unencrypted_hash) is False
def test_caches_detection_result(
self, sample_encrypted_zip: tuple[Path, dict[str, str], str]
) -> None:
"""Second call to is_file_encrypted uses the cache."""
zip_path, inner_files, _ = sample_encrypted_zip
indexer = main.ZipFileIndexer(str(zip_path), salt="test")
encrypted_hash = indexer._hash_path(inner_files["encrypted"])
# First call populates the cache
assert indexer.is_file_encrypted(encrypted_hash) is True
assert "encrypted.txt" in indexer._password_cache
# Second call returns cached value
assert indexer.is_file_encrypted(encrypted_hash) is True
def test_returns_false_for_unknown_hash(
self, sample_encrypted_zip: tuple[Path, dict[str, str], str]
) -> None:
"""Unknown hash returns False (not encrypted)."""
zip_path, _, _ = sample_encrypted_zip
indexer = main.ZipFileIndexer(str(zip_path), salt="test")
assert indexer.is_file_encrypted("nonexistent-hash") is False
def test_get_file_with_correct_password(
self, sample_encrypted_zip: tuple[Path, dict[str, str], str]
) -> None:
"""Reading an encrypted file with the correct password succeeds."""
zip_path, inner_files, password = sample_encrypted_zip
indexer = main.ZipFileIndexer(str(zip_path), salt="test")
encrypted_hash = indexer._hash_path(inner_files["encrypted"])
content = b"".join(
indexer.get_file_by_hash(encrypted_hash, password=password.encode())
)
assert content == b"You have read this file\n"
def test_get_file_with_wrong_password_raises(
self, sample_encrypted_zip: tuple[Path, dict[str, str], str]
) -> None:
"""Reading an encrypted file with the wrong password raises."""
zip_path, inner_files, _ = sample_encrypted_zip
indexer = main.ZipFileIndexer(str(zip_path), salt="test")
encrypted_hash = indexer._hash_path(inner_files["encrypted"])
with pytest.raises(RuntimeError):
list(indexer.get_file_by_hash(encrypted_hash, password=b"wrong"))
def test_get_unencrypted_file_without_password(
self, sample_encrypted_zip: tuple[Path, dict[str, str], str]
) -> None:
"""Reading an unencrypted file works without a password."""
zip_path, inner_files, _ = sample_encrypted_zip
indexer = main.ZipFileIndexer(str(zip_path), salt="test")
unencrypted_hash = indexer._hash_path(inner_files["unencrypted"])
content = b"".join(indexer.get_file_by_hash(unencrypted_hash))
assert content == b"You have read this file\n"
class TestEncryptedZipEndpoints:
"""Tests for endpoints serving encrypted ZIP files."""
def _basic_auth(self, password: str) -> str:
"""Build a Basic Auth header value."""
credentials = f"user:{password}"
return f"Basic {base64.b64encode(credentials.encode()).decode()}"
async def test_encrypted_file_requires_auth_on_data_endpoint(
self, client_encrypted_zip: AsyncClient
) -> None:
"""Encrypted file returns 401 without auth on /api/{hash}/data."""
for hash_val, filepath in main.file_mapping.items():
if filepath == "encrypted.txt":
response = await client_encrypted_zip.get(f"/api/{hash_val}/data")
assert response.status_code == 401
break
else:
pytest.fail("encrypted.txt not found in file_mapping")
async def test_encrypted_file_requires_auth_on_hash_page(
self, client_encrypted_zip: AsyncClient
) -> None:
"""Encrypted file returns 401 without auth on /{hash}."""
for hash_val, filepath in main.file_mapping.items():
if filepath == "encrypted.txt":
response = await client_encrypted_zip.get(f"/{hash_val}")
assert response.status_code == 401
break
else:
pytest.fail("encrypted.txt not found in file_mapping")
async def test_encrypted_file_succeeds_with_correct_auth(
self, client_encrypted_zip: AsyncClient
) -> None:
"""Encrypted file returns 200 with correct auth on /api/{hash}/data."""
for hash_val, filepath in main.file_mapping.items():
if filepath == "encrypted.txt":
response = await client_encrypted_zip.get(
f"/api/{hash_val}/data",
headers={"Authorization": self._basic_auth("password")},
)
assert response.status_code == 200
assert response.content == b"You have read this file\n"
break
else:
pytest.fail("encrypted.txt not found in file_mapping")
async def test_encrypted_file_wrong_password_returns_401(
self, client_encrypted_zip: AsyncClient
) -> None:
"""Encrypted file returns 401 with wrong password."""
for hash_val, filepath in main.file_mapping.items():
if filepath == "encrypted.txt":
response = await client_encrypted_zip.get(
f"/api/{hash_val}/data",
headers={"Authorization": self._basic_auth("wrong")},
)
assert response.status_code == 401
break
else:
pytest.fail("encrypted.txt not found in file_mapping")
async def test_www_authenticate_header_present(
self, client_encrypted_zip: AsyncClient
) -> None:
"""401 response includes WWW-Authenticate header."""
for hash_val, filepath in main.file_mapping.items():
if filepath == "encrypted.txt":
response = await client_encrypted_zip.get(f"/api/{hash_val}/data")
assert response.status_code == 401
assert "www-authenticate" in response.headers
assert "Basic" in response.headers["www-authenticate"]
break
else:
pytest.fail("encrypted.txt not found in file_mapping")
async def test_encrypted_hash_page_succeeds_with_auth(
self, client_encrypted_zip: AsyncClient
) -> None:
"""Encrypted file hash page returns 200 with correct auth."""
for hash_val, filepath in main.file_mapping.items():
if filepath == "encrypted.txt":
response = await client_encrypted_zip.get(
f"/{hash_val}",
headers={"Authorization": self._basic_auth("password")},
)
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
break
else:
pytest.fail("encrypted.txt not found in file_mapping")
async def test_unencrypted_file_no_auth_required(
self, client_encrypted_zip: AsyncClient
) -> None:
"""Unencrypted files in the same ZIP work without any auth."""
for hash_val, filepath in main.file_mapping.items():
if filepath == "unencrypted.txt":
response = await client_encrypted_zip.get(f"/api/{hash_val}/data")
assert response.status_code == 200
assert response.content == b"You have read this file\n"
break
else:
pytest.fail("unencrypted.txt not found in file_mapping")
async def test_unencrypted_zip_no_auth_required(
self, client_zip: AsyncClient
) -> None:
"""Files from an unencrypted ZIP work without any auth."""
file_hash = list(main.file_mapping.keys())[0]
response = await client_zip.get(f"/api/{file_hash}/data")
assert response.status_code == 200