refactor(fupogfakta): extract first lobby gameplay slice (#312)
All checks were successful
CI / test-and-quality (push) Successful in 3m8s
CI / test-and-quality (pull_request) Successful in 3m13s

This commit is contained in:
2026-03-17 05:37:31 +00:00
parent 592c265331
commit 2ee235c6c0
4 changed files with 322 additions and 186 deletions

70
fupogfakta/payloads.py Normal file
View File

@@ -0,0 +1,70 @@
from datetime import timedelta
from .models import GameSession, Player, RoundConfig, RoundQuestion
def build_player_ref(player: Player | None) -> dict | None:
if player is None:
return None
return {
"player_id": player.id,
"nickname": player.nickname,
}
def build_reveal_payload(round_question: RoundQuestion | None) -> dict | None:
if round_question is None:
return None
lies = [
{
**build_player_ref(lie.player),
"text": lie.text,
"created_at": lie.created_at.isoformat(),
}
for lie in round_question.lies.select_related("player").order_by("created_at", "id")
]
guesses = []
for guess in round_question.guesses.select_related("player", "fooled_player").order_by("created_at", "id"):
guess_payload = {
**build_player_ref(guess.player),
"selected_text": guess.selected_text,
"is_correct": guess.is_correct,
"created_at": guess.created_at.isoformat(),
"fooled_player_id": guess.fooled_player_id,
}
if guess.fooled_player is not None:
guess_payload["fooled_player_nickname"] = guess.fooled_player.nickname
guesses.append(guess_payload)
return {
"round_question_id": round_question.id,
"round_number": round_question.round_number,
"prompt": round_question.question.prompt,
"correct_answer": round_question.correct_answer,
"lies": lies,
"guesses": guesses,
}
def build_leaderboard(session: GameSession) -> list[dict]:
return list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
def build_lie_started_payload(session: GameSession, round_config: RoundConfig, round_question: RoundQuestion) -> dict:
lie_deadline_at = round_question.shown_at + timedelta(seconds=round_config.lie_seconds)
return {
"round_number": session.current_round,
"category": {"slug": round_config.category.slug, "name": round_config.category.name},
"round_question_id": round_question.id,
"prompt": round_question.question.prompt,
"shown_at": round_question.shown_at.isoformat(),
"lie_deadline_at": lie_deadline_at.isoformat(),
"lie_seconds": round_config.lie_seconds,
}

115
fupogfakta/services.py Normal file
View File

@@ -0,0 +1,115 @@
import random
from .models import GameSession, Player, Question, RoundConfig, RoundQuestion, ScoreEvent
def get_current_round_question(session: GameSession) -> RoundQuestion | None:
return (
RoundQuestion.objects.filter(session=session, round_number=session.current_round)
.select_related("question")
.order_by("-id")
.first()
)
def select_round_question(session: GameSession, round_config: RoundConfig) -> RoundQuestion:
existing_round_question = get_current_round_question(session)
if existing_round_question is not None:
return existing_round_question
used_question_ids = RoundQuestion.objects.filter(session=session).values_list("question_id", flat=True)
available_questions = Question.objects.filter(
category=round_config.category,
is_active=True,
).exclude(pk__in=used_question_ids)
if not available_questions.exists():
raise ValueError("no_available_questions")
question = random.choice(list(available_questions))
return RoundQuestion.objects.create(
session=session,
round_number=session.current_round,
question=question,
correct_answer=question.correct_answer,
)
def prepare_mixed_answers(round_question: RoundQuestion) -> list[str]:
deduped_answers = list(round_question.mixed_answers or [])
if deduped_answers:
return deduped_answers
lie_texts = list(round_question.lies.values_list("text", flat=True))
seen = set()
for text in [round_question.correct_answer, *lie_texts]:
normalized = text.strip().casefold()
if not normalized or normalized in seen:
continue
seen.add(normalized)
deduped_answers.append(text.strip())
if len(deduped_answers) < 2:
raise ValueError("not_enough_answers_to_mix")
random.shuffle(deduped_answers)
round_question.mixed_answers = deduped_answers
round_question.save(update_fields=["mixed_answers"])
return deduped_answers
def resolve_scores(
session: GameSession,
round_question: RoundQuestion,
round_config: RoundConfig,
) -> tuple[list[ScoreEvent], list[dict]]:
guesses = list(round_question.guesses.select_related("player"))
if not guesses:
raise ValueError("no_guesses_submitted")
bluff_counts: dict[int, int] = {}
for guess in guesses:
if guess.fooled_player_id:
bluff_counts[guess.fooled_player_id] = bluff_counts.get(guess.fooled_player_id, 0) + 1
score_events = []
for guess in guesses:
if guess.is_correct:
guess.player.score += round_config.points_correct
guess.player.save(update_fields=["score"])
score_events.append(
ScoreEvent(
session=session,
player=guess.player,
delta=round_config.points_correct,
reason="guess_correct",
meta={"round_question_id": round_question.id, "guess_id": guess.id},
)
)
for player_id, fooled_count in bluff_counts.items():
delta = fooled_count * round_config.points_bluff
player = Player.objects.get(pk=player_id, session=session)
player.score += delta
player.save(update_fields=["score"])
score_events.append(
ScoreEvent(
session=session,
player=player,
delta=delta,
reason="bluff_success",
meta={"round_question_id": round_question.id, "fooled_count": fooled_count},
)
)
ScoreEvent.objects.bulk_create(score_events)
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
return score_events, leaderboard

View File

@@ -1,2 +1,127 @@
from unittest.mock import patch
# Create your tests here. from django.contrib.auth import get_user_model
from django.test import TestCase
from fupogfakta.models import Category, GameSession, Guess, LieAnswer, Player, Question, RoundConfig, RoundQuestion, ScoreEvent
from fupogfakta.payloads import build_lie_started_payload, build_reveal_payload
from fupogfakta.services import get_current_round_question, prepare_mixed_answers, resolve_scores, select_round_question
User = get_user_model()
class FupOgFaktaExtractionSliceTests(TestCase):
def setUp(self):
self.host = User.objects.create_user(username="host", password="secret123")
self.session = GameSession.objects.create(host=self.host, code="ABCD23")
self.category = Category.objects.create(name="Historie", slug="historie", is_active=True)
self.question_one = Question.objects.create(
category=self.category,
prompt="Hvornår faldt muren?",
correct_answer="1989",
is_active=True,
)
self.question_two = Question.objects.create(
category=self.category,
prompt="Hvornår kom euroen?",
correct_answer="1999",
is_active=True,
)
self.round_config = RoundConfig.objects.create(session=self.session, number=1, category=self.category)
self.alice = Player.objects.create(session=self.session, nickname="Alice")
self.bob = Player.objects.create(session=self.session, nickname="Bob")
self.clara = Player.objects.create(session=self.session, nickname="Clara")
def test_select_round_question_skips_already_used_questions_for_session(self):
RoundQuestion.objects.create(
session=self.session,
round_number=99,
question=self.question_one,
correct_answer=self.question_one.correct_answer,
)
round_question = select_round_question(self.session, self.round_config)
self.assertEqual(round_question.question, self.question_two)
self.assertEqual(get_current_round_question(self.session), round_question)
def test_prepare_mixed_answers_dedupes_blank_and_case_variants(self):
round_question = RoundQuestion.objects.create(
session=self.session,
round_number=1,
question=self.question_one,
correct_answer="1989",
)
LieAnswer.objects.create(round_question=round_question, player=self.alice, text=" 1989 ")
LieAnswer.objects.create(round_question=round_question, player=self.bob, text="Nitten niogfirs")
LieAnswer.objects.create(round_question=round_question, player=self.clara, text=" ")
with patch("fupogfakta.services.random.shuffle", side_effect=lambda answers: None):
answers = prepare_mixed_answers(round_question)
self.assertEqual(answers, ["1989", "Nitten niogfirs"])
round_question.refresh_from_db()
self.assertEqual(round_question.mixed_answers, answers)
def test_resolve_scores_applies_correct_and_bluff_points(self):
round_question = RoundQuestion.objects.create(
session=self.session,
round_number=1,
question=self.question_one,
correct_answer="1989",
)
Guess.objects.create(
round_question=round_question,
player=self.alice,
selected_text="1989",
is_correct=True,
)
Guess.objects.create(
round_question=round_question,
player=self.bob,
selected_text="Berlin",
is_correct=False,
fooled_player=self.clara,
)
Guess.objects.create(
round_question=round_question,
player=self.clara,
selected_text="Berlin",
is_correct=False,
fooled_player=self.clara,
)
score_events, leaderboard = resolve_scores(self.session, round_question, self.round_config)
self.assertEqual(len(score_events), 2)
self.alice.refresh_from_db()
self.clara.refresh_from_db()
self.assertEqual(self.alice.score, self.round_config.points_correct)
self.assertEqual(self.clara.score, self.round_config.points_bluff * 2)
self.assertEqual(ScoreEvent.objects.filter(session=self.session, meta__round_question_id=round_question.id).count(), 2)
self.assertEqual([entry["nickname"] for entry in leaderboard], ["Alice", "Clara", "Bob"])
def test_payload_builders_expose_fupogfakta_round_contract(self):
round_question = RoundQuestion.objects.create(
session=self.session,
round_number=1,
question=self.question_one,
correct_answer="1989",
)
lie = LieAnswer.objects.create(round_question=round_question, player=self.bob, text="1991")
Guess.objects.create(
round_question=round_question,
player=self.alice,
selected_text="1991",
is_correct=False,
fooled_player=self.bob,
)
lie_payload = build_lie_started_payload(self.session, self.round_config, round_question)
reveal_payload = build_reveal_payload(round_question)
self.assertEqual(lie_payload["category"], {"slug": self.category.slug, "name": self.category.name})
self.assertEqual(lie_payload["round_question_id"], round_question.id)
self.assertEqual(reveal_payload["correct_answer"], "1989")
self.assertEqual(reveal_payload["lies"][0]["player_id"], lie.player_id)
self.assertEqual(reveal_payload["guesses"][0]["fooled_player_nickname"], self.bob.nickname)

View File

@@ -8,16 +8,17 @@ from django.http import HttpRequest, JsonResponse
from django.utils import timezone from django.utils import timezone
from django.views.decorators.http import require_GET, require_POST from django.views.decorators.http import require_GET, require_POST
from fupogfakta.models import ( from fupogfakta.models import Category, GameSession, Guess, LieAnswer, Player, Question, RoundConfig, RoundQuestion, ScoreEvent
Category, from fupogfakta.payloads import (
GameSession, build_leaderboard as _build_leaderboard,
Guess, build_lie_started_payload as _build_lie_started_payload,
LieAnswer, build_reveal_payload as _build_reveal_payload,
Player, )
Question, from fupogfakta.services import (
RoundConfig, get_current_round_question as _get_current_round_question,
RoundQuestion, prepare_mixed_answers as _prepare_mixed_answers,
ScoreEvent, resolve_scores as _resolve_scores,
select_round_question as _select_round_question,
) )
from realtime.broadcast import sync_broadcast_phase_event from realtime.broadcast import sync_broadcast_phase_event
@@ -64,181 +65,6 @@ def _create_unique_session_code() -> str:
raise RuntimeError("Could not generate unique session code") raise RuntimeError("Could not generate unique session code")
def _build_player_ref(player: Player | None) -> dict | None:
if player is None:
return None
return {
"player_id": player.id,
"nickname": player.nickname,
}
def _build_reveal_payload(round_question: RoundQuestion | None) -> dict | None:
if round_question is None:
return None
lies = [
{
**_build_player_ref(lie.player),
"text": lie.text,
"created_at": lie.created_at.isoformat(),
}
for lie in round_question.lies.select_related("player").order_by("created_at", "id")
]
guesses = []
for guess in round_question.guesses.select_related("player", "fooled_player").order_by("created_at", "id"):
guess_payload = {
**_build_player_ref(guess.player),
"selected_text": guess.selected_text,
"is_correct": guess.is_correct,
"created_at": guess.created_at.isoformat(),
"fooled_player_id": guess.fooled_player_id,
}
if guess.fooled_player is not None:
guess_payload["fooled_player_nickname"] = guess.fooled_player.nickname
guesses.append(guess_payload)
return {
"round_question_id": round_question.id,
"round_number": round_question.round_number,
"prompt": round_question.question.prompt,
"correct_answer": round_question.correct_answer,
"lies": lies,
"guesses": guesses,
}
def _build_leaderboard(session: GameSession) -> list[dict]:
return list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
def _get_current_round_question(session: GameSession) -> RoundQuestion | None:
return (
RoundQuestion.objects.filter(session=session, round_number=session.current_round)
.select_related("question")
.order_by("-id")
.first()
)
def _select_round_question(session: GameSession, round_config: RoundConfig) -> RoundQuestion:
existing_round_question = _get_current_round_question(session)
if existing_round_question is not None:
return existing_round_question
used_question_ids = RoundQuestion.objects.filter(session=session).values_list("question_id", flat=True)
available_questions = Question.objects.filter(
category=round_config.category,
is_active=True,
).exclude(pk__in=used_question_ids)
if not available_questions.exists():
raise ValueError("no_available_questions")
question = random.choice(list(available_questions))
return RoundQuestion.objects.create(
session=session,
round_number=session.current_round,
question=question,
correct_answer=question.correct_answer,
)
def _build_lie_started_payload(session: GameSession, round_config: RoundConfig, round_question: RoundQuestion) -> dict:
lie_deadline_at = round_question.shown_at + timedelta(seconds=round_config.lie_seconds)
return {
"round_number": session.current_round,
"category": {"slug": round_config.category.slug, "name": round_config.category.name},
"round_question_id": round_question.id,
"prompt": round_question.question.prompt,
"shown_at": round_question.shown_at.isoformat(),
"lie_deadline_at": lie_deadline_at.isoformat(),
"lie_seconds": round_config.lie_seconds,
}
def _prepare_mixed_answers(round_question: RoundQuestion) -> list[str]:
deduped_answers = list(round_question.mixed_answers or [])
if deduped_answers:
return deduped_answers
lie_texts = list(round_question.lies.values_list("text", flat=True))
seen = set()
for text in [round_question.correct_answer, *lie_texts]:
normalized = text.strip().casefold()
if not normalized or normalized in seen:
continue
seen.add(normalized)
deduped_answers.append(text.strip())
if len(deduped_answers) < 2:
raise ValueError("not_enough_answers_to_mix")
random.shuffle(deduped_answers)
round_question.mixed_answers = deduped_answers
round_question.save(update_fields=["mixed_answers"])
return deduped_answers
def _resolve_scores(session: GameSession, round_question: RoundQuestion, round_config: RoundConfig) -> tuple[list[ScoreEvent], list[dict]]:
guesses = list(round_question.guesses.select_related("player"))
if not guesses:
raise ValueError("no_guesses_submitted")
bluff_counts: dict[int, int] = {}
for guess in guesses:
if guess.fooled_player_id:
bluff_counts[guess.fooled_player_id] = bluff_counts.get(guess.fooled_player_id, 0) + 1
score_events = []
for guess in guesses:
if guess.is_correct:
guess.player.score += round_config.points_correct
guess.player.save(update_fields=["score"])
score_events.append(
ScoreEvent(
session=session,
player=guess.player,
delta=round_config.points_correct,
reason="guess_correct",
meta={"round_question_id": round_question.id, "guess_id": guess.id},
)
)
for player_id, fooled_count in bluff_counts.items():
delta = fooled_count * round_config.points_bluff
player = Player.objects.get(pk=player_id, session=session)
player.score += delta
player.save(update_fields=["score"])
score_events.append(
ScoreEvent(
session=session,
player=player,
delta=delta,
reason="bluff_success",
meta={"round_question_id": round_question.id, "fooled_count": fooled_count},
)
)
ScoreEvent.objects.bulk_create(score_events)
return score_events, _build_leaderboard(session)
def _maybe_promote_reveal_to_scoreboard(session: GameSession) -> GameSession: def _maybe_promote_reveal_to_scoreboard(session: GameSession) -> GameSession:
if session.status != GameSession.Status.REVEAL: if session.status != GameSession.Status.REVEAL:
return session return session