Messaging
Learn details about using messaging services and outbox for asynchronous communications.
cds.MessagingService class
Class cds.MessagingService
and subclasses thereof are technical services representing asynchronous messaging channels. They can be used directly/low-level, or behind the scenes on higher-level service-to-service eventing.
class cds.MessagingService extends cds.Service
Declaring Events
In your CDS model, you can model events using the event
keyword inside services. Once you created the messaging
section in cds.requires
, all modeled events are automatically enabled for messaging.
You can then use the services to emit events (for your own service) or receive events (for external services).
Example:
In your package.json:
{
"cds": {
"requires": {
"ExternalService": {
"kind": "odata",
"model": "srv/external/external.cds"
},
"messaging": {
"kind": "enterprise-messaging"
}
}
}
}
{
"cds": {
"requires": {
"ExternalService": {
"kind": "odata",
"model": "srv/external/external.cds"
},
"messaging": {
"kind": "enterprise-messaging"
}
}
}
}
In srv/external/external.cds:
service ExternalService {
event ExternalEvent {
ID: UUID;
name: String;
}
}
service ExternalService {
event ExternalEvent {
ID: UUID;
name: String;
}
}
In srv/own.cds:
service OwnService {
event OwnEvent {
ID: UUID;
name: String;
}
}
service OwnService {
event OwnEvent {
ID: UUID;
name: String;
}
}
In srv/own.js:
module.exports = async srv => {
const externalService = await cds.connect.to('ExternalService')
externalService.on('ExternalEvent', async msg => {
await srv.emit('OwnEvent', msg.data)
})
}
module.exports = async srv => {
const externalService = await cds.connect.to('ExternalService')
externalService.on('ExternalEvent', async msg => {
await srv.emit('OwnEvent', msg.data)
})
}
Custom Topics with Declared Events
You can specify topics to modeled events using the @topic
annotation.
TIP
If no annotation is provided, the topic will be set to the fully qualified event name.
Example:
service OwnService {
@topic: 'my.custom/topic'
event OwnEvent { ID: UUID; name: String; }
}
service OwnService {
@topic: 'my.custom/topic'
event OwnEvent { ID: UUID; name: String; }
}
Emitting Events
To send a message to the message broker, you can use the emit
method on a transaction for the connected service.
Example:
const messaging = await cds.connect.to('messaging')
this.after(['CREATE', 'UPDATE', 'DELETE'], 'Reviews', async (_, req) => {
const { subject } = req.data
const { rating } = await cds.run(
SELECT.one(['round(avg(rating),2) as rating'])
.from(Reviews)
.where({ subject }))
// send to a topic
await messaging.emit('cap/msg/system/review/reviewed',
{ subject, rating })
// alternative if you want to send custom headers
await messaging.emit({ event: 'cap/msg/system/review/reviewed',
data: { subject, rating },
headers: { 'X-Correlation-ID': req.headers['X-Correlation-ID'] }})
})
const messaging = await cds.connect.to('messaging')
this.after(['CREATE', 'UPDATE', 'DELETE'], 'Reviews', async (_, req) => {
const { subject } = req.data
const { rating } = await cds.run(
SELECT.one(['round(avg(rating),2) as rating'])
.from(Reviews)
.where({ subject }))
// send to a topic
await messaging.emit('cap/msg/system/review/reviewed',
{ subject, rating })
// alternative if you want to send custom headers
await messaging.emit({ event: 'cap/msg/system/review/reviewed',
data: { subject, rating },
headers: { 'X-Correlation-ID': req.headers['X-Correlation-ID'] }})
})
TIP
The messages are sent once the transaction is successful. Per default, an in-memory outbox is used.See Messaging - Outbox for more information.
Receiving Events
To listen to messages from a message broker, you can use the on
method on the connected service. This also creates the necessary topic subscriptions.
Example:
const messaging = await cds.connect.to('messaging')
// listen to a topic
messaging.on('cap/msg/system/review/reviewed', msg => {
const { subject, rating } = msg.data
return cds.run(UPDATE(Books, subject).with({ rating }))
})
const messaging = await cds.connect.to('messaging')
// listen to a topic
messaging.on('cap/msg/system/review/reviewed', msg => {
const { subject, rating } = msg.data
return cds.run(UPDATE(Books, subject).with({ rating }))
})
Once all handlers are executed successfully, the message is acknowledged. If one handler throws an error, the message broker will be informed that the message couldn't be consumed properly and might send the message again. To avoid endless cycles, consider catching all errors.
If you want to receive all messages without creating topic subscriptions, you can register on '*'
. This is useful when consuming messages from a dead letter queue.
messaging.on('*', async msg => { /*...*/ })
messaging.on('*', async msg => { /*...*/ })
CloudEvents Protocol
CloudEvents is a commonly used specification for describing event data.
An example event looks like this:
{
"type": "sap.s4.beh.salesorder.v1.SalesOrder.Created.v1",
"specversion": "1.0",
"source": "/default/sap.s4.beh/ER9CLNT001",
"id": "0894ef45-7741-1eea-b7be-ce30f48e9a1d",
"time": "2020-08-14T06:21:52Z",
"datacontenttype": "application/json",
"data": {
"SalesOrder":"3016329"
}
}
{
"type": "sap.s4.beh.salesorder.v1.SalesOrder.Created.v1",
"specversion": "1.0",
"source": "/default/sap.s4.beh/ER9CLNT001",
"id": "0894ef45-7741-1eea-b7be-ce30f48e9a1d",
"time": "2020-08-14T06:21:52Z",
"datacontenttype": "application/json",
"data": {
"SalesOrder":"3016329"
}
}
To help you adhere to this standard, CAP prefills these header fields automatically. To enable this, you need to set the option format: 'cloudevents'
in your message broker.
Example:
{
cds: {
requires: {
messaging: {
kind: 'enterprise-messaging-shared',
format: 'cloudevents'
}
}
}
}
{
cds: {
requires: {
messaging: {
kind: 'enterprise-messaging-shared',
format: 'cloudevents'
}
}
}
}
You can always overwrite the default values.
Topic Prefixes
If you want the topics to start with a certain string, you can set a publish and/or a subscribe prefix in your message broker.
Example:
{
cds: {
requires: {
messaging: {
kind: 'enterprise-messaging-shared',
publishPrefix: 'default/sap.cap/books/',
subscribePrefix: 'default/sap.cap/reviews/'
}
}
}
}
{
cds: {
requires: {
messaging: {
kind: 'enterprise-messaging-shared',
publishPrefix: 'default/sap.cap/books/',
subscribePrefix: 'default/sap.cap/reviews/'
}
}
}
}
Topic Manipulations
SAP Event Mesh
If you specify your format to be cloudevents
, the following default prefixes are set:
{
publishPrefix: '$namespace/ce/',
subscribePrefix: '+/+/+/ce/'
}
{
publishPrefix: '$namespace/ce/',
subscribePrefix: '+/+/+/ce/'
}
In addition to that, slashes in the event name are replaced by dots and the source
header field is derived based on publishPrefix
.
Examples:
publishPrefix | derived source |
---|---|
my/own/namespace/ce/ | /my/own/namespace |
my/own.namespace/-/ce/ | /my/own.namespace |
Message Brokers
To safely send and receive messages between applications, you need a message broker in-between where you can create queues that listen to topics. All relevant incoming messages are first stored in those queues before they're consumed. This way messages aren't lost when the consuming application isn't available.
In CDS, you can configure one of the available broker services in your requires
section.
According to our grow as you go principle, it makes sense to first test your application logic without a message broker and enable it later. Therefore, we provide support for local messaging (if everything is inside one Node.js process) as well as file-based messaging.
Configuring Message Brokers
You must provide all necessary credentials by binding the message broker to your app.
For local environments, use cds bind
in a hybrid setup.
TIP
For local testing use kind
: enterprise-messaging-shared
to avoid the complexity of HTTP-based messaging.
SAP Event Mesh (Shared)
kind
: enterprise-messaging-shared
Use this if you want to communicate using SAP Event Mesh in a shared way.
If you register at least one handler, a queue will automatically be created if not yet existent. Keep in mind that unused queues aren't automatically deleted, this has to be done manually.
You have the following configuration options:
queue
: An object containing thename
property as the name of your queue, additional properties are described in section QueueP.amqp
: AQMP client options as described in the@sap/xb-msg-amqp-v100
documentation
If the queue name isn’t specified, it’s derived from application_name
and the first four characters of application_id
of your VCAP_APPLICATION
environmental variable, as well as the namespace
property of your SAP Event Mesh binding in VCAP_SERVICES
: {namespace}/{application_name}/{truncated_application_id}
. This makes sure that every application has its own queue.
Example:
{
"requires": {
"messaging": {
"kind": "enterprise-messaging-shared",
"queue": {
"name": "my/enterprise/messaging/queue",
"accessType": "EXCLUSIVE",
"maxMessageSizeInBytes": 19000000
},
"amqp": {
"incomingSessionWindow": 100
}
}
}
}
{
"requires": {
"messaging": {
"kind": "enterprise-messaging-shared",
"queue": {
"name": "my/enterprise/messaging/queue",
"accessType": "EXCLUSIVE",
"maxMessageSizeInBytes": 19000000
},
"amqp": {
"incomingSessionWindow": 100
}
}
}
}
❗ Warning
When using enterprise-messaging-shared
in a multitenant scenario, only the provider account will have an event bus. There is no tenant isolation.
TIP
You need to install the latest version of the NPM package @sap/xb-msg-amqp-v100
.
TIP
For optimal performance, you should set the correct access type. To make sure your server is not flooded with messages, you should set the incoming session window.
SAP Event Mesh
kind
: enterprise-messaging
This is the same as enterprise-messaging-shared
except that messages are transferred through HTTP. For incoming messages, a webhook is used. Since no permanent AMQP connection is required, this allows for multitenant scenarios.
Compared to enterprise-messaging-shared
you have the additional configuration option:
webhook
: An object containing thewaitingPeriod
property as the time in milliseconds until a webhook is created after the application is listening to incoming HTTP requests (default: 5000). Additional properties are described in theSubscription
object in SAP Event Mesh - REST APIs Messaging.
Example:
{
"requires": {
"messaging": {
"kind": "enterprise-messaging",
"queue": {
"name": "my/enterprise/messaging/queue",
"accessType": "EXCLUSIVE",
"maxMessageSizeInBytes": 19000000
},
"webhook": {
"waitingPeriod": 7000
}
}
}
}
{
"requires": {
"messaging": {
"kind": "enterprise-messaging",
"queue": {
"name": "my/enterprise/messaging/queue",
"accessType": "EXCLUSIVE",
"maxMessageSizeInBytes": 19000000
},
"webhook": {
"waitingPeriod": 7000
}
}
}
}
If your server is authenticated using XSUAA, you need to grant the scope $XSAPPNAME.emcallback
to SAP Event Mesh for it to be able to trigger the handshake and send messages.
In xs-security.json:
{
...,
"scopes": [
...,
{
"name": "$XSAPPNAME.emcallback",
"description": "Event Mesh Callback Access",
"grant-as-authority-to-apps": [
"$XSSERVICENAME(<SERVICE_NAME_OF_YOUR_EVENT_MESH_INSTANCE>)"
]
}
]
}
{
...,
"scopes": [
...,
{
"name": "$XSAPPNAME.emcallback",
"description": "Event Mesh Callback Access",
"grant-as-authority-to-apps": [
"$XSSERVICENAME(<SERVICE_NAME_OF_YOUR_EVENT_MESH_INSTANCE>)"
]
}
]
}
Make sure to add this to the service descriptor of your SAP Event Mesh instance:
{
...,
"authorities": [
"$ACCEPT_GRANTED_AUTHORITIES"
]
}
{
...,
"authorities": [
"$ACCEPT_GRANTED_AUTHORITIES"
]
}
WARNING
This will not work in the dev
plan of SAP Event Mesh.
WARNING
If you enable the cors middleware, handshake requests from SAP Event Mesh might be intercepted.
Redis PubSub (beta)
WARNING
This is a beta feature. Beta features aren't part of the officially delivered scope that SAP guarantees for future releases.
kind
: redis-messaging
Use Redis PubSub as a message broker.
There are no queues:
- Messages are lost when consumers are not available.
- All instances receive the messages independently.
TIP
You need to install the latest version of the NPM package redis
.
File Based
kind
: file-based-messaging
Don't use this in production, only if you want to test your application locally. It creates a file and uses it as a simple message broker.
You can have at most one consuming app per emitted event.
You have the following configuration options:
file
: You can set the file path (default is ~/.cds-msg-box).
Example:
{
"requires": {
"messaging": {
"kind": "file-based-messaging",
"file": "../msg-box"
}
}
}
{
"requires": {
"messaging": {
"kind": "file-based-messaging",
"file": "../msg-box"
}
}
}
Local Messaging
kind
: local-messaging
You can use local messaging to communicate inside one Node.js process. It's especially useful in your automated tests.
Composite-Messaging
kind
: composite-messaging
If you have several messaging services and don't want to mention them explicitly in your code, you can create a composite-messaging
service where you can define routes for incoming and outgoing messages. In those routes, you can use glob patterns to match topics (**
for any number of any character, *
for any number of any character except /
and .
, ?
for a single character).
Example:
{
"requires": {
"messaging": {
"kind": "composite-messaging",
"routes": {
"myEnterpriseMessagingReview": ["cap/msg/system/review/*"],
"myEnterpriseMessagingBook": ["**/book/*"]
}
},
"myEnterpriseMessagingReview": {
"kind": "enterprise-messaging",
"queue": {
"name": "cap/msg/system/review"
}
},
"myEnterpriseMessagingBook": {
"kind": "enterprise-messaging",
"queue": {
"name": "cap/msg/system/book"
}
}
}
}
{
"requires": {
"messaging": {
"kind": "composite-messaging",
"routes": {
"myEnterpriseMessagingReview": ["cap/msg/system/review/*"],
"myEnterpriseMessagingBook": ["**/book/*"]
}
},
"myEnterpriseMessagingReview": {
"kind": "enterprise-messaging",
"queue": {
"name": "cap/msg/system/review"
}
},
"myEnterpriseMessagingBook": {
"kind": "enterprise-messaging",
"queue": {
"name": "cap/msg/system/book"
}
}
}
}
module.exports = async srv => {
const messaging = await cds.connect.to('messaging')
messaging.on('book/repository/book/modified', msg => {
// comes from myEnterpriseMessagingBook
})
messaging.on('cap/msg/system/review/reviewed', msg => {
// comes from myEnterpriseMessagingReview
})
}
module.exports = async srv => {
const messaging = await cds.connect.to('messaging')
messaging.on('book/repository/book/modified', msg => {
// comes from myEnterpriseMessagingBook
})
messaging.on('cap/msg/system/review/reviewed', msg => {
// comes from myEnterpriseMessagingReview
})
}
Transactional Outbox
Usually the emit of messages should be delayed until the main transaction succeeded. Otherwise recipients will also receive messages in case of a rollback. To solve this problem, an outbox is used internally to defer the emit of messages until the success of the current transaction.
In-Memory Outbox (Default)
Per default, messages are emitted when the current transaction is successful. Until then, messages are kept in memory. This is similar to the following code if done manually:
cds.context.on('succeeded', () => this.emit(msg))
cds.context.on('succeeded', () => this.emit(msg))
WARNING
The message is lost if its emit fails, there is no retry mechanism. The app will crash if the error is identified as unrecoverable, for example in SAP Event Mesh if the used topic is forbidden.
Persistent Outbox
Using the persistent outbox, the to-be-emitted message is stored in a database table first. The same database transaction is used as for other operations, therefore transactional consistency is guaranteed.
To enable the persistent outbox globally for all deferrable services (that means cds.MessagingService and cds.AuditLogService
), you need to add the service outbox
of kind persistent-outbox
to the cds.requires
section.
{
"requires": {
"outbox": {
"kind": "persistent-outbox"
}
}
}
{
"requires": {
"outbox": {
"kind": "persistent-outbox"
}
}
}
The optional parameters are:
maxAttempts
(default20
): The number of unsuccessful emits until the message is ignored. It will still remain in the database table.chunkSize
(default100
): The number of messages which are read from the database table in one go.storeLastError
(defaulttrue
): Specifies if error information of the last failed emit should be stored in the outbox table.parallel
(defaultfalse
): Specifies if messages are sent in parallel (faster but the order isn't guaranteed).
Once the transaction succeeds, the messages are read from the database table and emitted. If an emit was successful, the respective message is deleted from the database table. If not, there will be retries after (exponentially growing) waiting times. After a maximum number of attempts, the message is ignored for processing and remains in the database table which therefore also acts as a dead letter queue. There is only one active message processor per service, tenant and app instance, hence there won't be duplicate emits except in the unlikely case of an app crash right after the emit and before the deletion of the message entry.
TIP
Some errors during the emit are identified as unrecoverable, for example in SAP Event Mesh if the used topic is forbidden. The respective message is then updated and the attempts
field is set to maxAttempts
to prevent further processing. Programming errors crash the server instance and must be fixed.
After adding the outbox
service to your package.json, your database model is automatically extended by the entity cds.outbox.Messages
, as follows:
using cuid from '@sap/cds/common';
namespace cds.outbox;
entity Messages : cuid {
timestamp: Timestamp;
target: String;
msg: LargeString;
attempts: Integer default 0;
partition: Integer default 0;
lastError: LargeString;
lastAttemptTimestamp: Timestamp @cds.on.update : $now;
}
using cuid from '@sap/cds/common';
namespace cds.outbox;
entity Messages : cuid {
timestamp: Timestamp;
target: String;
msg: LargeString;
attempts: Integer default 0;
partition: Integer default 0;
lastError: LargeString;
lastAttemptTimestamp: Timestamp @cds.on.update : $now;
}
TIP
In your CDS model, you can refer to the entity cds.outbox.Messages
using the path @sap/cds/srv/outbox
, for example to expose it in a service.
WARNING
- Make sure to redeploy your model.
- If the app crashes, another emit for the respective tenant and service is necessary to restart.
- The user id is stored to recreate the correct context.
To overwrite the outbox configuration for a particular service, you can specify the outbox
option.
Example:
{
"requires": {
"messaging": {
"kind": "enterprise-messaging",
"outbox": {
"maxAttempts": 10,
"chunkSize": 10
}
}
}
}
{
"requires": {
"messaging": {
"kind": "enterprise-messaging",
"outbox": {
"maxAttempts": 10,
"chunkSize": 10
}
}
}
}
Troubleshooting
Delete Entries in the Outbox Table
To manually delete entries in the table cds.outbox.Messages
, you can either expose it in a service or programmatically modify it using the cds.outbox.Messages
entity:
const db = await cds.connect.to('db')
const { Messages } = db.entities('cds.outbox')
await DELETE.from(Messages)
const db = await cds.connect.to('db')
const { Messages } = db.entities('cds.outbox')
await DELETE.from(Messages)
Outbox Table Not Found
If the outbox table is not found on the database, this can be caused by insufficient configuration data in package.json.
In case you have overwritten requires.db.model
there, make sure to add the outbox model path @sap/cds/srv/outbox
:
"requires": {
"db": { ...
"model": [..., "@sap/cds/srv/outbox"]
}
}
"requires": {
"db": { ...
"model": [..., "@sap/cds/srv/outbox"]
}
}
The following is only relevant if you're using @sap/cds version < 6.7.0 and you've configured options.model
in custom build tasks. Add the model path accordingly:
"build": {
"tasks": [{ ...
"options": { "model": [..., "@sap/cds/srv/outbox"] }
}]
}
"build": {
"tasks": [{ ...
"options": { "model": [..., "@sap/cds/srv/outbox"] }
}]
}
Note that model configuration isn't required for CAP projects using the standard project layout that contain the folders db
, srv
, and app
. In this case, you can delete the entire model
configuration.
Immediate Emit
To disable deferred emitting for a particular service, you can set the outbox
option of your service to false
:
{
"requires": {
"messaging": {
"kind": "enterprise-messaging",
"outbox": false
}
}
}
{
"requires": {
"messaging": {
"kind": "enterprise-messaging",
"outbox": false
}
}
}