NAV Navigation
cURL HTTP JavaScript Node.js Ruby Python Java Go

Welcome

We have language bindings in cURL, HTTP, JavaScript, Node.js, Ruby, Python, Java, and Go! You can view code examples in this dark area on the right, and you can switch the programming language of the examples with the tabs above.

Welcome to the Mambu Streaming API documentation! Our streaming API is independent from the existing Mambu API and WebHook infrastructure and allows you to subscribe to constant, real-time updates of activities within Mambu. If your connection is interrupted for any reason, your cursor position is saved so that when you re-connect, you will pick up exactly where you left off. Please read on to learn more about how to use the streaming API and for a full reference of each endpoint.

Java Sample Client

To help get you started quickly, a sample project (in Java) implementing a Streaming API client is available at https://github.com/mambu-gmbh/Streaming-API-Java-Sample-Client.

Using the Streaming API

Creating Subscriptions

Sample Request

curl -v -XPOST "https://demotenant.localhost:8889/api/v1/subscriptions" -H "Content-type: application/json" -d

Body Parameter

{
    "owning_application": "demo-app",
    "event_types": ["mrn.event.demotenant.streamingapi.client_approved"]
}

Request

A Subscription can be created by sending a POST request to the /api/v1/subscriptions resource.

Response

Sample Response

{
  "owning_application": "demo-app",
  "event_types": [
    "mrn.event.demotenant.streamingapi.client_approved"
  ],
  "consumer_group": "default",
  "read_from": "end",
  "id": "0691160a-b519-4595-b85c-a400fc73e963",
  "created_at": "2018-11-18T22:27:29.156Z"
}

The response returns the whole Subscription object that was created, including the server generated id field:

Deleting Subscriptions

Sample Request

curl -X DELETE /api/v1/subscriptions/071569bc-89f2-4b52-8277-6ed9614ffbb3

Request

A Subscription can be removed by sending a DELETE request to the /api/v1/subscriptions/{subscription_id} resource.

Response

Sample Response

{
    "title": "Not Found",
    "status": 404,
    "detail": "Subscription with id \"071569bc-89f2-4b52-8277-6ed9614ffbb3\" does not exist"
}

The possible responses for the delete are:

Consuming Events from a Subscription

Sample Request

curl -v -XGET "https://.demotenant.localhost:8889/api/v1/subscriptions/0691160a-b519-4595-b85c-a400fc73e963/events"

Request

Consuming events is done by sending a GET request to the Subscription’s event resource /api/v1/subscriptions/{subscription-id}/events.

Response

Sample Response

HTTP/1.1 200 OK
X-Mambu-StreamId: 93ae5174-b863-4f8f-ba33-d274854d1f3d
Transfer-Encoding: chunked
{ "cursor": {    "partition": "1",    "offset": "001-0001--1",    "event_type": "mrn.event.demo_tenant.streamingapi.client_approved",    "cursor_token": "1d23d044-daa1-414c-8ec4-a060ba03d77c"  },  "info": {    "debug": "Stream started"  }}
{  "cursor": {    "partition": "1",    "offset": "001-0001-000000000000000000",    "event_type": "mrn.event.demo_tenant.streamingapi.client_approved",    "cursor_token": "e8f6bc5d-65b7-4466-9d95-507da0e79d3c"  },  "events": [    {      "metadata": {        "occurred_at": "2018-11-18T22:50:12.602Z",        "eid": "fde622d5-f975-4786-8e1d-d328c29761f9",        "event_type": "mrn.event.demo_tenant.streamingapi.client_approved",        "partition": "1",        "received_at": "2018-11-18T22:50:12.786Z",        "flow_id": "jOK94UJFdtKsthWOgOJCwuGC",        "version": "1.0.0"      },      "occurred_at": "2018-11-18T22:50:12.602Z",      "body": "Client was approved notification body",      "content_type": "text/plain; charset=UTF-8",      "template_name": "Demo Template 1",      "category": "DATA"    }  ]}
{  "cursor": {    "partition": "1",    "offset": "001-0001-000000000000000000",    "event_type": "mrn.event.demo_tenant.streamingapi.client_approved",    "cursor_token": "5fed7de4-86ec-464f-92f0-5d6103869385"  }}

The response is a stream that groups events into JSON batches separated by an endline (\n) character. The output can be seen in the right panel.

Each batch contains the following fields:

Subscription Cursors

