159 lines
5.3 KiB
Python
159 lines
5.3 KiB
Python
import importlib.util
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import types
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
MODULE_PATH = Path(__file__).with_name("main.py")
|
|
|
|
|
|
class FakeDB:
|
|
storages = {}
|
|
|
|
def create_storage(self, name, purpose=None):
|
|
self.storages.setdefault(name, {})
|
|
|
|
def set(self, storage, key, value):
|
|
self.storages.setdefault(storage, {})[key] = value
|
|
|
|
def get(self, storage, key):
|
|
return self.storages.get(storage, {}).get(key)
|
|
|
|
def list_items(self, storage):
|
|
return list(self.storages.get(storage, {}).keys())
|
|
|
|
def delete_item(self, storage, key):
|
|
self.storages.get(storage, {}).pop(key, None)
|
|
|
|
|
|
class ContactApiTests(unittest.TestCase):
|
|
def setUp(self):
|
|
FakeDB.storages = {}
|
|
self.temp_dir = tempfile.TemporaryDirectory()
|
|
self.rate_limit_file = os.path.join(self.temp_dir.name, "ratelimit.json")
|
|
os.environ["LUNAS_KEY"] = "topsecret"
|
|
self.module = self._load_module()
|
|
self.module.RATE_LIMIT_FILE = self.rate_limit_file
|
|
|
|
def tearDown(self):
|
|
self.temp_dir.cleanup()
|
|
os.environ.pop("LUNAS_KEY", None)
|
|
sys.modules.pop("contact_api_main_under_test", None)
|
|
sys.modules.pop("_db_com", None)
|
|
|
|
def _load_module(self):
|
|
fake_db_module = types.ModuleType("_db_com")
|
|
fake_db_module.database = FakeDB
|
|
sys.modules["_db_com"] = fake_db_module
|
|
|
|
spec = importlib.util.spec_from_file_location("contact_api_main_under_test", MODULE_PATH)
|
|
module = importlib.util.module_from_spec(spec)
|
|
assert spec.loader is not None
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
def _res(self, response):
|
|
return response["_res"]
|
|
|
|
def test_submit_message_persists_record(self):
|
|
response = self.module.main(
|
|
{
|
|
"method": "POST",
|
|
"headers": {"origin": "https://luna.reversed.dev", "user-agent": "pytest"},
|
|
"body": json.dumps(
|
|
{
|
|
"username": "Space",
|
|
"email": "space@example.com",
|
|
"message": "Hello Luna, this is a valid test message.",
|
|
}
|
|
),
|
|
}
|
|
)
|
|
|
|
self.assertEqual(response["_code"], 201)
|
|
payload = self._res(response)
|
|
self.assertTrue(payload["ok"])
|
|
stored = FakeDB.storages["portfolio_contact_messages"][payload["id"]]
|
|
record = json.loads(stored)
|
|
self.assertEqual(record["username"], "Space")
|
|
self.assertFalse(record["seen"])
|
|
|
|
def test_new_route_requires_key(self):
|
|
response = self.module.main({"method": "GET", "route": "new", "headers": {}})
|
|
self.assertEqual(response["_code"], 401)
|
|
self.assertFalse(self._res(response)["ok"])
|
|
|
|
def test_new_seen_and_delete_flow(self):
|
|
created = self.module.main(
|
|
{
|
|
"method": "POST",
|
|
"headers": {"origin": "https://luna.reversed.dev"},
|
|
"body": json.dumps(
|
|
{
|
|
"username": "Space",
|
|
"email": "space@example.com",
|
|
"message": "Hello Luna, this should show up in the admin inbox.",
|
|
}
|
|
),
|
|
}
|
|
)
|
|
message_id = self._res(created)["id"]
|
|
|
|
auth_headers = {"x-lunas-key": "topsecret", "origin": "https://luna.reversed.dev"}
|
|
listed = self.module.main({"method": "GET", "route": "new", "headers": auth_headers})
|
|
self.assertEqual(listed["_code"], 200)
|
|
self.assertEqual(self._res(listed)["count"], 1)
|
|
self.assertEqual(self._res(listed)["messages"][0]["id"], message_id)
|
|
|
|
seen = self.module.main(
|
|
{
|
|
"method": "POST",
|
|
"route": "seen",
|
|
"headers": auth_headers,
|
|
"body": json.dumps({"id": message_id}),
|
|
}
|
|
)
|
|
self.assertEqual(seen["_code"], 200)
|
|
self.assertEqual(self._res(seen)["updated"], [message_id])
|
|
|
|
listed_after_seen = self.module.main({"method": "GET", "route": "new", "headers": auth_headers})
|
|
self.assertEqual(self._res(listed_after_seen)["count"], 0)
|
|
|
|
deleted = self.module.main(
|
|
{
|
|
"method": "POST",
|
|
"route": "delete",
|
|
"headers": auth_headers,
|
|
"body": json.dumps({"id": message_id}),
|
|
}
|
|
)
|
|
self.assertEqual(deleted["_code"], 200)
|
|
self.assertEqual(self._res(deleted)["deleted"], [message_id])
|
|
|
|
def test_rate_limit_returns_retry_after(self):
|
|
payload = {
|
|
"username": "Space",
|
|
"email": "space@example.com",
|
|
"message": "Hello Luna, this message is long enough to pass validation.",
|
|
}
|
|
args = {
|
|
"method": "POST",
|
|
"headers": {"x-forwarded-for": "1.2.3.4", "origin": "https://luna.reversed.dev"},
|
|
"body": json.dumps(payload),
|
|
}
|
|
|
|
for _ in range(self.module.RATE_LIMIT_MAX_REQUESTS):
|
|
response = self.module.main(args)
|
|
self.assertEqual(response["_code"], 201)
|
|
|
|
limited = self.module.main(args)
|
|
self.assertEqual(limited["_code"], 429)
|
|
self.assertIn("Retry-After", limited["_headers"])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|