Send events in bulk for streaming setup on GCP

Hi, we have a case where a team wants to send events in bulks from their backend for technical reasons.

What are your take on this? I am a bit afraid of some data loss occurring as the autoscaling maybe of the loadbalancer attached to the web service won’t be fast enough to handle these loads for a few microseconds thus losing some data in the pipeline we cannot see.

Has anyone else encountered this or can give me some insight in this? We currently have a GCP setup with the stream collector.

/Brian

Hey Brian,

As long as you’re using one of the Snowplow trackers, the risk of data loss is normally miniscule. All of the trackers have an event store, which stores the events until the collector responds with success. If it doesn’t get the response, or it gets a failure response, the tracker will leave the events in the backlog. Every time the tracker attempts a request, it will attempt to send all events in the store. So any events that don’t make it first time will be re-attempted until they do.

If you’re batching very aggressively, there is some chance that a delayed response can cause some requests to go through but not get a response in time. In that scenario you have more chances of duplicates - but a sensible size of batches avoids that.

The one scenario where you would have more risk of data loss is if your environment is transient, and so the event store gets dropped before a second attempt. In that scenario we would indeed recommend to avoid batching. I’m guessing the backend in question is some web server or similar though so you shouldn’t have issues there.

Hope that’s helpful. :slight_smile:

1 Like

Hi, thanks for your reply :slight_smile:

Could you elaborate more in what you mean with “all of the trackers have an event store”? I do not quite understand this.

Also do you have any advice on monitoring /feedback when data is lost, basically the collector is not receiving the events for some reason?

/Brian

Also do you have any advice on monitoring /feedback when data is lost, basically the collector is not receiving the events for some reason?

Sure, so what I mean is that all of the Snowplow trackers have an internal database which stores events until they are successfully sent. If the request to the collector fails for any reason, the events are still sent, because they stay in the event store until they make it successfully.

For example if you have the Javascript tracker on a web page, and the users connection goes down, the events are stored in localstorage, and they get retried until they’re successful.

The only time there is risk of that not happening is if the environment itself crashes or terminates before the events are sent.

Also do you have any advice on monitoring /feedback when data is lost, basically the collector is not receiving the events for some reason?

You can monitor response codes from the collector’s load balancer using stackdriver, or any other log reporting tool (we use grafana) - that’s generally a good way to keep tabs on the overall health of the collector. But a 4XX or 5XX response for example does not represent data loss - those events get retried as I described above.

If something is wrong upstream of the collector - for example if your server’s (where the tracker lives) network connection is down - then no there is no way to monitor those events. The first time the data hits the Snowplow infrastructure is at the collector. So for that scenario you either have access to the environment and can go in and take a look, or you don’t and it’s invisible. However the only scenario where you actually lose data is when something goes very wrong and eg. the server crashes - but in that scenario you likely have bigger problems than the events not making it.

Hi Thanks again for your response!

So this also applies to the Java tracker? This is the one we are having troubles with. Different from the javascript tracker I can’t seem to select how to select storage strategy (e.g. localStorage), or is this something else?

Ah! My apologies, I steered you wrong here, and I now see where your questions comes from!

The Java tracker does not in fact have an event store, which is the last thing we need to add before we consider it a ‘v1’ fully featured tracker. Apologies for getting that wrong, my thinking was rooted in other use cases and I didn’t think to check which tracker you were using/forgot that we have still got some work to do on some of the trackers.

What the Java tracker does have is a callback, where you can provide your own function to decide what to do with the event on success and failure. So you could retry on failure using that, and/or you could log the event to some other place and keep track of everything that way.

[edited to add:] A model which I’ve seen work quite well for mission-critical servers is to have a logging client, which lives in a distinct part of the architecture from the main server. It can then ingest those failure callbacks, and implementing a log to record them to some filestorage, as well as an in-memory database (eg. a redis instance or similar) with a separate tracker in the logging client which instruments the retries. It’s likely overkill for many use cases (and it’s more work than we want it to be), but when you want to move the work away from the important instance, something like that can be very effective.

We have recently expanded the trackers team, with the intention to invest a lot more into them - with a focus on the server-side offering. Adding this feature to the Java tracker - and releasing a v1 - is on our priority list - it’s not my team’s project so I can’t tell you timelines, but I’m optimistic it’ll come soon.

Apologies again for so confidently giving you a wrong answer on this one :man_facepalming:.

2 Likes

No worries, thanks for the detailed reply!

So we do have a problem in regards of some data loss to the collector, so that’s why I am asking the question. The error we get is the following

[pool-4-thread-1] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Publishing message to good failed with code GrpcStatusCode{transportCode=DEADLINE_EXCEEDED}: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 9.999950601s. [remote_addr=pubsub.googleapis.com/IP_ADDRESS]

I suspect this has to do with the backup policy set with a too low limit (10s currently) in the application.config file for the collector. Would you say this is the issue, that batch loads may cause an overwhelmingly large queue in just a moment so that the machines are not able to autoscale in time for that specific moment?

I think this one’s slightly out of reach of what I’ll be useful in helping with, but I’ll point it out to the team that works more closely with the collector to see if anyone can help you out.

