Collector on ECS/Fargate cannot access Kinesis

Hi,

We are deploying the snowplow collector on fargate and we get the following error when we try to connect to the kinesis streams it seems that we do not have the right credentials. In the config we specify IAM as a way to authenticate:

[main] WARN com.amazonaws.http.AmazonHttpClient - SSL Certificate checking for endpoints has been explicitly disabled.
Exception in thread “main” com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
at com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
at com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
at com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:172)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1225)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:801)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:751)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
at com.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:875)
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:846)
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:887)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$.streamExists(KinesisSink.scala:125)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$.$anonfun$createAndInitialize$2(KinesisSink.scala:52)
at scala.util.Either.flatMap(Either.scala:341)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$.createAndInitialize(KinesisSink.scala:50)
at com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector$.$anonfun$main$2(KinesisCollector.scala:38)
at scala.util.Either.flatMap(Either.scala:341)
at com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector$.main(KinesisCollector.scala:30)
at com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector.main(KinesisCollector.scala)
Caused by: java.net.ConnectException: Invalid argument (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
at com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:113)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:82)
at com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:197)
at com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
… 25 more

We are using the following credential settings in the config:

  ```
  aws {
    accessKey = iam
    secretKey = iam
  }
  ```

And we’re attaching a role to our ecs_task_execution that has the AmazonKinesisFullAccess policy attached to it:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "*"
        }
    ]
}
1 Like

Hey @dadasami,

I ran into exactly the same problem. :wink: I solved it by setting the credentials settings to ‘default’. ‘iam’ works for deployment directly on EC2 instances but apparently does not work when running as an ECS Fargate task.
Futhermore, you need to attach the policy to a task_role which is different from the task_execution role.

The task_role is the IAM role that allows your Amazon ECS container task to make calls to other AWS services. whereas the task_execution_role is the role the Amazon ECS container agent and the Docker daemon can assume in order to pull images from ECR.

In addition you could restrict your kinesis policy further. The collector does not require full access:

{
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:PutRecord"
      ],
      "Resource": [
        "${collect_stream_good}",
        "${collect_stream_bad}"
      ]
    }
  ]

I hope that helps. Cheers!

2 Likes

Hello @dadasami,

You should set the aws credentials to default / default.

I don’t have the code in front of me but as far as I remember this is because when using iam on fargate there is no such thing as an EC2 instance profile (which is were credentials are read from).

So basically going through the default credential provider chain should fix your issue.

Let me know if you have more issues because I run the full real-time pipeline on fargate :slight_smile:

2 Likes

Hi @mgloel and @AcidFlow,

Thank you both for pointing the issue out so precisely. Setting the aws credentials to default was indeed the solution. :slightly_smiling_face:

@AcidFlow I would definitely get back to you in case I face more problems. Thanks a lot for the generous offer :pray:

Hi all,

I set the credentials to default but still get this error:
“Exception in thread “main” java.lang.IllegalArgumentException: Kinesis stream snowplow-collected-good-events-stream doesn’t exist”

I created two streams and they are active.

What does the complete error log of your collector module look like? Do you get a credentials error?
Have you checked the policies that are attached to your ecs task, i.e. does your task_role has a policy attached to it that grants access to these kinesis streams?

Hi Mgloel,

This is my error log.

I ran the collector in an EC2 instance. Could you please tell me how to config appropriate policy.
Thank you so much.