Ambar User Guide

Table of Contents

1 Introduction

Ambar is a data streaming service that empowers you to build mission-critical real-time applications in minutes. Instead of producing to and consuming from message brokers, Ambar pulls records from databases and pushes records to application endpoints.

To use Ambar, you provide three things:

  • Data Sources: durable storage containing record streams, e.g., an append only table in your relational database where each row is one record.
  • Filters: logical specifications that select subsets of records from data sources.
  • Data Destinations: endpoints in your application that will process your record streams.

Ambar provides two convenient methods for providing these things: an API for imperative management, and a Terraform provider for declarative management.

This user guide contains all the necessary information to get started with Ambar.

2 Data Sources

Conceptually, an Ambar data source contains records which will be delivered to data destinations. All records belong to some sequence which is identified by a property of the record called the sequence id. Records contain an arbitrary number of fields which store application relevant data. If you are familiar with other messaging platforms, you can think of a sequence as a stream where the sequence id is simply a partitioning key that uniquely identifies that stream.

Individual record sequences must be updated in an append-only fashion. In other words, for a given sequence of records, new records can be appended to that sequence, but existing records must not be removed or modified. Additionally, new records must not be inserted into the middle of an existing sequence. Distinct record sequences may be appended to concurrently.

Depending on the type of data source, Ambar requires certain configuration constraints. These constraints are described for each data source type in the following sections.

2.1 MySQL

For MySQL data sources, Ambar supports sequence ids stored in columns of any type. Ambar also supports sequence ids which are embedded in JSON columns.

The order of records in their containing sequences is inferred using an auto-incrementing column that is shared among all the sequences. For example, the following table encodes six records distributed across two sequences (a and b), each of which has a single field. In this case, the sequence with id a contains records with fields cat, dog, and tiger (in that order), and the sequence with id b contains records with fields chair, table, and table (in that order).

sequence_id auto_incrementing_column field_1
a 1 cat
b 2 chair
b 3 table
a 4 dog
a 5 tiger
b 6 table

Ambar reads from MySQL both directly through its SQL interface, and from the binary log. As such, the binary log must be enabled, and configured to use the row-based logging format type. The format type must not be changed during operation (it must always be row-based). Ambar requires binary logs to be retained for at least seven days. The binlog_error_action system variable must be set to ABORT_SERVER (this is the default setting). The binlog_encryption system variable must be set to OFF. The sync_binlog and innodb_flush_log_at_trx_commit options must both be set to 1. The source table must support transactions (i.e., it needs to use the InnoDB engine).

Ambar requires a dedicated user account on the source database. That user account needs to have the REPLICATION CLIENT, REPLICATION SLAVE, and SELECT privileges on the source table. A single user account may be used for multiple data sources.

2.1.1 Version Compatibility

Ambar supports MySQL servers running versions greater than or equal to 5.7.9 (for the 5.7 series), and greater than or equal to 8.0.11 (for the 8.0 series).

2.1.2 Note for Amazon RDS Users

The following configuration items are required:

binlog_format = row
log_bin = /var/lib/mysql/mysql-bin.log
expire_logs_days = 7

On RDS, retention (expire_log_days) is set with the following command:

CALL mysql.rds_set_configuration('binlog retention hours', 24);

Creation of a user with the appropriate grants can be performed with the following sequence of commands:

CREATE USER 'projections_user'@'%' IDENTIFIED BY 'PASSWORD_GOES_HERE_CHANGE_ME';
GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'projections_user'@'%';
GRANT SELECT ON *.* TO 'projections_user'@'%';
GRANT SELECT ON mysql.rds_heartbeat2 TO 'projections_user'@'%';
GRANT SELECT ON mysql.rds_configuration TO 'projections_user'@'%';
GRANT EXECUTE ON PROCEDURE mysql.rds_kill TO 'projections_user'@'%';

2.1.3 References

For information about the binary log and row based logging, please see the following links: row-based-logging, binary-log.

For information about binary log retention settings, please see the following link: binary-log-retention.

For information about binlog synchronization and InnoDB flushing, please see the following links: binlog-synchronization, InnoDB-flushing.

For information about replication privileges, please see the following link: replication-privileges.

2.2 PostgreSQL

For PostgreSQL data sources, Ambar supports sequence ids stored in columns of any type.

The order of records in their containing sequences is inferred using a serial column that is shared among all the sequences. For example, the following table encodes six records distributed across two sequences (a and b), each of which has a single field. In this case, the sequence with id a contains records with fields cat, dog, and tiger (in that order), and the sequence with id b contains records with fields chair, table, and table (in that order).

