Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ humanize = ">=2.6.0"
maxminddb = "*"
oauthlib = "*"
prometheus_client = "*"
proxy-protocol = "*"
pyjwt = {version = ">=2.4.0", extras = ["crypto"]}
pyyaml = "*"
sortedcontainers = "*"
Expand Down
37 changes: 9 additions & 28 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from server.health import HealthServer
from server.player_service import PlayerService
from server.profiler import Profiler
from server.protocol import QDataStreamProtocol, SimpleJsonProtocol
from server.timing import datetime_now


Expand Down Expand Up @@ -115,34 +114,16 @@ def done_handler(sig: int, frame):

await instance.start_services()

PROTO_CLASSES = {
QDataStreamProtocol.__name__: QDataStreamProtocol,
SimpleJsonProtocol.__name__: SimpleJsonProtocol
}
for cfg in config.LISTEN:
try:
host = cfg["ADDRESS"]
port = cfg["PORT"]
proto_class_name = cfg["PROTOCOL"]
name = cfg.get("NAME")
proxy = cfg.get("PROXY", False)

proto_class = PROTO_CLASSES[proto_class_name]

await instance.listen(
address=(host, port),
name=name,
protocol_class=proto_class,
proxy=proxy
)
except Exception as e:
raise RuntimeError(f"Error with server instance config: {cfg}") from e

