Hey @Tejas_Behra - ill walk through how we generally setup ES Loaders and hopefully there is something in there that can help!
I would also recommend upgrading to the latest 1.0.0 release of the loader.
First up the different components!
Loader HOCON example:
source = "kinesis"
sink {
good = "elasticsearch"
bad = "kinesis"
}
enabled = "good"
aws {
accessKey = iam
secretKey = iam
}
queue {
enabled = kinesis
initialPosition = "LATEST"
initialTimestamp = ""
maxRecords = 10000
region = "<region>" # e.g. eu-west-1
appName = "<unique app name>" # e.g. com-acme-es-loader-enriched
channelName = ""
host = ""
port = -1
lookupPort = -1
disableCloudWatch = true # Disables custom metrics being published
}
streams {
inStreamName = "<enriched kinesis stream name>"
outStreamName = "<bad kinesis stream name>"
buffer {
byteLimit = 1000000
recordLimit = 500
timeLimit = 250
}
}
elasticsearch {
client {
endpoint = "vpc-<internal endpoint>.es.amazonaws.com"
port = "443"
maxTimeout = "10000"
maxRetries = 6
ssl = true
}
aws {
signing = true
region = "eu-west-1"
}
cluster {
name = "<simple name of cluster>"
index = "<good index name>"
documentType = "good"
}
}
Note: The documentType links onto the Elasticsearch index mapping and we generally use “good” or “bad” as our documentType’s internally.
Example IAM policy for Loader:
{
"Version" : "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:ListTables",
"dynamodb:BatchWriteItem",
"dynamodb:PutItem",
"dynamodb:CreateTable",
"dynamodb:DescribeTable",
"dynamodb:DeleteItem",
"dynamodb:GetItem",
"dynamodb:Scan",
"dynamodb:Query",
"dynamodb:UpdateItem",
"kinesis:*",
"s3:GetObject",
"cloudwatch:ListMetrics",
"cloudwatch:PutMetricAlarm",
"cloudwatch:PutMetricData",
"ec2:DescribeTags",
"es:DescribeElasticsearchDomains",
"es:ListDomainNames",
"es:DescribeElasticsearchDomain",
"es:ESHttpGet",
"es:ESHttpHead",
"es:ESHttpDelete",
"es:ESHttpPost",
"es:ESHttpPut",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:DescribeLogStreams"
],
"Resource": ["*"]
}
]
}
Wide open Access Policy for the VPC based AWS Elasticsearch Cluster - you can tighten this up but this will let you send signed requests to the AWS Elasticsearch Cluster:
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "es:*",
"Principal": "*",
"Effect": "Allow",
"Resource": "arn:aws:es:${var.aws_region}:${var.account_id}:domain/${var.domain_name}/*"
}
]
}
Creating the index in Elasticsearch
This is the default mapping file we use when setting up for loading “enriched” data to Elasticsearch:
{
"settings": {
"analysis": {
"analyzer": {
"default": {
"type": "keyword"
}
}
},
"index" : {
"number_of_replicas" : "1",
"number_of_shards" : "${shards}"
}
},
"mappings": {
"good": {
"properties": {
"app_id": {
"type": "keyword"
},
"br_colordepth": {
"type": "keyword"
},
"br_cookies": {
"type": "boolean"
},
"br_family": {
"type": "keyword"
},
"br_features_director": {
"type": "boolean"
},
"br_features_flash": {
"type": "boolean"
},
"br_features_gears": {
"type": "boolean"
},
"br_features_java": {
"type": "boolean"
},
"br_features_pdf": {
"type": "boolean"
},
"br_features_quicktime": {
"type": "boolean"
},
"br_features_realplayer": {
"type": "boolean"
},
"br_features_silverlight": {
"type": "boolean"
},
"br_features_windowsmedia": {
"type": "boolean"
},
"br_lang": {
"type": "keyword"
},
"br_name": {
"type": "keyword"
},
"br_renderengine": {
"type": "keyword"
},
"br_type": {
"type": "keyword"
},
"br_version": {
"type": "keyword"
},
"br_viewheight": {
"type": "long"
},
"br_viewwidth": {
"type": "long"
},
"collector_tstamp": {
"type": "date",
"format": "dateOptionalTime"
},
"doc_charset": {
"type": "keyword"
},
"doc_height": {
"type": "long"
},
"doc_width": {
"type": "long"
},
"domain_sessionid": {
"type": "keyword"
},
"domain_sessionidx": {
"type": "long"
},
"domain_userid": {
"type": "keyword"
},
"dvce_ismobile": {
"type": "boolean"
},
"dvce_screenheight": {
"type": "long"
},
"dvce_screenwidth": {
"type": "long"
},
"dvce_sent_tstamp": {
"type": "date",
"format": "dateOptionalTime"
},
"dvce_tstamp": {
"type": "date",
"format": "dateOptionalTime"
},
"dvce_type": {
"type": "keyword"
},
"etl_tstamp": {
"type": "date",
"format": "dateOptionalTime"
},
"event": {
"type": "keyword"
},
"event_id": {
"type": "keyword"
},
"geo_location": {
"type": "geo_point"
},
"mkt_campaign": {
"type": "keyword"
},
"mkt_content": {
"type": "keyword"
},
"mkt_medium": {
"type": "keyword"
},
"mkt_source": {
"type": "keyword"
},
"mkt_term": {
"type": "keyword"
},
"name_tracker": {
"type": "keyword"
},
"network_userid": {
"type": "keyword"
},
"os_family": {
"type": "keyword"
},
"os_manufacturer": {
"type": "keyword"
},
"os_name": {
"type": "keyword"
},
"os_timezone": {
"type": "keyword"
},
"page_referrer": {
"type": "keyword"
},
"page_title": {
"type": "keyword"
},
"page_url": {
"type": "keyword"
},
"page_urlfragment": {
"type": "keyword"
},
"page_urlhost": {
"type": "keyword"
},
"page_urlpath": {
"type": "keyword"
},
"page_urlport": {
"type": "long"
},
"page_urlquery": {
"type": "keyword"
},
"page_urlscheme": {
"type": "keyword"
},
"platform": {
"type": "keyword"
},
"pp_xoffset_max": {
"type": "long"
},
"pp_xoffset_min": {
"type": "long"
},
"pp_yoffset_max": {
"type": "long"
},
"pp_yoffset_min": {
"type": "long"
},
"refr_medium": {
"type": "keyword"
},
"refr_source": {
"type": "keyword"
},
"refr_term": {
"type": "keyword"
},
"refr_urlfragment": {
"type": "keyword"
},
"refr_urlhost": {
"type": "keyword"
},
"refr_urlpath": {
"type": "keyword"
},
"refr_urlport": {
"type": "long"
},
"refr_urlquery": {
"type": "keyword"
},
"refr_urlscheme": {
"type": "keyword"
},
"se_action": {
"type": "keyword"
},
"se_category": {
"type": "keyword"
},
"se_label": {
"type": "keyword"
},
"user_fingerprint": {
"type": "keyword"
},
"user_id": {
"type": "keyword"
},
"user_ipaddress": {
"type": "keyword"
},
"useragent": {
"type": "keyword"
},
"v_collector": {
"type": "keyword"
},
"v_etl": {
"type": "keyword"
},
"v_tracker": {
"type": "keyword"
},
"dvce_created_tstamp": {
"type": "date",
"format": "dateOptionalTime"
},
"txn_id": {
"type": "long"
},
"geo_country": {
"type": "keyword"
},
"geo_region": {
"type": "keyword"
},
"geo_city": {
"type": "keyword"
},
"geo_zipcode": {
"type": "keyword"
},
"geo_latitude": {
"type": "double"
},
"geo_longitude": {
"type": "double"
},
"geo_region_name": {
"type": "keyword"
},
"ip_isp": {
"type": "keyword"
},
"ip_organization": {
"type": "keyword"
},
"ip_domain": {
"type": "keyword"
},
"ip_netspeed": {
"type": "keyword"
},
"se_property": {
"type": "keyword"
},
"se_value": {
"type": "keyword"
},
"tr_orderid": {
"type": "keyword"
},
"tr_affiliation": {
"type": "keyword"
},
"tr_total": {
"type": "double"
},
"tr_tax": {
"type": "double"
},
"tr_shipping": {
"type": "double"
},
"tr_city": {
"type": "keyword"
},
"tr_state": {
"type": "keyword"
},
"tr_country": {
"type": "keyword"
},
"tr_currency": {
"type": "keyword"
},
"tr_total_base": {
"type": "double"
},
"tr_tax_base": {
"type": "double"
},
"tr_shipping_base": {
"type": "double"
},
"ti_orderid": {
"type": "keyword"
},
"ti_sku": {
"type": "keyword"
},
"ti_name": {
"type": "keyword"
},
"ti_category": {
"type": "keyword"
},
"ti_price": {
"type": "double"
},
"ti_quantity": {
"type": "long"
},
"ti_currency": {
"type": "keyword"
},
"ti_price_base": {
"type": "double"
},
"base_currency": {
"type": "keyword"
},
"geo_timezone": {
"type": "keyword"
},
"mkt_clickid": {
"type": "keyword"
},
"mkt_network": {
"type": "keyword"
},
"etl_tags": {
"type": "keyword"
},
"refr_domain_userid": {
"type": "keyword"
},
"refr_device_tstamp": {
"type": "date",
"format": "dateOptionalTime"
},
"derived_tstamp": {
"type": "date",
"format": "dateOptionalTime"
},
"event_vendor": {
"type": "keyword"
},
"event_name": {
"type": "keyword"
},
"event_format": {
"type": "keyword"
},
"event_version": {
"type": "keyword"
},
"event_fingerprint": {
"type": "keyword"
},
"true_tstamp": {
"type": "date",
"format": "dateOptionalTime"
}
}
}
}
}
Inserted into the cluster like so:
curl -XPUT --fail 'https://${var.cluster_endpoint}/${var.index_name_good}?pretty' -H 'Content-Type: application/json' -d '${local.mapping_good}'
Note: The index_name_good should be the same index name as what you have put into your config HOCON.
At this point you should have the index created in Elasticsearch with the correct mapping and you should be ready to load to the cluster!
In terms of network security you do want to ensure that you have allowed inbound traffic on port 443 TCP for the AWS Elasticsearch Cluster and outbound traffic on port 443 TCP from the node running the Elasticsearch Loader - if you can already query the endpoint from the server when SSHed in then this should be fine.
Hope this helps!