teal.websocket_dispatcher.utils
– Utilities for the TeaL websocket dispatcher#
- class teal.websocket_dispatcher.utils.BasicAuthBackend#
Bases:
AuthenticationBackend
Basic authentication backend, for decoding HTTP Basic credentials.
- class teal.websocket_dispatcher.utils.WebsocketDispatcher(*, amq_handler: AMQHandler)#
Bases:
object
Main class for dispatching websocket event pushes.
- amq_handler#
- async bind_callback_state(websocket: WebSocket, /, *, state: str) None #
Bind a websocket to a callback state.
- Parameters:
websocket – The websocket to bind.
state – The state to bind to.
- async bind_openid_ciba_request_identifier(websocket: WebSocket, /, *, request_id: str) None #
Bind a websocket to OpenID CIBA callbacks for a request identifier.
- Parameters:
websocket – The websocket to bind.
request_id – The request identifier to bind for.
- async bind_powens_domain(websocket: WebSocket, /, *, domain: str) None #
Bind a websocket to Powens webhook calls for a domain.
- Parameters:
websocket – The websocket to bind.
domain – The Powens domain to bind for.
- callback_state_by_websocket: defaultdict[WebSocket, set[str]]#
Set of callback states bound to a particular websocket.
- classmethod dispatcher_context(*, settings: Settings) AsyncIterator[WebsocketDispatcherType] #
Get a dispatcher in a context.
- Parameters:
settings – The settings to use.
- openid_ciba_request_id_by_websocket: defaultdict[WebSocket, set[str]]#
Set of OpenID CIBA callback request identifiers bound for a given websocket.
- powens_domain_by_websocket: defaultdict[WebSocket, set[str]]#
Set of Powens domain whose webhooks are sent to a websocket.
- async unbind_all(websocket: WebSocket, /) None #
Unbind everything from a websocket.
- Parameters:
websocket – The websocket to unbind.
- websocket_by_callback_state: defaultdict[str, set[WebSocket]]#
Set of websockets bound to a particular callback state.
- websocket_by_openid_ciba_request_id: defaultdict[str, set[WebSocket]]#
Set of websockets to which the callbacks should be sent for an OpenID CIBA callback request identifier.
- websocket_by_powens_domain: defaultdict[str, set[WebSocket]]#
Set of websockets to which the webhooks should be sent for a domain.