Files
weirsoe-party-protocol/realtime/tests.py
Asger Geel Weirsøe d2dbd8c802
Some checks failed
CI / test-and-quality (push) Has been cancelled
CI / test-and-quality (pull_request) Successful in 2m43s
docs: design doc for fup og fakta game engine + platform architecture
Captures all brainstormed decisions:
- Pluggable game cartridge platform (GameDriver interface)
- Celery + Redis timer-driven phase transitions
- Session owner play/pause/exit controls (no skip)
- Escalating scoring per round, incremental reveal scoring
- Emoji reactions during guess phase → post-game awards
- Relational per-user config presets with game-specific models
- Ephemeral game state (no persistence after exit/finish)
- Full WebSocket event reference and data lifecycle

Also: updated TODO.md (WebSocket done, persisted answers done),
created CLAUDE.md, and PROMPT.md for ralph-loop.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-09 07:35:55 +01:00

73 lines
2.6 KiB
Python

from channels.testing import WebsocketCommunicator
from django.contrib.auth import get_user_model
from django.test import TestCase
from fupogfakta.models import GameSession, Player
from partyhub.asgi import application
User = get_user_model()
class GameConsumerConnectTest(TestCase):
def setUp(self):
self.user = User.objects.create_user(username="host", password="pw")
self.session = GameSession.objects.create(host=self.user, code="AABBCC")
self.player = Player.objects.create(session=self.session, nickname="Tester")
async def test_player_connect_and_ping(self):
token = self.player.session_token
communicator = WebsocketCommunicator(
application,
f"/ws/game/AABBCC/?session_token={token}",
)
connected, _ = await communicator.connect()
self.assertTrue(connected)
await communicator.send_json_to({"type": "ping"})
response = await communicator.receive_json_from()
self.assertEqual(response["type"], "pong")
await communicator.disconnect()
async def test_connect_without_token_rejected(self):
communicator = WebsocketCommunicator(application, "/ws/game/AABBCC/")
connected, code = await communicator.connect()
self.assertFalse(connected)
self.assertEqual(code, 4001)
async def test_connect_invalid_token_rejected(self):
communicator = WebsocketCommunicator(
application,
"/ws/game/AABBCC/?session_token=invalid-token",
)
connected, code = await communicator.connect()
self.assertFalse(connected)
self.assertEqual(code, 4003)
async def test_host_connect_without_token(self):
communicator = WebsocketCommunicator(
application,
"/ws/game/AABBCC/?role=host",
)
connected, _ = await communicator.connect()
self.assertTrue(connected)
await communicator.disconnect()
async def test_broadcast_reaches_connected_client(self):
token = self.player.session_token
communicator = WebsocketCommunicator(
application,
f"/ws/game/AABBCC/?session_token={token}",
)
connected, _ = await communicator.connect()
self.assertTrue(connected)
from realtime.broadcast import broadcast_phase_event
await broadcast_phase_event("AABBCC", "phase.test_event", {"hello": "world"})
message = await communicator.receive_json_from(timeout=2)
self.assertEqual(message["type"], "phase.test_event")
self.assertEqual(message["hello"], "world")
await communicator.disconnect()