Skip to content

ignore_errors() does not catch gevent pool's ConcurrentObjectUseError during connection close, causing Celery worker crash #2463

@JaeHyuckSa

Description

@JaeHyuckSa

Hello!

I use Celery with gevent pool + Django + Amazon MQ (RabbitMQ with TLS/AMQPS).

When a greenlet is blocking on an SSL socket read and another greenlet calls connection.close(), gevent raises ConcurrentObjectUseError.

kombu.common.ignore_errors() only catches errors in Transport.connection_errors, which does not include gevent's ConcurrentObjectUseError. This propagates through ignore_errors() and crashes the Celery worker as an "Unrecoverable error".

ERROR LOG
Traceback (most recent call last):
  File "gevent/ssl.py", line 439, in read
    return self._sslobj.read(nbytes or 1024)
SSLWantReadError: The operation did not complete (read) (_ssl.c:2710)
Traceback (most recent call last):
  File "celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "celery/worker/consumer/consumer.py", line 346, in start
    blueprint.start(self)
  File "celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "celery/worker/consumer/events.py", line 36, in start
    prev = self._close(c)
  File "celery/worker/consumer/events.py", line 62, in _close
    ignore_errors(c, dispatcher.connection.close)
  File "kombu/common.py", line 325, in ignore_errors
    return fun(*args, **kwargs)
  File "kombu/connection.py", line 400, in release
    self._close()
  File "kombu/connection.py", line 366, in _close
    self._do_close_self()
  File "kombu/connection.py", line 359, in _do_close_self
    self.transport.close_connection(self._connection)
  File "kombu/transport/pyamqp.py", line 212, in close_connection
    connection.close()
  File "amqp/connection.py", line 602, in close
    return self.send_method(
  File "amqp/abstract_channel.py", line 79, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "amqp/abstract_channel.py", line 99, in wait
    self.connection.drain_events(timeout=timeout)
  File "amqp/connection.py", line 526, in drain_events
    while not self.blocking_read(timeout):
  File "amqp/connection.py", line 531, in blocking_read
    frame = self.transport.read_frame()
  File "amqp/transport.py", line 297, in read_frame
    frame_header = read(7, True)
  File "amqp/transport.py", line 577, in _read
    s = recv(n - len(rbuf))  # see note above
  File "gevent/ssl.py", line 443, in read
    self._wait(self._read_event, timeout_exc=_SSLErrorReadTimeout)
  File "src/gevent/_hub_primitives.py", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
  File "src/gevent/_hub_primitives.py", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
  File "src/gevent/_hub_primitives.py", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0xffff60d49ee0>>
Traceback (most recent call last):
  File "gevent/ssl.py", line 687, in unwrap
    s = shutdown()
SSLWantReadError: The operation did not complete (read) (_ssl.c:2838)
Traceback (most recent call last):
  File "celery", line 10, in <module>
    sys.exit(main())
  File "celery/__main__.py", line 15, in main
    sys.exit(_main())
  File "celery/bin/celery.py", line 227, in main
    return celery(auto_envvar_prefix="CELERY")
  File "click/core.py", line 1485, in __call__
    return self.main(*args, **kwargs)
  File "click/core.py", line 1406, in main
    rv = self.invoke(ctx)
  File "click/core.py", line 1873, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "click/core.py", line 1269, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "click/core.py", line 824, in invoke
    return callback(*args, **kwargs)
  File "click/decorators.py", line 34, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "celery/bin/base.py", line 135, in caller
    return f(ctx, *args, **kwargs)
  File "celery/bin/worker.py", line 367, in worker
    worker.start()
  File "celery/worker/worker.py", line 208, in start
    self.stop(exitcode=EX_FAILURE)
  File "celery/worker/worker.py", line 251, in stop
    self._shutdown(warm=True)
  File "celery/worker/worker.py", line 266, in _shutdown
    self.blueprint.stop(self, terminate=not warm)
  File "celery/bootsteps.py", line 174, in stop
    self.on_stopped()
  File "celery/worker/worker.py", line 162, in on_stopped
    self.consumer.shutdown()
  File "celery/worker/consumer/consumer.py", line 422, in shutdown
    self.blueprint.shutdown(self)
  File "celery/worker/consumer/consumer.py", line 183, in shutdown
    self.send_all(parent, 'shutdown')
  File "celery/bootsteps.py", line 148, in send_all
    fun(parent, *args)
  File "celery/worker/consumer/events.py", line 68, in shutdown
    self._close(c)
  File "celery/worker/consumer/events.py", line 62, in _close
    ignore_errors(c, dispatcher.connection.close)
  File "kombu/common.py", line 325, in ignore_errors
    return fun(*args, **kwargs)
  File "kombu/connection.py", line 400, in release
    self._close()
  File "kombu/connection.py", line 366, in _close
    self._do_close_self()
  File "kombu/connection.py", line 359, in _do_close_self
    self.transport.close_connection(self._connection)
  File "kombu/transport/pyamqp.py", line 212, in close_connection
    connection.close()
  File "amqp/connection.py", line 602, in close
    return self.send_method(
  File "amqp/abstract_channel.py", line 79, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "amqp/abstract_channel.py", line 99, in wait
    self.connection.drain_events(timeout=timeout)
  File "amqp/connection.py", line 526, in drain_events
    while not self.blocking_read(timeout):
  File "amqp/connection.py", line 532, in blocking_read
    return self.on_inbound_frame(frame)
  File "amqp/method_framing.py", line 53, in on_frame
    callback(channel, method_sig, buf, None)
  File "amqp/connection.py", line 538, in on_inbound_method
    return self.channels[channel_id].dispatch_method(
  File "amqp/abstract_channel.py", line 156, in dispatch_method
    listener(*args)
  File "amqp/connection.py", line 696, in _on_close_ok
    self.collect()
  File "amqp/connection.py", line 471, in collect
    self._transport.close()
  File "amqp/transport.py", line 263, in close
    self._shutdown_transport()
  File "amqp/transport.py", line 565, in _shutdown_transport
    self.sock = self.sock.unwrap()
  File "gevent/ssl.py", line 695, in unwrap
    self._wait(self._read_event)
  File "src/gevent/_hub_primitives.py", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
  File "src/gevent/_hub_primitives.py", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
  File "src/gevent/_hub_primitives.py", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0xffff60d49ee0>>

Package Version

  • kombu 5.6.2
  • celery 5.6.2
  • amqp 5.3.1
  • gevent 25.9.1
  • Python 3.14
  • RabbitMQ 3 (AMQPS with TLS)

I created a reproduction repository here: https://github.com/JaeHyuckSa/kombu-gevent-issue

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions