import json import random from datetime import timedelta from django.contrib.auth.decorators import login_required from django.db import IntegrityError, transaction from django.http import HttpRequest, JsonResponse from django.utils import timezone from django.views.decorators.http import require_GET, require_POST from fupogfakta.models import ( Category, GameSession, Guess, LieAnswer, Player, Question, RoundConfig, RoundQuestion, ScoreEvent, ) from realtime.broadcast import sync_broadcast_phase_event from .i18n import api_error SESSION_CODE_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" SESSION_CODE_LENGTH = 6 MAX_CODE_GENERATION_ATTEMPTS = 20 JOINABLE_STATUSES = { GameSession.Status.LOBBY, GameSession.Status.LIE, GameSession.Status.GUESS, GameSession.Status.REVEAL, GameSession.Status.SCOREBOARD, } def _json_body(request: HttpRequest) -> dict: if not request.body: return {} try: return json.loads(request.body) except json.JSONDecodeError: return {} def _generate_session_code() -> str: return "".join(random.choices(SESSION_CODE_ALPHABET, k=SESSION_CODE_LENGTH)) def _normalize_session_code(code: str) -> str: return code.strip().upper() def _create_unique_session_code() -> str: for _ in range(MAX_CODE_GENERATION_ATTEMPTS): code = _generate_session_code() if not GameSession.objects.filter(code=code).exists(): return code raise RuntimeError("Could not generate unique session code") def _build_player_ref(player: Player | None) -> dict | None: if player is None: return None return { "player_id": player.id, "nickname": player.nickname, } def _build_reveal_payload(round_question: RoundQuestion | None) -> dict | None: if round_question is None: return None lies = [ { **_build_player_ref(lie.player), "text": lie.text, "created_at": lie.created_at.isoformat(), } for lie in round_question.lies.select_related("player").order_by("created_at", "id") ] guesses = [] for guess in round_question.guesses.select_related("player", "fooled_player").order_by("created_at", "id"): guess_payload = { **_build_player_ref(guess.player), "selected_text": guess.selected_text, "is_correct": guess.is_correct, "created_at": guess.created_at.isoformat(), "fooled_player_id": guess.fooled_player_id, } if guess.fooled_player is not None: guess_payload["fooled_player_nickname"] = guess.fooled_player.nickname guesses.append(guess_payload) return { "round_question_id": round_question.id, "round_number": round_question.round_number, "prompt": round_question.question.prompt, "correct_answer": round_question.correct_answer, "lies": lies, "guesses": guesses, } def _build_leaderboard(session: GameSession) -> list[dict]: return list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) def _get_current_round_question(session: GameSession) -> RoundQuestion | None: return ( RoundQuestion.objects.filter(session=session, round_number=session.current_round) .select_related("question") .order_by("-id") .first() ) def _select_round_question(session: GameSession, round_config: RoundConfig) -> RoundQuestion: existing_round_question = _get_current_round_question(session) if existing_round_question is not None: return existing_round_question used_question_ids = RoundQuestion.objects.filter(session=session).values_list("question_id", flat=True) available_questions = Question.objects.filter( category=round_config.category, is_active=True, ).exclude(pk__in=used_question_ids) if not available_questions.exists(): raise ValueError("no_available_questions") question = random.choice(list(available_questions)) return RoundQuestion.objects.create( session=session, round_number=session.current_round, question=question, correct_answer=question.correct_answer, ) def _build_lie_started_payload(session: GameSession, round_config: RoundConfig, round_question: RoundQuestion) -> dict: lie_deadline_at = round_question.shown_at + timedelta(seconds=round_config.lie_seconds) return { "round_number": session.current_round, "category": {"slug": round_config.category.slug, "name": round_config.category.name}, "round_question_id": round_question.id, "prompt": round_question.question.prompt, "shown_at": round_question.shown_at.isoformat(), "lie_deadline_at": lie_deadline_at.isoformat(), "lie_seconds": round_config.lie_seconds, } def _prepare_mixed_answers(round_question: RoundQuestion) -> list[str]: deduped_answers = list(round_question.mixed_answers or []) if deduped_answers: return deduped_answers lie_texts = list(round_question.lies.values_list("text", flat=True)) seen = set() for text in [round_question.correct_answer, *lie_texts]: normalized = text.strip().casefold() if not normalized or normalized in seen: continue seen.add(normalized) deduped_answers.append(text.strip()) if len(deduped_answers) < 2: raise ValueError("not_enough_answers_to_mix") random.shuffle(deduped_answers) round_question.mixed_answers = deduped_answers round_question.save(update_fields=["mixed_answers"]) return deduped_answers def _resolve_scores(session: GameSession, round_question: RoundQuestion, round_config: RoundConfig) -> tuple[list[ScoreEvent], list[dict]]: guesses = list(round_question.guesses.select_related("player")) if not guesses: raise ValueError("no_guesses_submitted") bluff_counts: dict[int, int] = {} for guess in guesses: if guess.fooled_player_id: bluff_counts[guess.fooled_player_id] = bluff_counts.get(guess.fooled_player_id, 0) + 1 score_events = [] for guess in guesses: if guess.is_correct: guess.player.score += round_config.points_correct guess.player.save(update_fields=["score"]) score_events.append( ScoreEvent( session=session, player=guess.player, delta=round_config.points_correct, reason="guess_correct", meta={"round_question_id": round_question.id, "guess_id": guess.id}, ) ) for player_id, fooled_count in bluff_counts.items(): delta = fooled_count * round_config.points_bluff player = Player.objects.get(pk=player_id, session=session) player.score += delta player.save(update_fields=["score"]) score_events.append( ScoreEvent( session=session, player=player, delta=delta, reason="bluff_success", meta={"round_question_id": round_question.id, "fooled_count": fooled_count}, ) ) ScoreEvent.objects.bulk_create(score_events) return score_events, _build_leaderboard(session) def _maybe_promote_reveal_to_scoreboard(session: GameSession) -> GameSession: if session.status != GameSession.Status.REVEAL: return session current_round_question = _get_current_round_question(session) if current_round_question is None: return session has_round_scores = ScoreEvent.objects.filter( session=session, meta__round_question_id=current_round_question.id, ).exists() if not has_round_scores: return session with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) if locked_session.status != GameSession.Status.REVEAL: return locked_session locked_session.status = GameSession.Status.SCOREBOARD locked_session.save(update_fields=["status"]) leaderboard = _build_leaderboard(session) sync_broadcast_phase_event( session.code, "phase.scoreboard", {"leaderboard": list(leaderboard), "current_round": session.current_round}, ) session.refresh_from_db(fields=["status"]) return session def _build_phase_view_model(session: GameSession, *, players_count: int, has_round_question: bool) -> dict: status = session.status in_lobby = status == GameSession.Status.LOBBY in_lie = status == GameSession.Status.LIE in_guess = status == GameSession.Status.GUESS in_reveal = status == GameSession.Status.REVEAL in_scoreboard = status == GameSession.Status.SCOREBOARD in_finished = status == GameSession.Status.FINISHED min_players_reached = players_count >= 3 max_players_allowed = players_count <= 5 return { "status": status, "current_phase": status, "round_number": session.current_round, "players_count": players_count, "constraints": { "min_players_to_start": 3, "max_players_mvp": 5, "min_players_reached": min_players_reached, "max_players_allowed": max_players_allowed, }, "readiness": { "question_ready": has_round_question, "scoreboard_ready": status in {GameSession.Status.REVEAL, GameSession.Status.SCOREBOARD, GameSession.Status.FINISHED}, "can_advance_to_next_round": in_scoreboard, }, "host": { "can_start_round": in_lobby and min_players_reached and max_players_allowed, "can_show_question": False, "can_mix_answers": False, "can_calculate_scores": False, "can_reveal_scoreboard": False, "can_start_next_round": in_scoreboard, "can_finish_game": in_scoreboard, }, "player": { "can_join": status in JOINABLE_STATUSES, "can_submit_lie": in_lie and has_round_question, "can_submit_guess": in_guess and has_round_question, "can_view_final_result": in_finished, }, } @require_POST @login_required def create_session(request: HttpRequest) -> JsonResponse: code = _create_unique_session_code() session = GameSession.objects.create(host=request.user, code=code) return JsonResponse( { "session": { "code": session.code, "status": session.status, "host_id": session.host_id, "current_round": session.current_round, } }, status=201, ) @require_POST def join_session(request: HttpRequest) -> JsonResponse: payload = _json_body(request) code = _normalize_session_code(str(payload.get("code", ""))) nickname = str(payload.get("nickname", "")).strip() if not code: return api_error( request, code="session_code_required", status=400, ) if len(nickname) < 2 or len(nickname) > 40: return api_error( request, code="nickname_invalid", status=400, ) try: session = GameSession.objects.get(code=code) except GameSession.DoesNotExist: return api_error( request, code="session_not_found", status=404, ) if session.status not in JOINABLE_STATUSES: return api_error( request, code="session_not_joinable", status=400, ) if Player.objects.filter(session=session, nickname__iexact=nickname).exists(): return api_error( request, code="nickname_taken", status=409, ) player = Player.objects.create(session=session, nickname=nickname) return JsonResponse( { "player": { "id": player.id, "nickname": player.nickname, "session_token": player.session_token, "score": player.score, }, "session": { "code": session.code, "status": session.status, }, }, status=201, ) @require_GET def session_detail(request: HttpRequest, code: str) -> JsonResponse: session_code = _normalize_session_code(code) try: session = GameSession.objects.get(code=session_code) except GameSession.DoesNotExist: return api_error( request, code="session_not_found", status=404, ) players = list( session.players.order_by("nickname").values( "id", "nickname", "score", "is_connected", ) ) session = _maybe_promote_reveal_to_scoreboard(session) current_round_question = _get_current_round_question(session) round_question_payload = None if current_round_question: round_question_payload = { "id": current_round_question.id, "round_number": current_round_question.round_number, "prompt": current_round_question.question.prompt, "shown_at": current_round_question.shown_at.isoformat(), "answers": [{"text": text} for text in (current_round_question.mixed_answers or [])], } phase_view_model = _build_phase_view_model( session, players_count=len(players), has_round_question=bool(current_round_question), ) return JsonResponse( { "session": { "code": session.code, "status": session.status, "host_id": session.host_id, "current_round": session.current_round, "players_count": len(players), }, "players": players, "round_question": round_question_payload, "reveal": _build_reveal_payload(current_round_question) if session.status in {GameSession.Status.REVEAL, GameSession.Status.SCOREBOARD} and current_round_question else None, "scoreboard": _build_leaderboard(session) if session.status in {GameSession.Status.SCOREBOARD, GameSession.Status.FINISHED} else None, "phase_view_model": phase_view_model, } ) @require_POST @login_required def start_round(request: HttpRequest, code: str) -> JsonResponse: payload = _json_body(request) category_slug = str(payload.get("category_slug", "")).strip() if not category_slug: return api_error( request, code="category_slug_required", status=400, ) session_code = _normalize_session_code(code) try: session = GameSession.objects.get(code=session_code) except GameSession.DoesNotExist: return api_error( request, code="session_not_found", status=404, ) if session.host_id != request.user.id: return api_error( request, code="host_only_start_round", status=403, ) if session.status != GameSession.Status.LOBBY: return api_error( request, code="round_start_invalid_phase", status=400, ) try: category = Category.objects.get(slug=category_slug, is_active=True) except Category.DoesNotExist: return api_error( request, code="category_not_found", status=404, ) if not Question.objects.filter(category=category, is_active=True).exists(): return api_error( request, code="category_has_no_questions", status=400, ) with transaction.atomic(): session = GameSession.objects.select_for_update().get(pk=session.pk) if session.status != GameSession.Status.LOBBY: return api_error( request, code="round_start_invalid_phase", status=400, ) round_config, created = RoundConfig.objects.get_or_create( session=session, number=session.current_round, defaults={"category": category}, ) if not created: return api_error( request, code="round_already_configured", status=409, ) try: round_question = _select_round_question(session, round_config) except ValueError as exc: return api_error(request, code=str(exc), status=400) session.status = GameSession.Status.LIE session.save(update_fields=["status"]) lie_started_payload = _build_lie_started_payload(session, round_config, round_question) sync_broadcast_phase_event( session.code, "phase.lie_started", lie_started_payload, ) return JsonResponse( { "session": { "code": session.code, "status": session.status, "current_round": session.current_round, }, "round": { "number": round_config.number, "category": { "slug": round_config.category.slug, "name": round_config.category.name, }, }, "round_question": { "id": round_question.id, "prompt": round_question.question.prompt, "round_number": round_question.round_number, "shown_at": round_question.shown_at.isoformat(), "lie_deadline_at": lie_started_payload["lie_deadline_at"], }, "config": { "lie_seconds": round_config.lie_seconds, }, }, status=201, ) @require_POST @login_required def show_question(request: HttpRequest, code: str) -> JsonResponse: session_code = _normalize_session_code(code) try: session = GameSession.objects.get(code=session_code) except GameSession.DoesNotExist: return api_error( request, code="session_not_found", status=404, ) if session.host_id != request.user.id: return api_error( request, code="host_only_show_question", status=403, ) if session.status != GameSession.Status.LIE: return api_error( request, code="show_question_invalid_phase", status=400, ) try: round_config = RoundConfig.objects.get(session=session, number=session.current_round) except RoundConfig.DoesNotExist: return api_error( request, code="round_config_missing", status=400, ) existing_round_question = _get_current_round_question(session) if existing_round_question is not None: round_question = existing_round_question else: try: round_question = _select_round_question(session, round_config) except ValueError as exc: return api_error(request, code=str(exc), status=400) lie_deadline_at = round_question.shown_at + timedelta(seconds=round_config.lie_seconds) sync_broadcast_phase_event( session.code, "phase.question_shown", { "round_question_id": round_question.id, "prompt": round_question.question.prompt, "shown_at": round_question.shown_at.isoformat(), "lie_deadline_at": lie_deadline_at.isoformat(), "lie_seconds": round_config.lie_seconds, }, ) return JsonResponse( { "round_question": { "id": round_question.id, "prompt": round_question.question.prompt, "round_number": round_question.round_number, "shown_at": round_question.shown_at.isoformat(), "lie_deadline_at": lie_deadline_at.isoformat(), }, "config": { "lie_seconds": round_config.lie_seconds, }, }, status=201, ) @require_POST def submit_lie(request: HttpRequest, code: str, round_question_id: int) -> JsonResponse: payload = _json_body(request) session_code = _normalize_session_code(code) player_id = payload.get("player_id") session_token = str(payload.get("session_token", "")).strip() lie_text = str(payload.get("text", "")).strip() if not player_id: return api_error(request, code="player_id_required", status=400) if not session_token: return api_error(request, code="session_token_required", status=400) if not lie_text or len(lie_text) > 255: return api_error(request, code="lie_text_invalid", status=400) try: session = GameSession.objects.get(code=session_code) except GameSession.DoesNotExist: return api_error(request, code="session_not_found", status=404) if session.status != GameSession.Status.LIE: return api_error(request, code="lie_submission_invalid_phase", status=400) try: player = Player.objects.get(pk=player_id, session=session) except Player.DoesNotExist: return api_error(request, code="player_not_found_in_session", status=404) if player.session_token != session_token: return api_error(request, code="invalid_player_session_token", status=403) try: round_question = RoundQuestion.objects.get( pk=round_question_id, session=session, round_number=session.current_round, ) except RoundQuestion.DoesNotExist: return api_error(request, code="round_question_not_found", status=404) try: round_config = RoundConfig.objects.get(session=session, number=round_question.round_number) except RoundConfig.DoesNotExist: return api_error(request, code="round_config_missing", status=400) lie_deadline_at = round_question.shown_at + timedelta(seconds=round_config.lie_seconds) if timezone.now() > lie_deadline_at: return api_error(request, code="lie_submission_closed", status=400) try: lie = LieAnswer.objects.create(round_question=round_question, player=player, text=lie_text) except IntegrityError: return api_error(request, code="lie_already_submitted", status=409) players_count = Player.objects.filter(session=session).count() lie_count = LieAnswer.objects.filter(round_question=round_question).count() session_status = session.status mixed_answers_payload = None if players_count > 0 and lie_count >= players_count: try: mixed_answers = _prepare_mixed_answers(round_question) except ValueError as exc: return api_error(request, code=str(exc), status=400) session.status = GameSession.Status.GUESS session.save(update_fields=["status"]) session_status = session.status mixed_answers_payload = [{"text": text} for text in mixed_answers] sync_broadcast_phase_event( session.code, "phase.guess_started", { "round_question_id": round_question.id, "answers": mixed_answers_payload, "guess_seconds": round_config.guess_seconds, }, ) return JsonResponse( { "lie": { "id": lie.id, "player_id": player.id, "round_question_id": round_question.id, "text": lie.text, "created_at": lie.created_at.isoformat(), }, "window": { "lie_deadline_at": lie_deadline_at.isoformat(), }, "session": { "code": session.code, "status": session_status, "current_round": session.current_round, }, "phase_transition": { "current_phase": session_status, "lies_submitted": lie_count, "players_expected": players_count, "auto_advanced": session_status == GameSession.Status.GUESS, }, "answers": mixed_answers_payload, }, status=201, ) @require_POST @login_required def mix_answers(request: HttpRequest, code: str, round_question_id: int) -> JsonResponse: session_code = _normalize_session_code(code) try: session = GameSession.objects.get(code=session_code) except GameSession.DoesNotExist: return api_error( request, code="session_not_found", status=404, ) if session.host_id != request.user.id: return api_error( request, code="host_only_mix_answers", status=403, ) if session.status not in {GameSession.Status.LIE, GameSession.Status.GUESS}: return api_error( request, code="mix_answers_invalid_phase", status=400, ) try: round_question = RoundQuestion.objects.get( pk=round_question_id, session=session, round_number=session.current_round, ) except RoundQuestion.DoesNotExist: return api_error( request, code="round_question_not_found", status=404, ) with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) if locked_session.status not in {GameSession.Status.LIE, GameSession.Status.GUESS}: return api_error( request, code="mix_answers_invalid_phase", status=400, ) locked_round_question = RoundQuestion.objects.select_for_update().get(pk=round_question.pk) try: deduped_answers = _prepare_mixed_answers(locked_round_question) except ValueError as exc: return api_error(request, code=str(exc), status=400) if locked_session.status == GameSession.Status.LIE: locked_session.status = GameSession.Status.GUESS locked_session.save(update_fields=["status"]) try: _guess_config = RoundConfig.objects.get(session=session, number=session.current_round) _guess_seconds = _guess_config.guess_seconds except RoundConfig.DoesNotExist: _guess_seconds = None sync_broadcast_phase_event( session.code, "phase.guess_started", { "round_question_id": round_question.id, "answers": [{"text": t} for t in deduped_answers], "guess_seconds": _guess_seconds, }, ) return JsonResponse( { "session": { "code": session.code, "status": GameSession.Status.GUESS, "current_round": session.current_round, }, "round_question": { "id": round_question.id, "round_number": round_question.round_number, }, "answers": [{"text": text} for text in deduped_answers], } ) @require_POST def submit_guess(request: HttpRequest, code: str, round_question_id: int) -> JsonResponse: payload = _json_body(request) session_code = _normalize_session_code(code) player_id = payload.get("player_id") session_token = str(payload.get("session_token", "")).strip() selected_text = str(payload.get("selected_text", "")).strip() if not player_id: return api_error(request, code="player_id_required", status=400) if not session_token: return api_error(request, code="session_token_required", status=400) if not selected_text or len(selected_text) > 255: return api_error(request, code="selected_text_invalid", status=400) try: session = GameSession.objects.get(code=session_code) except GameSession.DoesNotExist: return api_error(request, code="session_not_found", status=404) if session.status != GameSession.Status.GUESS: return api_error(request, code="guess_submission_invalid_phase", status=400) try: player = Player.objects.get(pk=player_id, session=session) except Player.DoesNotExist: return api_error(request, code="player_not_found_in_session", status=404) if player.session_token != session_token: return api_error(request, code="invalid_player_session_token", status=403) try: round_question = RoundQuestion.objects.get( pk=round_question_id, session=session, round_number=session.current_round, ) except RoundQuestion.DoesNotExist: return api_error(request, code="round_question_not_found", status=404) try: round_config = RoundConfig.objects.get(session=session, number=round_question.round_number) except RoundConfig.DoesNotExist: return api_error(request, code="round_config_missing", status=400) guess_deadline_at = round_question.shown_at + timedelta( seconds=round_config.lie_seconds + round_config.guess_seconds ) if timezone.now() > guess_deadline_at: return api_error(request, code="guess_submission_closed", status=400) allowed_answers = { round_question.correct_answer.strip().casefold(), *( text.strip().casefold() for text in round_question.lies.values_list("text", flat=True) if text.strip() ), } selected_normalized = selected_text.casefold() if selected_normalized not in allowed_answers: return api_error(request, code="selected_answer_invalid", status=400) correct_normalized = round_question.correct_answer.strip().casefold() fooled_player_id = None if selected_normalized != correct_normalized: fooled_player_id = ( round_question.lies.filter(text__iexact=selected_text).values_list("player_id", flat=True).first() ) try: guess = Guess.objects.create( round_question=round_question, player=player, selected_text=selected_text, is_correct=selected_normalized == correct_normalized, fooled_player_id=fooled_player_id, ) except IntegrityError: return api_error(request, code="guess_already_submitted", status=409) players_count = Player.objects.filter(session=session).count() guess_count = Guess.objects.filter(round_question=round_question).count() session_status = session.status reveal_payload = None leaderboard = None if players_count > 0 and guess_count >= players_count: already_calculated = ScoreEvent.objects.filter( session=session, meta__round_question_id=round_question.id, ).exists() if not already_calculated: score_events, leaderboard = _resolve_scores(session, round_question, round_config) else: score_events = list( ScoreEvent.objects.filter(session=session, meta__round_question_id=round_question.id).select_related("player") ) leaderboard = _build_leaderboard(session) session.status = GameSession.Status.REVEAL session.save(update_fields=["status"]) session_status = session.status reveal_payload = _build_reveal_payload(round_question) score_deltas = [ {"player_id": ev.player_id, "delta": ev.delta, "reason": ev.reason} for ev in score_events ] sync_broadcast_phase_event( session.code, "phase.scores_calculated", { "round_question_id": round_question.id, "score_deltas": score_deltas, "leaderboard": list(leaderboard), }, ) return JsonResponse( { "guess": { "id": guess.id, "player_id": player.id, "round_question_id": round_question.id, "selected_text": guess.selected_text, "is_correct": guess.is_correct, "fooled_player_id": guess.fooled_player_id, "created_at": guess.created_at.isoformat(), }, "window": { "guess_deadline_at": guess_deadline_at.isoformat(), }, "session": { "code": session.code, "status": session_status, "current_round": session.current_round, }, "phase_transition": { "current_phase": session_status, "guesses_submitted": guess_count, "players_expected": players_count, "auto_advanced": session_status == GameSession.Status.REVEAL, }, "reveal": reveal_payload, "leaderboard": leaderboard, }, status=201, ) @require_GET @login_required def reveal_scoreboard(request: HttpRequest, code: str) -> JsonResponse: session_code = _normalize_session_code(code) try: session = GameSession.objects.get(code=session_code) except GameSession.DoesNotExist: return api_error(request, code="session_not_found", status=404) if session.host_id != request.user.id: return api_error(request, code="host_only_view_scoreboard", status=403) session = _maybe_promote_reveal_to_scoreboard(session) if session.status not in {GameSession.Status.SCOREBOARD, GameSession.Status.FINISHED}: return api_error(request, code="scoreboard_invalid_phase", status=400) leaderboard = _build_leaderboard(session) return JsonResponse( { "session": { "code": session.code, "status": session.status, "current_round": session.current_round, }, "leaderboard": leaderboard, } ) @require_POST @login_required def start_next_round(request: HttpRequest, code: str) -> JsonResponse: session_code = _normalize_session_code(code) try: session = GameSession.objects.get(code=session_code) except GameSession.DoesNotExist: return api_error(request, code="session_not_found", status=404) if session.host_id != request.user.id: return api_error(request, code="host_only_start_next_round", status=403) with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) if locked_session.status != GameSession.Status.SCOREBOARD: return api_error(request, code="next_round_invalid_phase", status=400) previous_round_config = RoundConfig.objects.filter( session=locked_session, number=locked_session.current_round, ).select_related("category").first() if previous_round_config is None: return api_error(request, code="round_config_missing", status=400) locked_session.current_round += 1 next_round_config = RoundConfig.objects.create( session=locked_session, number=locked_session.current_round, category=previous_round_config.category, lie_seconds=previous_round_config.lie_seconds, guess_seconds=previous_round_config.guess_seconds, points_correct=previous_round_config.points_correct, points_bluff=previous_round_config.points_bluff, ) try: round_question = _select_round_question(locked_session, next_round_config) except ValueError as exc: return api_error(request, code=str(exc), status=400) locked_session.status = GameSession.Status.LIE locked_session.save(update_fields=["current_round", "status"]) lie_started_payload = _build_lie_started_payload(locked_session, next_round_config, round_question) sync_broadcast_phase_event( locked_session.code, "phase.lie_started", lie_started_payload, ) return JsonResponse( { "session": { "code": locked_session.code, "status": locked_session.status, "current_round": locked_session.current_round, }, "round": { "number": next_round_config.number, "category": { "slug": next_round_config.category.slug, "name": next_round_config.category.name, }, }, "round_question": { "id": round_question.id, "prompt": round_question.question.prompt, "round_number": round_question.round_number, "shown_at": round_question.shown_at.isoformat(), "lie_deadline_at": lie_started_payload["lie_deadline_at"], }, "config": { "lie_seconds": next_round_config.lie_seconds, }, } ) @require_POST @login_required def finish_game(request: HttpRequest, code: str) -> JsonResponse: session_code = _normalize_session_code(code) try: session = GameSession.objects.get(code=session_code) except GameSession.DoesNotExist: return api_error(request, code="session_not_found", status=404) if session.host_id != request.user.id: return api_error(request, code="host_only_finish_game", status=403) with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) if locked_session.status != GameSession.Status.SCOREBOARD: return api_error(request, code="finish_game_invalid_phase", status=400) locked_session.status = GameSession.Status.FINISHED locked_session.save(update_fields=["status"]) leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) winner = leaderboard[0] if leaderboard else None sync_broadcast_phase_event( session.code, "phase.game_over", {"winner": winner, "leaderboard": list(leaderboard)}, ) return JsonResponse( { "session": { "code": session.code, "status": GameSession.Status.FINISHED, "current_round": session.current_round, }, "winner": winner, "leaderboard": leaderboard, } ) @require_POST @login_required def calculate_scores(request: HttpRequest, code: str, round_question_id: int) -> JsonResponse: session_code = _normalize_session_code(code) try: session = GameSession.objects.get(code=session_code) except GameSession.DoesNotExist: return api_error(request, code="session_not_found", status=404) if session.host_id != request.user.id: return api_error(request, code="host_only_calculate_scores", status=403) already_calculated = ScoreEvent.objects.filter( session=session, meta__round_question_id=round_question_id, ).exists() if already_calculated: return api_error(request, code="scores_already_calculated", status=409) if session.status != GameSession.Status.GUESS: return api_error(request, code="calculate_scores_invalid_phase", status=400) try: round_question = RoundQuestion.objects.get( pk=round_question_id, session=session, round_number=session.current_round, ) except RoundQuestion.DoesNotExist: return api_error(request, code="round_question_not_found", status=404) try: round_config = RoundConfig.objects.get(session=session, number=round_question.round_number) except RoundConfig.DoesNotExist: return api_error(request, code="round_config_missing", status=400) guesses = list(round_question.guesses.select_related("player")) if not guesses: return api_error(request, code="no_guesses_submitted", status=400) bluff_counts = {} for guess in guesses: if guess.fooled_player_id: bluff_counts[guess.fooled_player_id] = bluff_counts.get(guess.fooled_player_id, 0) + 1 with transaction.atomic(): locked_session = GameSession.objects.select_for_update().get(pk=session.pk) if locked_session.status != GameSession.Status.GUESS: return api_error(request, code="calculate_scores_invalid_phase", status=400) score_events = [] for guess in guesses: if guess.is_correct: guess.player.score += round_config.points_correct guess.player.save(update_fields=["score"]) score_events.append( ScoreEvent( session=session, player=guess.player, delta=round_config.points_correct, reason="guess_correct", meta={"round_question_id": round_question.id, "guess_id": guess.id}, ) ) for player_id, fooled_count in bluff_counts.items(): delta = fooled_count * round_config.points_bluff player = Player.objects.get(pk=player_id, session=session) player.score += delta player.save(update_fields=["score"]) score_events.append( ScoreEvent( session=session, player=player, delta=delta, reason="bluff_success", meta={"round_question_id": round_question.id, "fooled_count": fooled_count}, ) ) ScoreEvent.objects.bulk_create(score_events) locked_session.status = GameSession.Status.REVEAL locked_session.save(update_fields=["status"]) leaderboard = list( Player.objects.filter(session=session) .order_by("-score", "nickname") .values("id", "nickname", "score") ) score_deltas = [ {"player_id": ev.player_id, "delta": ev.delta, "reason": ev.reason} for ev in score_events ] sync_broadcast_phase_event( session.code, "phase.scores_calculated", { "round_question_id": round_question.id, "score_deltas": score_deltas, "leaderboard": list(leaderboard), }, ) return JsonResponse( { "session": { "code": session.code, "status": GameSession.Status.REVEAL, "current_round": session.current_round, }, "round_question": { "id": round_question.id, "round_number": round_question.round_number, }, "reveal": _build_reveal_payload(round_question), "events_created": len(score_events), "leaderboard": leaderboard, } )