Error flushing tracker with AsyncEmitter in flask + gunicorn app

Hello,

We are running our snowplow collector with a small adapter web service sitting in front of it. The adapter web service is a flask app run in gunicorn that simply calls tracker.track on requests. This is necessary because the adapter sits in front of another message routing service.

I am trying to implement graceful shutdown for this adapter. I want to call tracker.flush before shutting down to flush the tracker’s buffer. I have implemented a signal handler. However, when I try to call tracker.flush() (without passing is_async), I get an error from gevent:

[ERROR] Exception in worker process
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/gunicorn/arbiter.py", line 609, in spawn_worker
    worker.init_process()
  File "/usr/local/lib/python3.10/site-packages/gunicorn/workers/ggevent.py", line 147, in init_process
    super().init_process()
  File "/usr/local/lib/python3.10/site-packages/gunicorn/workers/base.py", line 142, in init_process
    self.run()
  File "/usr/local/lib/python3.10/site-packages/gunicorn/workers/ggevent.py", line 88, in run
    gevent.sleep(1.0)
  File "/usr/local/lib/python3.10/site-packages/gevent/hub.py", line 166, in sleep
    hub.wait(t)
  File "src/gevent/_hub_primitives.py", line 46, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_hub_primitives.py", line 55, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_waiter.py", line 154, in gevent._gevent_c_waiter.Waiter.get
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 65, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_gevent_c_greenlet_primitives.pxd", line 35, in gevent._gevent_c_greenlet_primitives._greenlet_switch
  File "/app/adapter/__init__.py", line 62, in handle_sigusr1
    tracker.flush()
  File "/usr/local/lib/python3.10/site-packages/snowplow_tracker/tracker.py", line 1014, in flush
    emitter.sync_flush()
  File "/usr/local/lib/python3.10/site-packages/snowplow_tracker/emitters.py", line 513, in sync_flush
    self.queue.join()
  File "/usr/local/lib/python3.10/queue.py", line 90, in join
    self.all_tasks_done.wait()
  File "/usr/local/lib/python3.10/threading.py", line 320, in wait
    waiter.acquire()
  File "/usr/local/lib/python3.10/site-packages/gevent/thread.py", line 112, in acquire
    acquired = BoundedSemaphore.acquire(self, blocking, timeout)
  File "src/gevent/_semaphore.py", line 180, in gevent._gevent_c_semaphore.Semaphore.acquire
  File "src/gevent/_semaphore.py", line 249, in gevent._gevent_c_semaphore.Semaphore.acquire
  File "src/gevent/_abstract_linkable.py", line 521, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait
  File "src/gevent/_abstract_linkable.py", line 487, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait_core
  File "src/gevent/_abstract_linkable.py", line 490, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait_core
  File "src/gevent/_abstract_linkable.py", line 442, in gevent._gevent_c_abstract_linkable.AbstractLinkable._AbstractLinkable__wait_to_be_notified
  File "src/gevent/_abstract_linkable.py", line 451, in gevent._gevent_c_abstract_linkable.AbstractLinkable._switch_to_hub
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 64, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 67, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
  File "src/gevent/_greenlet_primitives.py", line 68, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
gevent.exceptions.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback

It seems like this has something to do with a mismatch between snowplow’s asynchonous handling and gunicorn’s/flask’s.

I can call flush(is_async=True) just fine, but I want to wait until the flush has finished before I move on to shutting down other parts of the stack.

Any help would be appreciated.