The cursors in the Subscription API have the structure visible to the right -->

Cursor Structure

{
  "partition": "1",
  "offset": "001-0001-000000000000000000",
  "event_type": "mrn.event.demo_tenant.streamingapi.client_approved",
  "cursor_token": "e8f6bc5d-65b7-4466-9d95-507da0e79d3c"
}

The fields are:

Committing Cursors

Sample Request

curl -v -XPOST "https://demotenant.localhost:8889/api/v1/subscriptions/0691160a-b519-4595-b85c-a400fc73e963/cursors"\
  -H "X-Mambu-StreamId: 93ae5174-b863-4f8f-ba33-d274854d1f3d" \
  -H "Content-type: application/json" \
  -d

Body Parameter

{
    "items": [
      {
        "partition": "1",
        "offset": "001-0001-000000000000000000",
        "event_type": "mrn.event.demo_tenant.streamingapi.client_approved",
        "cursor_token": "db4b4c27-f880-4406-a382-b057acf432cd"
      }    ]
  }

Request

Cursors can be committed by posting to Subscription’s cursor resource /api/v1/subscriptions/{subscriptionId}/cursors:

Response

The possible successful responses for a commit are:

The timeout for commit is 60 seconds. If you open the stream, read data, and don’t commit anything for 60 seconds - the stream connection will be closed from the Mambu side. Please note that if there are no events available to send and you get only empty batches - there is no need to commit; Mambu will close the connection only if there is some uncommitted data and no commits happened for 60 seconds.

If the connection is closed for any reason, the client still has 60 seconds to commit the events it received from the moment when the events were sent. After that, the session will be considered closed and it will be no longer be possible to do commits with that X-Mambu-StreamId. If the commit was not done - then the next time you start reading from a subscription you will get data from the last point of your commit, and you will again receive the events you haven’t committed.

Mambu Streaming API Definition v1.0.0

Scroll down for code samples, example requests and responses. Select a language for code samples from the tabs above or the mobile navigation menu.

The purpose of Streaming API is to provide high-performance and reliable mechanism to stream significant amount of data out of Mambu on a constant basis, without posing unnecessary pressure on API and WebHooks infrastructure.

In order to subscribe to Mambu notifications events and consume them, a dedicated API is exposed to be used by clients.

The typical workflow is:

  1. Create a Subscription specifying the topic names you want to read.
  2. Start reading batches of events from the subscription.
  3. Commit the cursors found in the event batches back to Mambu, which will store the offsets.

If the connection is closed, and later restarted, clients will get events from the point the last cursor commit. If you need more than one client for your subscription to distribute the load you can read the subscription with multiple clients, for more details please check the get events endpoint.

Endpoint Definitions

Add Subscription

Code samples

# You can also use wget
curl -X POST /api/v1/subscriptions \
  -H 'Content-Type: application/json' \
  -H 'Accept: application/json'
POST /api/v1/subscriptions HTTP/1.1

Content-Type: application/json
Accept: application/json
var headers = {
  'Content-Type':'application/json',
  'Accept':'application/json'

};

$.ajax({
  url: '/api/v1/subscriptions',
  method: 'post',

  headers: headers,
  success: function(data) {
    console.log(JSON.stringify(data));
  }
})

const fetch = require('node-fetch');
const inputBody = '{
  "owning_application": "demo",
  "event_types": [
    "mrn.event.demotenant.streamingapi.client_approved"
  ],
  "consumer_group": "read-product-updates",
  "read_from": "end",
  "initial_cursors": [
    {
      "partition": "1",
      "offset": "001-0001-000000000000000000",
      "event_type": "mrn.event.demotenant.streamingapi.client_approved"
    }
  ]
}';
const headers = {
  'Content-Type':'application/json',
  'Accept':'application/json'

};

fetch('/api/v1/subscriptions',
{
  method: 'POST',
  body: inputBody,
  headers: headers
})
.then(function(res) {
    return res.json();
}).then(function(body) {
    console.log(body);
});

require 'rest-client'
require 'json'

headers = {
  'Content-Type' => 'application/json',
  'Accept' => 'application/json'
}

result = RestClient.post '/api/v1/subscriptions',
  params: {
  }, headers: headers

p JSON.parse(result)

import requests
headers = {
  'Content-Type': 'application/json',
  'Accept': 'application/json'
}

r = requests.post('/api/v1/subscriptions', params={

}, headers = headers)

