import random from dataclasses import dataclass from typing import Any from django.db import transaction from django.utils import timezone from .models import GameSession, Guess, LieAnswer, Player, Question, RoundConfig, RoundQuestion, ScoreEvent from .payloads import build_finish_game_phase_event, build_start_next_round_phase_event @dataclass(frozen=True) class RoundTransitionResult: session: GameSession round_config: RoundConfig round_question: RoundQuestion should_broadcast: bool phase_event_name: str | None = None phase_event_payload: dict[str, Any] | None = None @dataclass(frozen=True) class FinishGameResult: session: GameSession should_broadcast: bool phase_event_name: str | None = None phase_event_payload: dict[str, Any] | None = None @dataclass(frozen=True) class ScoreboardTransitionResult: session: GameSession leaderboard: list[dict] should_broadcast: bool def get_current_round_question(session: GameSession) -> RoundQuestion | None: return ( RoundQuestion.objects.filter(session=session, round_number=session.current_round) .select_related("question") .order_by("-id") .first() ) def reset_round_question_bootstrap_state(round_question: RoundQuestion) -> RoundQuestion: Guess.objects.filter(round_question=round_question).delete() LieAnswer.objects.filter(round_question=round_question).delete() update_fields: list[str] = [] if round_question.mixed_answers: round_question.mixed_answers = [] update_fields.append("mixed_answers") round_question.shown_at = timezone.now() update_fields.append("shown_at") round_question.save(update_fields=update_fields) return round_question def select_round_question(session: GameSession, round_config: RoundConfig) -> RoundQuestion: existing_round_question = get_current_round_question(session) if existing_round_question is not None: return existing_round_question used_question_ids = RoundQuestion.objects.filter(session=session).values_list("question_id", flat=True) available_questions = Question.objects.filter( category=round_config.category, is_active=True, ).exclude(pk__in=used_question_ids) if not available_questions.exists(): raise ValueError("no_available_questions") question = random.choice(list(available_questions)) return RoundQuestion.objects.create( session=session, round_number=session.current_round, question=question, correct_answer=question.correct_answer, ) def prepare_mixed_answers(round_question: RoundQuestion) -> list[str]: deduped_answers = list(round_question.mixed_answers or []) if deduped_answers: return deduped_answers lie_texts = list(round_question.lies.values_list("text", flat=True)) seen = set() for text in [round_question.correct_answer, *lie_texts]: normalized = text.strip().casefold() if not normalized or normalized in seen: continue seen.add(normalized) deduped_answers.append(text.strip()) if len(deduped_answers) < 2: raise ValueError("not_enough_answers_to_mix") random.shuffle(deduped_answers) round_question.mixed_answers = deduped_answers round_question.save(update_fields=["mixed_answers"]) return deduped_answers def start_next_round(session: GameSession) -> RoundTransitionResult: with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) next_round_config = None round_question = None should_broadcast = False phase_event_name = None phase_event_payload = None if locked_session.status == GameSession.Status.SCOREBOARD: previous_round_config = RoundConfig.objects.filter( session=locked_session, number=locked_session.current_round, ).select_related("category").first() if previous_round_config is None: raise ValueError("round_config_missing") next_round_number = locked_session.current_round + 1 next_round_config = RoundConfig( session=locked_session, number=next_round_number, category=previous_round_config.category, lie_seconds=previous_round_config.lie_seconds, guess_seconds=previous_round_config.guess_seconds, points_correct=previous_round_config.points_correct, points_bluff=previous_round_config.points_bluff, started_from_scoreboard=True, ) locked_session.current_round = next_round_number round_question = reset_round_question_bootstrap_state(select_round_question(locked_session, next_round_config)) next_round_config.save() locked_session.status = GameSession.Status.LIE locked_session.save(update_fields=["current_round", "status"]) should_broadcast = True phase_event = build_start_next_round_phase_event(locked_session, next_round_config, round_question) phase_event_name = phase_event["name"] phase_event_payload = phase_event["payload"] elif locked_session.status == GameSession.Status.LIE: if locked_session.current_round <= 1: raise ValueError("next_round_invalid_phase") next_round_config = RoundConfig.objects.filter( session=locked_session, number=locked_session.current_round, ).select_related("category").first() round_question = get_current_round_question(locked_session) if ( next_round_config is None or not next_round_config.started_from_scoreboard or round_question is None ): raise ValueError("next_round_invalid_phase") else: raise ValueError("next_round_invalid_phase") return RoundTransitionResult( session=locked_session, round_config=next_round_config, round_question=round_question, should_broadcast=should_broadcast, phase_event_name=phase_event_name, phase_event_payload=phase_event_payload, ) def finish_game(session: GameSession) -> FinishGameResult: with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) should_broadcast = False phase_event_name = None phase_event_payload = None if locked_session.status == GameSession.Status.SCOREBOARD: locked_session.status = GameSession.Status.FINISHED locked_session.save(update_fields=["status"]) should_broadcast = True phase_event = build_finish_game_phase_event(locked_session) phase_event_name = phase_event["name"] phase_event_payload = phase_event["payload"] elif locked_session.status != GameSession.Status.FINISHED: raise ValueError("finish_game_invalid_phase") return FinishGameResult( session=locked_session, should_broadcast=should_broadcast, phase_event_name=phase_event_name, phase_event_payload=phase_event_payload, ) def promote_reveal_to_scoreboard(session: GameSession) -> ScoreboardTransitionResult: if session.status != GameSession.Status.REVEAL: leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return ScoreboardTransitionResult(session=session, leaderboard=leaderboard, should_broadcast=False) current_round_question = get_current_round_question(session) if current_round_question is None: leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return ScoreboardTransitionResult(session=session, leaderboard=leaderboard, should_broadcast=False) players_count = Player.objects.filter(session=session).count() guess_count = Guess.objects.filter(round_question=current_round_question).count() has_score_events = ScoreEvent.objects.filter( session=session, meta__round_question_id=current_round_question.id, ).exists() reveal_is_resolved = has_score_events or (players_count > 0 and guess_count >= players_count) if not reveal_is_resolved: leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return ScoreboardTransitionResult(session=session, leaderboard=leaderboard, should_broadcast=False) with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) if locked_session.status != GameSession.Status.REVEAL: scoreboard_session = locked_session should_broadcast = False else: locked_session.status = GameSession.Status.SCOREBOARD locked_session.save(update_fields=["status"]) scoreboard_session = locked_session should_broadcast = True leaderboard = list( Player.objects.filter(session=scoreboard_session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return ScoreboardTransitionResult( session=scoreboard_session, leaderboard=leaderboard, should_broadcast=should_broadcast, ) def resolve_scores( session: GameSession, round_question: RoundQuestion, round_config: RoundConfig, ) -> tuple[list[ScoreEvent], list[dict]]: guesses = list(round_question.guesses.select_related("player")) if not guesses: raise ValueError("no_guesses_submitted") bluff_counts: dict[int, int] = {} for guess in guesses: if guess.fooled_player_id: bluff_counts[guess.fooled_player_id] = bluff_counts.get(guess.fooled_player_id, 0) + 1 score_events = [] for guess in guesses: if guess.is_correct: guess.player.score += round_config.points_correct guess.player.save(update_fields=["score"]) score_events.append( ScoreEvent( session=session, player=guess.player, delta=round_config.points_correct, reason="guess_correct", meta={"round_question_id": round_question.id, "guess_id": guess.id}, ) ) for player_id, fooled_count in bluff_counts.items(): delta = fooled_count * round_config.points_bluff player = Player.objects.get(pk=player_id, session=session) player.score += delta player.save(update_fields=["score"]) score_events.append( ScoreEvent( session=session, player=player, delta=delta, reason="bluff_success", meta={"round_question_id": round_question.id, "fooled_count": fooled_count}, ) ) ScoreEvent.objects.bulk_create(score_events) leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return score_events, leaderboard