S3 loader Monitoring

Hi,
I was trying to add monitoring.metrics.statsd conf in s3loader-2.2.5 deployed in kubernetes container
config.hocon

{
  # Optional, but recommended
  "region": ${envregion}
  # Options are: RAW, ENRICHED_EVENTS, JSON
  # RAW simply sinks data 1:1
  # ENRICHED_EVENTS work with monitoring.statsd to report metrics (identical to RAW otherwise)
  # SELF_DESCRIBING partitions self-describing data (such as JSON) by its schema
  "purpose": "ENRICHED_EVENTS"
  # Input Stream config
  "input": {
    # Kinesis Client Lib app name (corresponds to DynamoDB table name)
    "appName": "snowplow-s3-loader-kinesis"
    # Kinesis stream name
    "streamName": ${enrich_good}
    # Options are: LATEST, TRIM_HORIZON, AT_TIMESTAMP
    "position": "LATEST"
    # Max batch size to pull from Kinesis
    "maxRecords": 10000
  }

  "output": {
    "s3": {
      # Full path to output data
      "path": ${s3path}
      # Partitioning format; Optional
      # Valid substitutions are {vendor}, {schema}, {format}, {model} for self-describing jsons
      # and {yy}, {mm}, {dd}, {hh} for year, month, day, hour
      partitionFormat: "year={yy}"/"month={mm}"
      # Prefix for all file names; Optional
      "filenamePrefix": ${prefix}
      # Maximum Timeout that the application is allowed to fail for, e.g. in case of S3 outage
      "maxTimeout": 2000
      # Output format; Options: GZIP, LZO
      "compression": "GZIP"
    }

    # Kinesis Stream to output failures
    "bad": {
      "streamName": ${enrich_bad}
    }
  }

  # Flush control. A first limit the KCL worker hits will trigger flushing
  "buffer": {
    # Maximum bytes to read before flushing
    "byteLimit": 67108864
    # Maximum records to read before flushing
    "recordLimit": 20000
    # Maximum time between flushes
    "timeLimit": 600000
  }

  # Optional
  "monitoring": {
    "metrics": {
      # StatsD-powered metrics; Optional
      "statsd": {
        "hostname": "localhost",
        "port": 8125,
        "tags": {
          # It can resolve environment variables
          # "worker": ${HOST}
        }
        # Optional
        "prefix": ${log_prefix}
      }
    }
  }
}

No UDP port is present in container

Response of netstat -a in loader container

/snowplow-loader # netstat -a
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State
tcp        0      0 clickstream-etl-s3loader-5f758b5588-bcgbm:57178 ec2-3-227-250-206.compute-1.amazonaws.com:https ESTABLISHED
tcp        0      0 clickstream-etl-s3loader-5f758b5588-bcgbm:39132 52.94.0.104:https       ESTABLISHED
Active UNIX domain sockets (servers and established)
Proto RefCnt Flags       Type       State         I-Node Path
unix  2      [ ]         STREAM     CONNECTED     24179739
unix  2      [ ]         STREAM     CONNECTED     24179748

I would also like to know if there is any way to write s3loader metrices directly to stdout like enricher

enricher_config.hocon