print r.json()

URL obj = new URL("/api/v1/subscriptions");
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
con.setRequestMethod("POST");
int responseCode = con.getResponseCode();
BufferedReader in = new BufferedReader(
    new InputStreamReader(con.getInputStream()));
String inputLine;
StringBuffer response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
    response.append(inputLine);
}
in.close();
System.out.println(response.toString());

package main

import (
       "bytes"
       "net/http"
)

func main() {

    headers := map[string][]string{
        "Content-Type": []string{"application/json"},
        "Accept": []string{"application/json"},

    }

    data := bytes.NewBuffer([]byte{jsonReq})
    req, err := http.NewRequest("POST", "/api/v1/subscriptions", data)
    req.Header = headers

    client := &http.Client{}
    resp, err := client.Do(req)
    // ...
}

POST /api/v1/subscriptions

This endpoint creates a subscription for EventTypes.

Body parameter

{
  "owning_application": "demo",
  "event_types": [
    "mrn.event.demotenant.streamingapi.client_approved"
  ],
  "consumer_group": "read-product-updates",
  "read_from": "end",
  "initial_cursors": [
    {
      "partition": "1",
      "offset": "001-0001-000000000000000000",
      "event_type": "mrn.event.demotenant.streamingapi.client_approved"
    }
  ]
}

Parameters

Parameter In Type Required Description
body body Subscription true Subscription to create

Example responses

200 Response

{
  "id": "0691160a-b519-4595-b85c-a400fc73e963",
  "owning_application": "demo",
  "event_types": [
    "mrn.event.demotenant.streamingapi.client_approved"
  ],
  "consumer_group": "read-product-updates",
  "created_at": "1996-12-19T16:39:57-08:00",
  "updated_at": "1996-12-19T16:39:57-08:00",
  "read_from": "end",
  "initial_cursors": [
    {
      "partition": "1",
      "offset": "001-0001-000000000000000000",
      "event_type": "mrn.event.demotenant.streamingapi.client_approved"
    }
  ]
}

Responses

Status Meaning Description Schema
200 OK Subscription for such parameters already exists. Returns subscription object that already existed. Subscription
201 Created Subscription was successfuly created. Returns subscription object that was created. Subscription
400 Bad Request Bad Request Problem
422 Unprocessable Entity Unprocessable Entity Problem

Response Headers

Status Header Type Format Description
200 Location string The relative URI for this subscription resource.
201 Location string The relative URI for the created resource.
201 Content-Location string If the Content-Location header is present and the same as the Location header the client can assume it has an up to date representation of the Subscription and a corresponding GET request is not needed.

Delete Subscription

Code samples

# You can also use wget
curl -X DELETE /api/v1/subscriptions/{subscription_id} \
  -H 'Accept: application/json' \
  -H 'apikey: API_KEY'

DELETE /api/v1/subscriptions/{subscription_id} HTTP/1.1

Accept: application/json

var headers = {
  'Accept':'application/json',
  'apikey':'API_KEY'

};

$.ajax({
  url: '/api/v1/subscriptions/{subscription_id}',
  method: 'delete',

  headers: headers,
  success: function(data) {
    console.log(JSON.stringify(data));
  }
})

const fetch = require('node-fetch');

const headers = {
  'Accept':'application/json',
  'apikey':'API_KEY'

};

fetch('/api/v1/subscriptions/{subscription_id}',
{
  method: 'DELETE',

  headers: headers
})
.then(function(res) {
    return res.json();
}).then(function(body) {
    console.log(body);
});

require 'rest-client'
require 'json'

headers = {
  'Accept' => 'application/json',
  'apikey' => 'API_KEY'
}

result = RestClient.delete '/api/v1/subscriptions/{subscription_id}',
  params: {
  }, headers: headers

p JSON.parse(result)

import requests
headers = {
  'Accept': 'application/json',
  'apikey': 'API_KEY'
}

r = requests.delete('/api/v1/subscriptions/{subscription_id}', params={

}, headers = headers)

print r.json()

URL obj = new URL("/api/v1/subscriptions/{subscription_id}");
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
con.setRequestMethod("DELETE");
int responseCode = con.getResponseCode();
BufferedReader in = new BufferedReader(
    new InputStreamReader(con.getInputStream()));
String inputLine;
StringBuffer response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
    response.append(inputLine);
}
in.close();
System.out.println(response.toString());