if not instance.contexts:
raise RuntimeError(
"The server was not configured to listen on any ports! Check the "
"config file and try again."
try:
await instance.listen(
address=(config.WS_HOST, config.WS_PORT),
path=config.WS_PATH,
)
except Exception as e:
raise RuntimeError(
f"Error starting WebSocket listener on "
f"{config.WS_HOST}:{config.WS_PORT}{config.WS_PATH}"
) from e

server.metrics.info.info({
"version": info.VERSION,
Expand Down
12 changes: 4 additions & 8 deletions minikube-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ spec:
selector:
app: faf-lobby
ports:
- port: 8001
name: qstream
- port: 8002
name: simplejson
- port: 8003
name: websocket
---
apiVersion: apps/v1
kind: Deployment
Expand Down Expand Up @@ -44,10 +42,8 @@ spec:
name: control
- containerPort: 2000
name: health
- containerPort: 8001
name: qstream
- containerPort: 8002
name: simplejson
- containerPort: 8003
name: websocket
env:
- name: CONFIGURATION_FILE
value: /config/config.yaml
Expand Down
18 changes: 6 additions & 12 deletions server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@
from .oauth_service import OAuthService
from .party_service import PartyService
from .player_service import PlayerService
from .protocol import Protocol, QDataStreamProtocol
from .rating_service.rating_service import RatingService
from .servercontext import ServerContext
from .stats.game_stats_service import GameStatsService
Expand Down Expand Up @@ -258,30 +257,25 @@ async def listen(
self,
address: tuple[str, int],
name: Optional[str] = None,
protocol_class: type[Protocol] = QDataStreamProtocol,
proxy: bool = False,
path: str = "/ws",
) -> ServerContext:
"""
Start listening on a new address.
Start listening for WebSocket connections on a new address.

# Params
- `address`: Tuple indicating the host, port to listen on.
- `name`: String used to identify this context in log messages. The
default is to use the `protocol_class` name.
- `protocol_class`: The protocol class implementation to use.
- `proxy`: Boolean indicating whether or not to use the PROXY protocol.
See: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
- `name`: String used to identify this context in log messages.
- `path`: HTTP path on which to expose the WebSocket endpoint.
"""
if not self.started:
await self.start_services()

ctx = ServerContext(
f"{self.name}[{name or protocol_class.__name__}]",
f"{self.name}[{name or 'WebSocket'}]",
self.connection_factory,
list(self.services.values()),
protocol_class
)
await ctx.listen(*address, proxy=proxy)
await ctx.listen(*address, path=path)

self.contexts.add(ctx)

Expand Down
19 changes: 3 additions & 16 deletions server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,9 @@ def __init__(self):
Change default values here.
"""
self.CONFIGURATION_REFRESH_TIME = 300
self.LISTEN = [
{
"ADDRESS": "",
"PORT": 8001,
"NAME": None,
"PROTOCOL": "QDataStreamProtocol",
"PROXY": False,
},
{
"ADDRESS": "",
"PORT": 8002,
"NAME": None,
"PROTOCOL": "SimpleJsonProtocol",
"PROXY": False
}
]
self.WS_HOST = ""
self.WS_PORT = 8003
self.WS_PATH = "/ws"
self.LOG_LEVEL = "DEBUG"
# Whether or not to use uvloop as a drop-in replacement for asyncio's
# default event loop
Expand Down
4 changes: 3 additions & 1 deletion server/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
from .protocol import DisconnectedError, Protocol
from .qdatastream import QDataStreamProtocol
from .simple_json import SimpleJsonProtocol
from .websocket import WebSocketProtocol

__all__ = (
"DisconnectedError",
"GpgNetClientProtocol",
"GpgNetServerProtocol",
"Protocol",
"QDataStreamProtocol",
"SimpleJsonProtocol"
"SimpleJsonProtocol",
"WebSocketProtocol",
)
88 changes: 88 additions & 0 deletions server/protocol/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""A WebSocket-native wire protocol.

Check notice on line 1 in server/protocol/websocket.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

server/protocol/websocket.py#L1

Multi-line docstring summary should start at the second line (D213)

Each message is sent as exactly one WebSocket text frame containing a single
JSON object. No newline framing — frame boundaries delimit messages.
"""

import asyncio
import contextlib
import json

from aiohttp import WSMsgType, web

Check failure on line 11 in server/protocol/websocket.py

View workflow job for this annotation

GitHub Actions / flake8

'aiohttp.web' imported but unused

Check warning on line 11 in server/protocol/websocket.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

server/protocol/websocket.py#L11

'aiohttp.web' imported but unused (F401)

Check warning on line 11 in server/protocol/websocket.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

server/protocol/websocket.py#L11

Unused web imported from aiohttp
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

import server.metrics as metrics

from .protocol import DisconnectedError, Protocol, json_encoder


class WebSocketProtocol(Protocol):
def __init__(self, ws, owned_session=None):

Check notice on line 19 in server/protocol/websocket.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

server/protocol/websocket.py#L19

Missing docstring in __init__ (D107)
# Intentionally bypass Protocol.__init__: it expects a StreamReader /
# StreamWriter pair, which we do not have here.
self.ws = ws
self._pending: set[asyncio.Task] = set()
self._owned_session = owned_session

@staticmethod
def encode_message(message: dict) -> bytes:
return json_encoder.encode(message).encode()

@staticmethod
def decode_message(data: bytes) -> dict:
return json.loads(data)

def is_connected(self) -> bool:
return not self.ws.closed

async def read_message(self) -> dict:
msg = await self.ws.receive()
if msg.type == WSMsgType.TEXT:
return json.loads(msg.data)
if msg.type == WSMsgType.BINARY:
return json.loads(msg.data)
raise DisconnectedError("WebSocket connection closed")

def write_raw(self, data: bytes) -> None:
metrics.sent_messages.labels(self.__class__.__name__).inc()
if not self.is_connected():
raise DisconnectedError("Protocol is not connected!")

text = data.decode() if isinstance(data, (bytes, bytearray)) else data
task = asyncio.create_task(self.ws.send_str(text))
self._pending.add(task)
task.add_done_callback(self._pending.discard)

def write_message(self, message: dict) -> None:
if not self.is_connected():
raise DisconnectedError("Protocol is not connected!")
self.write_raw(self.encode_message(message))

def write_messages(self, messages: list[dict]) -> None:
metrics.sent_messages.labels(self.__class__.__name__).inc()
if not self.is_connected():
raise DisconnectedError("Protocol is not connected!")
for message in messages:
self.write_raw(self.encode_message(message))
Comment thread
coderabbitai[bot] marked this conversation as resolved.

async def drain(self) -> None:
if not self._pending:
return
try:
await asyncio.gather(*self._pending)
except Exception as e:
await self.close()
raise DisconnectedError("Protocol connection lost!") from e

def abort(self) -> None:
for task in self._pending:
task.cancel()
if not self.ws.closed:
asyncio.create_task(self.ws.close())

async def close(self) -> None:
with contextlib.suppress(Exception):
await self.ws.close()
if self._owned_session is not None:
with contextlib.suppress(Exception):
await self._owned_session.close()
self._owned_session = None
Loading
Loading