Source code for websockets.framing

"""
:mod:`websockets.framing` reads and writes WebSocket frames.

It deals with a single frame at a time. Anything that depends on the sequence
of frames is implemented in :mod:`websockets.protocol`.

See `section 5 of RFC 6455`_.

.. _section 5 of RFC 6455: http://tools.ietf.org/html/rfc6455#section-5

"""

import io
import random
import struct
from typing import Any, Awaitable, Callable, NamedTuple, Optional, Sequence, Tuple

from .exceptions import PayloadTooBig, ProtocolError
from .typing import Data


try:
    from .speedups import apply_mask
except ImportError:  # pragma: no cover
    from .utils import apply_mask


__all__ = [
    "DATA_OPCODES",
    "CTRL_OPCODES",
    "OP_CONT",
    "OP_TEXT",
    "OP_BINARY",
    "OP_CLOSE",
    "OP_PING",
    "OP_PONG",
    "Frame",
    "prepare_data",
    "encode_data",
    "parse_close",
    "serialize_close",
]

DATA_OPCODES = OP_CONT, OP_TEXT, OP_BINARY = 0x00, 0x01, 0x02
CTRL_OPCODES = OP_CLOSE, OP_PING, OP_PONG = 0x08, 0x09, 0x0A

# Close code that are allowed in a close frame.
# Using a list optimizes `code in EXTERNAL_CLOSE_CODES`.
EXTERNAL_CLOSE_CODES = [1000, 1001, 1002, 1003, 1007, 1008, 1009, 1010, 1011]


# Remove FrameData when dropping support for Python < 3.6.1 — the first
# version where NamedTuple supports default values, methods, and docstrings.

# Consider converting to a dataclass when dropping support for Python < 3.7.


class FrameData(NamedTuple):
    fin: bool
    opcode: int
    data: bytes
    rsv1: bool
    rsv2: bool
    rsv3: bool