package main

import (
       "bytes"
       "net/http"
)

func main() {

    headers := map[string][]string{
        "Accept": []string{"application/json"},
        "apikey": []string{"API_KEY"},

    }

    data := bytes.NewBuffer([]byte{jsonReq})
    req, err := http.NewRequest("DELETE", "/api/v1/subscriptions/{subscription_id}", data)
    req.Header = headers

    client := &http.Client{}
    resp, err := client.Do(req)
    // ...
}

DELETE /api/v1/subscriptions/{subscription_id}

This endpoint deletes an existing subscription for event types.

Parameters

Parameter In Type Required Description
subscription_id path string true Id of subscription

Example responses

404 Response

{
    "title": "Not Found",
    "status": 404,
    "detail": "Subscription with id \"071569bc-89f2-4b52-8277-6ed9614ffbb3\" does not exist"
}

Responses

Status Meaning Description Schema
204 No Content Subscription for the given id has been successfully deleted. Subscription
404 Not Found Subscription for the given id was not found. Subscription

Events

Code samples

# You can also use wget
curl -X GET /api/v1/subscriptions/{subscription_id}/events \
  -H 'Accept: application/json' \
  -H 'X-Flow-Id: string'

GET /api/v1/subscriptions/{subscription_id}/events HTTP/1.1

Accept: application/json
X-Flow-Id: string

var headers = {
  'Accept':'application/json',
  'X-Flow-Id':'string'

};

$.ajax({
  url: '/api/v1/subscriptions/{subscription_id}/events',
  method: 'get',

  headers: headers,
  success: function(data) {
    console.log(JSON.stringify(data));
  }
})

const fetch = require('node-fetch');

const headers = {
  'Accept':'application/json',
  'X-Flow-Id':'string'

};

fetch('/api/v1/subscriptions/{subscription_id}/events',
{
  method: 'GET',

  headers: headers
})
.then(function(res) {
    return res.json();
}).then(function(body) {
    console.log(body);
});

require 'rest-client'
require 'json'

headers = {
  'Accept' => 'application/json',
  'X-Flow-Id' => 'string'
}

result = RestClient.get '/api/v1/subscriptions/{subscription_id}/events',
  params: {
  }, headers: headers

p JSON.parse(result)

import requests
headers = {
  'Accept': 'application/json',
  'X-Flow-Id': 'string'
}

r = requests.get('/api/v1/subscriptions/{subscription_id}/events', params={

}, headers = headers)

print r.json()

URL obj = new URL("/api/v1/subscriptions/{subscription_id}/events");
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
con.setRequestMethod("GET");
int responseCode = con.getResponseCode();
BufferedReader in = new BufferedReader(
    new InputStreamReader(con.getInputStream()));
String inputLine;
StringBuffer response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
    response.append(inputLine);
}
in.close();
System.out.println(response.toString());

package main

import (
       "bytes"
       "net/http"
)

func main() {

    headers := map[string][]string{
        "Accept": []string{"application/json"},
        "X-Flow-Id": []string{"string"},

    }

    data := bytes.NewBuffer([]byte{jsonReq})
    req, err := http.NewRequest("GET", "/api/v1/subscriptions/{subscription_id}/events", data)
    req.Header = headers

    client := &http.Client{}
    resp, err := client.Do(req)
    // ...
}

GET /api/v1/subscriptions/{subscription_id}/events

Starts a new stream for reading events from this subscription. The data will be automatically rebalanced between streams of one subscription.

Client Rebalancing

Parameters

Parameter In Type Required Description
subscription_id path string(uuid) true Id of subscription.
max_uncommitted_events query integer(int32) false The maximum number of uncommitted events that Mambu will stream before pausing the stream. When in paused state and commit comes - the stream will resume.
batch_limit query integer(int32) false Maximum number of Events in each chunk (and therefore per partition) of the stream.
stream_limit query integer(int32) false Maximum number of Events in this stream (over all partitions being streamed in this connection).
batch_flush_timeout query number(int32) false Maximum time in seconds to wait for the flushing of each chunk (per partition).
stream_timeout query number(int32) false Maximum time in seconds a stream will live before connection is closed by the server.
stream_keep_alive_limit query integer(int32) false Maximum number of empty keep alive batches to get in a row before closing the connection.
commit_timeout query number(int32) false Maximum amount of seconds that Mambu will be waiting for commit after sending a batch to a client.
X-Flow-Id header string false The flow id of the request, which is written into the logs and passed to called services. Helpful for operational troubleshooting and log analysis.