sequence_id serial_column field_1
a 1 cat
b 2 chair
b 3 table
a 4 dog
a 5 tiger
b 6 table

Ambar reads from PostgreSQL both directly through its SQL interface, as well as from the write-ahead log using the logical replication feature. Enabling logical replication requires setting the following configuration options.

The wal_level configuration option must be set to logical. The max_replication_slots configuration option must be set to at least 3 more than the maximum number of desired data sources in addition to the number configured for normal operation (without Ambar). The max_wal_senders configuration option must be set to at least the value of max_replication_slots plus the number of physical replicas that are connected concurrently. The wal_sender_timeout configuration option must be set to at least one minute (the default value).

Additionally, a publication must be created for each table that will be used as a data source. The publication must publish all the desired columns of that table, and it must not contain any row filters. The publication must publish all of the DML operations (the default value; this assists in detecting and reporting violations of our safety assumptions).

Ambar requires a dedicated user account on the source database. That user account needs to have the REPLICATION privilege, and the SELECT privilege for any tables used as data sources. A single user account may be used for multiple data sources, so long as it has the required privileges for all of them.

2.2.1 Version Compatibility

PostgreSQL servers greater than or equal to version 14 are supported.

2.2.2 References

For information about configuration for logical replication, see logical-replication-config.

For more information about creating publications, see publication-creation.

For more information about required privileges, see required-privileges.

3 Ambar Internal Data Format

In order to combine and filter records from sources of various types, Ambar transforms ingested records into a common internal format. This format represents records as a flat list of key-value pairs whose keys are strings, and whose values range over the following set of types:

  • Boolean
  • Unsigned Integer (64 bit)
  • Signed Integer (64 bit)
  • Real Number (IEE764 Double Precision)
  • String (UTF-8)
  • Byte String
  • DateTime (ISO 8601)
  • JSON String
  • Null

3.1 Source Type Mappings

For each type of supported database, we define a static, lossless mapping between a subset of types in that database and types in our internal data format. If a source type is not listed in the following sections, then it is not currently supported.

3.1.1 MySQL

MySQL Type Ambar Type
Bool Boolean
Serial Unsigned Integer
TinyInt, SmallInt, MediumInt, BigInt, Int Signed Integer
Float, Double Real
UTF8 Char types String
Bit, Binary, Varbinary Byte String
Blob, TinyBlob, MediumBlob, LongBlob Byte String
Json JSON
DateTime DateTime

Note: UTF8 Char types include: Char, NChar, Varchar, NVarchar, Text, TinyText, MediumText, and LongText columns with an encoding of UTF8MB4, UTF8, or UTF8MB3.

3.1.2 PostgreSQL

PostgreSQL Type Ambar Type
int2, int4, int8 Signed Integer
float4, float8 Real
bool Boolean
json JSON
bytea Bytes
timestamp, timestamptz DateTime
text String

3.2 JSON Encoding for Delivery

When Ambar delivers records to your data destinations, they will be encoded as JSON objects with the following format:

{
  "data_source_id": "AMBAR-51f161c7-faef-4b3b-9149-32e1ae8a03af",
  "data_source_description": "postgres production data source",
  "data_destination_id": "AMBAR-d7f5c446-1e79-4f5c-82ed-afe3beceb144",
  "data_destination_description": "concurrent visitors per page",
  "payload": {...}
}

The payload field will contain one top-level member for each of the fields in the record.

Ambar internal types are encoded into JSON elements using the following type mapping:

Ambar Type JSON Type
Boolean "true" or "false"
Unsigned Integer number
Signed Integer number
Real Number number
String string
Byte String string (Base64 encoded)
DateTime string
JSON String string
Null null

Please note the nature of the type conversion for JSON. JSON fields in data source records will be delivered as JSON strings, rather than being spliced into the result structure.

4 Data Destinations

Conceptually, an Ambar data destination is an HTTP service, to which some filtered set of records will be streamed. Each data destination is composed of a set of filters, which select desired records from one exactly one data source. Ambar provides an intuitive, convenient, and powerful domain specific language for specifying filters in terms of record fields. Ambar data destinations also adapt dynamically to the available processing power on customer endpoints, ensuring that customers can achieve the highest throughput possible without overloading their systems.

4.1 Filter Domain Specific Language (DSL)

The Ambar filter DSL allows the concise specification of predicates over records. A filter is associated with a single data source. This allows the filter expressions that are passed to the filter creation endpoint to be typechecked against the schema of the associated data source.