{
  "input": {
    "type": "Kinesis"

    # Optional. Name of the application which the KCL daemon should assume
    "appName": "snowplow-enrich-kinesis"

    # Name of the Kinesis stream to read from
    "streamName": ${collector_good}

    # Optional. Region where the Kinesis stream is located
    # This field is optional if it can be resolved with AWS region provider chain.
    # It checks places like env variables, system properties, AWS profile file.
    # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
    "region": ${envregion}

    # Optional, set the initial position to consume the Kinesis stream
    # Must be TRIM_HORIZON, LATEST or AT_TIMESTAMP
    # LATEST: most recent data.
    # TRIM_HORIZON: oldest available data.
    # AT_TIMESTAMP: start from the record at or after the specified timestamp
    "initialPosition": {
      "type": "TRIM_HORIZON"
    }
    # "initialPosition": {
    #   "type": "AT_TIMESTAMP"
    #   "timestamp": "2020-07-17T10:00:00Z" # Required for AT_TIMESTAMP
    # }

    # Optional, set the mode for retrieving records.
    "retrievalMode": {
      "type": "Polling"

      # Maximum size of a batch returned by a call to getRecords.
      # Records are checkpointed after a batch has been fully processed,
      # thus the smaller maxRecords, the more often records can be checkpointed
      # into DynamoDb, but possibly reducing the throughput.
      "maxRecords": 10000
    }
    # "retrievalMode": {
    #   "type": "FanOut"
    # }

    # Optional. Size of the internal buffer used when reading messages from Kinesis,
    # each buffer holding up to maxRecords from above
    "bufferSize": 3

    # Optional. Settings for backoff policy for checkpointing.
    # Records are checkpointed after all the records of the same chunk have been enriched
    "checkpointBackoff": {
      "minBackoff": 100 milliseconds
      "maxBackoff": 10 seconds
      "maxRetries": 10
    }

    # Optional, endpoint url configuration to override aws kinesis endpoints
    # Can be used to specify local endpoint when using localstack
    # "customEndpoint": "http://localhost:4566"

    # Optional, endpoint url configuration to override aws dyanomdb endpoint for Kinesis checkpoints lease table
    # Can be used to specify local endpoint when using localstack
    #"dynamodbCustomEndpoint": "http://dynamodb:us-east-1:180648733583:table/snowplow-enrich-kinesis:8000"

    # Optional, endpoint url configuration to override aws cloudwatch endpoint for metrics
    # Can be used to specify local endpoint when using localstack
    # "cloudwatchCustomEndpoint": "http://localhost:4582"
  }

  "output": {
    "good": {
      "streamName": ${enrich_good}
	  
	  # Optional. Limits the number of events in a single PutRecords request.
      # Several requests are made in parallel
      # Maximum allowed: 500
      "recordLimit": 500

      # Optional. Limits the number of bytes in a single PutRecords request,
      # including records and partition keys.
      # Several requests are made in parallel
      # Maximum allowed: 5 MB
      "byteLimit": 5242880
    }

    "bad": {
      "streamName": ${enrich_bad}
	  
	  # Optional. Limits the number of events in a single PutRecords request.
      # Several requests are made in parallel
      # Maximum allowed: 500
      "recordLimit": 500

      # Optional. Limits the number of bytes in a single PutRecords request,
      # including records and partition keys.
      # Several requests are made in parallel
      # Maximum allowed: 5 MB
      "byteLimit": 5242880
    }
  }
  
  # Optional. Concurrency of the app
  "concurrency" : {
    # Number of events that can get enriched at the same time within a chunk
    "enrich": 256
    # Number of chunks that can get sunk at the same time
    # WARNING: if greater than 1, records can get checkpointed before they are sunk
    "sink": 1
  }

  #monitoring stdout
  "monitoring": {
    "metrics": {
      "stdout": {
        "period": "1 minute"
        "prefix": ${log_prefix}
      }
    }
  }

}

Hi @Sreenath, no the S3 loader cannot be configured to print the metrics to stdout. But I agree with you this would be a nice improvement to the loader. I opened a github issue to look into this, so hopefully we can include it in a future release.

@istreeter
Added statsd monitoring in loader config but there no UDP port created. Can you check s3loader config above and please advise if there is any changes needed in config

Any updates on this

Hi @Sreenath, the config you shared is correct, but there might be a misunderstanding about what this config does. The S3 loader does not open a UDP port for listening. So if you run netstat -a then you should not expect to see a UDP port listed.

Instead, the loader sends metrics to the specified UDP port. Or in other words, it expects that you have manually launched a statsd-compatible server, which is listening on port 8125.

So your next question might be: how can you run a server that listens on port 8125. Well…

  • The statsd github repo has some tools for running statsd servers.
  • Datadog is a monitoring tool with support for receiving statsd metrics
  • Or in theory you could build your own custom tool to listen on port 8125. Afterall, the statsd protocol is very simple to parse.
1 Like

@istreeter
Hi,
My objective is to bring these monitoring logs to stdout, since s3loader does not provide any option like enricher. I had used statsd monitoring config and run simple python script which listen to UDP port and print that matric log to stdout. I was able to do the same strategy for collector but not working on s3loader.

Python Code used:

# Script to push monitoring log from UDP to STDOUT
import socket

UDP_IP = "0.0.0.0"
UDP_PORT = 8125

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))

while True:
    data, addr = sock.recvfrom(8125)
    output = data.decode()
    print(output)

Note: I am running this python script in the same container in which s3loader resides

Hi @Sreenath, in theory the approach should work. Actually, the script you shared is a good example of how Snowplow users could write a statsd receiver to handle these metrics.

However, the s3 loader docker image does not contain python, so I don’t see how it is possible to run this script inside the same container. Maybe you could check how you are running this script, and confirm whether it is definitely running? If you run netstat -a can you see the python script has opened a UDP port for listening?

@istreeter
Instead using snowplow docker image we had created a docker file and run respective jar files, so we have added python in our container
Collector docker file

FROM alpine:3.15.0
ARG envname
ARG envregion

#Kinesis Streams Region
ENV envregion ${envregion}

#snowplow collector version
ENV collector_version=2.8.2

#Kinesis Streams
ENV collector_good=collector-good-${envname}
ENV collector_bad=collector-bad-${envname}

#SQS queue
ENV sqs_queue=dead-letter-queue-${envname}

#log
ENV log_prefix=${envname}-snowplow.collector.

#Install openjdk-11
RUN apk update && \
    apk add openjdk11 python3 py3-pip wget && \
    pip3 install awscli ;

#Collector port
EXPOSE 8080/tcp

COPY . /snowplow-collector
WORKDIR /snowplow-collector

RUN wget https://github.com/snowplow/stream-collector/releases/download/$collector_version/snowplow-stream-collector-kinesis-$collector_version.jar

CMD source start.sh

In starts sh file we have bash commands to run python script for log_push and collector jar. I hope you had understood