Detailed descriptions

max_uncommitted_events: The maximum number of uncommitted events that Mambu will stream before pausing the stream. When in paused state and commit comes - the stream will resume.

batch_limit: Maximum number of Events in each chunk (and therefore per partition) of the stream.

stream_limit: Maximum number of Events in this stream (over all partitions being streamed in this connection).

batch_flush_timeout: Maximum time in seconds to wait for the flushing of each chunk (per partition).

stream_timeout: Maximum time in seconds a stream will live before connection is closed by the server.

If this timeout is reached, any pending messages (in the sense of stream_limit) will be flushed to the client.

Stream initialization will fail if stream_timeout is lower than batch_flush_timeout.

If the stream_timeout is greater than max value (4200 seconds) - Mambu will treat this as not specifying stream_timeout.

stream_keep_alive_limit: Maximum number of empty keep alive batches to get in a row before closing the connection.

commit_timeout: Maximum amount of seconds that Mambu will be waiting for commit after sending a batch to a client.

In case if commit does not come within this timeout, Mambu will initialize stream termination, no new data will be sent. Partitions from this stream will be assigned to other streams.

Setting commit_timeout to 0 is equal to setting it to the maximum allowed value - 60 seconds.

In case low latency is needed, change the commit_timeout to a lower value (e.g. 5 seconds).

X-Flow-Id: The flow id of the request, which is written into the logs and passed to called services. Helpful for operational troubleshooting and log analysis.

Example responses

200 Response

{
  "cursor": {
    "partition": "1",
    "offset": "001-0001-000000000000000000",
    "event_type": "mrn.event.demotenant.streamingapi.client_approved",
    "cursor_token": "string"
  },
  "info": {},
  "events": [
    {
      "metadata": {
        "eid": "105a76d8-db49-4144-ace7-e683e8f4ba46",
        "event_type": "mrn.event.demotenant.streamingapi.client_approved",
        "occurred_at": "1996-12-19T16:39:57-08:00",
        "content_type": "text/plain; charset=UTF-8",
        "category": "string"
      },
      "body": "Client was modified. Activity type: CLIENT_SET_TO_INACTIVE. Date: 27-11-2018.\n",
      "template_name": "Client activity template"
    }
  ]
}

Responses

Status Meaning Description Schema
200 OK Stream started. Stream format is a continuous series of SubscriptionEventStreamBatchs separated by \n SubscriptionEventStreamBatch
400 Bad Request Bad Request Problem
403 Forbidden Access forbidden Problem
404 Not Found Subscription not found. Problem
409 Conflict Conflict. There are several possible reasons for receiving this status code: 1) There are no empty slots for this subscriptions. The amount of consumers for this subscription already equals the maximum value - the total number of partitions in this subscription. 2) Request to reset subscription cursors is still in progress. Problem

Response Headers

Status Header Type Format Description
200 X-Mambu-StreamId string The id of this stream generated by Mambu. Must be used for committing events that were read by client from this stream.

Cursors

Code samples

# You can also use wget
curl -X POST /api/v1/subscriptions/{subscription_id}/cursors \
  -H 'Content-Type: application/json' \
  -H 'Accept: application/json' \
  -H 'X-Mambu-StreamId: string'

POST /api/v1/subscriptions/{subscription_id}/cursors HTTP/1.1

Content-Type: application/json
Accept: application/json
X-Mambu-StreamId: string

var headers = {
  'Content-Type':'application/json',
  'Accept':'application/json',
  'X-Mambu-StreamId':'string'

};

$.ajax({
  url: '/api/v1/subscriptions/{subscription_id}/cursors',
  method: 'post',

  headers: headers,
  success: function(data) {
    console.log(JSON.stringify(data));
  }
})

const fetch = require('node-fetch');
const inputBody = '{
  "items": [
    {
      "partition": "1",
      "offset": "001-0001-000000000000000000",
      "event_type": "mrn.event.demotenant.streamingapi.client_approved",
      "cursor_token": "string"
    }
  ]
}';
const headers = {
  'Content-Type':'application/json',
  'Accept':'application/json',
  'X-Mambu-StreamId':'string'

};

