I have setup scala stream collector with kafka as sink which is running on AWS EC2 t2.xlarge instance.
The javascript tracker which send the data to collector is taking more than 200ms for a single request. What should I do to reduce the response time. I am expecting response time to be less than 50ms.
I have requests below 200 but highly over 50 ms. But i am quite far away from my collector On the other hand - this should have no influence on performance as long as your requests are non blocking asynchronous ones.
I have collectors on individual M5 backed docker cluster, with ALB and CloudFront (so i believe some ms can be cut while not using CF, but i need it for some reasons).
How are your instance CPU credits? I would not risk to use burstable instance type for production collector tbh…
collector {
interface = "0.0.0.0"
interface = ${?COLLECTOR_INTERFACE}
port = 80
port = ${?COLLECTOR_PORT}
'vendor/version' protocol.
paths {
# "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
# "/com.acme/redirect" = "/r/tp2"
# "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
}
p3p {
policyRef = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}
crossDomain {
enabled = false
# Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
domains = [ "*" ]
domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
# Whether to only grant access to HTTPS or both HTTPS and HTTP sources
secure = true
secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
}
cookie {
enabled = true
#enabled = ${?COLLECTOR_COOKIE_ENABLED}
expiration = 365d # e.g. "365 days"
#expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
# Network cookie name
name = l5_sp
#name = ${?COLLECTOR_COOKIE_NAME}
# The domain is optional and will make the cookie accessible to other
# applications on the domain. Comment out this line to tie cookies to
# the collector's full domain
#domain = ".layerfive.com"
#domain = ${?COLLECTOR_COOKIE_DOMAIN}
secure = false
secure = ${?COLLECTOR_COOKIE_SECURE}
httpOnly = false
httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
}
doNotTrackCookie {
enabled = false
enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
name = lfcookiename
name = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_NAME}
value = lfcookievalue
value = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_VALUE}
}
cookieBounce {
enabled = false
enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
# The name of the request parameter which will be used on redirects checking that third-party
# cookies work.
name = "n3pc"
name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
# Network user id to fallback to when third-party cookies are blocked.
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
# Optionally, specify the name of the header containing the originating protocol for use in the
# bounce redirect location. Use this if behind a load balancer that performs SSL termination.
# The value of this header must be http or https. Example, if behind an AWS Classic ELB.
forwardedProtocolHeader = "X-Forwarded-Proto"
forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
}
redirectMacro {
enabled = false
enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
# Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
placeholder = "[TOKEN]"
placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
}
rootResponse {
enabled = false
enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
statusCode = 302
statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
# Optional, defaults to empty map
headers = {
Location = "https://127.0.0.1/",
Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
X-Custom = "something"
}
# Optional, defaults to empty string
body = "302, redirecting"
body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
}
# Configuration related to CORS preflight requests
cors {
# The Access-Control-Max-Age response header indicates how long the results of a preflight
# request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
accessControlMaxAge = 5 seconds
accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
}
# Configuration of prometheus http metrics
prometheusMetrics {
# If metrics are enabled then all requests will be logged as prometheus metrics
# and '/metrics' endpoint will return the report about the requests
enabled = false
enabled = ${?COLLECTOR_PROMETHEUS_METRICS_ENABLED}
# Custom buckets for http_request_duration_seconds_bucket duration metric
#durationBucketsInSeconds = [0.1, 3, 10]
#durationbucketsInSeconds = ${?COLLECTOR_PROMETHEUS_METRICS_DURATION_BUCKETS_IN_SECONDS}
}
# Configuration of prometheus http metrics
prometheusMetrics {
# If metrics are enabled then all requests will be logged as prometheus metrics
# and '/metrics' endpoint will return the report about the requests
enabled = false
# Custom buckets for http_request_duration_seconds_bucket duration metric
#durationBucketsInSeconds = [0.1, 3, 10]
}
streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = mytopic_good
good = ${?COLLECTOR_STREAMS_GOOD}
# Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
bad = mytopic_bad
bad = ${?COLLECTOR_STREAMS_BAD}
# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false
useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}
# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, googlepubsub, kafka, nsq, or stdout.
# To use stdout, comment or remove everything in the "collector.streams.sink" section except
# "enabled" which should be set to "stdout".
enabled = kafka
# enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}
# Region where the streams are located
# region = {{kinesisRegion}}
# region = ${?COLLECTOR_STREAMS_SINK_REGION}
## Optional endpoint url configuration to override aws kinesis endpoints,
## this can be used to specify local endpoints when using localstack
# customEndpoint = {{kinesisEndpoint}}
# customEndpoint = ${?COLLECTOR_STREAMS_SINK_CUSTOM_ENDPOINT}
# Thread pool size for Kinesis API requests
# threadPoolSize = 10
# threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}
# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
# aws {
# accessKey = iam
# accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
# secretKey = iam
# secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
# }
# # Minimum and maximum backoff periods, in milliseconds
# backoffPolicy {
# minBackoff = {{minBackoffMillis}}
# minBackoff = ${?COLLECTOR_STREAMS_SINK_MIN_BACKOFF}
# maxBackoff = {{maxBackoffMillis}}
# maxBackoff = ${?COLLECTOR_STREAMS_SINK_MAX_BACKOFF}
# }
# Or Google Pubsub
#googleProjectId = ID
## Minimum, maximum and total backoff periods, in milliseconds
## and multiplier between two backoff
#backoffPolicy {
# minBackoff = {{minBackoffMillis}}
# maxBackoff = {{maxBackoffMillis}}
# totalBackoff = {{totalBackoffMillis}} # must be >= 10000
# multiplier = {{backoffMultiplier}}
#}
# Or Kafka
brokers = "10.175.17.158:9092"
## Number of retries to perform before giving up on sending a record
retries = 0
# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" -brokers
# retries -retries
# "buffer.memory" -buffer.byteLimit
# "linger.ms" -buffer.timeLimit
producerConf {
acks = all
# "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
# "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
}
# Or NSQ
## Host name for nsqd
#host = "{{nsqHost}}"
## TCP port for nsqd, 4150 by default
#port = {{nsqdPort}}
}
# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 4500000
byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
recordLimit = 500 # Not supported by Kafka; will be ignored
recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
timeLimit = 60000
timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
}
}
}
# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
loglevel = OFF # 'OFF' for no logging, 'DEBUG' for all logging.
loglevel = ${?AKKA_LOGLEVEL}
loggers = ["akka.event.slf4j.Slf4jLogger"]
loggers = [${?AKKA_LOGGERS}]
# akka-http is the server the Stream collector uses and has configurable options defined at
# http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
http.server {
# To obtain the hostname in the collector, the 'remote-address' header
# should be set. By default, this is disabled, and enabling it
# adds the 'Remote-Address' header to every request automatically.
remote-address-header = on
remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}
raw-request-uri-header = on
raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}
# Define the maximum request length (the default is 2048)
parsing {
max-uri-length = 32768
max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
uri-parsing-mode = relaxed
uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
}
}
}
Are you running behind a Load Balancer? Application or Classic?
No, I am not using any load balancer. Directly serving from EC2 instance
How are you running the Collector? Directly via the JAR? Docker container on the EC2 node?
I am using Docker container on the EC2 on which Kafka is also running as container.
How have you configured the JVM process? How much memory has been allocated to it?
No specific memory assigned, using the default one
Are you running anything else on this server apart from the Collector?
Yes, kafka and zookeeper as docker container
What amount of traffic are you sending when you see this response time?
When I hit the single request it take around 200-250 ms. When I tested it with 200 users the RPM was 193 and average response time was 700ms which is too high.
Apart from this, when I used cloudfront collector it took only 60ms which is what I am expecting but CF collector does not suit my requirements and I want to use SSC.
You’ll definitely want to run your EC2 instances behind a load balancer - this will help improve response time as well as other factors such as scalability of the pipeline.
Docker on EC2 is fine but it’s not recommended to run additional services (such as Kafka) on the same machine to avoid any contention between services (e.g., microservice infrastructure).
You’ll likely want to tune this according to your EC2 instance type and any other services running on the machine.
That’s very slow but that latency could come from a wide variety of sources including (which can each be debugged individually):
Network (including DNS, SSL, HTTP(S) request etc) between the client and the collector
Latency in the collector processing the event (should not be high unless there’s some resource contention)
Latency in writing to the Kafka topic (quite likely if Kafka is running on the same machine within a docker container)
Latency in returning a response to the client (e.g., TTFB)