teal.amq – Message queue related utilities for TeaL#

class teal.amq.AMQExchangeName(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#

Bases: str, Enum

Exchange names for TeaL AMQ messages.

CALLBACKS = 'callbacks'#

Callback messages.

CallbackMessage objects are sent on this exchange, and bound by callback state.

OPENID_CIBA_CALLBACKS = 'openid_ciba_callbacks'#

OpenID CIBA callback messages, for ping/push flows.

OpenIDCIBACallbackMessage objects are sent on this exchange, and bound by authentication request identifier.

POWENS_DOMAIN_WEBHOOKS = 'powens_domain_webhooks'#

Powens domain webhook messages.

PowensWebhookMessage objects are sent on this exchange, and bound by Powens domain.

class teal.amq.AMQHandler(*, connection: AsyncContextManager, channel: AbstractChannel, exchange_prefix: str)#

Bases: object

AMQ Handler for dispatchers and listeners.

async bind_callback_state(state: str, /) None#

Bind a callback state.


state – The state to bind.

async bind_openid_ciba_request_identifier(request_id: str, /) None#

Bind events regarding an OpenID Connect CIBA authentication request.


request_id – The identifier of the OpenID Connect CIBA request to bind to events for.

async bind_powens_domain_webhooks(domain: str, /) None#

Bind Powens webhooks for a given Powens domain.


domain – The domain to get events for.

callback: Callable[[Message], Any] | None#

The callback to call with a message.

async get_exchange(name: str, /, *, type_: ExchangeType) AbstractExchange#

Get the exchange, declare if necessary.

  • name – The name of the exchange.

  • exchange_type – The exchange type.


The exchange.

async get_queue() AbstractQueue#

Get the queue for receiving messages.

classmethod handler_context(*, settings: AMQSettings) AsyncIterator[AMQHandlerType]#

Get a handler in a context.


settings – The settings to base ourselves on.

async process(incoming_message: AbstractIncomingMessage, /) None#

Process an incoming message.


message – The incoming message to process.

async send(message: Message, /) None#

Send a message on the given context.


message – The message to send.

class teal.amq.AMQSettings(_env_file: str | PathLike | List[str | PathLike] | Tuple[str | PathLike, ...] | None = '<object object>', _env_file_encoding: str | None = None, _env_nested_delimiter: str | None = None, _secrets_dir: str | PathLike | None = None, *, amqp_dsn: AmqpDsn, amq_exchange_prefix: str = '')#

Bases: BaseSettings

RabbitMQ related settings.

amq_exchange_prefix: str#

AMQP exchange name prefix.

This is mostly useful for cases where TeaL share the same AMQP server for multiple applications.

amqp_dsn: AmqpDsn#

AMQP connection URI to use.

An example AMQP URI for localhost is the following:


See the RabbitMQ URI Specification for more information.

class teal.amq.CallbackMessage(*, timestamp: datetime = None, url: str, state: str)#

Bases: Message

Body for a callback message in the message queue.

state: str#

State for which the callback is emitted.

url: str#

Resulting callback URL with parameters and fragment.

class teal.amq.Message(*, timestamp: datetime = None)#

Bases: BaseModel

Base message.

timestamp: datetime#

UTC date and time at which the message has been sent.

class teal.amq.OpenIDCIBACallbackMessage(*, timestamp: datetime = None, request_id: str, access_token: str | None = None, push_token: OpenIDCIBAPushToken | None = None, push_error: OpenIDCIBAPushError | None = None)#

Bases: Message

Body for an OpenID CIBA request notification.

access_token: str | None#

The access token used to authenticate the ping/push callback.

push_error: OpenIDCIBAPushError | None#

The error data, in case the callback is a push error.

push_token: OpenIDCIBAPushToken | None#

The token data, in case the callback is a push token.

request_id: str#

Authentication request identifier.

class teal.amq.OpenIDCIBAPushError(*, error: str, error_description: str | None = None)#

Bases: BaseModel

Error data content for an OpenID CIBA Push Error Callback.

error: str#

The error code.

error_description: str | None#

The optional error description.

class teal.amq.OpenIDCIBAPushToken(*, access_token: str, token_type: str, refresh_token: str, expires_at: datetime, id_token: str | None = None)#

Bases: BaseModel

Token data content for an OpenID CIBA Push Callback.

access_token: str#

The provided access token, if relevant.

expires_at: datetime#

The number of seconds in which the provided token expires.

id_token: str | None#

The OpenID token identifier.

refresh_token: str#

The provided refresh token, if relevant.

token_type: str#

The provided access token type, if relevant.

class teal.amq.PowensHMACSignature(*, signature: str, payload_prefix: str, signature_date: datetime | None = None)#

Bases: BaseModel

HMAC signature date for Powens webhook body.

The signature is computed by using the following data:

        <METHOD> + "." + <ENDPOINT> + "." + <DATE> + "." + <PAYLOAD>,


  • METHOD is the HTTP method in uppercase.

  • ENDPOINT is the HTTP request path, e.g. “/my-webhook-listener”

  • DATE is the raw “BI-Signature-Date” header.

  • PAYLOAD is the raw webhook data payload.

payload_prefix: str#

The computed prefix to prepend the payload with for computing.

signature: str#

The computed signature on Powens’ end.

signature_date: datetime | None#

The date and time at which the signature has been produced at.

class teal.amq.PowensWebhookMessage(*, timestamp: datetime = None, domain: str, event: str, hmac_signature: PowensHMACSignature | None = None, user_token: str | None = None, payload: str)#

Bases: Message

Body for a Powens webhook message in the message queue.

domain: str#

Fully qualified domain for which the webhook is emitted.

event: str#

Event for which the webhook is emitted.

hmac_signature: PowensHMACSignature | None#

The HMAC signature, if present.

payload: str#

The UTF-8 decoded payload.

user_token: str | None#

User scoped token with which the webhook is authenticated.