fetch('/api/v1/subscriptions/{subscription_id}/cursors',
{
  method: 'POST',
  body: inputBody,
  headers: headers
})
.then(function(res) {
    return res.json();
}).then(function(body) {
    console.log(body);
});

require 'rest-client'
require 'json'

headers = {
  'Content-Type' => 'application/json',
  'Accept' => 'application/json',
  'X-Mambu-StreamId' => 'string'
}

result = RestClient.post '/api/v1/subscriptions/{subscription_id}/cursors',
  params: {
  }, headers: headers

p JSON.parse(result)

import requests
headers = {
  'Content-Type': 'application/json',
  'Accept': 'application/json',
  'X-Mambu-StreamId': 'string'
}

r = requests.post('/api/v1/subscriptions/{subscription_id}/cursors', params={

}, headers = headers)

print r.json()

URL obj = new URL("/api/v1/subscriptions/{subscription_id}/cursors");
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
con.setRequestMethod("POST");
int responseCode = con.getResponseCode();
BufferedReader in = new BufferedReader(
    new InputStreamReader(con.getInputStream()));
String inputLine;
StringBuffer response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
    response.append(inputLine);
}
in.close();
System.out.println(response.toString());

package main

import (
       "bytes"
       "net/http"
)

func main() {

    headers := map[string][]string{
        "Content-Type": []string{"application/json"},
        "Accept": []string{"application/json"},
        "X-Mambu-StreamId": []string{"string"},

    }

    data := bytes.NewBuffer([]byte{jsonReq})
    req, err := http.NewRequest("POST", "/api/v1/subscriptions/{subscription_id}/cursors", data)
    req.Header = headers

    client := &http.Client{}
    resp, err := client.Do(req)
    // ...
}

POST /api/v1/subscriptions/{subscription_id}/cursors

Endpoint for committing offsets of the subscription.

Body parameter

{
  "items": [
    {
      "partition": "1",
      "offset": "001-0001-000000000000000000",
      "event_type": "mrn.event.demotenant.streamingapi.client_approved",
      "cursor_token": "string"
    }
  ]
}

Parameters

Parameter In Type Required Description
subscription_id path string true Id of subscription
X-Mambu-StreamId header string true Id of stream which client uses to read events. It is not possible to make a commit for a terminated or none-existing stream.
body body object false none
» items body [allOf] true List of cursors that the consumer acknowledges to have successfully processed.
»» anonymous body object false none
»»» partition body string true Id of the partition pointed to by this cursor.
»»» offset body string true Offset of the event being pointed to.
»» anonymous body object false none
»»» event_type body string true The name of the event type this partition's events belong to.
»»» cursor_token body string true An opaque value defined by the server.

Detailed descriptions

X-Mambu-StreamId: Id of stream which client uses to read events. It is not possible to make a commit for a terminated or none-existing stream. Also the client can't commit something which was not sent to his stream.

»»» offset: Offset of the event being pointed to. Note that if you want to specify beginning position of a stream with first event at offset N, you should specify offset N-1.

This applies in cases when you create new subscription or reset subscription offsets.

Also for stream start offsets one can use two special values:

Example responses

200 Response

{
  "items": [
    {
      "cursor": {
        "partition": "1",
        "offset": "001-0001-000000000000000000",
        "event_type": "mrn.event.demotenant.streamingapi.client_approved",
        "cursor_token": "string"
      },
      "result": "string"
    }
  ]
}

Responses

Status Meaning Description Schema
200 OK At least one cursor which was tried to be committed is older or equal to already committed one. Array of commit results is returned for this status code. Inline
204 No Content Offsets were committed None
403 Forbidden Access forbidden Problem
404 Not Found Subscription not found Problem
422 Unprocessable Entity Unprocessable Entity Problem

Response Schema

Status Code 200

Name Type Required Restrictions Description
» items [CursorCommitResult] true none list of items which describe commit result for each cursor
»» cursor any true none none

allOf

Name Type Required Restrictions Description
»»» anonymous object false none none
»»»» partition string true none Id of the partition pointed to by this cursor.
»»»» offset string true none Offset of the event being pointed to. Note that if you want to specify beginning position of a stream with first event at offset N, you should specify offset N-1. This applies in cases when you create new subscription or reset subscription offsets. Also for stream start offsets one can use two special values: - begin - read from the oldest available event. - end - read from the most recent offset.

and

Name Type Required Restrictions Description
»»» anonymous object false none none
»»»» event_type string true none The name of the event type this partition's events belong to.
»»»» cursor_token string true none An opaque value defined by the server.

