Amazon S3 with Athena

The Objectiv Collector can be configured to store data on Amazon S3 through Snowplow on AWS. AWS Athena can then be configured to query the data on S3.

Essentially there are three steps to setting this up:

  1. Configure the Objectiv Collector to output data to AWS SQS or AWS Kinesis
  2. Configure Snowplow to read data from SQS or Kinesis, and to store that data on S3
  3. Configure Athena to read data from S3

This document details step 1 and 3. For more information on configuring Snowplow, see the Snowplow documentation.

Set up Objectiv on AWS through Snowplow

note

We assume below that you've already read how to set up Objectiv with Snowplow.

The setup works as follows:

  • Events arrive at the Objectiv Collector, and are validated;
  • Good events are published on the raw topic on Kinesis or SQS (which in turn is processed by Enrich); and
  • Bad events (invalid) are published on the bad topic on Kinesis.

Starting the Collector

The output topics of the Collector are controlled through environment variables:

The AWS integration uses the boto3 python library, which means authentication is also provided through that library (as detailed here). The simplest way to make it work, is by setting the following environment variables:

  • AWS_ACCESS_KEY_ID - iAM key of the account used to access AWS services;
  • AWS_SECRET_ACCESS_KEY - iAM secret key;
  • AWS_DEFAULT_REGION - Optionally specify the AWS region in which the Kinesis/SQS resources are deployed.

Once these environment variables have been set, the Objectiv Collector can be started, and the Snowplow AWS output will be enabled.

Using docker-compose

To run this setup in docker, make sure that the aforementioned environment variables are properly set and available in the container. Also take care that the path to the credentials is actually available in the container.

When using docker-compose, the following yaml snippet will do the trick:

objectiv_collector:
container_name: objectiv_collector
image: objectiv/backend
working_dir: /services
ports:
- "127.0.0.1:5000:5000"
environment:
AWS_ACCESS_KEY_ID: AKIA-some-key
AWS_SECRET_ACCESS_KEY: some-secret-key
AWS_DEFAULT_REGION: some-aws-region
SP_AWS_MESSAGE_TOPIC_RAW: sp-raw-stream
SP_AWS_MESSAGE_TOPIC_BAD: sp-bad-stream
OUTPUT_ENABLE_PG: false

The important part here is setting the correct env:

  • providing the AWS credentials;
  • providing the Kinesis stream ids.

Running locally

Running the Collector locally, in a dev setup is pretty similar:

# setup environment
virtualenv objectiv-venv
source objectiv-venv/bin/activate
pip install -r requirements.in

# start flask app, using SQS queue
cd objectiv_backend
export PYTHONPATH=.:$PYTHONPATH
SP_AWS_MESSAGE_TOPIC_RAW=sp-raw-stream \
SP_AWS_MESSAGE_TOPIC_BAD=sp-bad-stream \
flask run

Test the setup

The Collector will display a message if the Snowplow config is loaded: Enabled Snowplow: AWS pipeline ($SP_AWS_MESSAGE_TOPIC).

This indicates that the Collector will try to push events. If this fails, logging should hint what's happening. If there are no errors in the Collector logs, the events should be successfully pushed into the raw topic, to be picked up by Snowplow's enrichment.

