Queueing with cds.queued
Overview
The task queue feature allows you to defer event processing.
A common use case is the outbox pattern, where remote operations are deferred until the main transaction has been successfully committed. This prevents accidental execution of remote calls in case the transaction is rolled back.
Every non-database CAP service can be queued, meaning that event dispatching becomes asynchronous.
TIP
The task queue feature can be disabled globally via cds.requires.queue = false
.
Queueing a Service
cds. queued (srv)
function cds.queued ( srv: Service, options? ) => QueuedService
Programmatically, you can get the queued service as follows:
const srv = await cds.connect.to('yourService')
const qd_srv = cds.queued(srv)
await qd_srv.emit('someEvent', { some: 'message' }) // asynchronous
await qd_srv.send('someEvent', { some: 'message' }) // asynchronous
await
needed
You still need to await
these operations because they're asynchronous. In case of a persistent queue, which is the default, messages are stored in the database, within the current transaction.
The cds.queued
function can also be called with optional configuration options.
const qd_srv = cds.queued(srv, { maxAttempts: 5 })
The persistent queue can only be used if it is not disabled globally via
cds.requires.queue = false
, as it requires a dedicated database table.
One-time configuration
Once you queued a service, you cannot override its configuration options again.
For backwards compatibility, cds.outboxed(srv)
works as a synonym.
cds. unqueued (srv)
function cds.unqueued ( srv: QueuedService ) => Service
Use this on a queued service to get back to the original service:
const srv = cds.unqueued(qd_srv)
This is useful if your service is outboxed (that is, queued) per configuration.
For backwards compatibility, cds.unboxed(srv)
works as a synonym.
Per Configuration
Some services are outboxed by default; these include cds.MessagingService
and cds.AuditLogService
. You can configure the outbox behavior by specifying the outboxed
option in your service configuration.
{
"requires": {
"yourService": {
"kind": "odata",
"outboxed": {
"maxAttempts": 5
}
}
}
}
For transactional safety, you're encouraged to use the persistent queue, which is enabled by default.
Persistent Queue (Default)
The persistent queue is the default configuration.
Using the persistent queue, the to-be-emitted message is stored in a database table within the current transaction, therefore transactional consistency is guaranteed.
You can use the following configuration options:
{
"requires": {
"queue": {
"kind": "persistent-queue",
"maxAttempts": 20,
"parallel": true,
"chunkSize": 10,
"storeLastError": true,
"legacyLocking": true,
"timeout": "1h"
}
}
}
The optional parameters are:
maxAttempts
(default20
): The number of unsuccessful emits until the message is considered unprocessable. The message will remain in the database table!parallel
(defaulttrue
): Specifies if messages are sent in parallel (faster, but the order isn't guaranteed).chunkSize
(default10
): The number of messages that are read from the database table in one go. Only applies forparallel !== false
.storeLastError
(defaulttrue
): Specifies whether error information of the last failed emit is stored in the tasks table.legacyLocking
(defaulttrue
): If set tofalse
, database locks are only used to set the status of the message toprocessing
to prevent long-kept database locks. Although this is the recommended approach, it is incompatible with task runners still on@sap/cds^8
.timeout
(default"1h"
): The time after which a message withstatus === "processing"
is considered to be abandoned and eligable to be processed again. Only forlegacyLocking === false
.
Once the transaction succeeds, the messages are read from the database table and dispatched. If processing was successful, the respective message is deleted from the database table. If processing failed, the system retries the message after exponentially increasing delays. After a maximum number of attempts, the message is ignored for processing and remains in the database, which therefore also acts as a dead letter queue. See Managing the Dead Letter Queue, to learn about how to handle such messages.
There is only one active message processor per service, tenant, app instance, and message. This ensures that no duplicate emits happen, except in the highly unlikely case of an app crash right after successful processing but before the message could be deleted.
Unrecoverable errors
Some errors during the emit are identified as unrecoverable, for example in SAP Event Mesh if the used topic is forbidden. The respective message is then updated and the attempts
field is set to maxAttempts
to prevent further processing. Programming errors crash the server instance and must be fixed. To mark your own errors as unrecoverable, you can set unrecoverable = true
on the error object.
Your database model is automatically extended by the entity cds.outbox.Messages
:
namespace cds.outbox;
entity Messages {
key ID : UUID;
timestamp : Timestamp;
target : String;
msg : LargeString;
attempts : Integer default 0;
partition : Integer default 0;
lastError : LargeString;
lastAttemptTimestamp : Timestamp @cds.on.update: $now;
status : String(23);
}
In your CDS model, you can refer to the entity cds.outbox.Messages
using the path @sap/cds/srv/outbox
, for example to expose it in a service (cf. Managing the Dead Letter Queue).
Known Limitations
- If the app crashes, another emit for the respective tenant and service is necessary to restart the message processing. It can be triggered manually using the
flush
method. - The service that handles the queued event must not rely on user roles and attributes, as they are not stored with the message. In other words, asynchronous task are always processed in a privileged mode. However, the user ID is stored to re-create the correct context.
Managing the Dead Letter Queue
You can manage the dead letter queue by implementing a service that exposes a read-only projection on entity cds.outbox.Messages
as well as bound actions to either revive or delete the respective message.
TIP
See Outbox Dead Letter Queue in the CAP Java documentation for additional considerations while we work on a general outbox guide.
1. Define the Service
using from '@sap/cds/srv/outbox';
@requires: 'internal-user'
service OutboxDeadLetterQueueService {
@readonly
entity DeadOutboxMessages as projection on cds.outbox.Messages
actions {
action revive();
action delete();
};
}
2. Filter for Dead Entries
As maxAttempts
is configurable, its value cannot be added as a static filter to projection DeadOutboxMessages
, but must be considered programmatically.
const cds = require('@sap/cds')
module.exports = class OutboxDeadLetterQueueService extends cds.ApplicationService {
async init() {
this.before('READ', 'DeadOutboxMessages', function (req) {
const { maxAttempts } = cds.env.requires.outbox
req.query.where('attempts >= ', maxAttempts)
})
await super.init()
}
}
3. Implement Bound Actions
Finally, entries in the dead letter queue can either be revived by resetting the number of attempts (that is, SET attempts = 0
) or deleted.
const cds = require('@sap/cds')
module.exports = class OutboxDeadLetterQueueService extends cds.ApplicationService {
async init() {
this.before('READ', 'DeadOutboxMessages', function (req) {
const { maxAttempts } = cds.env.requires.outbox
req.query.where('attempts >= ', maxAttempts)
})
this.on('revive', 'DeadOutboxMessages', async function (req) {
await UPDATE(req.subject).set({ attempts: 0 })
})
this.on('delete', 'DeadOutboxMessages', async function (req) {
await DELETE.from(req.subject)
})
await super.init()
}
}
Additional APIs alpha
Task Scheduling
You can use the schedule
method as a shortcut for cds.queued(srv).send()
, with optional scheduling options after
and every
:
await srv.schedule('someEvent', { some: 'message' })
await srv.schedule('someEvent', { some: 'message' }).after('1h') // after one hour
await srv.schedule('someEvent', { some: 'message' }).every('1h') // every hour after each processing
Task Processing
To manually trigger the message processing, for example if your server is restarted, you can use the flush
method.
const srv = await cds.connect.to('yourService')
cds.queued(srv).flush()
Task Callbacks
Once a message has been successfully processed, it triggers the <event>/#succeeded
handlers.
srv.after('someEvent/#succeeded', (data, req) => {
// `data` is the result of the event processor
console.log('Message successfully processed:', data)
})
Similarly, you can use the <event>/#failed
event to handle failed messages (once the maximum retry count is reached).
srv.after('someEvent/#failed', (data, req) => {
// `data` is the error from the event processor
console.log('Message could not be processed:', data)
})
Register on specific events
Event handlers have to be registered for these specific events. The *
wildcard handler is not called for these.
In-Memory Queue
You can enable the in-memory queue globally with:
{
"requires": {
"queue": {
"kind": "in-memory-queue"
}
}
}
Messages are emitted only after the current transaction is successfully committed. Until then, messages are only kept in memory. This is similar to the following code if done manually:
cds.context.on('succeeded', () => this.emit(msg))
No retry mechanism
The message is lost if the emit fails. There's no retry mechanism.
Immediate Emit
To disable deferred emitting for a particular service only, you can set the outboxed
option of that service to false
:
{
"requires": {
"messaging": {
"kind": "enterprise-messaging",
"outboxed": false
}
}
}
Troubleshooting
Delete Entries in the Messages Table
To manually delete entries in the table cds.outbox.Messages
, you can either expose it in a service, see Managing the Dead Letter Queue, or programmatically modify it using the cds.outbox.Messages
entity:
const db = await cds.connect.to('db')
await DELETE.from('cds.outbox.Messages')
Messages Table Not Found
If the messages table is not found on the database, this can be caused by insufficient configuration data in package.json.
In case you have overwritten requires.db.model
there, make sure to add the outbox model path @sap/cds/srv/outbox
:
"requires": {
"db": { ...
"model": [..., "@sap/cds/srv/outbox"]
}
}
The following is only relevant if you're using @sap/cds version < 6.7.0 and you've configured options.model
in custom build tasks. Add the model path accordingly:
"build": {
"tasks": [{ ...
"options": { "model": [..., "@sap/cds/srv/outbox"] }
}]
}
Note that model configuration isn't required for CAP projects using the standard project layout with db
, srv
, and app
folders. In this case, you can delete the entire model
configuration.