Laravel實(shí)現(xiàn)隊(duì)列的示例代碼
目錄
- 一:隊(duì)列配置
- 1:隊(duì)列相關(guān)配置
- 2:不同隊(duì)列依賴
- 二:創(chuàng)建隊(duì)列任務(wù)
- 三:任務(wù)分發(fā)
- 1:默認(rèn)分發(fā)
- 2:延時(shí)分發(fā)
- 3:指定隊(duì)列分發(fā)
- 4:指定驅(qū)動(dòng)分發(fā)
- 5:指定驅(qū)動(dòng)和隊(duì)列分發(fā)
- 四:任務(wù)處理
- 五:失敗任務(wù)處理
- 六:使用Supervisor管理隊(duì)列
- 1:Supervisor安裝
- 2:配置Supervisor
- 3:啟動(dòng)Supervisor
- 補(bǔ)充
一:隊(duì)列配置
隊(duì)列的配置文件放置在config/queue.php文件中,laravel框架中支持的隊(duì)列驅(qū)動(dòng)有:sync, database, beanstalkd, sqs, redis,null對應(yīng)著:同步(本地使用)驅(qū)動(dòng),數(shù)據(jù)庫驅(qū)動(dòng),beanstalkd ,Amazon SQS ,redis,null 隊(duì)列驅(qū)動(dòng)用于那些放棄隊(duì)列的任務(wù)
1:隊(duì)列相關(guān)配置
(1):隊(duì)列驅(qū)動(dòng)配置
"default" => env("QUEUE_DRIVER", "sync"),//隊(duì)列驅(qū)動(dòng)設(shè)置
(2):不同驅(qū)動(dòng)相關(guān)配置
"connections" => [ syns驅(qū)動(dòng)配置 "sync" => [ "driver" => "sync", ], 數(shù)據(jù)庫驅(qū)動(dòng)配置 "database" => [ "driver" => "database", "table" => "jobs",//數(shù)據(jù)庫驅(qū)動(dòng)配置使用的數(shù)據(jù)庫 "queue" => "default", "retry_after" => 90,//指定了任務(wù)最多處理多少秒后就被當(dāng)做失敗重試,比如說,如果這個(gè)選項(xiàng)設(shè)置為 90,那么當(dāng)這個(gè)任務(wù)持續(xù)執(zhí)行了 90 秒而沒有被刪除,那么它將被釋放回隊(duì)列 ], //beanstalkd驅(qū)動(dòng)配置 "beanstalkd" => [ "driver" => "beanstalkd", "host" => "localhost",//使用beanstalkd驅(qū)動(dòng)地址 "queue" => "default", "retry_after" => 90,//指定了任務(wù)最多處理多少秒后就被當(dāng)做失敗重試,比如說,如果這個(gè)選項(xiàng)設(shè)置為 90,那么當(dāng)這個(gè)任務(wù)持續(xù)執(zhí)行了 90 秒而沒有被刪除,那么它將被釋放回隊(duì)列 "block_for" => 0, ], //sqs驅(qū)動(dòng)配置 "sqs" => [ "driver" => "sqs", "key" => env("AWS_ACCESS_KEY_ID"), "secret" => env("AWS_SECRET_ACCESS_KEY"), "prefix" => env("SQS_PREFIX", "https://sqs.us-east-1.amazonaws.com/your-account-id"), "queue" => env("SQS_QUEUE", "your-queue-name"), "region" => env("AWS_DEFAULT_REGION", "us-east-1"), ], //redis驅(qū)動(dòng)配置 "redis" => [ "driver" => "redis", "connection" => "default",//使用哪個(gè)連接的redis,redis配置是在config/database.php文件中 "queue" => env("REDIS_QUEUE", "default"), "retry_after" => 90, "block_for" => null, ], ],
2:不同隊(duì)列依賴
(1):數(shù)據(jù)庫驅(qū)動(dòng)
使用數(shù)據(jù)庫驅(qū)動(dòng)需要生成一個(gè)隊(duì)列驅(qū)動(dòng)表
php artisan queue:table php artisan migrate
執(zhí)行上面的命令之后會(huì)發(fā)現(xiàn)數(shù)據(jù)庫中會(huì)增加一個(gè)jobs表
(2):redis驅(qū)動(dòng)
使用redis驅(qū)動(dòng)需要安裝一個(gè)predis/predis 拓展
composer require predis/predis
(3):Amazon SQS驅(qū)動(dòng)
使用Amazon SQS驅(qū)動(dòng)時(shí)需要安裝aws/aws-sdk-php拓展
composer require aws/aws-sdk-php
(4):Beanstalkd驅(qū)動(dòng)
使用Beanstalkd驅(qū)動(dòng)需要安裝pda/pheanstalk拓展
composer require pda/pheanstalk
二:創(chuàng)建隊(duì)列任務(wù)
php artisan make:job TestJobs
執(zhí)行上面的命令創(chuàng)建一個(gè)隊(duì)列任務(wù)類,這時(shí)候會(huì)發(fā)現(xiàn)在app/jobs目錄下生成一個(gè)TestJobs.php文件
簡單的隊(duì)列任務(wù)類實(shí)例:
<?php namespace App\Jobs; use App\Models\blog\User; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; class TestJobs implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; /** * 任務(wù)最大嘗試次數(shù)。 * * @var int */ public $tries = 5; /** * 任務(wù)運(yùn)行的超時(shí)時(shí)間。 * 指定了 Laravel 隊(duì)列處理器最多執(zhí)行多長時(shí)間后就應(yīng)該被關(guān)閉掉 * --timeout 應(yīng)該永遠(yuǎn)都要比 retry_after 短至少幾秒鐘的時(shí)間。這樣就能保證任務(wù)進(jìn)程總能在失敗重試前就被殺死了。 * 如果你的 --timeout 選項(xiàng)大于 retry_after 配置選項(xiàng),你的任務(wù)可能被執(zhí)行兩次 * * @var int */ public $timeout = 120; public $info; /** * Create a new job instance. * * @return void */ public function __construct($info) { // $this->info = $info; } /** * Execute the job. * * @return void */ public function handle() { // $user = new User(); $user->user_no = $this->info["user_no"]; $user->user_name = $this->info["user_name"]; $user->save(); } }
三:任務(wù)分發(fā)
1:默認(rèn)分發(fā)
$info = [ "user_no"=>"006", "user_name"=>"testName" ]; TestJobs::dispatch($info);
2:延時(shí)分發(fā)
TestJobs::dispatch($info)->delay(Carbon::now()->addMinutes(10));//表示延時(shí)十分鐘分發(fā)任務(wù)
3:指定隊(duì)列分發(fā)
TestJobs::dispatch($info)->onQueue("processing");//表示使用默認(rèn)驅(qū)動(dòng)的processing隊(duì)列
4:指定驅(qū)動(dòng)分發(fā)
TestJobs::dispatch($info)->onConnection("redis");//使用redis驅(qū)動(dòng)的默認(rèn)隊(duì)列
5:指定驅(qū)動(dòng)和隊(duì)列分發(fā)
TestJobs::dispatch($info)->onConnection("redis")->onQueue("processing");//使用redis驅(qū)動(dòng)的processing隊(duì)列
四:任務(wù)處理
php artisan queue:work
執(zhí)行后我們會(huì)發(fā)現(xiàn)user表中發(fā)現(xiàn)多了一條user_no為006,user_name為testName數(shù)據(jù),但是如果你指定了驅(qū)動(dòng)和隊(duì)列的話,這時(shí)候執(zhí)行php artisan queue:work,你就會(huì)發(fā)現(xiàn)數(shù)據(jù)庫中沒有數(shù)據(jù)加進(jìn)去,這是因?yàn)閜hp artisan queue:work命令是對默認(rèn)驅(qū)動(dòng)和'default'隊(duì)列監(jiān)聽,這時(shí)候就要使用:
php artisan queue:work redis --queue="processing" //redis表示指定驅(qū)動(dòng) processing表示指定隊(duì)列
五:失敗任務(wù)處理
php artisan queue:failed-table php artisan migrate
執(zhí)行上面命令后會(huì)在數(shù)據(jù)庫中增加一張failed_jobs表,專門用于存儲失敗的任務(wù)信息,在TestJobs類中添加一個(gè)failed方法處理失敗隊(duì)列
/** * 要處理的失敗任務(wù)。 * * @param Exception $exception * @return void */ public function failed(Exception $exception) { // 給用戶發(fā)送失敗通知,等等... }
如果你想要注冊一個(gè)只要當(dāng)隊(duì)列任務(wù)失敗時(shí)就會(huì)被調(diào)用的事件,我們可以在 Laravel 的 app/Providers/AppServiceProvider.php文件中對這個(gè)事件附加一個(gè)回調(diào)函數(shù)即可
/** * 啟動(dòng)任意應(yīng)用程序的服務(wù)。 * * @return void */ public function boot() { Queue::failing(function (JobFailed $event) { // $event->connectionName // $event->job // $event->exception }); }
六:使用Supervisor管理隊(duì)列
一旦使用queue:work 命令,它將一直運(yùn)行,直到你手動(dòng)停止或者你關(guān)閉控制臺,如果你想要讓queue:work 命令永久在后臺運(yùn)行,這時(shí)候可以使用進(jìn)程監(jiān)控工具Supervisor來實(shí)現(xiàn)永久在后臺運(yùn)行
1:Supervisor安裝
Supervisor安裝可以參考文末補(bǔ)充內(nèi)容
2:配置Supervisor
(1):配置supervisord.conf
在/etc/supervisord.conf文件的最后一行增加
files = supervisord.d/*.ini
(2):隊(duì)列進(jìn)程配置
在/etc/supervisord.d/目錄下創(chuàng)建一個(gè).ini文件
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work database --sleep=3 --tries=3 #/home/forge/app.com為項(xiàng)目地址
autostart=true
autorestart=true
user=forge
numprocs=8 #numprocs 命令會(huì)要求 Supervisor 運(yùn)行并監(jiān)控 8 個(gè) queue:work 進(jìn)程
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
3:啟動(dòng)Supervisor
systemctl start supervisord.service
supervisorctl update
supervisorctl reload
supervisorctl start laravel-worker
補(bǔ)充
Linux下使用supervisor實(shí)現(xiàn)進(jìn)程管理
最近在linux下寫了一個(gè)腳本需要在linux后臺一直運(yùn)行,這里我使用了supervisor來實(shí)現(xiàn)腳本進(jìn)程管理
supervisor安裝
1:yum安裝
yum install supervisor
2:pip安裝
pip install supervisor
supervisor常用命令
supervisorctl status//查看所有進(jìn)程的狀態(tài) supervisorctl stop xx //停止指定進(jìn)程(all為所有進(jìn)程) supervisorctl start xx //啟動(dòng)指定進(jìn)程(all為所有進(jìn)程) supervisorctl restart //重啟 supervisorctl update//配置文件修改后使用該命令加載新的配置 supervisorctl reload//重新啟動(dòng)配置中的所有程序 systemctl start supervisord.service //啟動(dòng)supervisor并加載默認(rèn)配置文件 systemctl enable supervisord.service //將supervisor加入開機(jī)啟動(dòng)項(xiàng)
將指定命令加入進(jìn)程管理實(shí)例
1:supervisor配置
supervisor配置文件:/etc/supervisord.conf
子進(jìn)程配置文件路徑:/etc/supervisord.d/ (子進(jìn)程的配置文件為ini格式)
我們增加一個(gè)命令到進(jìn)程中只需要在子進(jìn)程配置文件目錄下創(chuàng)建一個(gè)ini進(jìn)程文件進(jìn)行配置即可
例:
vim /etc/supervisord.d/test.ini
在test.ini文件中加入如下命令:
[program:test] #項(xiàng)目進(jìn)程名稱
dircetory=/XXX #進(jìn)程目錄
command=XXX #進(jìn)程命令
autostart = true #在supervisord啟動(dòng)的時(shí)候是否自動(dòng)啟動(dòng)
autorestart=false #程序退出后是否自動(dòng)重啟
#日志輸出
stderr_logfile=/tmp/client_stderr.log
stdout_logfile=/tmp/client_stdout.log
user=www #腳本運(yùn)行的用戶身份
2:將test進(jìn)程加入進(jìn)程管理
systemctl start supervisord.service
supervisorctl update
supervisorctl reload
supervisorctl start test
根據(jù)如上布置就可以實(shí)現(xiàn)將指定腳本加入進(jìn)程管理
以上就是Laravel實(shí)現(xiàn)隊(duì)列的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Laravel隊(duì)列的資料請關(guān)注其它相關(guān)文章!
