RQ 是一个基于 Redis 的轻量级任务队列,依赖 Redis >= 2.7.0。RQ 将任务和执行结果通过 pickle 序列化后存储在 Redis 中,在较小规模的应用中可以替代 Celery 来执行异步任务。
一个优点是,RQ 的 worker 不会预先加载任务函数。因此,任务函数更改后,不需要重启 RQ 的 worker。同时,推荐以下基于 RQ 的项目:
- rq-scheduler 基于 RQ 的定时任务;
- Flask-RQ2 在 Flask 中集成 RQ 的扩展;
- rq-dashboard RQ 的 web 监控工具,使用 Flask 开发,可以方便地集成到 Flask 应用中。
RQ 的不足在于,它依赖于 fork()
进程,因此不能在 Windows 系统中使用。
本文基于 RQ 0.12.0 版本。
QuickStart
安装 RQ:
uv add rq
编写任务函数:
import requests
def count_words_at_url(url):
resp = requests.get(url)
return len(resp.text.split())
创建 RQ 队列:
from redis import Redis
from rq import Queue
q = Queue(connection=Redis())
调用任务函数
from my_module import count_words_at_url
result = q.enqueue(count_words_at_url, 'http://python-rq.org')
启动 RQ worker:
$ rq worker
22:52:34 RQ worker 'rq:worker:InvokerPro.10689' started, version 0.12.0
22:52:34 *** Listening on default...
22:52:34 Cleaning registries for queue: default
22:52:34 default: job.count_words_at_url('http://python-rq.org') (c3b8fcab-347f-4529-ab4f-0865407fbaa2)
22:52:34 default: Job OK (c3b8fcab-347f-4529-ab4f-0865407fbaa2)
22:52:34 Result is kept for 500 seconds
获取结果(需要延时一段时间使异步任务执行完成)
import time
for i in range(20):
if result.status != 'finished':
time.sleep(1)
print(result.result)
Queue 队列
任务(job)是一个供 RQ 后台 worker 调用的 Python 函数对象。把该函数和运行参数压入队列的过程称为入队(enqueue)。
新建队列
首先,声明一个任务函数,这里不再赘述。
新建队列(Queue),可以在实例化时根据需要指定 Queue 名称,常见的命名模式是按照优先级命名队列(例如 high
,medium
,low
)。
q = Queue('low', connection=redis_conn)
入队:enqueue 和 enqueue_call
使用 enqueue(f, *args, **kwargs)
方法将任务入队:
q.enqueue(job_func, job_arg1, job_arg2, job_kwarg1=value)
除此之外,使用 enqueue
入队时,可以接收以下参数控制任务执行:
timeout
任务超时时间,超时后将会被标记为failed
状态。默认单位为秒,可以传入整数,或者能够被转换为整数的字符串,例如2
或者'2’
。另外,也可以传入包含时分秒单位的字符串,例如1h
、3m
和10s
。result_ttl
在 Redis 中存储的任务结果的过期时间,过期后任务结果会被删除,默认 500 秒。ttl
任务加入队列后,被取消之前的等待执行时间;超过该时间后任务会被取消执行。如果设置为 -1,任务将永远不会被取消,一直等待。depends_on
指定另一个依赖任务(或者 job ID),依赖任务执行完毕后,当前任务才会入队。job_id
指定 job_id。at_front
将任务放在队列的前面,即优先插队执行。kwargs
和args
存放传入任务函数的关键字参数和可变参数。
举个例子:
q.enqueue_call(
func=count_words_at_url,
args=('http://nvie.com',),
timeout=30
)
另外,在一些场景中,入队任务进程可能无法访问在 worker 中运行的源代码,此时该函数也可以传入字符串:
q.enqueue('my_package.my_module.my_func', 3, 4)
队列使用
这里介绍一些 Queue 实例的其他 method。
获取队列中任务数量:
len(q)
获取队列中任务 job id 列表:
q.job_ids
获取任务实例列表:
q.jobs
根据 job id 获取任务实例:
q.fetch_job('my_id')
删除队列:
q.delete(delete_jobs=True) # delete_jobs=True 时,也会删除队列中的所有任务
RQ 依赖于 pickle 来序列化任务存入 Redis,因此仅适用于 Python 环境。
获取执行结果
当任务入队时,该 queue.enqueue()
方法返回一个 Job
实例。这是一个可以用来检查运行结果的 proxy 对象。
该实例的 result
属性,在任务未完成时返回 None
,在任务完成后返回任务函数的返回值(前提是有返回值)。
@job 装饰器
使用类似于 Celery @task
的任务函数装饰器。(RQ >= 0.3)
from rq.decorators import job
@job('low', connection=my_redis_conn, timeout=5)
def add(x, y):
return x + y
job = add.delay(3, 4)
time.sleep(1)
print(job.result)
同步执行
不经过 worker,直接在当前进程中同步阻塞执行任务函数。(RQ >= 0.3.1)
需要在 Queue 实例化时传递参数 is_async=False
。
>>> q = Queue('low', is_async=False, connection=my_redis_conn)
>>> job = q.enqueue(fib, 8)
>>> job.result
21
注意这种情况下,仍然要建立 Redis 连接以存储任务执行状态。
链式执行
在任务入队时传入 depends_on
参数以保证任务链式执行。(RQ >= 0.4.0)
q = Queue('low', connection=my_redis_conn)
report_job = q.enqueue(generate_report)
q.enqueue_call(func=send_report, depends_on=report_job)
任务注意事项
- 确保该函数的
__module__
能够被 worker 导入。这意味着无法将__main__
模块中声明的函数作为任务入队。 - 确保 worker 和 worker 生成器共享完全相同的源代码。
- 确保函数调用不依赖于其上下文。不要在任务函数中使用全局变量、Web 应用程序中的 current_user 或者 current_request 对象。当 worker 运行任务函数时,函数所依赖的任何状态都不存在。如果要访问这些信息,应该将这些信息作为参数传递给 worker。
Worker 工作进程
工作进程(worker)是一个通常在后台运行的用于执行阻塞或长时任务的 Python 进程。
启动
RQ worker 基于 fork()
创建新进程。如果不使用 Windows Subsystem for Linux 并在 shell 中运行,那么 RQ 无法在 Windows 系统上执行任务。
启动 Worker
在项目根目录执行:
$ rq worker high normal low
09:07:05 RQ worker 'rq:worker:InvokerPro.11898' started, version 0.12.0
09:07:05 *** Listening on high, normal, low...
09:07:05 Cleaning registries for queue: high
09:07:05 Cleaning registries for queue: normal
09:07:05 Cleaning registries for queue: low
Worker 将会在无限循环中从给定的 Queue 中依次读取任务,因此启动 Worker 时 Queue 参数的顺序很重要,应该让高优先级任务 Queue 排在前面。
一个 Worker 每次只能执行一个任务,不能并发处理。如果要同时执行任务,只需要启动多个 worker 即可。
Burst mode 突发模式
默认情况下,Worker 启动后会立即开始处理任务,处理完成后阻塞等待新任务。
使用 --burst
参数可以让 Worker 以突发模式启动。在此模式下,Worker 会在给定队列清空(即完成所有任务)后退出。
$ rq worker --burst high normal low
09:24:03 RQ worker 'rq:worker:InvokerPro.13525' started, version 0.12.0
09:24:03 *** Listening on high, normal, low...
09:24:03 Cleaning registries for queue: high
09:24:03 Cleaning registries for queue: normal
09:24:03 Cleaning registries for queue: low
09:24:03 RQ worker 'rq:worker:InvokerPro.13525' done, quitting
突发模式可以应用于:
- 定期执行的批量任务,单独开 Worker 执行,在执行完毕后退出;
- 在任务积压时,临时增加 Worker;
启动参数
除 --burst
外,Worker 还支持以下启动参数:
--url
-u
指定 Redis 数据库连接,例如redis://:[email protected]:1234/9
--path
-p
指定 import 路径,可以传多个值;--config
-c
指定配置文件路径;--worker-class
-w
指定使用的 RQ Worker 类;--job-class
-j
指定使用的 RQ Job 类;--queue-class
指定使用的 RQ Queue 类;--connection-class
指定要使用的 Redis 连接类,默认redis.StrictRedis
;--log-format
指定 Worker 日志格式,默认为'%(asctime)s %(message)s'
;--date-format
指定 Worker 日志的日期时间格式,默认为'%H:%M:%S'
;
生命周期
Worker 的生命周期包括几个阶段:
- Boot。加载 Python 环境。
- Birth registration。Worker 将自己注册到系统。
- Start listening。从给定的 Redis 队列中取出任务。若所有队列都为空,如果 Worker 以突发模式运行则 Worker 结束运行,否则阻塞等待任务。
- Prepare job execution。Worker 把要执行的任务状态设置为
busy
,并在StartedJobRegistry
中注册该任务,告知系统准备执行该任务 - Fork a child process。Fork 一个子进程(被称为 work horse),该子进程在故障安全上下文(fail-safe context)中执行任务。
- Process work。子进程执行任务。
- Cleanup job execution。Worker 将任务状态设置为
idle
,将任务结果结果存储到 Redis 中并根据result_ttl
设置过期时间。把任务从StartedJobRegistry
中删除,如果执行成功将任务添加到FinishedJobRegistry
中,如果执行失败将任务添加到FailedQueue
中。 - Loop。从第 3 步开始重复。
提升性能
RQ Worker shell 脚本基本上是一个 fetch-fork-execute 循环。这样做的好处是 RQ 不会泄露内存。但当任务需要进行冗长的设置,或任务都依赖于相同的模块时,每次运行任务都要耗费这一部分时间(因为要在 fork 出新进程后再进行 import)。
可以在 fork 之前就 import 必要的模块,以改进性能。RQ Worker 没有这样的设置项,但你可以在开始 worker loop 之前进行 import。
为此,你要自己实现 Worker 启动脚本,而不是使用 rq worker
。举个例子:
#!/usr/bin/env python
import sys
from rq import Connection, Worker
import library_that_you_want_preloaded
# qs 用于获取 Queue 名
with Connection():
qs = sys.argv[1:] or ['default']
w = Worker(qs)
w.work()
Worker 信息
Worker
名称一般是主机名和当前 PID 的组合,也可以在启动时通过 --name
指定。
Worker
实例的运行时信息存储于 Redis 中,可以使用 rq.Worker.all
查询。注意每次查询都会从 Redis 中取信息构建 Worker
实例,也就是说,每次查询得到的实例不是同一内存对象。
from redis import Redis
from rq import Queue, Worker
redis = Redis()
# 返回当前 Redis 连接中注册的所有 Worker
workers = Worker.all(connection=redis)
# 返回指定 Queue 的所有 Worker(RQ >= 0.10.0)
queue = Queue('queue_name', connection=redis)
workers = Worker.all(queue=queue)
如果只是想得到 Worker 数量,可以使用 rq.Worker.count
方法(RQ >= 0.10.0):
from redis import Redis
from rq import Queue, Worker
redis = Redis()
workers = Worker.count(connection=redis)
queue = Queue('queue_name', connection=redis)
workers = Worker.count(queue=queue)
另外还可以通过 Worker
实例获得一些统计信息(RQ >= 0.9.0)。首先通过 Worker.find_by_key
方法获得 Worker 实例,传参为 Redis key,格式为 rq:worker:<name>
。再通过实例属性查看统计信息。
from rq.worker import Worker
worker = Worker.find_by_key('rq:worker:name')
worker.successful_job_count # 执行成功任务数量
worker.failed_job_count # 执行失败任务数量
worker.total_working_time # 总执行时间
停止 Worker
当 Worker 收到 SIGINT
信号(通过 Ctrl + C)或 SIGTERM
信号(通过 kill)时,会等待当前任务运行结束后,结束任务循环并注册自己的死亡。
如果在等待期间再次收到 SIGINT
或者 SIGTERM
信号,Worker 将会发送 SIGKILL
信号强行中止子进程,但仍然会尝试注册自己的死亡。
使用配置文件
要求 RQ >= 0.3.2。
配置文件需要为 Python 文件,在启动 Worker 时通过 -c
参数指定从哪个模块读取配置。以下是配置文件支持的配置项:
REDIS_URL = 'redis://localhost:6379/1'
# 或者通过以下参数指定 Redis 数据库
# REDIS_HOST = 'redis.example.com'
# REDIS_PORT = 6380
# REDIS_DB = 3
# REDIS_PASSWORD = 'very secret'
# 指定监听的 Queue
QUEUES = ['high', 'normal', 'low']
# Sentry 设置
# The 'sync+' prefix is required for raven: https://github.com/nvie/rq/issues/350#issuecomment-43592410
SENTRY_DSN = 'sync+http://public:[email protected]/1'
# 自定义 Worker 名称
NAME = 'worker-1024'
注意: QUEUES
和 REDIS_PASSWORD
设置是 0.3.3 以后的新设置。
指定配置文件:
rq worker -c settings
自定义 DeathPenalty 类
当任务超时时,Worker 将尝试使用 death_penalty_class
(默认值 UnixSignalDeathPenalty
)提供的方法将其终止。如果您希望尝试以特定应用程序或“更干净”的方式杀死任务,则可以覆盖此项。
DeathPenalty 类使用以下参数构造 BaseDeathPenalty(timeout, JobTimeoutException, job_id=job.id)
自定义异常处理程序
要求 RQ >= 0.5.5。
如果要针对不同类型的作业以不同方式处理错误,或者想自定义 默认的错误处理,可以使用 --exception-handler
指定错误处理类:
$ rq worker --exception-handler 'path.to.my.ErrorHandler'
# Multiple exception handlers is also supported
$ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler'
Result 执行结果
处理结果
如果一个任务有非 None 的返回值,Worker 会把返回值经过 pickle 序列化后,写入到任务在 Redis 中对应记录(key 为 rq:job:[hash]
格式,value 为 Hash 类型)的 result
字段中,默认将会在 500 秒后失效。
将任务入队时返回的 Job
实例是一个代理对象,绑定了任务 ID,以便能从任务执行结果中取到数据。
若 RQ >= 0.3.1 版本,可以在调用 enqueue
和 enqueue_call
入队时,使用 result_ttl
参数指定存储结果删除时间:
- 不设置,使用默认值 500,500 秒后删除;
- 设置为 0,立即删除;
- 设置为 -1,永不删除,此时要注意自己清理 Redis,以免 Redis 无限增长;
- 设置为其他正整数数值 N,在 N 秒后删除;
q.enqueue(foo) # result expires after 500 secs (the default)
q.enqueue(foo, result_ttl=86400) # result expires after 1 day
q.enqueue(foo, result_ttl=0) # result gets deleted immediately
q.enqueue(foo, result_ttl=-1) # result never expires--you should delete jobs manually
文档中的这一段与实际测试结果不符,实际测试没有返回值的情况下也使用默认的 500 秒的超时时间。
Additionally, you can use this for keeping around finished jobs without return values, which would be deleted immediately by default.
q.enqueue(func_without_rv, result_ttl=500) # job kept explicitly
异常处理
任务执行失败时会抛出异常,为了引起足够的注意,失败任务在 Redis 中的记录永不过期。RQ 目前没有可靠的或自动的方法判断某些失败的任务能否安全重试。
失败任务中抛出的异常,会被 Worker 捕获,pickle 序列化后存储到任务在 Redis 中记录的 exc_info
字段中,而 失败任务的 Job
对象则会放入 failed
队列中。
Job
对象本身有一些有用的属性,可以用于辅助检查:
- 任务原始创建时间;
- 最后入队日期;
- 入队始发队列;
- 函数调用的文本描述;
- 异常信息;
可以根据这些信息手动检查和判断问题并可以重新提交任务。
中断处理
当 Worker 进程被以“礼貌的”方式杀死时(Ctrl + C 或 kill
),RQ 会努力不丢失任何任务,会在当前任务处理完成后停止处理新的任务。
但是,Worker 进程也可以通过 kill -9
的方式强行杀死,此时 Worker 不能“优雅地”完成工作,没有时间把任务加入 failed
队列中。因此,强行杀死进程可能会导致异常。
超时处理
默认情况下,任务应该在 180 秒内执行完毕。否则,Worker 会杀死 work horse 进程,并将任务加入到 failed
队列中,表明工作超时。
根据任务需要,我们可以自定义超时时间。
在 Job 维度,入队时使用 timeout
参数设置:
q = Queue()
q.enqueue_call(mytask, args=(foo,), kwargs={'bar': qux}, timeout=600) # 10 mins
在 Queue 维度,新建 Queue 时使用 timeout
参数设置,对队列中所有任务都有效。
# High prio jobs should end in 8 secs, while low prio
# work may take up to 10 mins
high = Queue('high', default_timeout=8) # 8 secs
low = Queue('low', default_timeout=600) # 10 mins
Job 维度的设置项优先级高于 Queue 维度。
Jobs 任务
从 Redis 中获取 Job
所有的任务信息都存储于 Redis 中,可以使用 Job.fetch(id, connection=redis)
方法获取 Job 实例:
from redis import Redis
from rq.job import Job
connection = Redis()
job = Job.fetch('my_job_id', connection=redis)
print('Status: %s' % job.get_status())
该 Job 对象的一些属性包括:
job.status
job.func_name
job.args
job.kwargs
job.result
job.enqueued_at
job.started_at
job.ended_at
job.exc_info
读写当前 Job 实例
由于任务函数是常规的 Python 函数,因此在任务函数中,只能通过 RQ 的 get_current_job
函数获得当前 Job 的实例。
from rq import get_current_job
def add(x, y):
job = get_current_job()
print('Current job: %s' % (job.id,))
return x + y
通过 Job 实例的 meta
属性 和 save_meta()
方法,可以向 Job 实例中写入数据。(RQ >= 0.8.0)
def add(x, y):
job = get_current_job()
job.meta['timestamp'] = time.time()
job.save_meta()
# do more work
time.sleep(1)
return x + y
注意,以上内容只在任务函数中有效。
Job 的等待执行时间
一个任务有两个 TTL:一个用于执行结果,另一个用于任务 Job 本身。后者表示任务在队列中等待多久后会被取消。该 TTL 可以在创建任务或入队时指定:
from rq.job import Job
# 在任务创建时指定
job = Job.create(func=say_hello, ttl=100)
# 在任务入队时指定
job = q.enqueue(count_words_at_url, 'http://nvie.com', ttl=43)
设置为 -1
时任务将一直等待,不会被取消。被取消的 Job 会立即被从 Redis 中立即删除。
执行失败的 Job
如果任务执行失败,Worker 会把任务放入 failed
队列中,同时 Redis 中 Job 实例的 is_failed
属性会被置为 True。使用 get_failed_queue
可以获取所有失败的任务。
from redis import StrictRedis
from rq import push_connection, get_failed_queue, Queue
from rq.job import Job
con = StrictRedis()
push_connection(con)
def div_by_zero(x):
return x / 0
job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.save()
# 获取 failed queue 对象
fq = get_failed_queue()
# 把 job 加入到 failed queue 中
fq.quarantine(job, Exception('Some fake error'))
assert fq.count == 1
# 把 job 重新加入到执行队列中,并从 failed queue 中删除
fq.requeue(job.id)
assert fq.count == 0
assert Queue('fake').count == 1
Monitoring 监控
rq-dashboard
RQ dashboard 是一个单独分发的,轻量级的 Web 前端监控工具,基于 Flask 开发。
安装方式如下:
pip install rq-dashboard
运行:
rq-dashboard
与 Flask 集成
from flask import Flask
import rq_dashboard
app = Flask(__name__)
app.config.from_object(rq_dashboard.default_settings)
app.register_blueprint(rq_dashboard.blueprint, url_prefix="/rq")
@app.route("/")
def hello():
return "Hello World!"
if __name__ == "__main__":
app.run()
Console 工具
RQ 自带了 console 监控工具,启动命令为 rq info
:
$ rq info
failed |██ 2
default | 0
2 queues, 2 jobs total
InvokerPro.53243 idle: default
1 workers, 2 queues
Updated: 2018-10-29 23:03:07.478540
查询指定队列
通过 rq info queue1 queue2 …
可以返回指定队列的信息:
$ rq info default
default | 0
1 queues, 0 jobs total
InvokerPro.53243 idle: default
1 workers, 1 queues
Updated: 2018-10-29 23:22:17.640684
按队列展示
默认情况下,rq info
输出活跃 Worker 和它们监听的 Queue。
通过设置 -R
或者 --by-queue
,可以让 RQ 按照队列组织展示,即展示队列和监听队列的 Worker。
$ rq info -R
failed |██ 2
default | 0
2 queues, 2 jobs total
failed: –
default: InvokerPro.53243 (idle)
1 workers, 2 queues
Updated: 2018-10-29 23:33:58.255413
定时轮询
默认情况下,rq info
打印信息后就会退出。可以使用 --interval
参数指定轮询间隔,以不断刷新监控信息。
rq info --interval 1
注意,如果 interval 设置的过低,会加重 Redis 的负载。
Connection 连接
**RQ 维护一个 Redis 连接堆栈,每个 RQ 对象实例在创建时,会使用堆栈最顶层的 Redis 连接。**因此我们可以使用 with 上下文管理器创建连接,并在其中新建 RQ 对象实例。
from rq import Queue, Connection
from redis import Redis
with Connection(Redis()):
q = Queue()
或者新建 RQ 对象实例时显式地指定连接:
from rq import Queue
from redis import Redis
conn = Redis('localhost', 6379)
q = Queue('foo', connection=conn)
多 Redis 连接
使用显式连接实现——准确但乏味:
from rq import Queue
from redis import Redis
conn1 = Redis('localhost', 6379)
conn2 = Redis('remote.host.org', 9836)
q1 = Queue('foo', connection=conn1)
q2 = Queue('bar', connection=conn2)
使用 with 上下文管理器实现:
from rq import Queue, Connection
from redis import Redis
with Connection(Redis('localhost', 6379)):
q1 = Queue('foo')
with Connection(Redis('remote.host.org', 9836)):
q2 = Queue('bar')
q3 = Queue('qux')
assert q1.connection != q2.connection
assert q2.connection != q3.connection
assert q1.connection == q3.connection
push/pop 连接
如果代码不允许使用 with
语句(例如在单元测试中),则可以使用 push_connection()
和 pop_connection()
方法替代上下文管理器。
import unittest
from rq import Queue
from rq import push_connection, pop_connection
class MyTest(unittest.TestCase):
def setUp(self):
# 将新连接压入连接堆栈
push_connection(Redis())
def tearDown(self):
# 从连接堆栈中丢弃连接
pop_connection()
def test_foo(self):
"""Any queues created here use local Redis."""
q = Queue()
...
结合 Sentinel
要使用 redis sentinel,必须在配置文件中指定字典。将此设置与带有自动重启选项的 systemd 或 docker 容器结合使用,以便允许 worker 和 RQ 通过容错连接(fault-tolerant connection)连接 Redis。
SENTINEL: {
'INSTANCES':[
('remote.host1.org', 26379),
('remote.host2.org', 26379),
('remote.host3.org', 26379)
],
'SOCKET_TIMEOUT': None,
'PASSWORD': 'secret',
'DB': 2,
'MASTER_NAME': 'master'
}
Exception 异常处理
任务在发生异常时会执行失败,当 RQ Worker 在后台运行时,你怎么才能知道任务失败了呢?
failed
队列
默认情况下,RQ 会将失败的任务放入到 failed
队列中,包含了它们的异常信息(类型,值,堆栈)。这只能被动保存发生的异常,但不会有任何主动通知。
自定义异常处理
RQ 支持注册自定义异常处理程序(RQ >= 0.3.1)。这样就可以在发生异常时采取其他步骤,或者替换默认地将失败任务发送到 failed
队列的行为。
在创建 Worker 时,使用 exception_handlers
参数指定异常处理程序列表。
from rq.handlers import move_to_failed_queue # RQ 默认的异常处理行为——发送任务到 failed 队列
w = Worker([q], exception_handlers=[my_handler, move_to_failed_queue])
...
异常处理 handler 是一个函数,其参数为:
job
任务对象exc_type
异常类型exc_value
异常值traceback
异常堆栈
def my_handler(job, exc_type, exc_value, traceback):
# do custom things here
# for example, write the exception info to a DB
...
或者使用可变参数定义:job
和 *exc_info
。
def my_handler(job, *exc_info):
# do custom things here
...
链式异常处理
异常处理程序可以决定是否完成处理异常,还是由堆栈中的后续程序继续处理异常。这通过返回值来控制:
- 若返回
True
表示继续,并进入下一个异常处理程序; - 若返回
False
表示停止处理异常; - 若没有返回值,即返回
None
,则认为是True
,继续进入下一个异常处理程序;
如果要替换默认的错误处理行为,错误处理程序应该返回 False
。
def black_hole(job, *exc_info):
return False
Testing 测试
单元测试中的 Worker
许多框架在执行单元测试时使用内存数据库,这些数据库与 RQ 默认使用的 fork()
不太兼容。
因此在单元测试用,应该使用 SimpleWorker
类来避免 fork()
,且建议以突发模式运行。
from redis import Redis
from rq import SimpleWorker, Queue
queue = Queue(connection=Redis())
queue.enqueue(my_long_running_job)
worker = SimpleWorker([queue], connection=queue.connection)
worker.work(burst=True) # Runs enqueued job
# Check for result...
在单元测试中执行任务
另一种解决方案是,在创建队列时使用 is_async=False
参数,使任务在同一个线程中立即执行,而不是将其分配给 Worker。此时不需要启动 RQ worker。
除此之外,还可以使用 FakeStrictRedis 替代 Redis,也不必再启动 Redis 服务器,FakeStrictRedis 的实例可以直接作为连接参数传递给队列。
from fakeredis import FakeStrictRedis
from rq import Queue
queue = Queue(is_async=False, connection=FakeStrictRedis())
job = queue.enqueue(my_long_running_job)
assert job.is_finished