Files
weirsoe-party-protocol/realtime/broadcast.py
Asger Geel Weirsoee 17234de5d1
Some checks failed
CI / test-and-quality (push) Failing after 11s
CI / test-and-quality (pull_request) Failing after 12s
Merge main into PR #291 and resolve scoreboard phase conflicts
2026-03-15 09:34:14 +00:00

36 lines
1.3 KiB
Python

from asgiref.sync import async_to_sync
from channels.exceptions import InvalidChannelLayerError
from channels.layers import get_channel_layer
try:
from redis.exceptions import ConnectionError as RedisConnectionError
except Exception: # pragma: no cover - optional dependency in local/test runtimes
RedisConnectionError = RuntimeError
async def broadcast_phase_event(session_code: str, event_type: str, payload: dict) -> None:
"""Send a phase event to all WebSocket clients connected to a game session."""
try:
channel_layer = get_channel_layer()
if channel_layer is None:
return
group_name = f"game_{session_code.upper()}"
await channel_layer.group_send(
group_name,
{
"type": "phase.event",
"event_type": event_type,
"payload": payload,
},
)
except (InvalidChannelLayerError, RedisConnectionError):
return
def sync_broadcast_phase_event(session_code: str, event_type: str, payload: dict) -> None:
"""Sync wrapper for calling broadcast_phase_event from synchronous Django views."""
try:
async_to_sync(broadcast_phase_event)(session_code, event_type, payload)
except (InvalidChannelLayerError, RedisConnectionError):
return