import random from datetime import timedelta from dataclasses import dataclass from typing import Any from django.db import transaction from django.utils import timezone from .models import Category, GameSession, Guess, LieAnswer, Player, Question, RoundConfig, RoundQuestion, ScoreEvent from .payloads import ( build_finish_game_phase_event, build_finish_game_response, build_lie_started_payload, build_question_shown_payload, build_question_shown_response, build_reveal_scoreboard_response, build_scoreboard_phase_event, build_start_next_round_phase_event, build_start_next_round_response, build_start_round_response, ) @dataclass(frozen=True) class RoundTransitionResult: session: GameSession round_config: RoundConfig round_question: RoundQuestion should_broadcast: bool response_payload: dict[str, Any] phase_event_name: str | None = None phase_event_payload: dict[str, Any] | None = None @dataclass(frozen=True) class FinishGameResult: session: GameSession should_broadcast: bool response_payload: dict[str, Any] phase_event_name: str | None = None phase_event_payload: dict[str, Any] | None = None @dataclass(frozen=True) class ScoreboardTransitionResult: session: GameSession leaderboard: list[dict] should_broadcast: bool response_payload: dict[str, Any] | None = None phase_event_name: str | None = None phase_event_payload: dict[str, Any] | None = None def get_round_question(session: GameSession, round_number: int) -> RoundQuestion | None: return ( RoundQuestion.objects.filter(session=session, round_number=round_number) .select_related("question") .order_by("-id") .first() ) def get_current_round_question(session: GameSession) -> RoundQuestion | None: return get_round_question(session, session.current_round) def reset_round_question_bootstrap_state(round_question: RoundQuestion) -> RoundQuestion: Guess.objects.filter(round_question=round_question).delete() LieAnswer.objects.filter(round_question=round_question).delete() update_fields: list[str] = [] if round_question.mixed_answers: round_question.mixed_answers = [] update_fields.append("mixed_answers") round_question.shown_at = timezone.now() update_fields.append("shown_at") round_question.save(update_fields=update_fields) return round_question def select_round_question( session: GameSession, round_config: RoundConfig, *, round_number: int | None = None, ) -> RoundQuestion: target_round_number = session.current_round if round_number is None else round_number existing_round_question = get_round_question(session, target_round_number) if existing_round_question is not None and existing_round_question.question.category_id == round_config.category_id: return existing_round_question used_question_ids = RoundQuestion.objects.filter(session=session).values_list("question_id", flat=True) available_questions = Question.objects.filter( category=round_config.category, is_active=True, ).exclude(pk__in=used_question_ids) if not available_questions.exists(): raise ValueError("no_available_questions") question = random.choice(list(available_questions)) if existing_round_question is not None: existing_round_question.question = question existing_round_question.correct_answer = question.correct_answer existing_round_question.save(update_fields=["question", "correct_answer"]) return existing_round_question return RoundQuestion.objects.create( session=session, round_number=target_round_number, question=question, correct_answer=question.correct_answer, ) def prepare_mixed_answers(round_question: RoundQuestion) -> list[str]: deduped_answers = list(round_question.mixed_answers or []) if deduped_answers: return deduped_answers lie_texts = list(round_question.lies.values_list("text", flat=True)) seen = set() for text in [round_question.correct_answer, *lie_texts]: normalized = text.strip().casefold() if not normalized or normalized in seen: continue seen.add(normalized) deduped_answers.append(text.strip()) if len(deduped_answers) < 2: raise ValueError("not_enough_answers_to_mix") random.shuffle(deduped_answers) round_question.mixed_answers = deduped_answers round_question.save(update_fields=["mixed_answers"]) return deduped_answers def start_round(session: GameSession, category_slug: str) -> RoundTransitionResult: try: category = Category.objects.get(slug=category_slug, is_active=True) except Category.DoesNotExist: raise ValueError("category_not_found") if not Question.objects.filter(category=category, is_active=True).exists(): raise ValueError("category_has_no_questions") with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) if locked_session.status != GameSession.Status.LOBBY: raise ValueError("round_start_invalid_phase") if RoundConfig.objects.filter(session=locked_session, number=locked_session.current_round).exists(): raise ValueError("round_already_configured") round_config = RoundConfig( session=locked_session, number=locked_session.current_round, category=category, ) round_question = select_round_question(locked_session, round_config) round_config.save() locked_session.status = GameSession.Status.LIE locked_session.save(update_fields=["status"]) phase_event = { "name": "phase.lie_started", "payload": build_lie_started_payload(locked_session, round_config, round_question), } return RoundTransitionResult( session=locked_session, round_config=round_config, round_question=round_question, should_broadcast=True, response_payload=build_start_round_response(locked_session, round_config, round_question), phase_event_name=phase_event["name"], phase_event_payload=phase_event["payload"], ) def show_question(session: GameSession) -> RoundTransitionResult: if session.status != GameSession.Status.LIE: raise ValueError("show_question_invalid_phase") try: round_config = RoundConfig.objects.get(session=session, number=session.current_round) except RoundConfig.DoesNotExist: raise ValueError("round_config_missing") round_question = get_current_round_question(session) if round_question is None: round_question = select_round_question(session, round_config) lie_deadline_at = round_question.shown_at + timedelta(seconds=round_config.lie_seconds) lie_deadline_iso = lie_deadline_at.isoformat() phase_event = { "name": "phase.question_shown", "payload": build_question_shown_payload(round_question, lie_deadline_iso, round_config.lie_seconds), } return RoundTransitionResult( session=session, round_config=round_config, round_question=round_question, should_broadcast=True, response_payload=build_question_shown_response(round_question, lie_deadline_iso, round_config.lie_seconds), phase_event_name=phase_event["name"], phase_event_payload=phase_event["payload"], ) def start_next_round(session: GameSession) -> RoundTransitionResult: with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) next_round_config = None round_question = None should_broadcast = False phase_event_name = None phase_event_payload = None if locked_session.status == GameSession.Status.SCOREBOARD: previous_round_config = RoundConfig.objects.filter( session=locked_session, number=locked_session.current_round, ).select_related("category").first() if previous_round_config is None: raise ValueError("round_config_missing") next_round_number = locked_session.current_round + 1 next_round_config, _created = RoundConfig.objects.get_or_create( session=locked_session, number=next_round_number, defaults={ "category": previous_round_config.category, "lie_seconds": previous_round_config.lie_seconds, "guess_seconds": previous_round_config.guess_seconds, "points_correct": previous_round_config.points_correct, "points_bluff": previous_round_config.points_bluff, "started_from_scoreboard": True, }, ) round_config_update_fields: list[str] = [] if next_round_config.category_id != previous_round_config.category_id: next_round_config.category = previous_round_config.category round_config_update_fields.append("category") if next_round_config.lie_seconds != previous_round_config.lie_seconds: next_round_config.lie_seconds = previous_round_config.lie_seconds round_config_update_fields.append("lie_seconds") if next_round_config.guess_seconds != previous_round_config.guess_seconds: next_round_config.guess_seconds = previous_round_config.guess_seconds round_config_update_fields.append("guess_seconds") if next_round_config.points_correct != previous_round_config.points_correct: next_round_config.points_correct = previous_round_config.points_correct round_config_update_fields.append("points_correct") if next_round_config.points_bluff != previous_round_config.points_bluff: next_round_config.points_bluff = previous_round_config.points_bluff round_config_update_fields.append("points_bluff") if not next_round_config.started_from_scoreboard: next_round_config.started_from_scoreboard = True round_config_update_fields.append("started_from_scoreboard") if round_config_update_fields: next_round_config.save(update_fields=round_config_update_fields) locked_session.current_round = next_round_number round_question = reset_round_question_bootstrap_state( select_round_question(locked_session, next_round_config, round_number=next_round_number) ) locked_session.status = GameSession.Status.LIE locked_session.save(update_fields=["current_round", "status"]) should_broadcast = True phase_event = build_start_next_round_phase_event(locked_session, next_round_config, round_question) phase_event_name = phase_event["name"] phase_event_payload = phase_event["payload"] elif locked_session.status == GameSession.Status.LIE: if locked_session.current_round <= 1: raise ValueError("next_round_invalid_phase") next_round_config = RoundConfig.objects.filter( session=locked_session, number=locked_session.current_round, ).select_related("category").first() round_question = get_current_round_question(locked_session) if ( next_round_config is None or not next_round_config.started_from_scoreboard or round_question is None ): raise ValueError("next_round_invalid_phase") else: raise ValueError("next_round_invalid_phase") return RoundTransitionResult( session=locked_session, round_config=next_round_config, round_question=round_question, should_broadcast=should_broadcast, response_payload=build_start_next_round_response( locked_session, next_round_config, round_question, ), phase_event_name=phase_event_name, phase_event_payload=phase_event_payload, ) def finish_game(session: GameSession) -> FinishGameResult: with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) should_broadcast = False phase_event_name = None phase_event_payload = None if locked_session.status == GameSession.Status.SCOREBOARD: locked_session.status = GameSession.Status.FINISHED locked_session.save(update_fields=["status"]) should_broadcast = True phase_event = build_finish_game_phase_event(locked_session) phase_event_name = phase_event["name"] phase_event_payload = phase_event["payload"] elif locked_session.status != GameSession.Status.FINISHED: raise ValueError("finish_game_invalid_phase") return FinishGameResult( session=locked_session, should_broadcast=should_broadcast, response_payload=build_finish_game_response(locked_session), phase_event_name=phase_event_name, phase_event_payload=phase_event_payload, ) def promote_reveal_to_scoreboard(session: GameSession) -> ScoreboardTransitionResult: if session.status != GameSession.Status.REVEAL: leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return ScoreboardTransitionResult( session=session, leaderboard=leaderboard, should_broadcast=False, response_payload=build_reveal_scoreboard_response(session, leaderboard), ) current_round_question = get_current_round_question(session) if current_round_question is None: leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return ScoreboardTransitionResult( session=session, leaderboard=leaderboard, should_broadcast=False, response_payload=build_reveal_scoreboard_response(session, leaderboard), ) players_count = Player.objects.filter(session=session).count() guess_count = Guess.objects.filter(round_question=current_round_question).count() has_score_events = ScoreEvent.objects.filter( session=session, meta__round_question_id=current_round_question.id, ).exists() reveal_is_resolved = has_score_events or (players_count > 0 and guess_count >= players_count) if not reveal_is_resolved: leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return ScoreboardTransitionResult( session=session, leaderboard=leaderboard, should_broadcast=False, response_payload=build_reveal_scoreboard_response(session, leaderboard), ) with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) if locked_session.status != GameSession.Status.REVEAL: scoreboard_session = locked_session should_broadcast = False else: locked_session.status = GameSession.Status.SCOREBOARD locked_session.save(update_fields=["status"]) scoreboard_session = locked_session should_broadcast = True leaderboard = list( Player.objects.filter(session=scoreboard_session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) phase_event_name = None phase_event_payload = None if should_broadcast: phase_event = build_scoreboard_phase_event(scoreboard_session, leaderboard) phase_event_name = phase_event["name"] phase_event_payload = phase_event["payload"] return ScoreboardTransitionResult( session=scoreboard_session, leaderboard=leaderboard, should_broadcast=should_broadcast, response_payload=build_reveal_scoreboard_response(scoreboard_session, leaderboard), phase_event_name=phase_event_name, phase_event_payload=phase_event_payload, ) def resolve_scores( session: GameSession, round_question: RoundQuestion, round_config: RoundConfig, ) -> tuple[list[ScoreEvent], list[dict]]: guesses = list(round_question.guesses.select_related("player")) if not guesses: raise ValueError("no_guesses_submitted") bluff_counts: dict[int, int] = {} for guess in guesses: if guess.fooled_player_id: bluff_counts[guess.fooled_player_id] = bluff_counts.get(guess.fooled_player_id, 0) + 1 score_events = [] for guess in guesses: if guess.is_correct: guess.player.score += round_config.points_correct guess.player.save(update_fields=["score"]) score_events.append( ScoreEvent( session=session, player=guess.player, delta=round_config.points_correct, reason="guess_correct", meta={"round_question_id": round_question.id, "guess_id": guess.id}, ) ) for player_id, fooled_count in bluff_counts.items(): delta = fooled_count * round_config.points_bluff player = Player.objects.get(pk=player_id, session=session) player.score += delta player.save(update_fields=["score"]) score_events.append( ScoreEvent( session=session, player=player, delta=delta, reason="bluff_success", meta={"round_question_id": round_question.id, "fooled_count": fooled_count}, ) ) ScoreEvent.objects.bulk_create(score_events) leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return score_events, leaderboard