Files
weirsoe-party-protocol/lobby/views.py
Asger Geel Weirsoee cfffc9934c
All checks were successful
CI / test-and-quality (push) Successful in 1m25s
CI / test-and-quality (pull_request) Successful in 1m25s
feat(ui): add MVP host/player web screens
2026-02-27 22:26:55 +01:00

737 lines
24 KiB
Python

import json
import random
from datetime import timedelta
from django.contrib.auth.decorators import login_required
from django.db import IntegrityError, transaction
from django.http import HttpRequest, JsonResponse
from django.utils import timezone
from django.views.decorators.http import require_GET, require_POST
from fupogfakta.models import (
Category,
GameSession,
Guess,
LieAnswer,
Player,
Question,
RoundConfig,
RoundQuestion,
ScoreEvent,
)
SESSION_CODE_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789"
SESSION_CODE_LENGTH = 6
MAX_CODE_GENERATION_ATTEMPTS = 20
JOINABLE_STATUSES = {
GameSession.Status.LOBBY,
GameSession.Status.LIE,
GameSession.Status.GUESS,
GameSession.Status.REVEAL,
}
def _json_body(request: HttpRequest) -> dict:
if not request.body:
return {}
try:
return json.loads(request.body)
except json.JSONDecodeError:
return {}
def _generate_session_code() -> str:
return "".join(random.choices(SESSION_CODE_ALPHABET, k=SESSION_CODE_LENGTH))
def _create_unique_session_code() -> str:
for _ in range(MAX_CODE_GENERATION_ATTEMPTS):
code = _generate_session_code()
if not GameSession.objects.filter(code=code).exists():
return code
raise RuntimeError("Could not generate unique session code")
@require_POST
@login_required
def create_session(request: HttpRequest) -> JsonResponse:
code = _create_unique_session_code()
session = GameSession.objects.create(host=request.user, code=code)
return JsonResponse(
{
"session": {
"code": session.code,
"status": session.status,
"host_id": session.host_id,
"current_round": session.current_round,
}
},
status=201,
)
@require_POST
def join_session(request: HttpRequest) -> JsonResponse:
payload = _json_body(request)
code = str(payload.get("code", "")).strip().upper()
nickname = str(payload.get("nickname", "")).strip()
if not code:
return JsonResponse({"error": "Session code is required"}, status=400)
if len(nickname) < 2 or len(nickname) > 40:
return JsonResponse({"error": "Nickname must be between 2 and 40 characters"}, status=400)
try:
session = GameSession.objects.get(code=code)
except GameSession.DoesNotExist:
return JsonResponse({"error": "Session not found"}, status=404)
if session.status not in JOINABLE_STATUSES:
return JsonResponse({"error": "Session is not joinable"}, status=400)
if Player.objects.filter(session=session, nickname__iexact=nickname).exists():
return JsonResponse({"error": "Nickname already taken"}, status=409)
player = Player.objects.create(session=session, nickname=nickname)
return JsonResponse(
{
"player": {
"id": player.id,
"nickname": player.nickname,
"score": player.score,
},
"session": {
"code": session.code,
"status": session.status,
},
},
status=201,
)
@require_GET
def session_detail(request: HttpRequest, code: str) -> JsonResponse:
session_code = code.strip().upper()
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return JsonResponse({"error": "Session not found"}, status=404)
players = list(
session.players.order_by("nickname").values(
"id",
"nickname",
"score",
"is_connected",
)
)
current_round_question = (
RoundQuestion.objects.filter(session=session, round_number=session.current_round)
.select_related("question")
.order_by("-id")
.first()
)
round_question_payload = None
if current_round_question:
round_question_payload = {
"id": current_round_question.id,
"round_number": current_round_question.round_number,
"prompt": current_round_question.question.prompt,
"shown_at": current_round_question.shown_at.isoformat(),
}
return JsonResponse(
{
"session": {
"code": session.code,
"status": session.status,
"host_id": session.host_id,
"current_round": session.current_round,
"players_count": len(players),
},
"players": players,
"round_question": round_question_payload,
}
)
@require_POST
@login_required
def start_round(request: HttpRequest, code: str) -> JsonResponse:
payload = _json_body(request)
category_slug = str(payload.get("category_slug", "")).strip()
if not category_slug:
return JsonResponse({"error": "category_slug is required"}, status=400)
session_code = code.strip().upper()
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return JsonResponse({"error": "Session not found"}, status=404)
if session.host_id != request.user.id:
return JsonResponse({"error": "Only host can start round"}, status=403)
if session.status != GameSession.Status.LOBBY:
return JsonResponse({"error": "Round can only be started from lobby"}, status=400)
try:
category = Category.objects.get(slug=category_slug, is_active=True)
except Category.DoesNotExist:
return JsonResponse({"error": "Category not found"}, status=404)
if not Question.objects.filter(category=category, is_active=True).exists():
return JsonResponse({"error": "Category has no active questions"}, status=400)
with transaction.atomic():
session = GameSession.objects.select_for_update().get(pk=session.pk)
if session.status != GameSession.Status.LOBBY:
return JsonResponse({"error": "Round can only be started from lobby"}, status=400)
round_config, created = RoundConfig.objects.get_or_create(
session=session,
number=session.current_round,
defaults={"category": category},
)
if not created:
return JsonResponse({"error": "Round already configured"}, status=409)
session.status = GameSession.Status.LIE
session.save(update_fields=["status"])
return JsonResponse(
{
"session": {
"code": session.code,
"status": session.status,
"current_round": session.current_round,
},
"round": {
"number": round_config.number,
"category": {
"slug": round_config.category.slug,
"name": round_config.category.name,
},
},
},
status=201,
)
@require_POST
@login_required
def show_question(request: HttpRequest, code: str) -> JsonResponse:
session_code = code.strip().upper()
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return JsonResponse({"error": "Session not found"}, status=404)
if session.host_id != request.user.id:
return JsonResponse({"error": "Only host can show question"}, status=403)
if session.status != GameSession.Status.LIE:
return JsonResponse({"error": "Question can only be shown in lie phase"}, status=400)
try:
round_config = RoundConfig.objects.get(session=session, number=session.current_round)
except RoundConfig.DoesNotExist:
return JsonResponse({"error": "Round config missing"}, status=400)
if RoundQuestion.objects.filter(session=session, round_number=session.current_round).exists():
return JsonResponse({"error": "Question already shown for this round"}, status=409)
used_question_ids = RoundQuestion.objects.filter(session=session).values_list("question_id", flat=True)
available_questions = Question.objects.filter(
category=round_config.category,
is_active=True,
).exclude(pk__in=used_question_ids)
if not available_questions.exists():
return JsonResponse({"error": "No available questions in category"}, status=400)
question = random.choice(list(available_questions))
round_question = RoundQuestion.objects.create(
session=session,
round_number=session.current_round,
question=question,
correct_answer=question.correct_answer,
)
lie_deadline_at = round_question.shown_at + timedelta(seconds=round_config.lie_seconds)
return JsonResponse(
{
"round_question": {
"id": round_question.id,
"prompt": question.prompt,
"round_number": round_question.round_number,
"shown_at": round_question.shown_at.isoformat(),
"lie_deadline_at": lie_deadline_at.isoformat(),
},
"config": {
"lie_seconds": round_config.lie_seconds,
},
},
status=201,
)
@require_POST
def submit_lie(request: HttpRequest, code: str, round_question_id: int) -> JsonResponse:
payload = _json_body(request)
session_code = code.strip().upper()
player_id = payload.get("player_id")
lie_text = str(payload.get("text", "")).strip()
if not player_id:
return JsonResponse({"error": "player_id is required"}, status=400)
if not lie_text or len(lie_text) > 255:
return JsonResponse({"error": "text must be between 1 and 255 characters"}, status=400)
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return JsonResponse({"error": "Session not found"}, status=404)
if session.status != GameSession.Status.LIE:
return JsonResponse({"error": "Lie submission is only allowed in lie phase"}, status=400)
try:
player = Player.objects.get(pk=player_id, session=session)
except Player.DoesNotExist:
return JsonResponse({"error": "Player not found in session"}, status=404)
try:
round_question = RoundQuestion.objects.get(
pk=round_question_id,
session=session,
round_number=session.current_round,
)
except RoundQuestion.DoesNotExist:
return JsonResponse({"error": "Round question not found"}, status=404)
try:
round_config = RoundConfig.objects.get(session=session, number=round_question.round_number)
except RoundConfig.DoesNotExist:
return JsonResponse({"error": "Round config missing"}, status=400)
lie_deadline_at = round_question.shown_at + timedelta(seconds=round_config.lie_seconds)
if timezone.now() > lie_deadline_at:
return JsonResponse({"error": "Lie submission window has closed"}, status=400)
try:
lie = LieAnswer.objects.create(round_question=round_question, player=player, text=lie_text)
except IntegrityError:
return JsonResponse({"error": "Lie already submitted for this player"}, status=409)
return JsonResponse(
{
"lie": {
"id": lie.id,
"player_id": player.id,
"round_question_id": round_question.id,
"text": lie.text,
"created_at": lie.created_at.isoformat(),
},
"window": {
"lie_deadline_at": lie_deadline_at.isoformat(),
},
},
status=201,
)
@require_POST
@login_required
def mix_answers(request: HttpRequest, code: str, round_question_id: int) -> JsonResponse:
session_code = code.strip().upper()
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return JsonResponse({"error": "Session not found"}, status=404)
if session.host_id != request.user.id:
return JsonResponse({"error": "Only host can mix answers"}, status=403)
if session.status != GameSession.Status.LIE:
return JsonResponse({"error": "Answers can only be mixed in lie phase"}, status=400)
try:
round_question = RoundQuestion.objects.get(
pk=round_question_id,
session=session,
round_number=session.current_round,
)
except RoundQuestion.DoesNotExist:
return JsonResponse({"error": "Round question not found"}, status=404)
lie_texts = list(round_question.lies.values_list("text", flat=True))
deduped_answers = []
seen = set()
for text in [round_question.correct_answer, *lie_texts]:
normalized = text.strip().casefold()
if not normalized or normalized in seen:
continue
seen.add(normalized)
deduped_answers.append(text.strip())
if len(deduped_answers) < 2:
return JsonResponse({"error": "Not enough answers to mix"}, status=400)
random.shuffle(deduped_answers)
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
if locked_session.status != GameSession.Status.LIE:
return JsonResponse({"error": "Answers can only be mixed in lie phase"}, status=400)
locked_session.status = GameSession.Status.GUESS
locked_session.save(update_fields=["status"])
return JsonResponse(
{
"session": {
"code": session.code,
"status": GameSession.Status.GUESS,
"current_round": session.current_round,
},
"round_question": {
"id": round_question.id,
"round_number": round_question.round_number,
},
"answers": [{"text": text} for text in deduped_answers],
}
)
@require_POST
def submit_guess(request: HttpRequest, code: str, round_question_id: int) -> JsonResponse:
payload = _json_body(request)
session_code = code.strip().upper()
player_id = payload.get("player_id")
selected_text = str(payload.get("selected_text", "")).strip()
if not player_id:
return JsonResponse({"error": "player_id is required"}, status=400)
if not selected_text or len(selected_text) > 255:
return JsonResponse({"error": "selected_text must be between 1 and 255 characters"}, status=400)
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return JsonResponse({"error": "Session not found"}, status=404)
if session.status != GameSession.Status.GUESS:
return JsonResponse({"error": "Guess submission is only allowed in guess phase"}, status=400)
try:
player = Player.objects.get(pk=player_id, session=session)
except Player.DoesNotExist:
return JsonResponse({"error": "Player not found in session"}, status=404)
try:
round_question = RoundQuestion.objects.get(
pk=round_question_id,
session=session,
round_number=session.current_round,
)
except RoundQuestion.DoesNotExist:
return JsonResponse({"error": "Round question not found"}, status=404)
try:
round_config = RoundConfig.objects.get(session=session, number=round_question.round_number)
except RoundConfig.DoesNotExist:
return JsonResponse({"error": "Round config missing"}, status=400)
guess_deadline_at = round_question.shown_at + timedelta(
seconds=round_config.lie_seconds + round_config.guess_seconds
)
if timezone.now() > guess_deadline_at:
return JsonResponse({"error": "Guess submission window has closed"}, status=400)
allowed_answers = {
round_question.correct_answer.strip().casefold(),
*(
text.strip().casefold()
for text in round_question.lies.values_list("text", flat=True)
if text.strip()
),
}
selected_normalized = selected_text.casefold()
if selected_normalized not in allowed_answers:
return JsonResponse({"error": "Selected answer is not part of this round"}, status=400)
correct_normalized = round_question.correct_answer.strip().casefold()
fooled_player_id = None
if selected_normalized != correct_normalized:
fooled_player_id = (
round_question.lies.filter(text__iexact=selected_text).values_list("player_id", flat=True).first()
)
try:
guess = Guess.objects.create(
round_question=round_question,
player=player,
selected_text=selected_text,
is_correct=selected_normalized == correct_normalized,
fooled_player_id=fooled_player_id,
)
except IntegrityError:
return JsonResponse({"error": "Guess already submitted for this player"}, status=409)
return JsonResponse(
{
"guess": {
"id": guess.id,
"player_id": player.id,
"round_question_id": round_question.id,
"selected_text": guess.selected_text,
"is_correct": guess.is_correct,
"fooled_player_id": guess.fooled_player_id,
"created_at": guess.created_at.isoformat(),
},
"window": {
"guess_deadline_at": guess_deadline_at.isoformat(),
},
},
status=201,
)
@require_GET
@login_required
def reveal_scoreboard(request: HttpRequest, code: str) -> JsonResponse:
session_code = code.strip().upper()
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return JsonResponse({"error": "Session not found"}, status=404)
if session.host_id != request.user.id:
return JsonResponse({"error": "Only host can view scoreboard"}, status=403)
if session.status != GameSession.Status.REVEAL:
return JsonResponse({"error": "Scoreboard is only available in reveal phase"}, status=400)
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
return JsonResponse(
{
"session": {
"code": session.code,
"status": session.status,
"current_round": session.current_round,
},
"leaderboard": leaderboard,
}
)
@require_POST
@login_required
def start_next_round(request: HttpRequest, code: str) -> JsonResponse:
session_code = code.strip().upper()
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return JsonResponse({"error": "Session not found"}, status=404)
if session.host_id != request.user.id:
return JsonResponse({"error": "Only host can start next round"}, status=403)
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
if locked_session.status != GameSession.Status.REVEAL:
return JsonResponse({"error": "Next round can only start from reveal phase"}, status=400)
locked_session.current_round += 1
locked_session.status = GameSession.Status.LOBBY
locked_session.save(update_fields=["current_round", "status"])
return JsonResponse(
{
"session": {
"code": session.code,
"status": GameSession.Status.LOBBY,
"current_round": locked_session.current_round,
}
}
)
@require_POST
@login_required
def finish_game(request: HttpRequest, code: str) -> JsonResponse:
session_code = code.strip().upper()
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return JsonResponse({"error": "Session not found"}, status=404)
if session.host_id != request.user.id:
return JsonResponse({"error": "Only host can finish game"}, status=403)
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
if locked_session.status != GameSession.Status.REVEAL:
return JsonResponse({"error": "Game can only be finished from reveal phase"}, status=400)
locked_session.status = GameSession.Status.FINISHED
locked_session.save(update_fields=["status"])
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
winner = leaderboard[0] if leaderboard else None
return JsonResponse(
{
"session": {
"code": session.code,
"status": GameSession.Status.FINISHED,
"current_round": session.current_round,
},
"winner": winner,
"leaderboard": leaderboard,
}
)
@require_POST
@login_required
def calculate_scores(request: HttpRequest, code: str, round_question_id: int) -> JsonResponse:
session_code = code.strip().upper()
try:
session = GameSession.objects.get(code=session_code)
except GameSession.DoesNotExist:
return JsonResponse({"error": "Session not found"}, status=404)
if session.host_id != request.user.id:
return JsonResponse({"error": "Only host can calculate scores"}, status=403)
already_calculated = ScoreEvent.objects.filter(
session=session,
meta__round_question_id=round_question_id,
).exists()
if already_calculated:
return JsonResponse({"error": "Scores already calculated for this round question"}, status=409)
if session.status != GameSession.Status.GUESS:
return JsonResponse({"error": "Scores can only be calculated in guess phase"}, status=400)
try:
round_question = RoundQuestion.objects.get(
pk=round_question_id,
session=session,
round_number=session.current_round,
)
except RoundQuestion.DoesNotExist:
return JsonResponse({"error": "Round question not found"}, status=404)
try:
round_config = RoundConfig.objects.get(session=session, number=round_question.round_number)
except RoundConfig.DoesNotExist:
return JsonResponse({"error": "Round config missing"}, status=400)
guesses = list(round_question.guesses.select_related("player"))
if not guesses:
return JsonResponse({"error": "No guesses submitted for this round question"}, status=400)
bluff_counts = {}
for guess in guesses:
if guess.fooled_player_id:
bluff_counts[guess.fooled_player_id] = bluff_counts.get(guess.fooled_player_id, 0) + 1
with transaction.atomic():
locked_session = GameSession.objects.select_for_update().get(pk=session.pk)
if locked_session.status != GameSession.Status.GUESS:
return JsonResponse({"error": "Scores can only be calculated in guess phase"}, status=400)
score_events = []
for guess in guesses:
if guess.is_correct:
guess.player.score += round_config.points_correct
guess.player.save(update_fields=["score"])
score_events.append(
ScoreEvent(
session=session,
player=guess.player,
delta=round_config.points_correct,
reason="guess_correct",
meta={"round_question_id": round_question.id, "guess_id": guess.id},
)
)
for player_id, fooled_count in bluff_counts.items():
delta = fooled_count * round_config.points_bluff
player = Player.objects.get(pk=player_id, session=session)
player.score += delta
player.save(update_fields=["score"])
score_events.append(
ScoreEvent(
session=session,
player=player,
delta=delta,
reason="bluff_success",
meta={"round_question_id": round_question.id, "fooled_count": fooled_count},
)
)
ScoreEvent.objects.bulk_create(score_events)
locked_session.status = GameSession.Status.REVEAL
locked_session.save(update_fields=["status"])
leaderboard = list(
Player.objects.filter(session=session)
.order_by("-score", "nickname")
.values("id", "nickname", "score")
)
return JsonResponse(
{
"session": {
"code": session.code,
"status": GameSession.Status.REVEAL,
"current_round": session.current_round,
},
"round_question": {
"id": round_question.id,
"round_number": round_question.round_number,
},
"events_created": len(score_events),
"leaderboard": leaderboard,
}
)