Hi ya, I’m running the Scala collector snowplow-stream-collector-kinesis-0.15.0.jar
and I got some errors which crashed the collector.
Questions:
- Is there a best practices way to keep the Scala collector running/restart it? I’m running on EC2 amzn-linux.
- Can anyone understand this error? I can’t seem to understand what’s causing it:
[scala-stream-collector-akka.actor.default-dispatcher-3316] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Writing 47 Thrift records to Kinesis stream snowplow-collector-good
[ERROR] [11/25/2020 16:35:51.616] [scala-stream-collector-akka.actor.default-dispatcher-3316] [akka.actor.ActorSystemImpl(scala-stream-collector)] Error during processing of request: 'Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@36175e36 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@7276936f[Shutting down, pool size = 10, active threads = 10, queued tasks = 1838, completed tasks = 183760767]'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@36175e36 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@7276936f[Shutting down, pool size = 10, active threads = 10, queued tasks = 1838, completed tasks = 183760767]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
at scala.concurrent.impl.Future$.apply(Future.scala:31)
at scala.concurrent.Future$.apply(Future.scala:494)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.multiPut(KinesisSink.scala:260)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.sendBatch(KinesisSink.scala:235)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$EventStorage$.flush(KinesisSink.scala:210)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$EventStorage$.store(KinesisSink.scala:196)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$$anonfun$storeRawEvents$1.apply(KinesisSink.scala:217)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$$anonfun$storeRawEvents$1.apply(KinesisSink.scala:217)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.storeRawEvents(KinesisSink.scala:217)
at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorService.sinkEvent(CollectorService.scala:218)
at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorService.cookie(CollectorService.scala:113)
at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorRoute$$anonfun$collectorRoute$1$$anonfun$apply$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$6$$anonfun$apply$7.apply(CollectorRoute.scala:50)
at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorRoute$$anonfun$collectorRoute$1$$anonfun$apply$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$4$$anonfun$apply$5$$anonfun$apply$6$$anonfun$apply$7.apply(CollectorRoute.scala:49)
at akka.http.scaladsl.server.util.ApplyConverterInstances$$anon$1$$anonfun$apply$1.apply(ApplyConverterInstances.scala:14)
at akka.http.scaladsl.server.util.ApplyConverterInstances$$anon$1$$anonfun$apply$1.apply(ApplyConverterInstances.scala:13)
at akka.http.scaladsl.server.ConjunctionMagnet$$anon$2$$anonfun$apply$14$$anonfun$apply$15$$anonfun$apply$16.apply(Directive.scala:162)
at akka.http.scaladsl.server.ConjunctionMagnet$$anon$2$$anonfun$apply$14$$anonfun$apply$15$$anonfun$apply$16.apply(Directive.scala:162)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61)
at akka.http.scaladsl.server.directives.FutureDirectives$$anonfun$onComplete$1$$anonfun$apply$1$$anonfun$apply$2.apply(FutureDirectives.scala:37)
at akka.http.scaladsl.server.directives.FutureDirectives$$anonfun$onComplete$1$$anonfun$apply$1$$anonfun$apply$2.apply(FutureDirectives.scala:37)
at akka.http.scaladsl.util.FastFuture$$anonfun$transformWith$extension0$1.apply(FastFuture.scala:37)
at akka.http.scaladsl.util.FastFuture$$anonfun$transformWith$extension0$1.apply(FastFuture.scala:37)
at akka.http.scaladsl.util.FastFuture$.akka$http$scaladsl$util$FastFuture$$strictTransform$1(FastFuture.scala:41)
at akka.http.scaladsl.util.FastFuture$.transformWith$extension1(FastFuture.scala:45)
at akka.http.scaladsl.util.FastFuture$.transformWith$extension0(FastFuture.scala:37)
at akka.http.scaladsl.server.directives.FutureDirectives$$anonfun$onComplete$1$$anonfun$apply$1.apply(FutureDirectives.scala:37)
at akka.http.scaladsl.server.directives.FutureDirectives$$anonfun$onComplete$1$$anonfun$apply$1.apply(FutureDirectives.scala:35)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResult$1$$anonfun$apply$3.apply(BasicDirectives.scala:61)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:43)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRequestContext$1$$anonfun$apply$1.apply(BasicDirectives.scala:43)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$9$$anonfun$apply$10.apply(Directive.scala:93)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$11.apply(Directive.scala:94)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8$$anonfun$apply$11.apply(Directive.scala:93)
at akka.http.scaladsl.util.FastFuture$.akka$http$scaladsl$util$FastFuture$$strictTransform$1(FastFuture.scala:41)
at akka.http.scaladsl.util.FastFuture$.transformWith$extension1(FastFuture.scala:45)
at akka.http.scaladsl.util.FastFuture$.flatMap$extension(FastFuture.scala:26)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:93)
at akka.http.scaladsl.server.Directive$$anonfun$recover$1$$anonfun$apply$8.apply(Directive.scala:90)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:44)
at akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1.apply(RouteConcatenation.scala:42)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResultWith$1$$anonfun$apply$4.apply(BasicDirectives.scala:67)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$mapRouteResultWith$1$$anonfun$apply$4.apply(BasicDirectives.scala:67)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.BasicDirectives$$anonfun$textract$1$$anonfun$apply$6.apply(BasicDirectives.scala:154)
at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:32)
at akka.http.scaladsl.server.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$1.apply(ExecutionDirectives.scala:28)
at akka.http.scaladsl.server.Route$$anonfun$asyncHandler$1.apply(Route.scala:79)
at akka.http.scaladsl.server.Route$$anonfun$asyncHandler$1.apply(Route.scala:78)
at akka.stream.impl.fusing.MapAsync$$anon$24.onPush(Ops.scala:1169)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:423)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[WARN] [11/25/2020 16:35:51.769] [scala-stream-collector-akka.actor.default-dispatcher-3316] [akka.actor.ActorSystemImpl(scala-stream-collector)] Illegal header: Illegal 'user-agent' header: Invalid input '[', expected 'EOI', product-or-comment, WSP, comment or CRLF (line 1, column 162): Mozilla/5.0 (Linux; Android 10; SM-G965U Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/86.0.4240.198 Mobile Safari/537.36 [FB_IAB/FB4A;FBAV/297.0.0.36.116;]
^
[DEBUG] [11/25/2020 16:35:51.769] [scala-stream-collector-akka.actor.default-dispatcher-3326] [akka://scala-stream-collector/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [11/25/2020 16:35:51.916] [scala-stream-collector-akka.actor.default-dispatcher-3320] [akka://scala-stream-collector/system/IO-TCP/selectors/$a/0] New connection accepted
The above error appeared to have happened several times in succession then the logs just stop.
Any help would be most appreciated!
Patrick