The following sections describes the filter DSL. As a notational convention, expressions in the DSL will be represented in monospace font.

4.1.1 Literal Expressions

The filter DSL supports the following boolean literals: true and false.

The filter DSL supports numeric literals in both integer and scientific format. Examples include 3, -4, 5.0, and 3.14e5.

The filter DSL supports string literals, delimited by double quotation marks. For example: "hello world".

Finally, it supports the null value literal, denoted null.

4.1.2 Operators

The filter DSL supports a standard set of logical connective operators: not for logical negation, && for logical conjunction, and || for logical disjunction.

It also supports generic equality operators, denoted = and != that work for strings and numbers.

A set of comparison operators is available for numeric values: <, >, <=, >=.

Two string operators are available: substring(<haystack>, <needle>), and regex_match(<string>, <regex>). The first of these checks whether the <needle> argument is a substring of the <haystack> argument, and the second checks if the <string> matches the <regex>. The filter DSL supports extended Posix regular expressions.

Two lookup operators are available to extract values from the record under test. The first, denoted lookup(<field_name>) takes a string literal as an argument, and evaluates to the value of the indicated field in the record under test.

There is also a JSON lookup operator, denoted lookup_json(<field_name>, <json_path>). This takes two string literals as arguments, one indicating a field, and one indicating a JSON path. It evaluates to the value stored at that path in the JSON string stored under the indicated field name in the record.

Parentheses can be used to alter the default precedence rules, which are standard unless otherwise noted.

4.1.3 JSON field references

Paths into JSON objects are specified as a sequence of dot separated path elements. Each path element is either a field lookup in an object, or an index into an array. Object field lookups are specified using the name of the field, and array indices are specified with natural numbers inside square brackets.

The reserved characters in these paths are: ., [, ], \, and '. When these characters are required outside of their special meaning, they can be escaped with a double backslash: \\. The path element separator is optional prior to array index path elements. Single quotes can be optionally used to delimit object indices.

4.1.4 Examples

In the examples that follow, suppose we have a data source which contains records having five fields:

  • seqid storing an unsigned integer
  • auto_increment storing an unsigned integer
  • timestamp storing a datetime value
  • value storing a json object
  • full_name storing a customer full name

In the JSON encoding for delivery, these records would have a payload value that looks like this:

{
  "seqid": 1,
  "auto_increment": 2,
  "timestamp": "2024-02-19T18:29:12Z",
  "value": "{"x": 1, "y": 2}",
  "full_name": "John Doe"
}

The simplest filter is the one which selects all records. That filter is specified using the boolean literal value, as below:

true

A filter which selects only records from the sequence with id 3 would look like:

lookup("seqid") == 3

A filter which selects only records produced on the United States Independence Day, 2024 would look like:

substring(lookup("timestamp"), "2024-07-04")

A filter which selects records whose value column has an x field less than 10 would look like:

lookup_json("value", ".x") <= 10

A filter which matches records where the full_name field contains the last name "Doe" would look like:

regex_match(lookup("full_name"), " Doe$")

4.2 Endpoint Expectations

In order to provide our advertised end-to-end correctness guarantees, Ambar requires a few simple conditions to be met by the HTTP endpoints connected to data destinations.

4.2.1 Endpoint Response Format

Ambar requires responses from the endpoint to take on one of three forms depending on whether the associated record was correctly processed. In the event of successful record processing, the endpoint should respond with the following JSON body:

{
  "result": {
    "success": {}
  }
}

In the event of an inability to successfully process the message, you can request Ambar either ignore the failure and continue delivering the remainder of the record sequence, or you can request that Ambar pause delivering the remainder of the record sequence, and attempt redelivery at a later time.

To request that the failure be ignored and the remainder of the record sequence be processed, your endpoint should respond with the following JSON body:

{
  "result": {
    "error": {
      "policy": "keep_going", 
      "class": "keep it going", 
      "description": "keep it going"
    }
  }
}

To request that delivery be paused and delivery of the failed record re-attempted, your endpoint should respond with the following JSON body:

{
  "result": {
    "error": {
      "policy": "must_retry", 
      "class": "must retry it", 
      "description": "must retry it"
    }
  }
}

In either case, the class and description fields can be filled in however you choose.

4.2.2 Endpoint Idempotency

