import random from dataclasses import dataclass from django.db import transaction from .models import GameSession, Guess, LieAnswer, Player, Question, RoundConfig, RoundQuestion, ScoreEvent @dataclass(frozen=True) class RoundTransitionResult: session: GameSession round_config: RoundConfig round_question: RoundQuestion should_broadcast: bool @dataclass(frozen=True) class FinishGameResult: session: GameSession should_broadcast: bool @dataclass(frozen=True) class ScoreboardTransitionResult: session: GameSession leaderboard: list[dict] should_broadcast: bool def get_current_round_question(session: GameSession) -> RoundQuestion | None: return ( RoundQuestion.objects.filter(session=session, round_number=session.current_round) .select_related("question") .order_by("-id") .first() ) def reset_round_question_bootstrap_state(round_question: RoundQuestion) -> RoundQuestion: Guess.objects.filter(round_question=round_question).delete() LieAnswer.objects.filter(round_question=round_question).delete() if round_question.mixed_answers: round_question.mixed_answers = [] round_question.save(update_fields=["mixed_answers"]) return round_question def select_round_question(session: GameSession, round_config: RoundConfig) -> RoundQuestion: existing_round_question = get_current_round_question(session) if existing_round_question is not None: return existing_round_question used_question_ids = RoundQuestion.objects.filter(session=session).values_list("question_id", flat=True) available_questions = Question.objects.filter( category=round_config.category, is_active=True, ).exclude(pk__in=used_question_ids) if not available_questions.exists(): raise ValueError("no_available_questions") question = random.choice(list(available_questions)) return RoundQuestion.objects.create( session=session, round_number=session.current_round, question=question, correct_answer=question.correct_answer, ) def prepare_mixed_answers(round_question: RoundQuestion) -> list[str]: deduped_answers = list(round_question.mixed_answers or []) if deduped_answers: return deduped_answers lie_texts = list(round_question.lies.values_list("text", flat=True)) seen = set() for text in [round_question.correct_answer, *lie_texts]: normalized = text.strip().casefold() if not normalized or normalized in seen: continue seen.add(normalized) deduped_answers.append(text.strip()) if len(deduped_answers) < 2: raise ValueError("not_enough_answers_to_mix") random.shuffle(deduped_answers) round_question.mixed_answers = deduped_answers round_question.save(update_fields=["mixed_answers"]) return deduped_answers def start_next_round(session: GameSession) -> RoundTransitionResult: with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) next_round_config = None round_question = None should_broadcast = False if locked_session.status == GameSession.Status.SCOREBOARD: previous_round_config = RoundConfig.objects.filter( session=locked_session, number=locked_session.current_round, ).select_related("category").first() if previous_round_config is None: raise ValueError("round_config_missing") next_round_number = locked_session.current_round + 1 next_round_config = RoundConfig( session=locked_session, number=next_round_number, category=previous_round_config.category, lie_seconds=previous_round_config.lie_seconds, guess_seconds=previous_round_config.guess_seconds, points_correct=previous_round_config.points_correct, points_bluff=previous_round_config.points_bluff, started_from_scoreboard=True, ) locked_session.current_round = next_round_number round_question = reset_round_question_bootstrap_state(select_round_question(locked_session, next_round_config)) next_round_config.save() locked_session.status = GameSession.Status.LIE locked_session.save(update_fields=["current_round", "status"]) should_broadcast = True elif locked_session.status == GameSession.Status.LIE: if locked_session.current_round <= 1: raise ValueError("next_round_invalid_phase") next_round_config = RoundConfig.objects.filter( session=locked_session, number=locked_session.current_round, ).select_related("category").first() round_question = get_current_round_question(locked_session) if ( next_round_config is None or not next_round_config.started_from_scoreboard or round_question is None ): raise ValueError("next_round_invalid_phase") else: raise ValueError("next_round_invalid_phase") return RoundTransitionResult( session=locked_session, round_config=next_round_config, round_question=round_question, should_broadcast=should_broadcast, ) def finish_game(session: GameSession) -> FinishGameResult: with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) should_broadcast = False if locked_session.status == GameSession.Status.SCOREBOARD: locked_session.status = GameSession.Status.FINISHED locked_session.save(update_fields=["status"]) should_broadcast = True elif locked_session.status != GameSession.Status.FINISHED: raise ValueError("finish_game_invalid_phase") return FinishGameResult(session=locked_session, should_broadcast=should_broadcast) def promote_reveal_to_scoreboard(session: GameSession) -> ScoreboardTransitionResult: if session.status != GameSession.Status.REVEAL: leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return ScoreboardTransitionResult(session=session, leaderboard=leaderboard, should_broadcast=False) current_round_question = get_current_round_question(session) if current_round_question is None: leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return ScoreboardTransitionResult(session=session, leaderboard=leaderboard, should_broadcast=False) players_count = Player.objects.filter(session=session).count() guess_count = Guess.objects.filter(round_question=current_round_question).count() has_score_events = ScoreEvent.objects.filter( session=session, meta__round_question_id=current_round_question.id, ).exists() reveal_is_resolved = has_score_events or (players_count > 0 and guess_count >= players_count) if not reveal_is_resolved: leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return ScoreboardTransitionResult(session=session, leaderboard=leaderboard, should_broadcast=False) with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) if locked_session.status != GameSession.Status.REVEAL: scoreboard_session = locked_session should_broadcast = False else: locked_session.status = GameSession.Status.SCOREBOARD locked_session.save(update_fields=["status"]) scoreboard_session = locked_session should_broadcast = True leaderboard = list( Player.objects.filter(session=scoreboard_session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return ScoreboardTransitionResult( session=scoreboard_session, leaderboard=leaderboard, should_broadcast=should_broadcast, ) def resolve_scores( session: GameSession, round_question: RoundQuestion, round_config: RoundConfig, ) -> tuple[list[ScoreEvent], list[dict]]: guesses = list(round_question.guesses.select_related("player")) if not guesses: raise ValueError("no_guesses_submitted") bluff_counts: dict[int, int] = {} for guess in guesses: if guess.fooled_player_id: bluff_counts[guess.fooled_player_id] = bluff_counts.get(guess.fooled_player_id, 0) + 1 score_events = [] for guess in guesses: if guess.is_correct: guess.player.score += round_config.points_correct guess.player.save(update_fields=["score"]) score_events.append( ScoreEvent( session=session, player=guess.player, delta=round_config.points_correct, reason="guess_correct", meta={"round_question_id": round_question.id, "guess_id": guess.id}, ) ) for player_id, fooled_count in bluff_counts.items(): delta = fooled_count * round_config.points_bluff player = Player.objects.get(pk=player_id, session=session) player.score += delta player.save(update_fields=["score"]) score_events.append( ScoreEvent( session=session, player=player, delta=delta, reason="bluff_success", meta={"round_question_id": round_question.id, "fooled_count": fooled_count}, ) ) ScoreEvent.objects.bulk_create(score_events) leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) return score_events, leaderboard