Hi @brajjany I found this good documentation about that DEADLINE_EXCEEDED exception. From what I read, it is most likely your collector is trying to publish messages more quickly than they can be sent to PubSub.

There are several troubleshooting ideas in that guide, but here are the ones which I think are relevant for the collector:

  • Try increasing the backoffPolicy.totalBackoff in your configuration file. This corresponds to the total timeout in the pubsub retry settings.
  • Try horizontally scaling your collector. e.g. if you are currently running 1 instance, then try running 2 instances, and use a load balancer to distribute requests across both. Each publisher will therefore need to publish fewer events.
1 Like

Hi @istreeter and thank you for your response. We have increased the backoffPolicy.totalBackoff and we receive exactly the same error message.

We already have horizontally scaling applied up to 45.

I was searching through the source code of the stream-collector stream-collector/GooglePubSubSink.scala at release/2.2.0 · snowplow/stream-collector · GitHub and found the following:

  /** Defaults are used for the rpc configuration, see Publisher.java */
  private def retrySettings(backoffPolicy: GooglePubSubBackoffPolicyConfig): RetrySettings =
    RetrySettings
      .newBuilder()
      .setInitialRetryDelay(Duration.ofMillis(backoffPolicy.minBackoff))
      .setMaxRetryDelay(Duration.ofMillis(backoffPolicy.maxBackoff))
      .setRetryDelayMultiplier(backoffPolicy.multiplier)
      .setTotalTimeout(Duration.ofMillis(backoffPolicy.totalBackoff))
      .setInitialRpcTimeout(Duration.ofSeconds(10))
      .setRpcTimeoutMultiplier(2)
      .setMaxRpcTimeout(Duration.ofSeconds(10))
      .build()

I am seeing setInitialRpcTimeout and setMaxRpcTimeout it’s hardcoded to 10 seconds so this means it’s not even retrying. This explains also the error code io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 9.999950601s

Do you know if there is a fix on this?

Thanks,
Daniel

Hi @Daniel_Darescu, thanks for sharing details of your setup. Please bear with me while I try to really understand what is going on here.

What value did you use for backoffPolicy.totalBackoff? I found that the maximum allowable value is 9223372036854, and last week I opened this github issue to set that as the new default value. I experimented, and found that the collector really did retry publishing events indefinitely, with that setting.

When you say…

we receive exactly the same error message

…was it specifically this error message?

[pool-4-thread-1] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Publishing message to good failed with code… etc

If you got that specific message, then I agree it looks like the collector is still failing to publish your event. However, if you just got a log message about DEADLINE_EXCEEDED (without the snowplow-added prefix) then it might be a log message from the publisher, which should get retried.

I read through the list of status codes, and it seems to say that DEADLINE_EXCEEDED is a mostly retryable (automatically), except on rare occasions.

We certainly could bump setInitialRpcTimeout and setMaxRpcTimeout to larger values in a future version of the collector, and even better make them configurable. But the intended behaviour is that these should not need to be any higher, and instead the publisher should keep on retrying until the event is published.

Finally, thank you again for raising this issue!! If we can find the right fix then it will be helpful for all Snowplow users.

Hi @istreeter,

These are the values used for backoffPolicy

          backoffPolicy {
            minBackoff = 1000
            maxBackoff = 30000
            totalBackoff = 60000 # must be >= 10000
            multiplier = 2
          }

Here is the full error message received today:

2021-11-17 01:35:32.316 EET
scala-stream-collector-dm
[pool-4-thread-1] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Publishing message to good failed with code GrpcStatusCode{transportCode=DEADLINE_EXCEEDED}: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 9.999981854s. [remote_addr=pubsub.googleapis.com/142.251.5.95:443]

If I’m not wrong after updating backoffPolicy we should see in the logs only messages with DEADLINE_EXCEEDED which have failed to publish after 59.9999xxxxx seconds right?

Thanks,
Daniel

Hi @Daniel_Darescu, here is my interpretation based on the documentation for RetrySettings, and based on config you shared, and your exception.

The collector’s publisher uses a timeout of 10 seconds. Because we use 10 seconds for the initial timeout and and max timeout, it means the timeout is fixed (the RpcTimeoutMultipler is effectively meaningless).

I think the publisher tries and fails to publish the event 5 times in a row, each time allowing 10 seconds (exactly) for the RPC call. It always retries immediately (no delay) because this is a timeout, which is treated differently to an error response.

On the 6th attempt, there is 9.999950601 seconds remaining until the 60 second total timeout is reached. So the publisher makes a 6th RPC call, but this time allows 9.999950601 seconds instead of 10 seconds, which explains the exception message that you shared.

Now to the point!!

I completely agree the collector needs fixing to make these RPC timeouts configurable, and probably with a longer maxRpcTimeout by default.

Until we have made that change, you might still be able to make your situation better by setting backoffPolicy.totalBackoff to an even larger value, e.g. 9223372036854. I hope the publisher will then keep on retrying, until eventually the RPC call takes less than 10 seconds.

One final thought… you mentioned you have 45 collectors, but are you sure the load balancer is configured to share events equally across all collectors? Because if it re-uses tcp connections to the same collector, then that might be causing the unlucky publisher to exceed PubSub limits.

1 Like

Thank you for your support @istreeter. Your explanation clarified the issue.