Ambar User Guide
Table of Contents
1 Introduction
Ambar is a data streaming service that empowers you to build mission-critical real-time applications in minutes. Instead of producing to and consuming from message brokers, Ambar pulls records from databases and pushes records to application endpoints.
To use Ambar, you provide three things:
- Data Sources: durable storage containing record streams, e.g., an append only table in your relational database where each row is one record.
- Filters: logical specifications that select subsets of records from data sources.
- Data Destinations: endpoints in your application that will process your record streams.
Ambar provides two convenient methods for providing these things: an API for imperative management, and a Terraform provider for declarative management.
This user guide contains all the necessary information to get started with Ambar.
2 Data Sources
Conceptually, an Ambar data source contains records which will be delivered to data destinations. All records belong to some sequence which is identified by a property of the record called the sequence id. Records contain an arbitrary number of fields which store application relevant data. If you are familiar with other messaging platforms, you can think of a sequence as a stream where the sequence id is simply a partitioning key that uniquely identifies that stream.
Individual record sequences must be updated in an append-only fashion. In other words, for a given sequence of records, new records can be appended to that sequence, but existing records must not be removed or modified. Additionally, new records must not be inserted into the middle of an existing sequence. Distinct record sequences may be appended to concurrently.
Depending on the type of data source, Ambar requires certain configuration constraints. These constraints are described for each data source type in the following sections.
2.1 MySQL
For MySQL data sources, Ambar supports sequence ids stored in columns of any type. Ambar also supports sequence ids which are embedded in JSON columns.
The order of records in their containing sequences is inferred using an auto-incrementing column that is shared among all the sequences.
For example, the following table encodes six records distributed across two sequences (a and b),
each of which has a single field.
In this case, the sequence with id a
contains records with fields cat
, dog
, and tiger
(in that order),
and the sequence with id b
contains records with fields chair
, table
, and table
(in that order).
sequence_id | auto_incrementing_column | field_1 |
---|---|---|
a | 1 | cat |
b | 2 | chair |
b | 3 | table |
a | 4 | dog |
a | 5 | tiger |
b | 6 | table |
Ambar reads from MySQL both directly through its SQL interface, and from the binary log.
As such, the binary log must be enabled, and configured to use the row-based logging format type.
The format type must not be changed during operation (it must always be row-based).
Ambar requires binary logs to be retained for at least seven days.
The binlog_error_action
system variable must be set to ABORT_SERVER
(this is the default setting).
The binlog_encryption
system variable must be set to OFF
.
The sync_binlog
and innodb_flush_log_at_trx_commit
options must both be set to 1.
The source table must support transactions (i.e., it needs to use the InnoDB engine).
Ambar requires a dedicated user account on the source database.
That user account needs to have the REPLICATION CLIENT
, REPLICATION SLAVE
, and SELECT
privileges on the source table.
A single user account may be used for multiple data sources.
2.1.1 Version Compatibility
Ambar supports MySQL servers running versions greater than or equal to 5.7.9 (for the 5.7 series), and greater than or equal to 8.0.11 (for the 8.0 series).
2.1.2 Note for Amazon RDS Users
The following configuration items are required:
binlog_format = row log_bin = /var/lib/mysql/mysql-bin.log expire_logs_days = 7
On RDS, retention (expire_log_days
) is set with the following command:
CALL mysql.rds_set_configuration('binlog retention hours', 24);
Creation of a user with the appropriate grants can be performed with the following sequence of commands:
CREATE USER 'projections_user'@'%' IDENTIFIED BY 'PASSWORD_GOES_HERE_CHANGE_ME'; GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'projections_user'@'%'; GRANT SELECT ON *.* TO 'projections_user'@'%'; GRANT SELECT ON mysql.rds_heartbeat2 TO 'projections_user'@'%'; GRANT SELECT ON mysql.rds_configuration TO 'projections_user'@'%'; GRANT EXECUTE ON PROCEDURE mysql.rds_kill TO 'projections_user'@'%';
2.1.3 References
For information about the binary log and row based logging, please see the following links: row-based-logging, binary-log.
For information about binary log retention settings, please see the following link: binary-log-retention.
For information about binlog synchronization and InnoDB flushing, please see the following links: binlog-synchronization, InnoDB-flushing.
For information about replication privileges, please see the following link: replication-privileges.
2.2 PostgreSQL
For PostgreSQL data sources, Ambar supports sequence ids stored in columns of any type.
The order of records in their containing sequences is inferred using a serial column that is shared among all the sequences.
For example, the following table encodes six records distributed across two sequences (a and b),
each of which has a single field.
In this case, the sequence with id a
contains records with fields cat
, dog
, and tiger
(in that order),
and the sequence with id b
contains records with fields chair
, table
, and table
(in that order).
sequence_id | serial_column | field_1 |
---|---|---|
a | 1 | cat |
b | 2 | chair |
b | 3 | table |
a | 4 | dog |
a | 5 | tiger |
b | 6 | table |
Ambar reads from PostgreSQL both directly through its SQL interface, as well as from the write-ahead log using the logical replication feature. Enabling logical replication requires setting the following configuration options.
The wal_level
configuration option must be set to logical
.
The max_replication_slots
configuration option must be set to at least 3 more than the maximum number of desired data sources in addition to the number configured for normal operation (without Ambar).
The max_wal_senders
configuration option must be set to at least the value of max_replication_slots
plus the number of physical replicas that are connected concurrently.
The wal_sender_timeout
configuration option must be set to at least one minute (the default value).
Additionally, a publication must be created for each table that will be used as a data source. The publication must publish all the desired columns of that table, and it must not contain any row filters. The publication must publish all of the DML operations (the default value; this assists in detecting and reporting violations of our safety assumptions).
Ambar requires a dedicated user account on the source database.
That user account needs to have the REPLICATION
privilege, and the SELECT
privilege for any tables used as data sources.
A single user account may be used for multiple data sources, so long as it has the required privileges for all of them.
2.2.1 Version Compatibility
PostgreSQL servers greater than or equal to version 14 are supported.
2.2.2 References
For information about configuration for logical replication, see logical-replication-config.
For more information about creating publications, see publication-creation.
For more information about required privileges, see required-privileges.
3 Ambar Internal Data Format
In order to combine and filter records from sources of various types, Ambar transforms ingested records into a common internal format. This format represents records as a flat list of key-value pairs whose keys are strings, and whose values range over the following set of types:
- Boolean
- Unsigned Integer (64 bit)
- Signed Integer (64 bit)
- Real Number (IEE764 Double Precision)
- String (UTF-8)
- Byte String
- DateTime (ISO 8601)
- JSON String
- Null
3.1 Source Type Mappings
For each type of supported database, we define a static, lossless mapping between a subset of types in that database and types in our internal data format. If a source type is not listed in the following sections, then it is not currently supported.
3.1.1 MySQL
MySQL Type | Ambar Type |
---|---|
Bool | Boolean |
Serial | Unsigned Integer |
TinyInt, SmallInt, MediumInt, BigInt, Int | Signed Integer |
Float, Double | Real |
UTF8 Char types | String |
Bit, Binary, Varbinary | Byte String |
Blob, TinyBlob, MediumBlob, LongBlob | Byte String |
Json | JSON |
DateTime | DateTime |
Note: UTF8 Char types include: Char, NChar, Varchar, NVarchar, Text, TinyText, MediumText, and LongText columns with an encoding of UTF8MB4, UTF8, or UTF8MB3.
3.1.2 PostgreSQL
PostgreSQL Type | Ambar Type |
---|---|
int2, int4, int8 | Signed Integer |
float4, float8 | Real |
bool | Boolean |
json | JSON |
bytea | Bytes |
timestamp, timestamptz | DateTime |
text | String |
3.2 JSON Encoding for Delivery
When Ambar delivers records to your data destinations, they will be encoded as JSON objects with the following format:
{ "data_source_id": "AMBAR-51f161c7-faef-4b3b-9149-32e1ae8a03af", "data_source_description": "postgres production data source", "data_destination_id": "AMBAR-d7f5c446-1e79-4f5c-82ed-afe3beceb144", "data_destination_description": "concurrent visitors per page", "payload": {...} }
The payload
field will contain one top-level member for each of the fields in the record.
Ambar internal types are encoded into JSON elements using the following type mapping:
Ambar Type | JSON Type |
---|---|
Boolean | "true" or "false" |
Unsigned Integer | number |
Signed Integer | number |
Real Number | number |
String | string |
Byte String | string (Base64 encoded) |
DateTime | string |
JSON String | string |
Null | null |
Please note the nature of the type conversion for JSON. JSON fields in data source records will be delivered as JSON strings, rather than being spliced into the result structure.
4 Data Destinations
Conceptually, an Ambar data destination is an HTTP service, to which some filtered set of records will be streamed. Each data destination is composed of a set of filters, which select desired records from one exactly one data source. Ambar provides an intuitive, convenient, and powerful domain specific language for specifying filters in terms of record fields. Ambar data destinations also adapt dynamically to the available processing power on customer endpoints, ensuring that customers can achieve the highest throughput possible without overloading their systems.
4.1 Filter Domain Specific Language (DSL)
The Ambar filter DSL allows the concise specification of predicates over records. A filter is associated with a single data source. This allows the filter expressions that are passed to the filter creation endpoint to be typechecked against the schema of the associated data source.
The following sections describes the filter DSL.
As a notational convention, expressions in the DSL will be represented in monospace font
.
4.1.1 Literal Expressions
The filter DSL supports the following boolean literals: true
and false
.
The filter DSL supports numeric literals in both integer and scientific format. Examples include 3
, -4
, 5.0
, and 3.14e5
.
The filter DSL supports string literals, delimited by double quotation marks. For example: "hello world"
.
Finally, it supports the null value literal, denoted null
.
4.1.2 Operators
The filter DSL supports a standard set of logical connective operators: not
for logical negation, &&
for logical conjunction, and ||
for logical disjunction.
It also supports generic equality operators, denoted =
and !=
that work for strings and numbers.
A set of comparison operators is available for numeric values: <
, >
, <=
, >=
.
Two string operators are available: substring(<haystack>, <needle>)
, and regex_match(<string>, <regex>)
.
The first of these checks whether the <needle>
argument is a substring of the <haystack>
argument,
and the second checks if the <string>
matches the <regex>
.
The filter DSL supports extended Posix regular expressions.
Two lookup operators are available to extract values from the record under test.
The first, denoted lookup(<field_name>)
takes a string literal as an argument, and evaluates to the value of the indicated field in the record under test.
There is also a JSON lookup operator, denoted lookup_json(<field_name>, <json_path>)
.
This takes two string literals as arguments, one indicating a field, and one indicating a JSON path.
It evaluates to the value stored at that path in the JSON string stored under the indicated field name in the record.
Parentheses can be used to alter the default precedence rules, which are standard unless otherwise noted.
4.1.3 JSON field references
Paths into JSON objects are specified as a sequence of dot separated path elements. Each path element is either a field lookup in an object, or an index into an array. Object field lookups are specified using the name of the field, and array indices are specified with natural numbers inside square brackets.
The reserved characters in these paths are: .
, [
, ]
, \
, and '
.
When these characters are required outside of their special meaning, they can be escaped with a double backslash: \\
.
The path element separator is optional prior to array index path elements.
Single quotes can be optionally used to delimit object indices.
4.1.4 Examples
In the examples that follow, suppose we have a data source which contains records having five fields:
seqid
storing an unsigned integerauto_increment
storing an unsigned integertimestamp
storing a datetime valuevalue
storing a json objectfull_name
storing a customer full name
In the JSON encoding for delivery, these records would have a payload value that looks like this:
{ "seqid": 1, "auto_increment": 2, "timestamp": "2024-02-19T18:29:12Z", "value": "{"x": 1, "y": 2}", "full_name": "John Doe" }
The simplest filter is the one which selects all records. That filter is specified using the boolean literal value, as below:
true
A filter which selects only records from the sequence with id 3 would look like:
lookup("seqid") == 3
A filter which selects only records produced on the United States Independence Day, 2024 would look like:
substring(lookup("timestamp"), "2024-07-04")
A filter which selects records whose value
column has an x
field less than 10 would look like:
lookup_json("value", ".x") <= 10
A filter which matches records where the full_name field contains the last name "Doe" would look like:
regex_match(lookup("full_name"), " Doe$")
4.2 Endpoint Expectations
In order to provide our advertised end-to-end correctness guarantees, Ambar requires a few simple conditions to be met by the HTTP endpoints connected to data destinations.
4.2.1 Endpoint Response Format
Ambar requires responses from the endpoint to take on one of three forms depending on whether the associated record was correctly processed. In the event of successful record processing, the endpoint should respond with the following JSON body:
{ "result": { "success": {} } }
In the event of an inability to successfully process the message, you can request Ambar either ignore the failure and continue delivering the remainder of the record sequence, or you can request that Ambar pause delivering the remainder of the record sequence, and attempt redelivery at a later time.
To request that the failure be ignored and the remainder of the record sequence be processed, your endpoint should respond with the following JSON body:
{ "result": { "error": { "policy": "keep_going", "class": "keep it going", "description": "keep it going" } } }
To request that delivery be paused and delivery of the failed record re-attempted, your endpoint should respond with the following JSON body:
{ "result": { "error": { "policy": "must_retry", "class": "must retry it", "description": "must retry it" } } }
In either case, the class and description fields can be filled in however you choose.
4.2.2 Endpoint Idempotency
Ambar guarantees that each record sequence in your data sources will be filtered and transmitted in order, at least once to each of your data destinations. If you require exactly once processing, your HTTP endpoint will need to deduplicate messages. You must deduplicate through a field or combination of fields that is universally unique. For example a message UUID, or a combination of a data source id and an auto incrementing column.
Upon deduplicating a record in this way, please issue a success response as described above.
4.3 Data Destination Adaptive Throughput
Ambar is designed to dynamically adjust the rate at which it sends records to configured destinations in an effort to maximize throughput, without overloading those endpoints. Ambar uses both response types and response times to measure endpoint pressure. No special actions are required on the part of the destination endpoints as long as they conform to the endpoint expectations outlined above.
4.4 Data Destination Warnings
Data destinations are composed of a set of filters which are disjoint with respect to their associated data sources. This means that no two filters in a data destination can be associated with the same data source. This restriction is enforced by the Ambar at data destination creation time, and is present to ensure that unintended correctness violations do not occur. A similar situation can arise if multiple data destinations are associated with the same HTTP service. Ambar does not attempt to detect and prevent this situation from occurring, so if you use one HTTP service for multiple data destinations, take care to process the sequences independently based on their assocaited data destination, the id of which can be found in the top level of the records we send to you (see 3.2).
5 Ambar User Interfaces
Ambar provides two convenient methods for configuring data sources and destinations: an API, and a Terraform provider. Additionally, Ambar maintains an OpenAPI spec which can be used to generate SDKs in your preferred language of choice.
5.1 API
The Ambar API is accessed through HTTP requests.
All API endpoints take JSON request bodies, and return JSON responses.
Authentication is performed with API keys which are vended as part of the signup process.
The API key must be passed in the HTTP header x-api-key
.
Every API key is associated with an environment,
which is a logical grouping of resources (data sources, filters, and data destinations).
All environments will be contained entirely in a single region determined at the time the API key is created.
Requests must be made to the appropriate regional endpoint for the API key being used.
Those APIs have the form <region>.api.ambar.cloud
, for example, euw1.api.ambar.cloud
for the euw1
region.
5.1.1 API Endpoints
In the following sections, we list each of the endpoints of the API, along with their associated request formats. For detailed information about their response formats, please refer to our OpenAPI spec.
5.1.1.1 ListResources
The ListResources
endpoint is used to describe all of the existing resources in an environment (data sources, data destinations, and filters).
- Verb: GET
- Path: /resource
The request body should be empty.
5.1.1.2 CreateDataSource
The CreateDataSource
endpoint is used to create a new data source.
- Verb: POST
- Path: /source
The request body should contain all of the information required to configure this data source. This information differs depending on the type of data source being set up, and is described below for each of our supported data source types.
Below is an example request body for a PostgreSQL data source.
{ "dataSourceType": "postgres", "description": "my postgres data source", "dataSourceConfig": { "hostname": "db.host.com", "hostPort": "5432", "databaseName": "my-db", "tableName": "my-table", "publicationName": "my-publication", "columns": "serial_col,partitioning_col,value", "username": "replication-user", "password": "replication-user-pw", "serialColumn": "serial_col", "partitioningColumn": "partitioning_col", "tlsTerminationOverrideHost": "my.override.host" } }
The dataSourceType
field needs to be set to the constant string postgres
.
The description
field is optional. When provided, it should contain a string describing the data source being created.
The dataSourceConfig
field needs to be structured as above, with the following components:
- The
hostname
field should contain the hostname of the PostgreSQL server which contains the data for this data source. - The
hostPort
field should contain the port on which this server is listening. - The
databaseName
field should contain the name of the PostgreSQL database that contains the desired table. - The
tableName
field should contain the name of the desired table. - The
publicationName
field should contain the name of the publication created on this table. Please see the DataSources section on PostgreSQL for more information. - The
columns
field should contain all of the columns that you wish to include in the data source, including those listed in theserialColumn
andpartitioningColumn
fields. - The
username
andpassword
fields should contain the authentication details for the replication user created for Ambar. Please see the DataSources section on PostgreSQL for more information. - The
partitioningColumn
field should contain the name of the column which stores the sequence id. - The
serialColumn
field should contain the name of the serial column which is used to determine record order. - The
tlsTerminationOverrideHost
field is optional, and should only be provided if you have a different host performing TLS termination than the database host.
Below is an example request body for a MySQL data source.
{ "dataSourceType": "mysql", "dataSourceConfig": { "hostname": "db.host.com", "hostPort": "3306", "databaseName": "my_db", "tableName": "my_table", "columns": "incrementing_col,partitioning_col,value", "binlogReplicationServerId": "25039" "username": "replication-user", "password": "replication-user-pw", "incrementingColumn": "incrementing_col", "partitioningColumn": "partitioning_col", "tlsTerminationOverrideHost": "my.override.host" } }
The dataSourceType
field needs to contain the constant string mysql
.
The dataSourceConfig
field needs to be structured as above, with the following components:
- The
hostname
field should contain the hostname of the MySQL server which contains the data for this data source. - The
hostPort
field should contain the port on which this server is listening. - The
databaseName
field should contain the name of the MySQL database that contains the desired table. - The
tableName
field should contain the name of the desired table. - The
columns
field should contain the names of the columns which have record fields you wish to ingest not including the incrementing and partitioning columns. - The
binlogReplicationServerId
field should contain the id of the binary log replication server for this database. - The
username
andpassword
fields should contain credentials for the Ambar user you've created. Please see the DataSources section on MySQL for more information. - The
incrementingColumn
field should contain the name of the auto-incrementing column which is used to determine record order. - The
partitioningColumn
field should contain the name of the column which is storing the sequence id. - The
tlsTerminationOverrideHost
field is optional, and should only be provided if you have a different host performing TLS termination than the database host.
5.1.1.3 DescribeDataSource
The DescribeDataSource
endpoint is used to describe an existing data source.
- Verb: GET
- Path: /source
The request body for this endpoint requires a JSON object containing the resource ID of the data source you want to delete, as in the following example:
{ "resourceId": "AMBAR-dd5e24ba-f1da-4c96-a02c-e1c2e0407d05" }
5.1.1.4 DeleteDataSource
The DeleteDataSource
endpoint is used to delete an existing data source.
- Verb: DELETE
- Path: /source
The request body for this endpoint requires a JSON object containing the resource ID of the data source you want to delete, as in the following example:
{ "resourceId": "AMBAR-dd5e24ba-f1da-4c96-a02c-e1c2e0407d05" }
5.1.1.5 CreateFilter
The CreateFilter
endpoint is used to create a new filter.
- Verb: POST
- Path: /filter
Two items are required for configuration of a filter: the associated data source, and the filter DSL expression which selects the desired records. An example is shown below:
{ "dataSourceId", "data-source-1", "description": "my filter (for data-source-1)", "filterContents", "..." }
The description
field is optional. When provided, it should contain a string describing the filter being created.
Important: Please note that the filterContents
field must be a Base64 encoded filter string as defined in RFC 4648, Section 4.
For more information about the filter DSL, see the Data Destinations section of this manual.
5.1.1.6 DescribeFilter
The DescribeFilter
endpoint is used to describe an existing filter.
- Verb: GET
- Path: /filter
The request body for this endpoint requires a JSON object containing the resource ID of the data source you want to delete, as in the following example:
{ "resourceId": "AMBAR-dd5e24ba-f1da-4c96-a02c-e1c2e0407d05" }
5.1.1.7 DeleteFilter
The DeleteFilter
endpoint is used to delete an existing filter.
- Verb: DELETE
- Path: /filter
The only required item is the filter ID, as shown in the following example:
{ "resourceId": "AMBAR-ffdbe21f-4888-4059-9503-d312305f2ee8" }
5.1.1.8 CreateDataDestination
The CreateDataDestination
endpoint is used to create a new data destination.
- Verb: POST
- Path: /destination
Four items are required for data destination configuration: a list of filters to apply, an HTTP endpoint to which the data will be sent, and a username and password for authentication to that endpoint. An example is shown below:
{ "filterIds": ["Filter-1", "Filter-2"], "description": "my data destination (consuming from filters filter-1 and filter-2)", "destinationEndpoint": "stream.processing.example.com/abc", "username":"guinea", "password":"pig" }
The description
field is optional. When provided, it should contain a string describing the destination being created.
5.1.1.9 DescribeDataDestination
The DescribeDataDestination
endpoint is used to describe an existing data destination.
- Verb: GET
- Path: /destination
The request body for this endpoint requires a JSON object containing the resource ID of the data source you want to delete, as in the following example:
{ "resourceId": "AMBAR-dd5e24ba-f1da-4c96-a02c-e1c2e0407d05" }
5.1.1.10 DeleteDataDestination
The DeleteDataDestination
endpoint is used to delete an existing data destination.
- Verb: DELETE
- Path: /destination
The only required item is the destination ID, as shown in the following example:
{ "resourceId": "AMBAR-9d29e6fe-7316-479c-9e7b-1b49459efe76" }
5.1.2 References
For the Ambar OpenAPI spec, please see the following link: ambar-openapi-spec.
5.2 Terraform Provider
For our customers who make use of Terraform for infrastructure management, we publish a Terraform provider in the Hashicorp registry which can be used to incorporate Ambar seamlessly in your infrastructure-as-code workflow.
5.2.1 References
For the Ambar Terraform provider, see the following link: ambar-terraform-provider.
For a detailed description of how to configure a data source, see the following link: data-source-config.
For a detailed description of how to configure a data destination, see the following link: data-destination-config.
For a detailed description of how to configure a filter, see the following link: filter-config.
6 Networking Considerations
Connections to external resources by Ambar components will all come from a set of regional static IP addresses. This includes both connections to databases initiated by our data sources, and connections to HTTP services initiated by data destinations. The table below lists these IP addresses for each region. Please ensure that you allow incoming connections from the appropriate set.
6.1 IP Allow List - euw1
- 18.203.122.196
- 34.247.111.202
- 63.32.245.54
- 54.220.2.224
- 54.220.9.121
- 54.73.55.245
- 34.240.153.156
- 52.210.73.243
- 54.155.76.132