Amazon S3 with Athena
Essentially there are three steps to setting this up:
- Configure the Objectiv Collector to output data to AWS SQS or AWS Kinesis
- Configure Snowplow to read data from SQS or Kinesis, and to store that data on S3
- Configure Athena to read data from S3
This document details step 1 and 3. For more information on configuring Snowplow, see the Snowplow documentation.
Set up Objectiv on AWS through Snowplow
We assume below that you've already read how to set up Objectiv with Snowplow.
The setup works as follows:
- Events arrive at the Objectiv Collector, and are validated;
- Good events are published on the
rawtopic on Kinesis or SQS (which in turn is processed by Enrich); and
- Bad events (invalid) are published on the
badtopic on Kinesis.
Starting the Collector
The output topics of the Collector are controlled through environment variables:
SP_AWS_MESSAGE_TOPIC_RAW- this can be either the id of a Kinesis stream (eg. sp-raw-stream) or a URL to an SQS queue (eg. https://sqs.someregion.amazonaws.com/123455/sp-raw-queue);
SP_AWS_MESSAGE_TOPIC_BAD- this should be the id of the Kinesis bad stream (eg. sp-bad-stream).
The AWS integration uses the boto3 python library, which means authentication is also provided through that library (as detailed here). The simplest way to make it work, is by setting the following environment variables:
AWS_ACCESS_KEY_ID- iAM key of the account used to access AWS services;
AWS_SECRET_ACCESS_KEY- iAM secret key;
AWS_DEFAULT_REGION- Optionally specify the AWS region in which the Kinesis/SQS resources are deployed.
Once these environment variables have been set, the Objectiv Collector can be started, and the Snowplow AWS output will be enabled.
To run this setup in docker, make sure that the aforementioned environment variables are properly set and available in the container. Also take care that the path to the credentials is actually available in the container.
docker-compose, the following yaml snippet will do the trick:
The important part here is setting the correct env:
- providing the AWS credentials;
- providing the Kinesis stream ids.
Running the Collector locally, in a dev setup is pretty similar:
# setup environment
pip install -r requirements.in
# start flask app, using SQS queue
Test the setup
The Collector will display a message if the Snowplow config is loaded:
Enabled Snowplow: AWS pipeline
This indicates that the Collector will try to push events. If this fails, logging should hint what's happening. If there are no errors in the Collector logs, the events should be successfully pushed into the raw topic, to be picked up by Snowplow's enrichment.
To check if the messages have successfully arrived in the queue, please review the monitoring in the AWS
console. Events should show up as either
PutRecords (Kinesis) or `Number of messages received (SQS).
Use Objectiv with Athena
- Create an Athena table that makes the S3 data queryable.
- Perform additional Athena setup: permissions and query-result storage.
- Construct the URL to connect with Athena.
Create Athena table
To make the data on S3 queryable through Athena, we must create a table. One can use the below create table statement. Alternatively, one can use AWS Glue with Snowplow's create_glue_table.sh script that is mentioned in this Snowplow blog post.
The SQL statement below assumes that a database has been created and selected already. Make sure to edit the location at the end of the statement, to reflect where you are storing your data.
CREATE EXTERNAL TABLE events(
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
ESCAPED BY '\\'
LINES TERMINATED BY '\n'
LOCATION 's3://<bucket>/<prefix>>/' -- Example: LOCATION 's3://objectiv-production-data/enriched/';
-- Column names and types were inspired by Snowplow's create_glue_table.sh
Additional Athena setup
All Athena query results are written to S3, to the so called 'S3 staging'. Therefore, an s3 bucket or prefix should be created that can be used for that purpose. This S3 prefix should not overlap with the location of the data.
An account should be setup, with the following access:
- read access to the data, i.e. the
eventstable gets its data from.
- write access to the S3 staging prefix.
- read access to the data, i.e. the
- Athena: access to query/create/etc. tables.
- Glue: access to Glue data catalog.
Construct Athena URL
To use the Objectiv modelhub with Athena, in Python one must simply provide a url to connect to the database. The general form of a URL for Athena is:
With the following fields set:
aws_access_key_id: account key id - optional
aws_secret_access_key: secret access key - optional
region_name: region, e.g.
schema_name: name of database that contains the
eventstable defined earlier
s3_staging_dir: path to S3 where query results can be stored
Additionally, there are optional query string parameters that can be added to the url:
work_group: work-group under which the queries run
catalog_name: catalog that contains the table definitions
If the access key fields are not set, then the Athena library will use default mechanisms to get credentials (environment variables, config files, etc. See Amazon docs)
All values must be properly escaped. As a help, the modelhub offers a function that constructs the url:
from modelhub import athena_construct_engine_url
db_url = athena_construct_engine_url(
aws_access_key_id='access key id',
# note that db_url contains the secret access key, this must be kept secret!
# Now set db_url as environment variable 'DSN', or pass it directly to modelhub as db_url
# Example usage:
from modelhub import ModelHub
df = ModelHub().get_objectiv_dataframe(db_url=db_url)