Running snowplow-s3-loader in K8s config.hocon configuration error

Hi All,

I’m trying to run snowplow-s3-loader:2.0.0-rc2 on K8s I’m using version 2.0.0 since i’m using oidc authentication to AWS which is not supported in version 1

When running the container i get this error code:

ConfigReaderFailures(ConvertFailure(ExceptionThrown(DecodingFailure(Attempt to decode value on failed cursor, List(DownField(purpose)))),Some(ConfigOrigin(/snowplow/config/config.hocon)),))

The path /snowplow/config/config.hocon is the mount path for the volume
When running version 2.0.0-rc3 the error message is even more cryptic…

I’m adding my config.hocon file here

# Default configuration for s3-loader

# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = "kinesis"

# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = "kinesis"

# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
  accessKey = "default"
  secretKey = "default"
}

# Config for NSQ
nsq {
  # Channel name for NSQ source
  # If more than one application reading from the same NSQ topic at the same time,
  # all of them must have unique channel name for getting all the data from the same topic
  channelName = ""

  # Host name for NSQ tools
  host = ""

  # HTTP port for nsqd
  port = 0

  # HTTP port for nsqlookupd
  lookupPort = 0
}

kinesis {
  # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
  # Note: This only affects the first run of this application on a stream.
  initialPosition = "LATEST"

  # Need to be specified when initialPosition is "AT_TIMESTAMP".
  # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
  # Ex: "2017-05-17T10:00:00Z"
  # Note: Time need to specified in UTC.
  # initialTimestamp = ""

  # Maximum number of records to read per GetRecords call
  maxRecords = 500

  region = "us-east-1"

  # "appName" is used for a DynamoDB table to maintain stream state.
  appName = "snowplow-s3-loader"

  ## Optional endpoint url configuration to override aws kinesis endpoints,
  ## this can be used to specify local endpoints when using localstack
  # customEndpoint = {{kinesisEndpoint}}

  # Optional override to disable CloudWatch metrics for KCL
  # disableCloudWatch = true
}

streams {
  # Input stream name
  inStreamName = "sp-enrich-good-stream"

  # Stream for events for which the storage process fails
  outStreamName = "sp-s3-loader-bad-stream"

  # Events are accumulated in a buffer before being sent to S3.
  # The buffer is emptied whenever:
  # - the combined size of the stored records exceeds byteLimit or
  # - the number of stored records exceeds recordLimit or
  # - the time in milliseconds since it was last emptied exceeds timeLimit
  buffer {
      byteLimit = 104857600 # 100mb
      recordLimit = 100000
      timeLimit = 600000 # 10 minutes
    }
}

s3 {
  region = "us-east-1"
  bucket = "my-output-bucket"
  # optional bucket where to store partitioned data
  # partitionedBucket = "{{s3bucket}}/partitioned"

  # optional date format prefix for directory pattern
  # eg: {YYYY}/{MM}/{dd}/{HH}
  # dateFormat = "{{s3DateFormat}}"

  # optional directory structure to use while storing data on s3 (followed by dateFormat config)
  # eg: outputDirectory = "enriched/good/"
  # outputDirectory = "{{s3OutputDirectory}}"

  # optional filename prefix
  # eg: output
  # filenamePrefix = "{{s3DFilenamePrefix}}"

  # Format is one of lzo or gzip
  # Note, that you can use gzip only for enriched data stream.
  format = "gzip"

  # Maximum Timeout that the application is allowed to fail for (in milliseconds)
  maxTimeout = 60000

  ## Optional endpoint url configuration to override aws s3 endpoints,
  ## this can be used to specify local endpoints when using localstack
  # customEndpoint = {{kinesisEndpoint}}
}

# Optional section for tracking endpoints
# monitoring {
#  snowplow{
#    collectorUri = "{{collectorUri}}"
#    collectorPort = 80
#    appId = "{{appName}}"
#    method = "{{method}}"
#  }
# }

If there’s a more verbose debugging way of running the program I would very much like to know.

Hi @avi_eshel_ct ,

The issue is that the field purpose is missing in the configuration, as 2.0.0 comes with some updates in the configuration format. You can find a full example here.

1 Like

I’ve similar problem, add purpose field didn’t fix the issue.

please share the entire config

Hi @pramod.niralakeri ,

Given that you reached shredding step I assume that you are not stuck any more?