Files
weirsoe-party-protocol/fupogfakta/services.py
dev-bot dfa197b33b
All checks were successful
CI / test-and-quality (pull_request) Successful in 3m37s
CI / test-and-quality (push) Successful in 3m38s
refactor(gameplay): keep host transition payloads in cartridge
2026-03-17 16:06:46 +00:00

358 lines
13 KiB
Python

import random
from dataclasses import dataclass
from typing import Any
from django.db import transaction
from django.utils import timezone
from .models import GameSession, Guess, LieAnswer, Player, Question, RoundConfig, RoundQuestion, ScoreEvent
from .payloads import (
build_finish_game_phase_event,
build_finish_game_response,
build_reveal_scoreboard_response,
build_scoreboard_phase_event,
build_start_next_round_phase_event,
build_start_next_round_response,
)
@dataclass(frozen=True)
class RoundTransitionResult:
session: GameSession
round_config: RoundConfig
round_question: RoundQuestion
should_broadcast: bool
response_payload: dict[str, Any]
phase_event_name: str | None = None
phase_event_payload: dict[str, Any] | None = None
@dataclass(frozen=True)
class FinishGameResult:
session: GameSession
should_broadcast: bool
response_payload: dict[str, Any]
phase_event_name: str | None = None
phase_event_payload: dict[str, Any] | None = None
@dataclass(frozen=True)
class ScoreboardTransitionResult:
session: GameSession
leaderboard: list[dict]
should_broadcast: bool
response_payload: dict[str, Any] | None = None
phase_event_name: str | None = None
phase_event_payload: dict[str, Any] | None = None
def get_current_round_question(session: GameSession) -> RoundQuestion | None:
return (
RoundQuestion.objects.filter(session=session, round_number=session.current_round)
.select_related("question")
.order_by("-id")
.first()
)
def reset_round_question_bootstrap_state(round_question: RoundQuestion) -> RoundQuestion:
Guess.objects.filter(round_question=round_question).delete()
LieAnswer.objects.filter(round_question=round_question).delete()
update_fields: list[str] = []
if round_question.mixed_answers:
round_question.mixed_answers = []
update_fields.append("mixed_answers")
round_question.shown_at = timezone.now()
update_fields.append("shown_at")
round_question.save(update_fields=update_fields)
return round_question
def select_round_question(session: GameSession, round_config: RoundConfig) -> RoundQuestion:
existing_round_question = get_current_round_question(session)
if existing_round_question is not None:
return existing_round_question
used_question_ids = RoundQuestion.objects.filter(session=session).values_list("question_id", flat=True)
available_questions = Question.objects.filter(
category=round_config.category,
is_active=True,
).exclude(pk__in=used_question_ids)
if not available_questions.exists():
raise ValueError("no_available_questions")
question = random.choice(list(available_questions))
return RoundQuestion.objects.create(
session=session,
round_number=session.current_round,
question=question,
correct_answer=question.correct_answer,
)
def prepare_mixed_answers(round_question: RoundQuestion) -> list[str]:
deduped_answers = list(round_question.mixed_answers or [])
if deduped_answers:
return deduped_answers
lie_texts = list(round_question.lies.values_list("text", flat=True))
seen = set()
for text in [round_question.correct_answer, *lie_texts]:
normalized = text.strip().casefold()
if not normalized or normalized in seen:
continue
seen.add(normalized)
deduped_answers.append(text.strip())
if len(deduped_answers) < 2:
raise ValueError("not_enough_answers_to_mix")
random.shuffle(deduped_answers)
round_question.mixed_answers = deduped_answers
round_question.save(update_fields=["mixed_answers"])
return deduped_answers
def start_next_round(session: GameSession) -> RoundTransitionResult:
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
next_round_config = None
round_question = None
should_broadcast = False
phase_event_name = None
phase_event_payload = None
if locked_session.status == GameSession.Status.SCOREBOARD:
previous_round_config = RoundConfig.objects.filter(
session=locked_session,
number=locked_session.current_round,
).select_related("category").first()
if previous_round_config is None:
raise ValueError("round_config_missing")
next_round_number = locked_session.current_round + 1
next_round_config = RoundConfig(
session=locked_session,
number=next_round_number,
category=previous_round_config.category,
lie_seconds=previous_round_config.lie_seconds,
guess_seconds=previous_round_config.guess_seconds,
points_correct=previous_round_config.points_correct,
points_bluff=previous_round_config.points_bluff,
started_from_scoreboard=True,
)
locked_session.current_round = next_round_number
round_question = reset_round_question_bootstrap_state(select_round_question(locked_session, next_round_config))
next_round_config.save()
locked_session.status = GameSession.Status.LIE
locked_session.save(update_fields=["current_round", "status"])
should_broadcast = True
phase_event = build_start_next_round_phase_event(locked_session, next_round_config, round_question)
phase_event_name = phase_event["name"]
phase_event_payload = phase_event["payload"]
elif locked_session.status == GameSession.Status.LIE:
if locked_session.current_round <= 1:
raise ValueError("next_round_invalid_phase")
next_round_config = RoundConfig.objects.filter(
session=locked_session,
number=locked_session.current_round,
).select_related("category").first()
round_question = get_current_round_question(locked_session)
if (
next_round_config is None
or not next_round_config.started_from_scoreboard
or round_question is None
):
raise ValueError("next_round_invalid_phase")
else:
raise ValueError("next_round_invalid_phase")
return RoundTransitionResult(
session=locked_session,
round_config=next_round_config,
round_question=round_question,
should_broadcast=should_broadcast,
response_payload=build_start_next_round_response(
locked_session,
next_round_config,
round_question,
),
phase_event_name=phase_event_name,
phase_event_payload=phase_event_payload,
)
def finish_game(session: GameSession) -> FinishGameResult:
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
should_broadcast = False
phase_event_name = None
phase_event_payload = None
if locked_session.status == GameSession.Status.SCOREBOARD:
locked_session.status = GameSession.Status.FINISHED
locked_session.save(update_fields=["status"])
should_broadcast = True
phase_event = build_finish_game_phase_event(locked_session)
phase_event_name = phase_event["name"]
phase_event_payload = phase_event["payload"]
elif locked_session.status != GameSession.Status.FINISHED:
raise ValueError("finish_game_invalid_phase")
return FinishGameResult(
session=locked_session,
should_broadcast=should_broadcast,
response_payload=build_finish_game_response(locked_session),
phase_event_name=phase_event_name,
phase_event_payload=phase_event_payload,
)
def promote_reveal_to_scoreboard(session: GameSession) -> ScoreboardTransitionResult:
if session.status != GameSession.Status.REVEAL:
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
return ScoreboardTransitionResult(
session=session,
leaderboard=leaderboard,
should_broadcast=False,
response_payload=build_reveal_scoreboard_response(session, leaderboard),
)
current_round_question = get_current_round_question(session)
if current_round_question is None:
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
return ScoreboardTransitionResult(
session=session,
leaderboard=leaderboard,
should_broadcast=False,
response_payload=build_reveal_scoreboard_response(session, leaderboard),
)
players_count = Player.objects.filter(session=session).count()
guess_count = Guess.objects.filter(round_question=current_round_question).count()
has_score_events = ScoreEvent.objects.filter(
session=session,
meta__round_question_id=current_round_question.id,
).exists()
reveal_is_resolved = has_score_events or (players_count > 0 and guess_count >= players_count)
if not reveal_is_resolved:
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
return ScoreboardTransitionResult(
session=session,
leaderboard=leaderboard,
should_broadcast=False,
response_payload=build_reveal_scoreboard_response(session, leaderboard),
)
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
if locked_session.status != GameSession.Status.REVEAL:
scoreboard_session = locked_session
should_broadcast = False
else:
locked_session.status = GameSession.Status.SCOREBOARD
locked_session.save(update_fields=["status"])
scoreboard_session = locked_session
should_broadcast = True
leaderboard = list(
Player.objects.filter(session=scoreboard_session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
phase_event_name = None
phase_event_payload = None
if should_broadcast:
phase_event = build_scoreboard_phase_event(scoreboard_session, leaderboard)
phase_event_name = phase_event["name"]
phase_event_payload = phase_event["payload"]
return ScoreboardTransitionResult(
session=scoreboard_session,
leaderboard=leaderboard,
should_broadcast=should_broadcast,
response_payload=build_reveal_scoreboard_response(scoreboard_session, leaderboard),
phase_event_name=phase_event_name,
phase_event_payload=phase_event_payload,
)
def resolve_scores(
session: GameSession,
round_question: RoundQuestion,
round_config: RoundConfig,
) -> tuple[list[ScoreEvent], list[dict]]:
guesses = list(round_question.guesses.select_related("player"))
if not guesses:
raise ValueError("no_guesses_submitted")
bluff_counts: dict[int, int] = {}
for guess in guesses:
if guess.fooled_player_id:
bluff_counts[guess.fooled_player_id] = bluff_counts.get(guess.fooled_player_id, 0) + 1
score_events = []
for guess in guesses:
if guess.is_correct:
guess.player.score += round_config.points_correct
guess.player.save(update_fields=["score"])
score_events.append(
ScoreEvent(
session=session,
player=guess.player,
delta=round_config.points_correct,
reason="guess_correct",
meta={"round_question_id": round_question.id, "guess_id": guess.id},
)
)
for player_id, fooled_count in bluff_counts.items():
delta = fooled_count * round_config.points_bluff
player = Player.objects.get(pk=player_id, session=session)
player.score += delta
player.save(update_fields=["score"])
score_events.append(
ScoreEvent(
session=session,
player=player,
delta=delta,
reason="bluff_success",
meta={"round_question_id": round_question.id, "fooled_count": fooled_count},
)
)
ScoreEvent.objects.bulk_create(score_events)
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
return score_events, leaderboard