To check if the messages have successfully arrived in the queue, please review the monitoring in the AWS console. Events should show up as either PutRecords (Kinesis) or `Number of messages received (SQS).

Use Objectiv with Athena

To be able to use the data on S3 with Bach and the open model hub, a few steps need to be taken:

  1. Create an Athena table that makes the S3 data queryable.
  2. Perform additional Athena setup: permissions and query-result storage.
  3. Construct the URL to connect with Athena.

Create Athena table

To make the data on S3 queryable through Athena, we must create a table. One can use the below create table statement. Alternatively, one can use AWS Glue with Snowplow's create_glue_table.sh script that is mentioned in this Snowplow blog post.

The SQL statement below assumes that a database has been created and selected already. Make sure to edit the location at the end of the statement, to reflect where you are storing your data.

CREATE EXTERNAL TABLE events(
app_id string,
platform string,
etl_tstamp timestamp,
collector_tstamp timestamp,
dvce_created_tstamp timestamp,
event string,
event_id string,
txn_id int,
name_tracker string,
v_tracker string,
v_collector string,
v_etl string,
user_id string,
user_ipaddress string,
user_fingerprint string,
domain_userid string,
domain_sessionidx int,
network_userid string,
geo_country string,
geo_region string,
geo_city string,
geo_zipcode string,
geo_latitude double,
geo_longitude double,
geo_region_name string,
ip_isp string,
ip_organization string,
ip_domain string,
ip_netspeed string,
page_url string,
page_title string,
page_referrer string,
page_urlscheme string,
page_urlhost string,
page_urlport int,
page_urlpath string,
page_urlquery string,
page_urlfragment string,
refr_urlscheme string,
refr_urlhost string,
refr_urlport int,
refr_urlpath string,
refr_urlquery string,
refr_urlfragment string,
refr_medium string,
refr_source string,
refr_term string,
mkt_medium string,
mkt_source string,
mkt_term string,
mkt_content string,
mkt_campaign string,
contexts string,
se_category string,
se_action string,
se_label string,
se_property string,
se_value double,
unstruct_event string,
tr_orderid string,
tr_affiliation string,
tr_total double,
tr_tax double,
tr_shipping double,
tr_city string,
tr_state string,
tr_country string,
ti_orderid string,
ti_sku string,
ti_name string,
ti_category string,
ti_price double,
ti_quantity int,
pp_xoffset_min int,
pp_xoffset_max int,
pp_yoffset_min int,
pp_yoffset_max int,
useragent string,
br_name string,
br_family string,
br_version string,
br_type string,
br_renderengine string,
br_lang string,
br_features_pdf boolean,
br_features_flash boolean,
br_features_java boolean,
br_features_director boolean,
br_features_quicktime boolean,
br_features_realplayer boolean,
br_features_windowsmedia boolean,
br_features_gears boolean,
br_features_silverlight boolean,
br_cookies boolean,
br_colordepth string,
br_viewwidth int,
br_viewheight int,
os_name string,
os_family string,
os_manufacturer string,
os_timezone string,
dvce_type string,
dvce_ismobile boolean,
dvce_screenwidth int,
dvce_screenheight int,
doc_charset string,
doc_width int,
doc_height int,
tr_currency string,
tr_total_base double,
tr_tax_base double,
tr_shipping_base double,
ti_currency string,
ti_price_base double,
base_currency string,
geo_timezone string,
mkt_clickid string,
mkt_network string,
etl_tags string,
dvce_sent_tstamp timestamp,
refr_domain_userid string,
refr_device_tstamp timestamp,
derived_contexts string,
domain_sessionid string,
derived_tstamp timestamp,
event_vendor string,
event_name string,
event_format string,
event_version string,
event_fingerprint string,
true_tstamp timestamp
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
ESCAPED BY '\\'
LINES TERMINATED BY '\n'
LOCATION 's3://<bucket>/<prefix>>/' -- Example: LOCATION 's3://objectiv-production-data/enriched/';
;

-- Column names and types were inspired by Snowplow's create_glue_table.sh

Additional Athena setup

All Athena query results are written to S3, to the so called 'S3 staging'. Therefore, an s3 bucket or prefix should be created that can be used for that purpose. This S3 prefix should not overlap with the location of the data.

An account should be setup, with the following access:

  • S3:
    • read access to the data, i.e. the LOCATION that the events table gets its data from.
    • write access to the S3 staging prefix.
  • Athena: access to query/create/etc. tables.
  • Glue: access to Glue data catalog.

Construct Athena URL

To use the Objectiv modelhub with Athena, in Python one must simply provide a url to connect to the database. The general form of a URL for Athena is:

awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/
{schema_name}?s3_staging_dir={s3_staging_dir}

With the following fields set:

  • aws_access_key_id: account key id - optional
  • aws_secret_access_key: secret access key - optional
  • region_name: region, e.g. eu-west-1
  • schema_name: name of database that contains the events table defined earlier
  • s3_staging_dir: path to S3 where query results can be stored

Additionally, there are optional query string parameters that can be added to the url:

  • work_group: work-group under which the queries run
  • catalog_name: catalog that contains the table definitions

If the access key fields are not set, then the Athena library will use default mechanisms to get credentials (environment variables, config files, etc. See Amazon docs)

All values must be properly escaped. As a help, the modelhub offers a function that constructs the url: athena_construct_engine_url().

from modelhub import athena_construct_engine_url

db_url = athena_construct_engine_url(
aws_access_key_id='access key id',
aws_secret_access_key='secret',
region_name='eu-west-1',
schema_name='snowplow_data',
s3_staging_dir='s3://staging-bucket-name/staging/my_name/'
)
# note that db_url contains the secret access key, this must be kept secret!
# Now set db_url as environment variable 'DSN', or pass it directly to modelhub as db_url

# Example usage:
from modelhub import ModelHub
df = ModelHub().get_objectiv_dataframe(db_url=db_url)