Ambar guarantees that each record sequence in your data sources will be filtered and transmitted in order, at least once to each of your data destinations. If you require exactly once processing, your HTTP endpoint will need to deduplicate messages. You must deduplicate through a field or combination of fields that is universally unique. For example a message UUID, or a combination of a data source id and an auto incrementing column.

Upon deduplicating a record in this way, please issue a success response as described above.

4.3 Data Destination Adaptive Throughput

Ambar is designed to dynamically adjust the rate at which it sends records to configured destinations in an effort to maximize throughput, without overloading those endpoints. Ambar uses both response types and response times to measure endpoint pressure. No special actions are required on the part of the destination endpoints as long as they conform to the endpoint expectations outlined above.

4.4 Data Destination Warnings

Data destinations are composed of a set of filters which are disjoint with respect to their associated data sources. This means that no two filters in a data destination can be associated with the same data source. This restriction is enforced by the Ambar at data destination creation time, and is present to ensure that unintended correctness violations do not occur. A similar situation can arise if multiple data destinations are associated with the same HTTP service. Ambar does not attempt to detect and prevent this situation from occurring, so if you use one HTTP service for multiple data destinations, take care to process the sequences independently based on their assocaited data destination, the id of which can be found in the top level of the records we send to you (see 3.2).

5 Ambar User Interfaces

Ambar provides two convenient methods for configuring data sources and destinations: an API, and a Terraform provider. Additionally, Ambar maintains an OpenAPI spec which can be used to generate SDKs in your preferred language of choice.

5.1 API

The Ambar API is accessed through HTTP requests. All API endpoints take JSON request bodies, and return JSON responses. Authentication is performed with API keys which are vended as part of the signup process. The API key must be passed in the HTTP header x-api-key.

Every API key is associated with an environment, which is a logical grouping of resources (data sources, filters, and data destinations). All environments will be contained entirely in a single region determined at the time the API key is created. Requests must be made to the appropriate regional endpoint for the API key being used. Those APIs have the form <region>.api.ambar.cloud, for example, euw1.api.ambar.cloud for the euw1 region.

5.1.1 API Endpoints

In the following sections, we list each of the endpoints of the API, along with their associated request formats. For detailed information about their response formats, please refer to our OpenAPI spec.

5.1.1.1 ListResources

The ListResources endpoint is used to describe all of the existing resources in an environment (data sources, data destinations, and filters).

  • Verb: GET
  • Path: /resources

The request body should be empty.

5.1.1.2 CreateDataSource

The CreateDataSource endpoint is used to create a new data source.

  • Verb: POST
  • Path: /source

The request body should contain all of the information required to configure this data source. This information differs depending on the type of data source being set up, and is described below for each of our supported data source types.

Below is an example request body for a PostgreSQL data source.

{
  "dataSourceType": "postgres",
  "dataSourceConfig": {
    "hostname": "db.host.com",
    "hostPort": "5432",
    "databaseName": "my-db",
    "tableName": "my-table",
    "publicationName": "my-publication",
    "columns": "serial_col,partitioning_col,value",
    "username": "replication-user",
    "password": "replication-user-pw",
    "serialColumn": "serial_col",
    "partitioningColumn": "partitioning_col",
    "tlsTerminationOverrideHost": "my.override.host"
  }
}

The dataSourceType field needs to be set to the constant string postgres. The dataSourceConfig field needs to be structured as above, with the following components:

  • The hostname field should contain the hostname of the PostgreSQL server which contains the data for this data source.
  • The hostPort field should contain the port on which this server is listening.
  • The databaseName field should contain the name of the PostgreSQL database that contains the desired table.
  • The tableName field should contain the name of the desired table.
  • The publicationName field should contain the name of the publication created on this table. Please see the DataSources section on PostgreSQL for more information.
  • The columns field should contain all of the columns that you wish to include in the data source, including those listed in the serialColumn and partitioningColumn fields.
  • The username and password fields should contain the authentication details for the replication user created for Ambar. Please see the DataSources section on PostgreSQL for more information.
  • The partitioningColumn field should contain the name of the column which stores the sequence id.
  • The serialColumn field should contain the name of the serial column which is used to determine record order.
  • The tlsTerminationOverrideHost field is optional, and should only be provided if you have a different host performing TLS termination than the database host.

Below is an example request body for a MySQL data source.

{
  "dataSourceType": "mysql",
  "dataSourceConfig": {
    "hostname": "db.host.com",
    "hostPort": "3306",
    "databaseName": "my_db",
    "tableName": "my_table",
    "columns": "incrementing_col,partitioning_col,value",
    "binlogReplicationServerId": "25039"
    "username": "replication-user",
    "password": "replication-user-pw",
    "incrementingColumn": "incrementing_col",
    "partitioningColumn": "partitioning_col",
    "tlsTerminationOverrideHost": "my.override.host"
  }
}

