Files
weirsoe-party-protocol/fupogfakta/services.py
Asger Geel Weirsøe a81bc1250c
Some checks failed
CI / test-and-quality (push) Failing after 4m4s
Big visual overhaul docker compsoe file etc
2026-03-23 14:11:30 +01:00

495 lines
19 KiB
Python

import random
from datetime import timedelta
from dataclasses import dataclass
from typing import Any
from django.db import transaction
from django.utils import timezone
from .models import Category, GameSession, Guess, LieAnswer, Player, Question, RoundConfig, RoundQuestion, ScoreEvent
from .payloads import (
build_finish_game_phase_event,
build_finish_game_response,
build_lie_started_payload,
build_question_shown_payload,
build_question_shown_response,
build_reveal_scoreboard_response,
build_scoreboard_phase_event,
build_start_next_round_phase_event,
build_start_next_round_response,
build_start_round_response,
)
@dataclass(frozen=True)
class RoundTransitionResult:
session: GameSession
round_config: RoundConfig
round_question: RoundQuestion
should_broadcast: bool
response_payload: dict[str, Any]
phase_event_name: str | None = None
phase_event_payload: dict[str, Any] | None = None
@dataclass(frozen=True)
class FinishGameResult:
session: GameSession
should_broadcast: bool
response_payload: dict[str, Any]
phase_event_name: str | None = None
phase_event_payload: dict[str, Any] | None = None
@dataclass(frozen=True)
class ScoreboardTransitionResult:
session: GameSession
leaderboard: list[dict]
should_broadcast: bool
response_payload: dict[str, Any] | None = None
phase_event_name: str | None = None
phase_event_payload: dict[str, Any] | None = None
def get_round_question(session: GameSession, round_number: int) -> RoundQuestion | None:
return (
RoundQuestion.objects.filter(session=session, round_number=round_number)
.select_related("question")
.order_by("-id")
.first()
)
def get_current_round_question(session: GameSession) -> RoundQuestion | None:
return get_round_question(session, session.current_round)
def reset_round_question_bootstrap_state(round_question: RoundQuestion) -> RoundQuestion:
Guess.objects.filter(round_question=round_question).delete()
LieAnswer.objects.filter(round_question=round_question).delete()
update_fields: list[str] = []
if round_question.mixed_answers:
round_question.mixed_answers = []
update_fields.append("mixed_answers")
round_question.shown_at = timezone.now()
update_fields.append("shown_at")
round_question.save(update_fields=update_fields)
return round_question
def select_round_question(
session: GameSession,
round_config: RoundConfig,
*,
round_number: int | None = None,
) -> RoundQuestion:
target_round_number = session.current_round if round_number is None else round_number
existing_round_question = get_round_question(session, target_round_number)
if existing_round_question is not None and existing_round_question.question.category_id == round_config.category_id:
return existing_round_question
used_question_ids = RoundQuestion.objects.filter(session=session).values_list("question_id", flat=True)
available_questions = Question.objects.filter(
category=round_config.category,
is_active=True,
).exclude(pk__in=used_question_ids)
if not available_questions.exists():
raise ValueError("no_available_questions")
question = random.choice(list(available_questions))
if existing_round_question is not None:
existing_round_question.question = question
existing_round_question.correct_answer = question.correct_answer
existing_round_question.save(update_fields=["question", "correct_answer"])
return existing_round_question
return RoundQuestion.objects.create(
session=session,
round_number=target_round_number,
question=question,
correct_answer=question.correct_answer,
)
def prepare_mixed_answers(round_question: RoundQuestion) -> list[str]:
deduped_answers = list(round_question.mixed_answers or [])
if deduped_answers:
return deduped_answers
lie_texts = list(round_question.lies.values_list("text", flat=True))
seen = set()
for text in [round_question.correct_answer, *lie_texts]:
normalized = text.strip().casefold()
if not normalized or normalized in seen:
continue
seen.add(normalized)
deduped_answers.append(text.strip())
players_count = Player.objects.filter(session=round_question.session).count()
target_total_answers = max(2, players_count + 1)
fallback_lies: list[str] = []
fallback_seen = set()
for text in round_question.question.fallback_lies.filter(is_active=True).values_list("text", flat=True):
normalized = text.strip().casefold()
if not normalized or normalized in seen or normalized in fallback_seen:
continue
fallback_seen.add(normalized)
fallback_lies.append(text.strip())
if fallback_lies and len(deduped_answers) < target_total_answers:
random.shuffle(fallback_lies)
deduped_answers.extend(fallback_lies[: target_total_answers - len(deduped_answers)])
if len(deduped_answers) < 2:
raise ValueError("not_enough_answers_to_mix")
random.shuffle(deduped_answers)
round_question.mixed_answers = deduped_answers
round_question.save(update_fields=["mixed_answers"])
return deduped_answers
def start_round(session: GameSession, category_slug: str) -> RoundTransitionResult:
try:
category = Category.objects.get(slug=category_slug, is_active=True)
except Category.DoesNotExist:
raise ValueError("category_not_found")
if not Question.objects.filter(category=category, is_active=True).exists():
raise ValueError("category_has_no_questions")
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
if locked_session.status != GameSession.Status.LOBBY:
raise ValueError("round_start_invalid_phase")
if RoundConfig.objects.filter(session=locked_session, number=locked_session.current_round).exists():
raise ValueError("round_already_configured")
round_config = RoundConfig(
session=locked_session,
number=locked_session.current_round,
category=category,
)
round_question = select_round_question(locked_session, round_config)
round_config.save()
locked_session.status = GameSession.Status.LIE
locked_session.save(update_fields=["status"])
phase_event = {
"name": "phase.lie_started",
"payload": build_lie_started_payload(locked_session, round_config, round_question),
}
return RoundTransitionResult(
session=locked_session,
round_config=round_config,
round_question=round_question,
should_broadcast=True,
response_payload=build_start_round_response(locked_session, round_config, round_question),
phase_event_name=phase_event["name"],
phase_event_payload=phase_event["payload"],
)
def show_question(session: GameSession) -> RoundTransitionResult:
if session.status != GameSession.Status.LIE:
raise ValueError("show_question_invalid_phase")
try:
round_config = RoundConfig.objects.get(session=session, number=session.current_round)
except RoundConfig.DoesNotExist:
raise ValueError("round_config_missing")
round_question = get_current_round_question(session)
if round_question is None:
round_question = select_round_question(session, round_config)
lie_deadline_at = round_question.shown_at + timedelta(seconds=round_config.lie_seconds)
lie_deadline_iso = lie_deadline_at.isoformat()
phase_event = {
"name": "phase.question_shown",
"payload": build_question_shown_payload(round_question, lie_deadline_iso, round_config.lie_seconds),
}
return RoundTransitionResult(
session=session,
round_config=round_config,
round_question=round_question,
should_broadcast=True,
response_payload=build_question_shown_response(round_question, lie_deadline_iso, round_config.lie_seconds),
phase_event_name=phase_event["name"],
phase_event_payload=phase_event["payload"],
)
def start_next_round(session: GameSession) -> RoundTransitionResult:
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
next_round_config = None
round_question = None
should_broadcast = False
phase_event_name = None
phase_event_payload = None
if locked_session.status == GameSession.Status.SCOREBOARD:
previous_round_config = RoundConfig.objects.filter(
session=locked_session,
number=locked_session.current_round,
).select_related("category").first()
if previous_round_config is None:
raise ValueError("round_config_missing")
next_round_number = locked_session.current_round + 1
next_round_config, _created = RoundConfig.objects.get_or_create(
session=locked_session,
number=next_round_number,
defaults={
"category": previous_round_config.category,
"lie_seconds": previous_round_config.lie_seconds,
"guess_seconds": previous_round_config.guess_seconds,
"points_correct": previous_round_config.points_correct,
"points_bluff": previous_round_config.points_bluff,
"started_from_scoreboard": True,
},
)
round_config_update_fields: list[str] = []
if next_round_config.category_id != previous_round_config.category_id:
next_round_config.category = previous_round_config.category
round_config_update_fields.append("category")
if next_round_config.lie_seconds != previous_round_config.lie_seconds:
next_round_config.lie_seconds = previous_round_config.lie_seconds
round_config_update_fields.append("lie_seconds")
if next_round_config.guess_seconds != previous_round_config.guess_seconds:
next_round_config.guess_seconds = previous_round_config.guess_seconds
round_config_update_fields.append("guess_seconds")
if next_round_config.points_correct != previous_round_config.points_correct:
next_round_config.points_correct = previous_round_config.points_correct
round_config_update_fields.append("points_correct")
if next_round_config.points_bluff != previous_round_config.points_bluff:
next_round_config.points_bluff = previous_round_config.points_bluff
round_config_update_fields.append("points_bluff")
if not next_round_config.started_from_scoreboard:
next_round_config.started_from_scoreboard = True
round_config_update_fields.append("started_from_scoreboard")
if round_config_update_fields:
next_round_config.save(update_fields=round_config_update_fields)
locked_session.current_round = next_round_number
round_question = reset_round_question_bootstrap_state(
select_round_question(locked_session, next_round_config, round_number=next_round_number)
)
locked_session.status = GameSession.Status.LIE
locked_session.save(update_fields=["current_round", "status"])
should_broadcast = True
phase_event = build_start_next_round_phase_event(locked_session, next_round_config, round_question)
phase_event_name = phase_event["name"]
phase_event_payload = phase_event["payload"]
elif locked_session.status == GameSession.Status.LIE:
if locked_session.current_round <= 1:
raise ValueError("next_round_invalid_phase")
next_round_config = RoundConfig.objects.filter(
session=locked_session,
number=locked_session.current_round,
).select_related("category").first()
round_question = get_current_round_question(locked_session)
if (
next_round_config is None
or not next_round_config.started_from_scoreboard
or round_question is None
):
raise ValueError("next_round_invalid_phase")
else:
raise ValueError("next_round_invalid_phase")
return RoundTransitionResult(
session=locked_session,
round_config=next_round_config,
round_question=round_question,
should_broadcast=should_broadcast,
response_payload=build_start_next_round_response(
locked_session,
next_round_config,
round_question,
),
phase_event_name=phase_event_name,
phase_event_payload=phase_event_payload,
)
def finish_game(session: GameSession) -> FinishGameResult:
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
should_broadcast = False
phase_event_name = None
phase_event_payload = None
if locked_session.status == GameSession.Status.SCOREBOARD:
locked_session.status = GameSession.Status.FINISHED
locked_session.save(update_fields=["status"])
should_broadcast = True
phase_event = build_finish_game_phase_event(locked_session)
phase_event_name = phase_event["name"]
phase_event_payload = phase_event["payload"]
elif locked_session.status != GameSession.Status.FINISHED:
raise ValueError("finish_game_invalid_phase")
return FinishGameResult(
session=locked_session,
should_broadcast=should_broadcast,
response_payload=build_finish_game_response(locked_session),
phase_event_name=phase_event_name,
phase_event_payload=phase_event_payload,
)
def promote_reveal_to_scoreboard(session: GameSession) -> ScoreboardTransitionResult:
if session.status != GameSession.Status.REVEAL:
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
return ScoreboardTransitionResult(
session=session,
leaderboard=leaderboard,
should_broadcast=False,
response_payload=build_reveal_scoreboard_response(session, leaderboard),
)
current_round_question = get_current_round_question(session)
if current_round_question is None:
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
return ScoreboardTransitionResult(
session=session,
leaderboard=leaderboard,
should_broadcast=False,
response_payload=build_reveal_scoreboard_response(session, leaderboard),
)
players_count = Player.objects.filter(session=session).count()
guess_count = Guess.objects.filter(round_question=current_round_question).count()
has_score_events = ScoreEvent.objects.filter(
session=session,
meta__round_question_id=current_round_question.id,
).exists()
reveal_is_resolved = has_score_events or (players_count > 0 and guess_count >= players_count)
if not reveal_is_resolved:
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
return ScoreboardTransitionResult(
session=session,
leaderboard=leaderboard,
should_broadcast=False,
response_payload=build_reveal_scoreboard_response(session, leaderboard),
)
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
if locked_session.status != GameSession.Status.REVEAL:
scoreboard_session = locked_session
should_broadcast = False
else:
locked_session.status = GameSession.Status.SCOREBOARD
locked_session.save(update_fields=["status"])
scoreboard_session = locked_session
should_broadcast = True
leaderboard = list(
Player.objects.filter(session=scoreboard_session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
phase_event_name = None
phase_event_payload = None
if should_broadcast:
phase_event = build_scoreboard_phase_event(scoreboard_session, leaderboard)
phase_event_name = phase_event["name"]
phase_event_payload = phase_event["payload"]
return ScoreboardTransitionResult(
session=scoreboard_session,
leaderboard=leaderboard,
should_broadcast=should_broadcast,
response_payload=build_reveal_scoreboard_response(scoreboard_session, leaderboard),
phase_event_name=phase_event_name,
phase_event_payload=phase_event_payload,
)
def resolve_scores(
session: GameSession,
round_question: RoundQuestion,
round_config: RoundConfig,
) -> tuple[list[ScoreEvent], list[dict]]:
guesses = list(round_question.guesses.select_related("player"))
if not guesses:
raise ValueError("no_guesses_submitted")
bluff_counts: dict[int, int] = {}
for guess in guesses:
if guess.fooled_player_id:
bluff_counts[guess.fooled_player_id] = bluff_counts.get(guess.fooled_player_id, 0) + 1
score_events = []
for guess in guesses:
if guess.is_correct:
guess.player.score += round_config.points_correct
guess.player.save(update_fields=["score"])
score_events.append(
ScoreEvent(
session=session,
player=guess.player,
delta=round_config.points_correct,
reason="guess_correct",
meta={"round_question_id": round_question.id, "guess_id": guess.id},
)
)
for player_id, fooled_count in bluff_counts.items():
delta = fooled_count * round_config.points_bluff
player = Player.objects.get(pk=player_id, session=session)
player.score += delta
player.save(update_fields=["score"])
score_events.append(
ScoreEvent(
session=session,
player=player,
delta=delta,
reason="bluff_success",
meta={"round_question_id": round_question.id, "fooled_count": fooled_count},
)
)
ScoreEvent.objects.bulk_create(score_events)
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
return score_events, leaderboard