467 lines
18 KiB
Python
467 lines
18 KiB
Python
import random
|
|
from datetime import timedelta
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
from django.db import transaction
|
|
from django.utils import timezone
|
|
|
|
from .models import Category, GameSession, Guess, LieAnswer, Player, Question, RoundConfig, RoundQuestion, ScoreEvent
|
|
from .payloads import (
|
|
build_finish_game_phase_event,
|
|
build_finish_game_response,
|
|
build_lie_started_payload,
|
|
build_question_shown_payload,
|
|
build_question_shown_response,
|
|
build_reveal_scoreboard_response,
|
|
build_scoreboard_phase_event,
|
|
build_start_next_round_phase_event,
|
|
build_start_next_round_response,
|
|
build_start_round_response,
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RoundTransitionResult:
|
|
session: GameSession
|
|
round_config: RoundConfig
|
|
round_question: RoundQuestion
|
|
should_broadcast: bool
|
|
response_payload: dict[str, Any]
|
|
phase_event_name: str | None = None
|
|
phase_event_payload: dict[str, Any] | None = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FinishGameResult:
|
|
session: GameSession
|
|
should_broadcast: bool
|
|
response_payload: dict[str, Any]
|
|
phase_event_name: str | None = None
|
|
phase_event_payload: dict[str, Any] | None = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ScoreboardTransitionResult:
|
|
session: GameSession
|
|
leaderboard: list[dict]
|
|
should_broadcast: bool
|
|
response_payload: dict[str, Any] | None = None
|
|
phase_event_name: str | None = None
|
|
phase_event_payload: dict[str, Any] | None = None
|
|
|
|
|
|
def get_current_round_question(session: GameSession) -> RoundQuestion | None:
|
|
return (
|
|
RoundQuestion.objects.filter(session=session, round_number=session.current_round)
|
|
.select_related("question")
|
|
.order_by("-id")
|
|
.first()
|
|
)
|
|
|
|
|
|
|
|
def reset_round_question_bootstrap_state(round_question: RoundQuestion) -> RoundQuestion:
|
|
Guess.objects.filter(round_question=round_question).delete()
|
|
LieAnswer.objects.filter(round_question=round_question).delete()
|
|
|
|
update_fields: list[str] = []
|
|
if round_question.mixed_answers:
|
|
round_question.mixed_answers = []
|
|
update_fields.append("mixed_answers")
|
|
|
|
round_question.shown_at = timezone.now()
|
|
update_fields.append("shown_at")
|
|
|
|
round_question.save(update_fields=update_fields)
|
|
return round_question
|
|
|
|
|
|
|
|
def select_round_question(session: GameSession, round_config: RoundConfig) -> RoundQuestion:
|
|
existing_round_question = get_current_round_question(session)
|
|
if existing_round_question is not None and existing_round_question.question.category_id == round_config.category_id:
|
|
return existing_round_question
|
|
|
|
used_question_ids = RoundQuestion.objects.filter(session=session).values_list("question_id", flat=True)
|
|
available_questions = Question.objects.filter(
|
|
category=round_config.category,
|
|
is_active=True,
|
|
).exclude(pk__in=used_question_ids)
|
|
|
|
if not available_questions.exists():
|
|
raise ValueError("no_available_questions")
|
|
|
|
question = random.choice(list(available_questions))
|
|
if existing_round_question is not None:
|
|
existing_round_question.question = question
|
|
existing_round_question.correct_answer = question.correct_answer
|
|
existing_round_question.save(update_fields=["question", "correct_answer"])
|
|
return existing_round_question
|
|
|
|
return RoundQuestion.objects.create(
|
|
session=session,
|
|
round_number=session.current_round,
|
|
question=question,
|
|
correct_answer=question.correct_answer,
|
|
)
|
|
|
|
|
|
|
|
def prepare_mixed_answers(round_question: RoundQuestion) -> list[str]:
|
|
deduped_answers = list(round_question.mixed_answers or [])
|
|
if deduped_answers:
|
|
return deduped_answers
|
|
|
|
lie_texts = list(round_question.lies.values_list("text", flat=True))
|
|
seen = set()
|
|
for text in [round_question.correct_answer, *lie_texts]:
|
|
normalized = text.strip().casefold()
|
|
if not normalized or normalized in seen:
|
|
continue
|
|
seen.add(normalized)
|
|
deduped_answers.append(text.strip())
|
|
|
|
if len(deduped_answers) < 2:
|
|
raise ValueError("not_enough_answers_to_mix")
|
|
|
|
random.shuffle(deduped_answers)
|
|
round_question.mixed_answers = deduped_answers
|
|
round_question.save(update_fields=["mixed_answers"])
|
|
return deduped_answers
|
|
|
|
|
|
|
|
|
|
|
|
def start_round(session: GameSession, category_slug: str) -> RoundTransitionResult:
|
|
try:
|
|
category = Category.objects.get(slug=category_slug, is_active=True)
|
|
except Category.DoesNotExist:
|
|
raise ValueError("category_not_found")
|
|
|
|
if not Question.objects.filter(category=category, is_active=True).exists():
|
|
raise ValueError("category_has_no_questions")
|
|
|
|
with transaction.atomic():
|
|
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
|
|
if locked_session.status != GameSession.Status.LOBBY:
|
|
raise ValueError("round_start_invalid_phase")
|
|
|
|
if RoundConfig.objects.filter(session=locked_session, number=locked_session.current_round).exists():
|
|
raise ValueError("round_already_configured")
|
|
|
|
round_config = RoundConfig(
|
|
session=locked_session,
|
|
number=locked_session.current_round,
|
|
category=category,
|
|
)
|
|
|
|
round_question = select_round_question(locked_session, round_config)
|
|
|
|
round_config.save()
|
|
locked_session.status = GameSession.Status.LIE
|
|
locked_session.save(update_fields=["status"])
|
|
|
|
phase_event = {
|
|
"name": "phase.lie_started",
|
|
"payload": build_lie_started_payload(locked_session, round_config, round_question),
|
|
}
|
|
return RoundTransitionResult(
|
|
session=locked_session,
|
|
round_config=round_config,
|
|
round_question=round_question,
|
|
should_broadcast=True,
|
|
response_payload=build_start_round_response(locked_session, round_config, round_question),
|
|
phase_event_name=phase_event["name"],
|
|
phase_event_payload=phase_event["payload"],
|
|
)
|
|
|
|
|
|
def show_question(session: GameSession) -> RoundTransitionResult:
|
|
if session.status != GameSession.Status.LIE:
|
|
raise ValueError("show_question_invalid_phase")
|
|
|
|
try:
|
|
round_config = RoundConfig.objects.get(session=session, number=session.current_round)
|
|
except RoundConfig.DoesNotExist:
|
|
raise ValueError("round_config_missing")
|
|
|
|
round_question = get_current_round_question(session)
|
|
if round_question is None:
|
|
round_question = select_round_question(session, round_config)
|
|
|
|
lie_deadline_at = round_question.shown_at + timedelta(seconds=round_config.lie_seconds)
|
|
lie_deadline_iso = lie_deadline_at.isoformat()
|
|
phase_event = {
|
|
"name": "phase.question_shown",
|
|
"payload": build_question_shown_payload(round_question, lie_deadline_iso, round_config.lie_seconds),
|
|
}
|
|
return RoundTransitionResult(
|
|
session=session,
|
|
round_config=round_config,
|
|
round_question=round_question,
|
|
should_broadcast=True,
|
|
response_payload=build_question_shown_response(round_question, lie_deadline_iso, round_config.lie_seconds),
|
|
phase_event_name=phase_event["name"],
|
|
phase_event_payload=phase_event["payload"],
|
|
)
|
|
|
|
def start_next_round(session: GameSession) -> RoundTransitionResult:
|
|
with transaction.atomic():
|
|
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
|
|
next_round_config = None
|
|
round_question = None
|
|
should_broadcast = False
|
|
|
|
phase_event_name = None
|
|
phase_event_payload = None
|
|
|
|
if locked_session.status == GameSession.Status.SCOREBOARD:
|
|
previous_round_config = RoundConfig.objects.filter(
|
|
session=locked_session,
|
|
number=locked_session.current_round,
|
|
).select_related("category").first()
|
|
if previous_round_config is None:
|
|
raise ValueError("round_config_missing")
|
|
|
|
next_round_number = locked_session.current_round + 1
|
|
next_round_config, _created = RoundConfig.objects.get_or_create(
|
|
session=locked_session,
|
|
number=next_round_number,
|
|
defaults={
|
|
"category": previous_round_config.category,
|
|
"lie_seconds": previous_round_config.lie_seconds,
|
|
"guess_seconds": previous_round_config.guess_seconds,
|
|
"points_correct": previous_round_config.points_correct,
|
|
"points_bluff": previous_round_config.points_bluff,
|
|
"started_from_scoreboard": True,
|
|
},
|
|
)
|
|
round_config_update_fields: list[str] = []
|
|
if next_round_config.category_id != previous_round_config.category_id:
|
|
next_round_config.category = previous_round_config.category
|
|
round_config_update_fields.append("category")
|
|
if next_round_config.lie_seconds != previous_round_config.lie_seconds:
|
|
next_round_config.lie_seconds = previous_round_config.lie_seconds
|
|
round_config_update_fields.append("lie_seconds")
|
|
if next_round_config.guess_seconds != previous_round_config.guess_seconds:
|
|
next_round_config.guess_seconds = previous_round_config.guess_seconds
|
|
round_config_update_fields.append("guess_seconds")
|
|
if next_round_config.points_correct != previous_round_config.points_correct:
|
|
next_round_config.points_correct = previous_round_config.points_correct
|
|
round_config_update_fields.append("points_correct")
|
|
if next_round_config.points_bluff != previous_round_config.points_bluff:
|
|
next_round_config.points_bluff = previous_round_config.points_bluff
|
|
round_config_update_fields.append("points_bluff")
|
|
if not next_round_config.started_from_scoreboard:
|
|
next_round_config.started_from_scoreboard = True
|
|
round_config_update_fields.append("started_from_scoreboard")
|
|
if round_config_update_fields:
|
|
next_round_config.save(update_fields=round_config_update_fields)
|
|
|
|
locked_session.current_round = next_round_number
|
|
|
|
round_question = reset_round_question_bootstrap_state(select_round_question(locked_session, next_round_config))
|
|
|
|
locked_session.status = GameSession.Status.LIE
|
|
locked_session.save(update_fields=["current_round", "status"])
|
|
should_broadcast = True
|
|
phase_event = build_start_next_round_phase_event(locked_session, next_round_config, round_question)
|
|
phase_event_name = phase_event["name"]
|
|
phase_event_payload = phase_event["payload"]
|
|
elif locked_session.status == GameSession.Status.LIE:
|
|
if locked_session.current_round <= 1:
|
|
raise ValueError("next_round_invalid_phase")
|
|
|
|
next_round_config = RoundConfig.objects.filter(
|
|
session=locked_session,
|
|
number=locked_session.current_round,
|
|
).select_related("category").first()
|
|
round_question = get_current_round_question(locked_session)
|
|
if (
|
|
next_round_config is None
|
|
or not next_round_config.started_from_scoreboard
|
|
or round_question is None
|
|
):
|
|
raise ValueError("next_round_invalid_phase")
|
|
else:
|
|
raise ValueError("next_round_invalid_phase")
|
|
|
|
return RoundTransitionResult(
|
|
session=locked_session,
|
|
round_config=next_round_config,
|
|
round_question=round_question,
|
|
should_broadcast=should_broadcast,
|
|
response_payload=build_start_next_round_response(
|
|
locked_session,
|
|
next_round_config,
|
|
round_question,
|
|
),
|
|
phase_event_name=phase_event_name,
|
|
phase_event_payload=phase_event_payload,
|
|
)
|
|
|
|
|
|
|
|
def finish_game(session: GameSession) -> FinishGameResult:
|
|
with transaction.atomic():
|
|
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
|
|
should_broadcast = False
|
|
phase_event_name = None
|
|
phase_event_payload = None
|
|
|
|
if locked_session.status == GameSession.Status.SCOREBOARD:
|
|
locked_session.status = GameSession.Status.FINISHED
|
|
locked_session.save(update_fields=["status"])
|
|
should_broadcast = True
|
|
phase_event = build_finish_game_phase_event(locked_session)
|
|
phase_event_name = phase_event["name"]
|
|
phase_event_payload = phase_event["payload"]
|
|
elif locked_session.status != GameSession.Status.FINISHED:
|
|
raise ValueError("finish_game_invalid_phase")
|
|
|
|
return FinishGameResult(
|
|
session=locked_session,
|
|
should_broadcast=should_broadcast,
|
|
response_payload=build_finish_game_response(locked_session),
|
|
phase_event_name=phase_event_name,
|
|
phase_event_payload=phase_event_payload,
|
|
)
|
|
|
|
|
|
|
|
def promote_reveal_to_scoreboard(session: GameSession) -> ScoreboardTransitionResult:
|
|
if session.status != GameSession.Status.REVEAL:
|
|
leaderboard = list(
|
|
Player.objects.filter(session=session)
|
|
.order_by("-score", "nickname")
|
|
.values("id", "nickname", "score")
|
|
)
|
|
return ScoreboardTransitionResult(
|
|
session=session,
|
|
leaderboard=leaderboard,
|
|
should_broadcast=False,
|
|
response_payload=build_reveal_scoreboard_response(session, leaderboard),
|
|
)
|
|
|
|
current_round_question = get_current_round_question(session)
|
|
if current_round_question is None:
|
|
leaderboard = list(
|
|
Player.objects.filter(session=session)
|
|
.order_by("-score", "nickname")
|
|
.values("id", "nickname", "score")
|
|
)
|
|
return ScoreboardTransitionResult(
|
|
session=session,
|
|
leaderboard=leaderboard,
|
|
should_broadcast=False,
|
|
response_payload=build_reveal_scoreboard_response(session, leaderboard),
|
|
)
|
|
|
|
players_count = Player.objects.filter(session=session).count()
|
|
guess_count = Guess.objects.filter(round_question=current_round_question).count()
|
|
has_score_events = ScoreEvent.objects.filter(
|
|
session=session,
|
|
meta__round_question_id=current_round_question.id,
|
|
).exists()
|
|
reveal_is_resolved = has_score_events or (players_count > 0 and guess_count >= players_count)
|
|
if not reveal_is_resolved:
|
|
leaderboard = list(
|
|
Player.objects.filter(session=session)
|
|
.order_by("-score", "nickname")
|
|
.values("id", "nickname", "score")
|
|
)
|
|
return ScoreboardTransitionResult(
|
|
session=session,
|
|
leaderboard=leaderboard,
|
|
should_broadcast=False,
|
|
response_payload=build_reveal_scoreboard_response(session, leaderboard),
|
|
)
|
|
|
|
with transaction.atomic():
|
|
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
|
|
if locked_session.status != GameSession.Status.REVEAL:
|
|
scoreboard_session = locked_session
|
|
should_broadcast = False
|
|
else:
|
|
locked_session.status = GameSession.Status.SCOREBOARD
|
|
locked_session.save(update_fields=["status"])
|
|
scoreboard_session = locked_session
|
|
should_broadcast = True
|
|
|
|
leaderboard = list(
|
|
Player.objects.filter(session=scoreboard_session)
|
|
.order_by("-score", "nickname")
|
|
.values("id", "nickname", "score")
|
|
)
|
|
phase_event_name = None
|
|
phase_event_payload = None
|
|
if should_broadcast:
|
|
phase_event = build_scoreboard_phase_event(scoreboard_session, leaderboard)
|
|
phase_event_name = phase_event["name"]
|
|
phase_event_payload = phase_event["payload"]
|
|
return ScoreboardTransitionResult(
|
|
session=scoreboard_session,
|
|
leaderboard=leaderboard,
|
|
should_broadcast=should_broadcast,
|
|
response_payload=build_reveal_scoreboard_response(scoreboard_session, leaderboard),
|
|
phase_event_name=phase_event_name,
|
|
phase_event_payload=phase_event_payload,
|
|
)
|
|
|
|
|
|
|
|
def resolve_scores(
|
|
session: GameSession,
|
|
round_question: RoundQuestion,
|
|
round_config: RoundConfig,
|
|
) -> tuple[list[ScoreEvent], list[dict]]:
|
|
guesses = list(round_question.guesses.select_related("player"))
|
|
if not guesses:
|
|
raise ValueError("no_guesses_submitted")
|
|
|
|
bluff_counts: dict[int, int] = {}
|
|
for guess in guesses:
|
|
if guess.fooled_player_id:
|
|
bluff_counts[guess.fooled_player_id] = bluff_counts.get(guess.fooled_player_id, 0) + 1
|
|
|
|
score_events = []
|
|
|
|
for guess in guesses:
|
|
if guess.is_correct:
|
|
guess.player.score += round_config.points_correct
|
|
guess.player.save(update_fields=["score"])
|
|
score_events.append(
|
|
ScoreEvent(
|
|
session=session,
|
|
player=guess.player,
|
|
delta=round_config.points_correct,
|
|
reason="guess_correct",
|
|
meta={"round_question_id": round_question.id, "guess_id": guess.id},
|
|
)
|
|
)
|
|
|
|
for player_id, fooled_count in bluff_counts.items():
|
|
delta = fooled_count * round_config.points_bluff
|
|
player = Player.objects.get(pk=player_id, session=session)
|
|
player.score += delta
|
|
player.save(update_fields=["score"])
|
|
score_events.append(
|
|
ScoreEvent(
|
|
session=session,
|
|
player=player,
|
|
delta=delta,
|
|
reason="bluff_success",
|
|
meta={"round_question_id": round_question.id, "fooled_count": fooled_count},
|
|
)
|
|
)
|
|
|
|
ScoreEvent.objects.bulk_create(score_events)
|
|
leaderboard = list(
|
|
Player.objects.filter(session=session)
|
|
.order_by("-score", "nickname")
|
|
.values("id", "nickname", "score")
|
|
)
|
|
return score_events, leaderboard
|