continued

Name Type Required Restrictions Description
»»» result string true none The result of cursor commit. - committed: cursor was successfully committed - outdated: there already was more recent (or the same) cursor committed, so the current one was not committed as it is outdated

Schemas

Event

{
  "metadata": {
    "eid": "105a76d8-db49-4144-ace7-e683e8f4ba46",
    "event_type": "mrn.event.demotenant.streamingapi.client_approved",
    "occurred_at": "1996-12-19T16:39:57-08:00",
    "content_type": "text/plain; charset=UTF-8",
    "category": "string"
  },
  "body": "Client was modified. Activity type: CLIENT_SET_TO_INACTIVE. Date: 27-11-2018.\n",
  "template_name": "Client activity template"
}

Payload of an Event. Usually represents a status transition in a Business process.

Properties

Name Type Required Restrictions Description
metadata EventMetadata true none Metadata for this Event.
body string true none Actual content of the notification.
template_name string true none Name of the notification template.

EventMetadata

{
  "eid": "105a76d8-db49-4144-ace7-e683e8f4ba46",
  "event_type": "mrn.event.demotenant.streamingapi.client_approved",
  "occurred_at": "1996-12-19T16:39:57-08:00",
  "content_type": "text/plain; charset=UTF-8",
  "category": "string"
}

Metadata for this Event.

Properties

Name Type Required Restrictions Description
eid string(uuid) true none Unique identifier of this Event. Consumers MIGHT use this value to assert uniqueness of reception of the Event.
event_type string true none The EventType of this Event.
occurred_at string(RFC 3339 date-time) true none Timestamp of creation of the Event generated by Mambu.
content_type string true none Notification content format.
category string true none Indicates if the content of the notification can be configured in Mambu or it is fixed. Currently only one category is supported: DATA

Enumerated Values

Property Value
content_type text/plain; charset=UTF-8
content_type application/json
content_type application/xml

Problem

{
  "type": "http://httpstatus.es/503",
  "title": "Service Unavailable",
  "status": 503,
  "detail": "Connection to database timed out",
  "instance": "http://example.com"
}

Properties

Name Type Required Restrictions Description
type string(uri) true none An absolute URI that identifies the problem type. When dereferenced, it SHOULD provide human-readable API documentation for the problem type (e.g., using HTML).
title string true none A short, summary of the problem type. Written in English and readable for engineers (usually not suited for non technical stakeholders and not localized).
status integer(int32) true none The HTTP status code generated by the origin server for this occurrence of the problem.
detail string false none A human readable explanation specific to this occurrence of the problem.
instance string(uri) false none An absolute URI that identifies the specific occurrence of the problem. It may or may not yield further information if dereferenced.

StreamInfo

{}

This object contains general information about the stream. Used only for debugging purposes. We recommend logging this object in order to solve connection issues. Clients should not parse this structure.

Properties

None

Cursor

{
  "partition": "1",
  "offset": "001-0001-000000000000000000"
}

Properties

Name Type Required Restrictions Description
partition string true none Id of the partition pointed to by this cursor.
offset string true none Offset of the event being pointed to. Note that if you want to specify beginning position of a stream with first event at offset N, you should specify offset N-1. This applies in cases when you create new subscription or reset subscription offsets. Also for stream start offsets one can use two special values: - begin - read from the oldest available event. - end - read from the most recent offset.

SubscriptionCursor

{
  "partition": "1",
  "offset": "001-0001-000000000000000000",
  "event_type": "mrn.event.demotenant.streamingapi.client_approved",
  "cursor_token": "string"
}

Properties

allOf

Name Type Required Restrictions Description
anonymous Cursor false none none

and

Name Type Required Restrictions Description
anonymous object false none none
» event_type string true none The name of the event type this partition's events belong to.
» cursor_token string true none An opaque value defined by the server.

SubscriptionCursorWithoutToken

{
  "partition": "1",
  "offset": "001-0001-000000000000000000",
  "event_type": "mrn.event.demotenant.streamingapi.client_approved"
}

Properties

allOf

Name Type Required Restrictions Description
anonymous Cursor false none none

and

Name Type Required Restrictions Description
anonymous object false none none
» event_type string true none The name of the event type this partition's events belong to.

CursorCommitResult