[docs]class Frame(FrameData): """ WebSocket frame. :param bool fin: FIN bit :param bool rsv1: RSV1 bit :param bool rsv2: RSV2 bit :param bool rsv3: RSV3 bit :param int opcode: opcode :param bytes data: payload data Only these fields are needed. The MASK bit, payload length and masking-key are handled on the fly by :meth:`read` and :meth:`write`. """ def __new__( cls, fin: bool, opcode: int, data: bytes, rsv1: bool = False, rsv2: bool = False, rsv3: bool = False, ) -> "Frame": return FrameData.__new__(cls, fin, opcode, data, rsv1, rsv2, rsv3)
[docs] @classmethod async def read( cls, reader: Callable[[int], Awaitable[bytes]], *, mask: bool, max_size: Optional[int] = None, extensions: Optional[Sequence["websockets.extensions.base.Extension"]] = None, ) -> "Frame": """ Read a WebSocket frame. :param reader: coroutine that reads exactly the requested number of bytes, unless the end of file is reached :param mask: whether the frame should be masked i.e. whether the read happens on the server side :param max_size: maximum payload size in bytes :param extensions: list of classes with a ``decode()`` method that transforms the frame and return a new frame; extensions are applied in reverse order :raises ~websockets.exceptions.PayloadTooBig: if the frame exceeds ``max_size`` :raises ~websockets.exceptions.ProtocolError: if the frame contains incorrect values """ # Read the header. data = await reader(2) head1, head2 = struct.unpack("!BB", data) # While not Pythonic, this is marginally faster than calling bool(). fin = True if head1 & 0b10000000 else False rsv1 = True if head1 & 0b01000000 else False rsv2 = True if head1 & 0b00100000 else False rsv3 = True if head1 & 0b00010000 else False opcode = head1 & 0b00001111 if (True if head2 & 0b10000000 else False) != mask: raise ProtocolError("incorrect masking") length = head2 & 0b01111111 if length == 126: data = await reader(2) length, = struct.unpack("!H", data) elif length == 127: data = await reader(8) length, = struct.unpack("!Q", data) if max_size is not None and length > max_size: raise PayloadTooBig( f"payload length exceeds size limit ({length} > {max_size} bytes)" ) if mask: mask_bits = await reader(4) # Read the data. data = await reader(length) if mask: data = apply_mask(data, mask_bits) frame = cls(fin, opcode, data, rsv1, rsv2, rsv3) if extensions is None: extensions = [] for extension in reversed(extensions): frame = extension.decode(frame, max_size=max_size) frame.check() return frame
[docs] def write( frame, writer: Callable[[bytes], Any], *, mask: bool, extensions: Optional[Sequence["websockets.extensions.base.Extension"]] = None, ) -> None: """ Write a WebSocket frame. :param frame: frame to write :param writer: function that writes bytes :param mask: whether the frame should be masked i.e. whether the write happens on the client side :param extensions: list of classes with an ``encode()`` method that transform the frame and return a new frame; extensions are applied in order :raises ~websockets.exceptions.ProtocolError: if the frame contains incorrect values """ # The first parameter is called `frame` rather than `self`, # but it's the instance of class to which this method is bound. frame.check() if extensions is None: extensions = [] for extension in extensions: frame = extension.encode(frame) output = io.BytesIO() # Prepare the header. head1 = ( (0b10000000 if frame.fin else 0) | (0b01000000 if frame.rsv1 else 0) | (0b00100000 if frame.rsv2 else 0) | (0b00010000 if frame.rsv3 else 0) | frame.opcode ) head2 = 0b10000000 if mask else 0 length = len(frame.data) if length < 126: output.write(struct.pack("!BB", head1, head2 | length)) elif length < 65536: output.write(struct.pack("!BBH", head1, head2 | 126, length)) else: output.write(struct.pack("!BBQ", head1, head2 | 127, length)) if mask: mask_bits = struct.pack("!I", random.getrandbits(32)) output.write(mask_bits) # Prepare the data. if mask: data = apply_mask(frame.data, mask_bits) else: data = frame.data output.write(data) # Send the frame. # The frame is written in a single call to writer in order to prevent # TCP fragmentation. See #68 for details. This also makes it safe to # send frames concurrently from multiple coroutines. writer(output.getvalue())
[docs] def check(frame) -> None: """ Check that reserved bits and opcode have acceptable values. :raises ~websockets.exceptions.ProtocolError: if a reserved bit or the opcode is invalid """ # The first parameter is called `frame` rather than `self`, # but it's the instance of class to which this method is bound. if frame.rsv1 or frame.rsv2 or frame.rsv3: raise ProtocolError("reserved bits must be 0") if frame.opcode in DATA_OPCODES: return elif frame.opcode in CTRL_OPCODES: if len(frame.data) > 125: raise ProtocolError("control frame too long") if not frame.fin: raise ProtocolError("fragmented control frame") else: raise ProtocolError(f"invalid opcode: {frame.opcode}")
[docs]def prepare_data(data: Data) -> Tuple[int, bytes]: """ Convert a string or byte-like object to an opcode and a bytes-like object. This function is designed for data frames. If ``data`` is a :class:`str`, return ``OP_TEXT`` and a :class:`bytes` object encoding ``data`` in UTF-8. If ``data`` is a bytes-like object, return ``OP_BINARY`` and a bytes-like object. :raises TypeError: if ``data`` doesn't have a supported type """ if isinstance(data, str): return OP_TEXT, data.encode("utf-8") elif isinstance(data, (bytes, bytearray)): return OP_BINARY, data elif isinstance(data, memoryview): if data.c_contiguous: return OP_BINARY, data else: return OP_BINARY, data.tobytes() else: raise TypeError("data must be bytes-like or str")
[docs]def encode_data(data: Data) -> bytes: """ Convert a string or byte-like object to bytes. This function is designed for ping and pong frames. If ``data`` is a :class:`str`, return a :class:`bytes` object encoding ``data`` in UTF-8. If ``data`` is a bytes-like object, return a :class:`bytes` object. :raises TypeError: if ``data`` doesn't have a supported type """ if isinstance(data, str): return data.encode("utf-8") elif isinstance(data, (bytes, bytearray)): return bytes(data) elif isinstance(data, memoryview): return data.tobytes() else: raise TypeError("data must be bytes-like or str")
[docs]def parse_close(data: bytes) -> Tuple[int, str]: """ Parse the payload from a close frame. Return ``(code, reason)``. :raises ~websockets.exceptions.ProtocolError: if data is ill-formed :raises UnicodeDecodeError: if the reason isn't valid UTF-8 """ length = len(data) if length >= 2: code, = struct.unpack("!H", data[:2]) check_close(code) reason = data[2:].decode("utf-8") return code, reason elif length == 0: return 1005, "" else: assert length == 1 raise ProtocolError("close frame too short")
[docs]def serialize_close(code: int, reason: str) -> bytes: """ Serialize the payload for a close frame. This is the reverse of :func:`parse_close`. """ check_close(code) return struct.pack("!H", code) + reason.encode("utf-8")
def check_close(code: int) -> None: """ Check that the close code has an acceptable value for a close frame. :raises ~websockets.exceptions.ProtocolError: if the close code is invalid """ if not (code in EXTERNAL_CLOSE_CODES or 3000 <= code < 5000): raise ProtocolError("invalid status code") # at the bottom to allow circular import, because Extension depends on Frame import websockets.extensions.base # isort:skip # noqa