python - celery beat 調度如何運行期間動態添加任務?
問題描述
我嘗試過django-celery-beat,在admin后臺添加任務,可以實現動態添加任務但要重啟celery beat才生效,請問,有其他方試嗎?
問題解答
回答1:無法動態添加,必須重啟 beat。
ask 回答過原因了 #3493
回答2:有個思路,你可以考慮,我目前也在嘗試這個方法,處于摸石過河階段。celery是支持定時任務,但是不符合我的需求,我需要像linux下的crontab這樣動態添加定時任務,我也看了django-celery-beat,因為用的是Flask,發現不值得參考實現,所以一直在看文檔搜資料,終于被我找到一種方式,celery的apply_async這個函數非常有用,它有個eta參數,它的簡化使用是countdown,但是eta的威力是很巨大的,因為它只接受datetime對象,比如你給定一個任務在2017-05-02 20:0:0執行,你可以這樣使用:
job.apply_async(args=args, kwarg=kwargs, eta=datetime(2017,5,2,20,0,0))
是不是很少用,假如我有一個任務需要每天晚上八點執行,我可以利用這個eta參數實現。偽代碼如下:
時間規則 = ’每天晚上八點執行’第一次調用任務,先計算最近的執行時間,作為eta的參數,調用apply_async函數,然后第一次任務執行成功,得到上次任務的eta參數值,在天的值上加一,然后把新的執行時間作為eta的參數再次調用apply_async函數,這里省略了很多判斷,自行腦補。循環往復,是不是一直按每天晚上八點執行。
這里有個非常重要的點是如何在任務執行成功的時候計算下一次的執行時間,做法如下
class MyTask(Task): def on_success(self, retval, task_id, args, kwargs):print ’task done: {0}’.format(retval)return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo):print ’task fail, reason: {0}’.format(exc)return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)@app.task(base=MyTask)def add(x, y): return x + y
它提供了任務執行成功和失敗的函數,我們只要在此基礎上重寫就可以了,我說的只是最核心的部分,具體怎么做有很多方法,
相關文章:
1. python文檔怎么查看?2. android - NavigationView 的側滑菜單中如何保存新增項(通過程序添加)3. javascript - ios返回不執行js怎么解決?4. javascript - 請問一下vue當中是在什么時候請求數據保存全局變量的?5. 除了 python2 和 python3,ipython notebook 還可以用哪些內核?6. 這段代碼既不提示錯誤也看不到結果,請老師明示錯在哪里,謝謝!7. mysql - 怎么生成這個sql表?8. 提示語法錯誤語法錯誤: unexpected ’abstract’ (T_ABSTRACT)9. tp5 不同控制器中的變量調用問題10. 數據庫 - mysql如何處理數據變化中的事務?