{
  "cursor": {
    "partition": "1",
    "offset": "001-0001-000000000000000000",
    "event_type": "mrn.event.demotenant.streamingapi.client_approved",
    "cursor_token": "string"
  },
  "result": "string"
}

The result of single cursor commit. Holds a cursor itself and a result value.

Properties

Name Type Required Restrictions Description
cursor SubscriptionCursor true none none
result string true none The result of cursor commit. - committed: cursor was successfully committed - outdated: there already was more recent (or the same) cursor committed, so the current one was not committed as it is outdated

SubscriptionEventStreamBatch

{
  "cursor": {
    "partition": "1",
    "offset": "001-0001-000000000000000000",
    "event_type": "mrn.event.demotenant.streamingapi.client_approved",
    "cursor_token": "string"
  },
  "info": {},
  "events": [
    {
      "metadata": {
        "eid": "105a76d8-db49-4144-ace7-e683e8f4ba46",
        "event_type": "mrn.event.demotenant.streamingapi.client_approved",
        "occurred_at": "1996-12-19T16:39:57-08:00",
        "content_type": "text/plain; charset=UTF-8",
        "category": "string"
      },
      "body": "Client was modified. Activity type: CLIENT_SET_TO_INACTIVE. Date: 27-11-2018.\n",
      "template_name": "Client activity template"
    }
  ]
}

One chunk of events in a stream. A batch consists of an array of Events plus a Cursor pointing to the offset of the last Event in the stream.

The size of the array of Event is limited by the parameters used to initialize a Stream.

Sequential batches might present repeated cursors if no new events have arrived.

Properties

Name Type Required Restrictions Description
cursor SubscriptionCursor true none none
info StreamInfo false none This object contains general information about the stream. Used only for debugging purposes. We recommend logging this object in order to solve connection issues. Clients should not parse this structure.
events [Event] false none [Payload of an Event. Usually represents a status transition in a Business process.]

Subscription

{
  "id": "0691160a-b519-4595-b85c-a400fc73e963",
  "owning_application": "demo",
  "event_types": [
    "mrn.event.demotenant.streamingapi.client_approved"
  ],
  "consumer_group": "read-product-updates",
  "created_at": "1996-12-19T16:39:57-08:00",
  "updated_at": "1996-12-19T16:39:57-08:00",
  "read_from": "end",
  "initial_cursors": [
    {
      "partition": "1",
      "offset": "001-0001-000000000000000000",
      "event_type": "mrn.event.demotenant.streamingapi.client_approved"
    }
  ]
}

Subscription is a high level consumption unit. Subscriptions allow applications to easily scale the number of clients by managing consumed event offsets and distributing load between instances.

The key properties that identify subscription are owning_application, event_types and consumer_group.

It's not possible to have two different subscriptions with these properties being the same.

Properties

Name Type Required Restrictions Description
id string false read-only Id of subscription that was created. Is generated by Mambu, should not be specified when creating a subscription.
owning_application string true none The id of application owning the subscription.
event_types [string] true none EventTypes to subscribe to. The order is not important. Subscriptions that differ only by the order of EventTypes will be considered the same and will have the same id. The size of event_types list is limited by total number of partitions within these event types. Default limit for partition count is 100.
consumer_group string false none The value describing the use case of this subscription. In general that is an additional identifier used to differ subscriptions having the same owning_application and event_types.
created_at string(RFC 3339 date-time) false read-only Timestamp of creation of the subscription. This is generated by Mambu. It should not be specified when creating subscription and sending it may result in a client error.
updated_at string(RFC 3339 date-time) false read-only Timestamp of last update of the subscription. This is generated by Mambu. It should not be specified when creating subscription and sending it may result in a client error. Its initial value is same as created_at.
read_from string false none Position to start reading events from. Currently supported values: - begin - read from the oldest available event. - end - read from the most recent offset. - cursors - read from cursors provided in initial_cursors property. Applied when the client starts reading from a subscription.
initial_cursors [SubscriptionCursorWithoutToken] false none List of cursors to start reading from. This property is required when read_from = cursors. The initial cursors should cover all partitions of subscription. Clients will get events starting from next offset positions.

Document History

2019-09-20

Added information about deleting subscriptions.

2019-03-21

Added /api/v1/ prefix to endpoints.

2019-01-25

Text edits for clarity and heading changes for ease of navigation.

2019-01-23

Document is created from the most recent swagger definition of the streaming API, V1.0.0.