Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

There is no retry logic for sending EDUs to application services #11150

Open
anoadragon453 opened this issue Oct 21, 2021 · 2 comments
Open

There is no retry logic for sending EDUs to application services #11150

anoadragon453 opened this issue Oct 21, 2021 · 2 comments
Labels
A-Application-Service Related to AS support T-Enhancement New features, changes in functionality, improvements in performance, or user-facing enhancements.

Comments

@anoadragon453
Copy link
Member

anoadragon453 commented Oct 21, 2021

When we send ephemeral events to appservices, we keep track of a stream token per appservice per EDU type. We do this for read receipts and presence updates (but not for typing, those are a little ephemeral to care about).

When a new read receipt comes through the server, we consider whether that should be sent to any connected application services. If so, we send those read receipts off. We then record the stream token of the read receipt for this appservice, even if we ended up determining that the appservice wasn't interested in the event (so that we wouldn't have to check that again later on).

This system works (although has scalability concerns), however there is no retry logic. We record the updated stream token even if sending to the application service fails:

elif stream_key == "receipt_key":
events = await self._handle_receipts(service)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)

submit_ephemeral_events_for_as kicks off a background task which logs and then ignores exceptions:

try:
await self.txn_ctrl.send(service, events, ephemeral)
except Exception:
logger.exception("AS request failed")

The stream token is stored and updated for this appservice immediately after the background task kicks off. It makes sense to do this so that we do not end up with duplicated work while processing subsequent calls, even if sending to the appservice is slow, however we still need a way of deprecated/holding off updating the stored stream id until a 200 is returned from the appservice. Only then should we mark the appservice as successfully having processed up to that stream token.

This doesn't matter so much with read receipts or presence updates, but will become much more important when we start passing things like device lists over this channel. A blip in the network will lead to decryption errors down the line.

Note that this behaviour for the current set of supported EDUs is intentional:

async def create_appservice_txn(
self,
service: ApplicationService,
events: List[EventBase],
ephemeral: List[JsonDict],
) -> AppServiceTransaction:
"""Atomically creates a new transaction for this application service
with the given list of events. Ephemeral events are NOT persisted to the
database and are not resent if a transaction is retried.

The stream tokens are stored in the application_services_state table, which has the schema:

synapse=# \d application_services_state
                   Table "public.application_services_state"
         Column         |         Type         | Collation | Nullable | Default 
------------------------+----------------------+-----------+----------+---------
 as_id                  | text                 |           | not null | 
 state                  | character varying(5) |           |          | 
 last_txn               | bigint               |           |          | 
 read_receipt_stream_id | bigint               |           |          | 
 presence_stream_id     | bigint               |           |          | 

Indexes:
    "application_services_state_pkey" PRIMARY KEY, btree (as_id)
@anoadragon453 anoadragon453 added the T-Enhancement New features, changes in functionality, improvements in performance, or user-facing enhancements. label Oct 21, 2021
anoadragon453 added a commit that referenced this issue Oct 21, 2021
@anoadragon453
Copy link
Member Author

As a slight clarification, we - perhaps purposefully - don't wait until the transaction has been sent to the application service before updating the stream token for a given service's stream_key (e.g presence_key), as doing so properly would mean that subsequent requests would need to wait until the previous request had finished before calling get_type_stream_id_for_appservice.

Instead, we should just keep track of failed requests and try them again in the background, backing off if necessary.

I'm not sure what we want to do in the case of a certain range of stream tokens failing for >24 hours. Something equivalent to federation catchup I suspect.

@anoadragon453
Copy link
Member Author

The current implementation of #11215 will only send 100 to-device messages at a time. Sending to-device messages to an AS is triggered by a new to-device message coming in. Thus, if there is a large backlog of to-device messages to send to the AS, they will only gradually get them as new to-device messages come in.

We can speed this process up by triggering a new transaction to the AS if we happen to hit this limit - though it is unclear whether this would overload the AS. Needs to be tried in practice I think.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
A-Application-Service Related to AS support T-Enhancement New features, changes in functionality, improvements in performance, or user-facing enhancements.
Projects
None yet
Development

No branches or pull requests

2 participants