The dataSourceType field needs to contain the constant string mysql. The dataSourceConfig field needs to be structured as above, with the following components:

  • The hostname field should contain the hostname of the MySQL server which contains the data for this data source.
  • The hostPort field should contain the port on which this server is listening.
  • The databaseName field should contain the name of the MySQL database that contains the desired table.
  • The tableName field should contain the name of the desired table.
  • The columns field should contain the names of the columns which have record fields you wish to ingest not including the incrementing and partitioning columns.
  • The binlogReplicationServerId field should contain the id of the binary log replication server for this database.
  • The username and password fields should contain credentials for the Ambar user you've created. Please see the DataSources section on MySQL for more information.
  • The incrementingColumn field should contain the name of the auto-incrementing column which is used to determine record order.
  • The partitioningColumn field should contain the name of the column which is storing the sequence id.
  • The tlsTerminationOverrideHost field is optional, and should only be provided if you have a different host performing TLS termination than the database host.
5.1.1.3 DeleteDataSource

The DeleteDataSource endpoint is used to delete an existing data source.

  • Verb: DELETE
  • Path: /source

The request body for this endpoint requires a JSON object containing the resource ID of the data source you want to delete, as in the following example:

{
  "resourceId": "AMBAR-dd5e24ba-f1da-4c96-a02c-e1c2e0407d05"
}
5.1.1.4 CreateFilter

The CreateFilter endpoint is used to create a new filter.

  • Verb: POST
  • Path: /filter

Two items are required for configuration of a filter: the associated data source, and the filter DSL expression which selects the desired records. An example is shown below:

{
  "dataSourceId", "data-source-1",
  "filterContents", "..."
}

Important: Please note that the filterContents field must be a Base64 encoded filter string as defined in RFC 4648, Section 4. For more information about the filter DSL, see the Data Destinations section of this manual.

5.1.1.5 DeleteFilter

The DeleteFilter endpoint is used to delete an existing filter.

  • Verb: DELETE
  • Path: /filter

The only required item is the filter ID, as shown in the following example:

{
  "resourceId": "AMBAR-ffdbe21f-4888-4059-9503-d312305f2ee8"
}
5.1.1.6 CreateDataDestination

The CreateDataDestination endpoint is used to create a new data destination.

  • Verb: POST
  • Path: /destination

Four items are required for data destination configuration: a list of filters to apply, an HTTP endpoint to which the data will be sent, and a username and password for authentication to that endpoint. An example is shown below:

{
    "filterIds": ["Filter-1", "Filter-2"]
    "destinationEndpoint": "stream.processing.example.com/abc",
    "username":"guinea",
    "password":"pig"
}
5.1.1.7 DeleteDataDestination

The DeleteDataDestination endpoint is used to delete an existing data destination.

  • Verb: DELETE
  • Path: /destination

The only required item is the destination ID, as shown in the following example:

{
  "resourceId": "AMBAR-9d29e6fe-7316-479c-9e7b-1b49459efe76"
}

5.1.2 References

For the Ambar OpenAPI spec, please see the following link: ambar-openapi-spec.

5.2 Terraform Provider

For our customers who make use of Terraform for infrastructure management, we publish a Terraform provider in the Hashicorp registry which can be used to incorporate Ambar seamlessly in your infrastructure-as-code workflow.

5.2.1 References

For the Ambar Terraform provider, see the following link: ambar-terraform-provider.

For a detailed description of how to configure a data source, see the following link: data-source-config.

For a detailed description of how to configure a data destination, see the following link: data-destination-config.

For a detailed description of how to configure a filter, see the following link: filter-config.

6 Networking Considerations

Connections to external resources by Ambar components will all come from a set of regional static IP addresses. This includes both connections to databases initiated by our data sources, and connections to HTTP services initiated by data destinations. The table below lists these IP addresses for each region. Please ensure that you allow incoming connections from the appropriate set.

6.1 IP Allow List - euw1

  • 18.203.122.196
  • 34.247.111.202
  • 63.32.245.54
  • 54.220.2.224
  • 54.220.9.121
  • 54.73.55.245
  • 34.240.153.156
  • 52.210.73.243
  • 54.155.76.132

Author: Ambar Cloud Ltd.

Created: 2024-04-18 Thu 09:58

Validate