Files
weirsoe-party-protocol/realtime/broadcast.py
Asger Geel Weirsoee be38fe6ac2
All checks were successful
CI / test-and-quality (pull_request) Successful in 2m58s
CI / test-and-quality (push) Successful in 2m59s
fix(realtime): tolerate missing scoreboard channel layer
2026-03-15 09:08:13 +00:00

36 lines
1.2 KiB
Python

from asgiref.sync import async_to_sync
from channels.exceptions import InvalidChannelLayerError
from channels.layers import get_channel_layer
from redis.exceptions import ConnectionError as RedisConnectionError
async def broadcast_phase_event(session_code: str, event_type: str, payload: dict) -> None:
"""Send a phase event to all WebSocket clients connected to a game session."""
try:
channel_layer = get_channel_layer()
except InvalidChannelLayerError:
return
if channel_layer is None:
return
group_name = f"game_{session_code.upper()}"
try:
await channel_layer.group_send(
group_name,
{
"type": "phase_event",
"payload": {"type": event_type, **payload},
},
)
except (InvalidChannelLayerError, RedisConnectionError):
return
def sync_broadcast_phase_event(session_code: str, event_type: str, payload: dict) -> None:
"""Sync wrapper for calling broadcast_phase_event from synchronous Django views."""
try:
async_to_sync(broadcast_phase_event)(session_code, event_type, payload)
except (InvalidChannelLayerError, RedisConnectionError):
return