At its recent re:Invent Amazon announced availability of Athena which let’s you query data in S3 buckets using standard SQL. I decided to give it a try to analyze bad row data in our S3 buckets.
Using Athena is very easy - you just create a new database and point the data source for that data to your S3 bucket. You choose the data format (I used JSON) and define your columns. I set up 3 columns following the format of the bad rows files:
line
error
timestamp
Once you have this setup you can begin to run your queries. Following the guidelines in the Snowplow documentation for filtering out “expected” bad rows I used the following query which returned a list of errors I need to worry about and the count of each one:
SELECT error, count(*) FROM badrows
where line not like
'%/crossdomain.xml%'
and line not like
'%OPTIONS%'
group by error
Once I had this info I just removed the group by and did a simple select for the lines giving me large number of errors.
The results from these queries came back very fast and you only get charged by Amazon when you run a query. Unlike other methods for searching through bad row data there is no setup of any other AWS resources, and no need to keep a resource online (and incur ongoing costs).
When using Athena you are billed by the amount of data scanned in the query. To reduce costs, I copied a few days of data from my Snowplow bad rows bucket into a new bucket and set that up as my datasource - this way I didn’t need to scan my entire bad rows history each time (and this also speeds up the query).
This is really cool @dwaxman. We’ve also been looking at Athena recently to query archived Redshift data with the idea of migrating older data out of Redshift (expensive store) and into S3 (significantly cheaper).
One thing that makes a massive difference in this case when you have a large number of columns as you do in atomic.events is to utilise a columnar based format rather than a straight CSV. From the testing we’ve done so far we’re seeing compression ratios of 8-10 when using gzipped Parquet files over CSV as text. This also results in lower cost queries (less data scanned) and lower S3 costs.
For those interested in querying Snowpow bad rows using Athena, here’s the Hive DDL to create a bad_rows table:
CREATE EXTERNAL TABLE IF NOT EXISTS bad_rows (
line string,
errors array<struct<level: string, message: string>>,
failure_tstamp string
) PARTITIONED BY (
run string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
LOCATION 's3://<your bucket here>/path/to/bad-rows/'
Unfortunately failure_tstamp is not in a Hive-compatible format so it has to be parsed as string, but can be later converted to a timestamp using UDFs.
We created two separate tables, one for enriched and one for shredded. Doing some sort of filtering on the run column will limit the amount of data Athena touches, reducing the cost of the query. Being mindful of partitions and amount of data read is even more important with events stored as text files. As @mike mentioned, It is strongly recommended that you store data compressed and in Parquet or some other columnar format as it will reduce costs anywhere between 30-90%. We use Spark 2.x’s default, Snappy-compressed Parquet.
@dwaxman@rgabo@mike thanks so much for all the insight! I’m only getting a chance to start playing with Athena now - so very late to the party … However, the AWS guys have done a great job of keeping it very simple to get started with. (Nothing like firing up Hive on EMR back in the day…)
One question - what’s the most elegant way of checking that none of the messages in the errors array contain a particular value? I create my table as per @rgabo’s DDL:
CREATE EXTERNAL TABLE IF NOT EXISTS bad_rows (
line string,
errors array<struct<level: string, message: string>>,
failure_tstamp string
) PARTITIONED BY (
run string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
LOCATION 's3://<your bucket here>/path/to/bad-rows/'
I then want to fetch 10 lines of data that do not contain 'Unrecognized event:
SELECT
errors[1].message,
line
FROM bad_rows
WHERE run > '2017-01-01'
AND position('Unrecognized event' in errors[1].message) = 0
AND position('Request path' in errors[1].message) = 0
AND position('Field [vp]' in errors[1].message) = 0
LIMIT 10;
The trouble with the above is I’m only checking (and returning) the first message in the errors array. Athena doesn’t support Lambda functions or UDFs. How can I either search the errors array of JSONs and return lines which do or do not contain a particular string in the message field?