Hello,
We are running our snowplow collector with a small adapter web service sitting in front of it. The adapter web service is a flask app run in gunicorn that simply calls tracker.track
on requests. This is necessary because the adapter sits in front of another message routing service.
I am trying to implement graceful shutdown for this adapter. I want to call tracker.flush before shutting down to flush the tracker’s buffer. I have implemented a signal handler. However, when I try to call tracker.flush()
(without passing is_async
), I get an error from gevent:
[ERROR] Exception in worker process
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/gunicorn/arbiter.py", line 609, in spawn_worker
worker.init_process()
File "/usr/local/lib/python3.10/site-packages/gunicorn/workers/ggevent.py", line 147, in init_process
super().init_process()
File "/usr/local/lib/python3.10/site-packages/gunicorn/workers/base.py", line 142, in init_process
self.run()
File "/usr/local/lib/python3.10/site-packages/gunicorn/workers/ggevent.py", line 88, in run
gevent.sleep(1.0)
File "/usr/local/lib/python3.10/site-packages/gevent/hub.py", line 166, in sleep
hub.wait(t)
File "src/gevent/_hub_primitives.py", line 46, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_hub_primitives.py", line 55, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_waiter.py", line 154, in gevent._gevent_c_waiter.Waiter.get
File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 65, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_gevent_c_greenlet_primitives.pxd", line 35, in gevent._gevent_c_greenlet_primitives._greenlet_switch
File "/app/adapter/__init__.py", line 62, in handle_sigusr1
tracker.flush()
File "/usr/local/lib/python3.10/site-packages/snowplow_tracker/tracker.py", line 1014, in flush
emitter.sync_flush()
File "/usr/local/lib/python3.10/site-packages/snowplow_tracker/emitters.py", line 513, in sync_flush
self.queue.join()
File "/usr/local/lib/python3.10/queue.py", line 90, in join
self.all_tasks_done.wait()
File "/usr/local/lib/python3.10/threading.py", line 320, in wait
waiter.acquire()
File "/usr/local/lib/python3.10/site-packages/gevent/thread.py", line 112, in acquire
acquired = BoundedSemaphore.acquire(self, blocking, timeout)
File "src/gevent/_semaphore.py", line 180, in gevent._gevent_c_semaphore.Semaphore.acquire
File "src/gevent/_semaphore.py", line 249, in gevent._gevent_c_semaphore.Semaphore.acquire
File "src/gevent/_abstract_linkable.py", line 521, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait
File "src/gevent/_abstract_linkable.py", line 487, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait_core
File "src/gevent/_abstract_linkable.py", line 490, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait_core
File "src/gevent/_abstract_linkable.py", line 442, in gevent._gevent_c_abstract_linkable.AbstractLinkable._AbstractLinkable__wait_to_be_notified
File "src/gevent/_abstract_linkable.py", line 451, in gevent._gevent_c_abstract_linkable.AbstractLinkable._switch_to_hub
File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 64, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 67, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
File "src/gevent/_greenlet_primitives.py", line 68, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
gevent.exceptions.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback
It seems like this has something to do with a mismatch between snowplow’s asynchonous handling and gunicorn’s/flask’s.
I can call flush(is_async=True)
just fine, but I want to wait until the flush has finished before I move on to shutting down other parts of the stack.
Any help would be appreciated.