Skip to content
Search

    Messaging

    Learn details about using messaging services and outbox for asynchronous communications.

    cds.MessagingService class

    Class cds.MessagingService and subclasses thereof are technical services representing asynchronous messaging channels. They can be used directly/low-level, or behind the scenes on higher-level service-to-service eventing.

    class cds.MessagingService extends cds.Service

    Declaring Events

    In your CDS model, you can model events using the event keyword inside services. Once you created the messaging section in cds.requires, all modeled events are automatically enabled for messaging.

    You can then use the services to emit events (for your own service) or receive events (for external services).

    Example:

    In your package.json:

    {
      "cds": {
        "requires": {
          "ExternalService": {
            "kind": "odata",
            "model": "srv/external/external.cds"
          },
          "messaging": {
            "kind": "enterprise-messaging"
          }
        }
      }
    }
    

    In srv/external/external.cds:

    service ExternalService {
        event ExternalEvent {
            ID: UUID;
            name: String;
        }
    }
    

    In srv/own.cds:

    service OwnService {
        event OwnEvent {
            ID: UUID;
            name: String;
        }
    }
    

    In srv/own.js:

    module.exports = async srv => {
      const externalService = await cds.connect.to('ExternalService')
      externalService.on('ExternalEvent', async msg => {
        await srv.emit('OwnEvent', msg.data)
      })
    }
    

    Custom Topics with Declared Events

    You can specify topics to modeled events using the @topic annotation.

    If no annotation is provided, the topic will be set to the fully qualified event name.

    Example:

    service OwnService {
        @topic: 'my.custom/topic'
        event OwnEvent { ID: UUID; name: String; }
    }
    

    Emitting Events

    To send a message to the message broker, you can use the emit method on a transaction for the connected service.

    Example:

    const messaging = await cds.connect.to('messaging')
    
    this.after(['CREATE', 'UPDATE', 'DELETE'], 'Reviews', async (_, req) => {
      const { subject } = req.data
      const { rating } = await cds.run(
        SELECT.one(['round(avg(rating),2) as rating'])
        .from(Reviews)
        .where({ subject }))
    
      // send to a topic
      await messaging.emit('cap/msg/system/review/reviewed',
       { subject, rating })
    
      // alternative if you want to send custom headers
      await messaging.emit({ event: 'cap/msg/system/review/reviewed',
        data: { subject, rating },
        headers: { 'X-Correlation-ID': req.headers['X-Correlation-ID'] }})
    })
    

    The messages are sent once the transaction is successful. Per default, an in-memory outbox is used.See Messaging - Outbox for more information.

    Receiving Events

    To listen to messages from a message broker, you can use the on method on the connected service.

    Example:

    const messaging = await cds.connect.to('messaging')
    
    // listen to a topic
    messaging.on('cap/msg/system/review/reviewed', msg => {
      const { subject, rating } = msg.data
      return cds.run(UPDATE(Books, subject).with({ rating }))
    })
    

    Once all handlers are executed successfully, the message is acknowledged. If one handler throws an error, the message broker will be informed that the message couldn’t be consumed properly and might send the message again. To avoid endless cycles, consider catching all errors.

    CloudEvents Protocol

    CloudEvents is a commonly used specification for describing event data.

    An example event looks like this:

    {
      "type": "sap.s4.beh.salesorder.v1.SalesOrder.Created.v1",
      "specversion": "1.0",
      "source": "/default/sap.s4.beh/ER9CLNT001",
      "id": "0894ef45-7741-1eea-b7be-ce30f48e9a1d",
      "time": "2020-08-14T06:21:52Z",
      "datacontenttype": "application/json",
      "data": {
        "SalesOrder":"3016329"
      }
    }
    

    To help you adhere to this standard, CAP prefills these header fields automatically. To enable this, you need to set the option format: 'cloudevents' in your message broker.

    Example:

    {
      cds: {
        requires: {
          messaging: {
            kind: 'enterprise-messaging-shared',
            format: 'cloudevents'
          }
        }
      }
    }
    

    You can always overwrite the default values.

    Topic Prefixes

    If you want the topics to start with a certain string, you can set a publish and/or a subscribe prefix in your message broker.

    Example:

    {
      cds: {
        requires: {
          messaging: {
            kind: 'enterprise-messaging-shared',
            publishPrefix: 'default/sap.cap/books/',
            subscribePrefix: 'default/sap.cap/reviews/'
          }
        }
      }
    }
    

    Topic Manipulations

    SAP Event Mesh

    If you specify your format to be cloudevents, the following default prefixes are set:

    {
      publishPrefix: '$namespace/ce/',
      subscribePrefix: '+/+/+/ce/'
    }
    

    In addition to that, slashes in the event name are replaced by dots and the source header field is derived based on publishPrefix.

    Examples:

    publishPrefix derived source
    my/own/namespace/ce/ /my/own/namespace
    my/own.namespace/-/ce/ /my/own.namespace

    Message Brokers

    To safely send and receive messages between applications, you need a message broker in-between where you can create queues that listen to topics. All relevant incoming messages are first stored in those queues before they’re consumed. This way messages aren’t lost when the consuming application isn’t available.

    In CDS, you can configure one of the available broker services in your requires section.

    According to our grow as you go principle, it makes sense to first test your application logic without a message broker and enable it later. Therefore, we provide support for local messaging (if everything is inside one Node.js process) as well as file-based messaging.

    Configuring Message Brokers

    You must provide all necessary credentials by binding the message broker to your app.

    For local environments, use cds bind in a hybrid setup.

    For local testing use kind: enterprise-messaging-shared to avoid the complexity of HTTP-based messaging.

    SAP Event Mesh (Shared)

    kind: enterprise-messaging-shared

    Use this if you want to communicate using SAP Event Mesh in a shared way.

    If you register at least one handler, a queue will automatically be created if not yet existent. Keep in mind that unused queues aren’t automatically deleted, this has to be done manually.

    You have the following configuration options:

    • queue: An object containing the name property as the name of your queue, additional properties are described in section QueueP.
    • amqp: AQMP client options as described in the @sap/xb-msg-amqp-v100 documentation

    If the queue name isn’t specified, it’s derived from application_name and the first four characters of application_id of your VCAP_APPLICATION environmental variable, as well as the namespace property of your SAP Event Mesh binding in VCAP_SERVICES: {namespace}/{application_name}/{truncated_application_id}. This makes sure that every application has its own queue.

    Example:

    {
        "requires": {
            "messaging": {
                "kind": "enterprise-messaging-shared",
                "queue": {
                   "name": "my/enterprise/messaging/queue",
                   "accessType": "EXCLUSIVE",
                   "maxMessageSizeInBytes": 19000000
                },
                "amqp": {
                  "incomingSessionWindow": 100
                }
            }
        }
    }
    

    You need to install the latest version of @sap/xb-msg-amqp-v100.

    ❗ Warning When using enterprise-messaging-shared in a multitenancy scenario, only the provider account will have an event bus. There is no tenant isolation.

    SAP Event Mesh

    kind: enterprise-messaging

    This is the same as enterprise-messaging-shared except that messages are transferred through HTTP. For incoming messages, a webhook is used. Since no permanent AMQP connection is required, this allows for multitenant scenarios.

    Compared to enterprise-messaging-shared you have the additional configuration option:

    • webhook: An object containing the waitingPeriod property as the time in milliseconds until a webhook is created after the application is listening to incoming HTTP requests (default: 5000). Additional properties are described in the Subscription object in SAP Event Mesh - REST APIs Messaging.

    Example:

    {
        "requires": {
            "messaging": {
                "kind": "enterprise-messaging",
                "queue": {
                   "name": "my/enterprise/messaging/queue",
                   "accessType": "EXCLUSIVE",
                   "maxMessageSizeInBytes": 19000000
                },
                "webhook": {
                  "waitingPeriod": 7000
                }
            }
        }
    }
    
    

    If your server is authenticated using XSUAA, you need to grant the scope $XSAPPNAME.emcallback to SAP Event Mesh for it to be able to trigger the handshake and send messages.

    In xs-security.json:

    {
      ...,
      "scopes": [
        ...,
        {
          "name": "$XSAPPNAME.emcallback",
          "description": "Event Mesh Callback Access",
          "grant-as-authority-to-apps": [
            "$XSSERVICENAME(<SERVICE_NAME_OF_YOUR_EVENT_MESH_INSTANCE>)"
          ]
        }
      ]
    }
    

    Make sure to add this to the service descriptor of your SAP Event Mesh instance:

    {
      ...,
      "authorities": [
        "$ACCEPT_GRANTED_AUTHORITIES"
      ]
    }
    

    This will not work in the dev plan of SAP Event Mesh.

    If you enable the cors middleware, handshake requests from SAP Event Mesh might be intercepted.

    File Based

    kind: file-based-messaging

    Don’t use this in production, only if you want to test your application locally. It creates a file and uses it as a simple message broker.

    You can have at most one consuming app per emitted event.

    You have the following configuration options in the credentials section:

    • file: You can set the file path (default is ~/.cds-msg-box).

    Example:

    {
        "requires": {
            "messaging": {
                "kind": "file-based-messaging",
                "credentials": {
                  "file": "../msg-box"
                }
            }
        }
    }
    

    Local Messaging

    kind: local-messaging

    You can use local messaging to communicate inside one Node.js process. It’s especially useful in your automated tests.

    Composite-Messaging

    kind: composite-messaging

    If you have several messaging services and don’t want to mention them explicitly in your code, you can create a composite-messaging service where you can define routes for incoming and outgoing messages. In those routes, you can use glob patterns to match topics (** for any number of any character, * for any number of any character except / and ., ? for a single character).

    Example:

    {
      "requires": {
        "messaging": {
          "kind": "composite-messaging",
          "routes": {
            "myEnterpriseMessagingReview": ["cap/msg/system/review/*"],
            "myEnterpriseMessagingBook": ["**/book/*"]
          }
        },
        "myEnterpriseMessagingReview": {
          "kind": "enterprise-messaging",
          "queue": {
            "name": "cap/msg/system/review"
          }
        },
        "myEnterpriseMessagingBook": {
          "kind": "enterprise-messaging",
          "queue": {
            "name": "cap/msg/system/book"
          }
        }
      }
    }
    
    module.exports = async srv => {
      const messaging = await cds.connect.to('messaging')
    
      messaging.on('book/repository/book/modified', msg => {
        // comes from myEnterpriseMessagingBook
      })
    
      messaging.on('cap/msg/system/review/reviewed', msg => {
        // comes from myEnterpriseMessagingReview
      })
    }
    

    Outbox

    Usually the emit of messages (for cds.MessagingService and cds.AuditLogService) should be delayed until the main transaction succeeded. Otherwise recipients will also receive messages in case of a rollback. To solve this problem, an outbox is used internally to defer the emit of messages until the success of the current transaction.

    In-Memory Outbox (Default)

    Per default, messages are emitted when the current transaction is successful. Until then, messages are kept in memory. This is similar to the following code if done manually:

    cds.context.on('succeeded', () => this.emit(msg))
    

    The message is lost if its emit fails, there is no retry mechanism. The app will crash if the error is identified as unrecoverable, for example in SAP Event Mesh if the used topic is forbidden.

    Immediate Emit

    To disable deferred emitting for a particular service, you can set the outbox option of your service to false:

    {
      "requires": {
        "messaging": {
          "kind": "enterprise-messaging",
          "outbox": false
        }
      }
    }