Technologies/RabbitMQ/dramatiq.messages.inprogress
RabbitMQRabbitMQMetric

dramatiq.messages.inprogress

Messages currently processing
Dimensions:None

Technical Annotations (39)

Configuration Parameters (8)
--threadsrecommended: Reduce from high values like 40; single thread (--threads=1) works reliably
Controls worker thread count; high values cause consumer thread starvation with CPU-intensive tasks
rabbitmq.heartbeat_timeoutrecommended: Increase from default
Allows longer intervals between heartbeat communication before connection closure
time_limitrecommended: 24 hours
allows actor to run for extended duration without timeout
heartbeat_timeoutrecommended: 300000
increased timeout in RedisBroker constructor to prevent premature worker death detection
dramatiq_queue_prefetchrecommended: 1
limits to one job per worker/process to prevent multiple duplicate executions
worker_threadsrecommended: 8
default per-process limit on concurrent message processing
queue_prefetch
Recommended value for work_queue maxsize to prevent unbounded growth
maxsize
Attribute for queue.PriorityQueue to limit work_queue size
Error Signatures (2)
pika.exceptions.ConnectionWrongStateError: BlockingConnection.add_callback_threadsafe() called on closed or closing connection.exception
Failed to wait for all callbacks to complete. This can happen when the RabbitMQ server is suddenly restarted.log pattern
CLI Commands (5)
dramatiq config.dramatiq --threads 1 --processes 1 --log-file=dramatiq_log${VAR}.txtdiagnostic
dramatiq example --threads=40 --processes=1diagnostic
dramatiq example --threads=1 --processes=1remediation
redis-cli HGETALL "dramatiq:default.msgs"diagnostic
redis-cli SMEMBERS "dramatiq:__acks__.{worker_id}.{queue}"diagnostic
Technical References (24)
work queueconcepttime.sleepcomponentmessage prefetchingconceptworker threadconceptconsumer threadscomponentsys.setswitchintervalcomponentheartbeat intervalconceptdelayed queuecomponent__heartbeats__component__acks__componentdispatch.luacomponentRedisBrokercomponentAsyncIO middlewarecomponentevent loopconceptwork_queuecomponentbroker.consumecomponentworker.pyfile pathRedis brokercomponentAMQPprotocolredis_message_idconceptmessage_idconceptWorkerThreadcomponentdramatiq:default.msgscomponent*.DQcomponent
Related Insights (10)
Worker pool exhaustion from blocking retry delays prevents message processingcritical
Concurrent actor execution overwhelms database and external serviceswarning
Message prefetching causes worker starvation with concurrent taskswarning
Message duplication from RabbitMQ connection closure under CPU-intensive loadcritical
Messages re-queued despite valid heartbeat during idle waitwarning
AsyncIO actors limited by worker thread count causing blockingwarning
Worker memory exhaustion from unbounded work queue with large message backlogscritical
Redis broker bypasses AMQP-style prefetch backpressurewarning
High prefetch limit for delayed messages risks memory exhaustionwarning
Delayed messages duplicated with same message_id but different redis_message_idscritical