Files
weirsoe-party-protocol/lobby/views.py
dev-bot d080f05661
All checks were successful
CI / test-and-quality (push) Successful in 4m0s
CI / test-and-quality (pull_request) Successful in 4m2s
fix(ci): retain lobby payload ownership export
2026-03-18 04:17:17 +00:00

892 lines
29 KiB
Python

from datetime import timedelta
import json
import random
from django.contrib.auth.decorators import login_required
from django.db import IntegrityError, transaction
from django.http import HttpRequest, JsonResponse
from django.utils import timezone
from django.views.decorators.http import require_GET, require_POST
from fupogfakta.models import GameSession, Guess, LieAnswer, Player, RoundConfig, RoundQuestion, ScoreEvent
from fupogfakta.payloads import (
build_leaderboard as _build_leaderboard,
build_reveal_payload as _build_reveal_payload,
build_scoreboard_phase_event as _build_scoreboard_phase_event,
build_session_detail_gameplay_payload as _build_session_detail_gameplay_payload,
)
from fupogfakta.services import (
finish_game as _finish_game,
get_current_round_question as _get_current_round_question,
prepare_mixed_answers as _prepare_mixed_answers,
promote_reveal_to_scoreboard as _promote_reveal_to_scoreboard,
resolve_scores as _resolve_scores,
select_round_question as _select_round_question,
show_question as _show_question,
start_next_round as _start_next_round,
start_round as _start_round,
)
from realtime.broadcast import sync_broadcast_phase_event
from .i18n import api_error
_GAMEPLAY_SERVICE_OWNERSHIP_EXPORTS = (
_select_round_question,
_build_scoreboard_phase_event,
)
SESSION_CODE_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789"
SESSION_CODE_LENGTH = 6
MAX_CODE_GENERATION_ATTEMPTS = 20
JOINABLE_STATUSES = {
GameSession.Status.LOBBY,
GameSession.Status.LIE,
GameSession.Status.GUESS,
GameSession.Status.REVEAL,
GameSession.Status.SCOREBOARD,
}
def _json_body(request: HttpRequest) -> dict:
if not request.body:
return {}
try:
return json.loads(request.body)
except json.JSONDecodeError:
return {}
def _generate_session_code() -> str:
return "".join(random.choices(SESSION_CODE_ALPHABET, k=SESSION_CODE_LENGTH))
def _normalize_session_code(code: str) -> str:
return code.strip().upper()
def _create_unique_session_code() -> str:
for _ in range(MAX_CODE_GENERATION_ATTEMPTS):
code = _generate_session_code()
if not GameSession.objects.filter(code=code).exists():
return code
raise RuntimeError("Could not generate unique session code")
def _maybe_promote_reveal_to_scoreboard(session: GameSession) -> GameSession:
transition = _promote_reveal_to_scoreboard(session)
if transition.should_broadcast:
sync_broadcast_phase_event(
transition.session.code,
transition.phase_event_name,
transition.phase_event_payload,
)
return transition.session
@require_POST
@login_required
def create_session(request: HttpRequest) -> JsonResponse:
code = _create_unique_session_code()
session = GameSession.objects.create(host=request.user, code=code)
return JsonResponse(
{
"session": {
"code": session.code,
"status": session.status,
"host_id": session.host_id,
"current_round": session.current_round,
}
},
status=201,
)
@require_POST
def join_session(request: HttpRequest) -> JsonResponse:
payload = _json_body(request)
code = _normalize_session_code(str(payload.get("code", "")))
nickname = str(payload.get("nickname", "")).strip()
if not code:
return api_error(
request,
code="session_code_required",
status=400,
)
if len(nickname) < 2 or len(nickname) > 40:
return api_error(
request,
code="nickname_invalid",
status=400,
)
try:
session = GameSession.objects.get(code=code)
except GameSession.DoesNotExist:
return api_error(
request,
code="session_not_found",
status=404,
)
if session.status not in JOINABLE_STATUSES:
return api_error(
request,
code="session_not_joinable",
status=400,
)
if Player.objects.filter(session=session, nickname__iexact=nickname).exists():
return api_error(
request,
code="nickname_taken",
status=409,
)
player = Player.objects.create(session=session, nickname=nickname)
return JsonResponse(
{
"player": {
"id": player.id,
"nickname": player.nickname,
"session_token": player.session_token,
"score": player.score,
},
"session": {
"code": session.code,
"status": session.status,
},
},
status=201,
)
@require_GET
def session_detail(request: HttpRequest, code: str) -> JsonResponse:
session_code = _normalize_session_code(code)
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return api_error(
request,
code="session_not_found",
status=404,
)
players = list(
session.players.order_by("nickname").values(
"id",
"nickname",
"score",
"is_connected",
)
)
session = _maybe_promote_reveal_to_scoreboard(session)
current_round_question = _get_current_round_question(session)
gameplay_payload = _build_session_detail_gameplay_payload(
session,
current_round_question=current_round_question,
players_count=len(players),
)
return JsonResponse(
{
"session": {
"code": session.code,
"status": session.status,
"host_id": session.host_id,
"current_round": session.current_round,
"players_count": len(players),
},
"players": players,
**gameplay_payload,
}
)
@require_POST
@login_required
def start_round(request: HttpRequest, code: str) -> JsonResponse:
payload = _json_body(request)
category_slug = str(payload.get("category_slug", "")).strip()
if not category_slug:
return api_error(
request,
code="category_slug_required",
status=400,
)
session_code = _normalize_session_code(code)
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return api_error(
request,
code="session_not_found",
status=404,
)
if session.host_id != request.user.id:
return api_error(
request,
code="host_only_start_round",
status=403,
)
try:
transition = _start_round(session, category_slug)
except ValueError as exc:
error_code = str(exc)
error_status = {
"category_not_found": 404,
"round_already_configured": 409,
}.get(error_code, 400)
return api_error(request, code=error_code, status=error_status)
sync_broadcast_phase_event(
transition.session.code,
transition.phase_event_name,
transition.phase_event_payload,
)
return JsonResponse(transition.response_payload, status=201)
@require_POST
@login_required
def show_question(request: HttpRequest, code: str) -> JsonResponse:
session_code = _normalize_session_code(code)
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return api_error(
request,
code="session_not_found",
status=404,
)
if session.host_id != request.user.id:
return api_error(
request,
code="host_only_show_question",
status=403,
)
try:
transition = _show_question(session)
except ValueError as exc:
return api_error(request, code=str(exc), status=400)
sync_broadcast_phase_event(
transition.session.code,
transition.phase_event_name,
transition.phase_event_payload,
)
return JsonResponse(transition.response_payload, status=201)
@require_POST
def submit_lie(request: HttpRequest, code: str, round_question_id: int) -> JsonResponse:
payload = _json_body(request)
session_code = _normalize_session_code(code)
player_id = payload.get("player_id")
session_token = str(payload.get("session_token", "")).strip()
lie_text = str(payload.get("text", "")).strip()
if not player_id:
return api_error(request, code="player_id_required", status=400)
if not session_token:
return api_error(request, code="session_token_required", status=400)
if not lie_text or len(lie_text) > 255:
return api_error(request, code="lie_text_invalid", status=400)
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return api_error(request, code="session_not_found", status=404)
if session.status != GameSession.Status.LIE:
return api_error(request, code="lie_submission_invalid_phase", status=400)
try:
player = Player.objects.get(pk=player_id, session=session)
except Player.DoesNotExist:
return api_error(request, code="player_not_found_in_session", status=404)
if player.session_token != session_token:
return api_error(request, code="invalid_player_session_token", status=403)
try:
round_question = RoundQuestion.objects.get(
pk=round_question_id,
session=session,
round_number=session.current_round,
)
except RoundQuestion.DoesNotExist:
return api_error(request, code="round_question_not_found", status=404)
try:
round_config = RoundConfig.objects.get(session=session, number=round_question.round_number)
except RoundConfig.DoesNotExist:
return api_error(request, code="round_config_missing", status=400)
lie_deadline_at = round_question.shown_at + timedelta(seconds=round_config.lie_seconds)
if timezone.now() > lie_deadline_at:
return api_error(request, code="lie_submission_closed", status=400)
try:
lie = LieAnswer.objects.create(round_question=round_question, player=player, text=lie_text)
except IntegrityError:
return api_error(request, code="lie_already_submitted", status=409)
players_count = Player.objects.filter(session=session).count()
lie_count = LieAnswer.objects.filter(round_question=round_question).count()
session_status = session.status
mixed_answers_payload = None
if players_count > 0 and lie_count >= players_count:
try:
mixed_answers = _prepare_mixed_answers(round_question)
except ValueError as exc:
return api_error(request, code=str(exc), status=400)
session.status = GameSession.Status.GUESS
session.save(update_fields=["status"])
session_status = session.status
mixed_answers_payload = [{"text": text} for text in mixed_answers]
sync_broadcast_phase_event(
session.code,
"phase.guess_started",
{
"round_question_id": round_question.id,
"answers": mixed_answers_payload,
"guess_seconds": round_config.guess_seconds,
},
)
return JsonResponse(
{
"lie": {
"id": lie.id,
"player_id": player.id,
"round_question_id": round_question.id,
"text": lie.text,
"created_at": lie.created_at.isoformat(),
},
"window": {
"lie_deadline_at": lie_deadline_at.isoformat(),
},
"session": {
"code": session.code,
"status": session_status,
"current_round": session.current_round,
},
"phase_transition": {
"current_phase": session_status,
"lies_submitted": lie_count,
"players_expected": players_count,
"auto_advanced": session_status == GameSession.Status.GUESS,
},
"answers": mixed_answers_payload,
},
status=201,
)
@require_POST
@login_required
def mix_answers(request: HttpRequest, code: str, round_question_id: int) -> JsonResponse:
session_code = _normalize_session_code(code)
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return api_error(
request,
code="session_not_found",
status=404,
)
if session.host_id != request.user.id:
return api_error(
request,
code="host_only_mix_answers",
status=403,
)
if session.status not in {GameSession.Status.LIE, GameSession.Status.GUESS}:
return api_error(
request,
code="mix_answers_invalid_phase",
status=400,
)
try:
round_question = RoundQuestion.objects.get(
pk=round_question_id,
session=session,
round_number=session.current_round,
)
except RoundQuestion.DoesNotExist:
return api_error(
request,
code="round_question_not_found",
status=404,
)
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
if locked_session.status not in {GameSession.Status.LIE, GameSession.Status.GUESS}:
return api_error(
request,
code="mix_answers_invalid_phase",
status=400,
)
locked_round_question = RoundQuestion.objects.select_for_update().get(pk=round_question.pk)
try:
deduped_answers = _prepare_mixed_answers(locked_round_question)
except ValueError as exc:
return api_error(request, code=str(exc), status=400)
if locked_session.status == GameSession.Status.LIE:
locked_session.status = GameSession.Status.GUESS
locked_session.save(update_fields=["status"])
try:
_guess_config = RoundConfig.objects.get(session=session, number=session.current_round)
_guess_seconds = _guess_config.guess_seconds
except RoundConfig.DoesNotExist:
_guess_seconds = None
sync_broadcast_phase_event(
session.code,
"phase.guess_started",
{
"round_question_id": round_question.id,
"answers": [{"text": t} for t in deduped_answers],
"guess_seconds": _guess_seconds,
},
)
return JsonResponse(
{
"session": {
"code": session.code,
"status": GameSession.Status.GUESS,
"current_round": session.current_round,
},
"round_question": {
"id": round_question.id,
"round_number": round_question.round_number,
},
"answers": [{"text": text} for text in deduped_answers],
}
)
@require_POST
def submit_guess(request: HttpRequest, code: str, round_question_id: int) -> JsonResponse:
payload = _json_body(request)
session_code = _normalize_session_code(code)
player_id = payload.get("player_id")
session_token = str(payload.get("session_token", "")).strip()
selected_text = str(payload.get("selected_text", "")).strip()
if not player_id:
return api_error(request, code="player_id_required", status=400)
if not session_token:
return api_error(request, code="session_token_required", status=400)
if not selected_text or len(selected_text) > 255:
return api_error(request, code="selected_text_invalid", status=400)
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return api_error(request, code="session_not_found", status=404)
if session.status != GameSession.Status.GUESS:
return api_error(request, code="guess_submission_invalid_phase", status=400)
try:
player = Player.objects.get(pk=player_id, session=session)
except Player.DoesNotExist:
return api_error(request, code="player_not_found_in_session", status=404)
if player.session_token != session_token:
return api_error(request, code="invalid_player_session_token", status=403)
try:
round_question = RoundQuestion.objects.get(
pk=round_question_id,
session=session,
round_number=session.current_round,
)
except RoundQuestion.DoesNotExist:
return api_error(request, code="round_question_not_found", status=404)
try:
round_config = RoundConfig.objects.get(session=session, number=round_question.round_number)
except RoundConfig.DoesNotExist:
return api_error(request, code="round_config_missing", status=400)
guess_deadline_at = round_question.shown_at + timedelta(
seconds=round_config.lie_seconds + round_config.guess_seconds
)
if timezone.now() > guess_deadline_at:
return api_error(request, code="guess_submission_closed", status=400)
allowed_answers = {
round_question.correct_answer.strip().casefold(),
*(
text.strip().casefold()
for text in round_question.lies.values_list("text", flat=True)
if text.strip()
),
}
selected_normalized = selected_text.casefold()
if selected_normalized not in allowed_answers:
return api_error(request, code="selected_answer_invalid", status=400)
correct_normalized = round_question.correct_answer.strip().casefold()
fooled_player_id = None
if selected_normalized != correct_normalized:
fooled_player_id = (
round_question.lies.filter(text__iexact=selected_text).values_list("player_id", flat=True).first()
)
try:
guess = Guess.objects.create(
round_question=round_question,
player=player,
selected_text=selected_text,
is_correct=selected_normalized == correct_normalized,
fooled_player_id=fooled_player_id,
)
except IntegrityError:
return api_error(request, code="guess_already_submitted", status=409)
players_count = Player.objects.filter(session=session).count()
guess_count = Guess.objects.filter(round_question=round_question).count()
session_status = session.status
reveal_payload = None
leaderboard = None
if players_count > 0 and guess_count >= players_count:
score_events = []
should_broadcast_scores = False
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
if locked_session.status == GameSession.Status.GUESS:
already_calculated = ScoreEvent.objects.filter(
session=locked_session,
meta__round_question_id=round_question.id,
).exists()
if not already_calculated:
score_events, leaderboard = _resolve_scores(locked_session, round_question, round_config)
should_broadcast_scores = True
else:
score_events = list(
ScoreEvent.objects.filter(
session=locked_session,
meta__round_question_id=round_question.id,
).select_related("player")
)
leaderboard = _build_leaderboard(locked_session)
locked_session.status = GameSession.Status.REVEAL
locked_session.save(update_fields=["status"])
elif locked_session.status == GameSession.Status.REVEAL:
score_events = list(
ScoreEvent.objects.filter(
session=locked_session,
meta__round_question_id=round_question.id,
).select_related("player")
)
leaderboard = _build_leaderboard(locked_session)
session_status = locked_session.status
reveal_payload = _build_reveal_payload(round_question)
if should_broadcast_scores:
score_deltas = [
{"player_id": ev.player_id, "delta": ev.delta, "reason": ev.reason}
for ev in score_events
]
sync_broadcast_phase_event(
session.code,
"phase.scores_calculated",
{
"round_question_id": round_question.id,
"score_deltas": score_deltas,
"leaderboard": list(leaderboard),
},
)
return JsonResponse(
{
"guess": {
"id": guess.id,
"player_id": player.id,
"round_question_id": round_question.id,
"selected_text": guess.selected_text,
"is_correct": guess.is_correct,
"fooled_player_id": guess.fooled_player_id,
"created_at": guess.created_at.isoformat(),
},
"window": {
"guess_deadline_at": guess_deadline_at.isoformat(),
},
"session": {
"code": session.code,
"status": session_status,
"current_round": session.current_round,
},
"phase_transition": {
"current_phase": session_status,
"guesses_submitted": guess_count,
"players_expected": players_count,
"auto_advanced": session_status == GameSession.Status.REVEAL,
},
"reveal": reveal_payload,
"leaderboard": leaderboard,
},
status=201,
)
@require_GET
@login_required
def reveal_scoreboard(request: HttpRequest, code: str) -> JsonResponse:
session_code = _normalize_session_code(code)
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return api_error(request, code="session_not_found", status=404)
if session.host_id != request.user.id:
return api_error(request, code="host_only_view_scoreboard", status=403)
transition = _promote_reveal_to_scoreboard(session)
if transition.should_broadcast:
sync_broadcast_phase_event(
transition.session.code,
transition.phase_event_name,
transition.phase_event_payload,
)
session = transition.session
if session.status not in {GameSession.Status.SCOREBOARD, GameSession.Status.FINISHED}:
return api_error(request, code="scoreboard_invalid_phase", status=400)
return JsonResponse(transition.response_payload)
@require_POST
@login_required
def start_next_round(request: HttpRequest, code: str) -> JsonResponse:
session_code = _normalize_session_code(code)
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return api_error(request, code="session_not_found", status=404)
if session.host_id != request.user.id:
return api_error(request, code="host_only_start_next_round", status=403)
try:
transition = _start_next_round(session)
except ValueError as exc:
return api_error(request, code=str(exc), status=400)
if transition.should_broadcast:
sync_broadcast_phase_event(
transition.session.code,
transition.phase_event_name,
transition.phase_event_payload,
)
return JsonResponse(transition.response_payload)
@require_POST
@login_required
def finish_game(request: HttpRequest, code: str) -> JsonResponse:
session_code = _normalize_session_code(code)
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return api_error(request, code="session_not_found", status=404)
if session.host_id != request.user.id:
return api_error(request, code="host_only_finish_game", status=403)
try:
transition = _finish_game(session)
except ValueError as exc:
return api_error(request, code=str(exc), status=400)
if transition.should_broadcast:
sync_broadcast_phase_event(
transition.session.code,
transition.phase_event_name,
transition.phase_event_payload,
)
return JsonResponse(transition.response_payload)
@require_POST
@login_required
def calculate_scores(request: HttpRequest, code: str, round_question_id: int) -> JsonResponse:
session_code = _normalize_session_code(code)
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return api_error(request, code="session_not_found", status=404)
if session.host_id != request.user.id:
return api_error(request, code="host_only_calculate_scores", status=403)
already_calculated = ScoreEvent.objects.filter(
session=session,
meta__round_question_id=round_question_id,
).exists()
if already_calculated:
return api_error(request, code="scores_already_calculated", status=409)
if session.status != GameSession.Status.GUESS:
return api_error(request, code="calculate_scores_invalid_phase", status=400)
try:
round_question = RoundQuestion.objects.get(
pk=round_question_id,
session=session,
round_number=session.current_round,
)
except RoundQuestion.DoesNotExist:
return api_error(request, code="round_question_not_found", status=404)
try:
round_config = RoundConfig.objects.get(session=session, number=round_question.round_number)
except RoundConfig.DoesNotExist:
return api_error(request, code="round_config_missing", status=400)
guesses = list(round_question.guesses.select_related("player"))
if not guesses:
return api_error(request, code="no_guesses_submitted", status=400)
bluff_counts = {}
for guess in guesses:
if guess.fooled_player_id:
bluff_counts[guess.fooled_player_id] = bluff_counts.get(guess.fooled_player_id, 0) + 1
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
if locked_session.status != GameSession.Status.GUESS:
return api_error(request, code="calculate_scores_invalid_phase", status=400)
score_events = []
for guess in guesses:
if guess.is_correct:
guess.player.score += round_config.points_correct
guess.player.save(update_fields=["score"])
score_events.append(
ScoreEvent(
session=session,
player=guess.player,
delta=round_config.points_correct,
reason="guess_correct",
meta={"round_question_id": round_question.id, "guess_id": guess.id},
)
)
for player_id, fooled_count in bluff_counts.items():
delta = fooled_count * round_config.points_bluff
player = Player.objects.get(pk=player_id, session=session)
player.score += delta
player.save(update_fields=["score"])
score_events.append(
ScoreEvent(
session=session,
player=player,
delta=delta,
reason="bluff_success",
meta={"round_question_id": round_question.id, "fooled_count": fooled_count},
)
)
ScoreEvent.objects.bulk_create(score_events)
locked_session.status = GameSession.Status.REVEAL
locked_session.save(update_fields=["status"])
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
score_deltas = [
{"player_id": ev.player_id, "delta": ev.delta, "reason": ev.reason}
for ev in score_events
]
sync_broadcast_phase_event(
session.code,
"phase.scores_calculated",
{
"round_question_id": round_question.id,
"score_deltas": score_deltas,
"leaderboard": list(leaderboard),
},
)
return JsonResponse(
{
"session": {
"code": session.code,
"status": GameSession.Status.REVEAL,
"current_round": session.current_round,
},
"round_question": {
"id": round_question.id,
"round_number": round_question.round_number,
},
"reveal": _build_reveal_payload(round_question),
"events_created": len(score_events),
"leaderboard": leaderboard,
}
)