This commit is contained in:
Paolo A
2024-08-13 13:44:16 +00:00
parent 1bbb23088d
commit e796d76612
4001 changed files with 30101 additions and 40075 deletions

36
vendor/laravel/framework/src/Illuminate/Queue/BeanstalkdQueue.php vendored Normal file → Executable file
View File

@@ -44,14 +44,20 @@ class BeanstalkdQueue extends Queue implements QueueContract
* @param string $default
* @param int $timeToRun
* @param int $blockFor
* @param bool $dispatchAfterCommit
* @return void
*/
public function __construct(Pheanstalk $pheanstalk, $default, $timeToRun, $blockFor = 0)
public function __construct(Pheanstalk $pheanstalk,
$default,
$timeToRun,
$blockFor = 0,
$dispatchAfterCommit = false)
{
$this->default = $default;
$this->blockFor = $blockFor;
$this->timeToRun = $timeToRun;
$this->pheanstalk = $pheanstalk;
$this->dispatchAfterCommit = $dispatchAfterCommit;
}
/**
@@ -77,7 +83,15 @@ class BeanstalkdQueue extends Queue implements QueueContract
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
null,
function ($payload, $queue) {
return $this->pushRaw($payload, $queue);
}
);
}
/**
@@ -106,13 +120,19 @@ class BeanstalkdQueue extends Queue implements QueueContract
*/
public function later($delay, $job, $data = '', $queue = null)
{
$pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue));
return $pheanstalk->put(
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
Pheanstalk::DEFAULT_PRIORITY,
$this->secondsUntil($delay),
$this->timeToRun
$queue,
$delay,
function ($payload, $queue, $delay) {
return $this->pheanstalk->useTube($this->getQueue($queue))->put(
$payload,
Pheanstalk::DEFAULT_PRIORITY,
$this->secondsUntil($delay),
$this->timeToRun
);
}
);
}

View File

@@ -3,6 +3,7 @@
namespace Illuminate\Queue;
use Closure;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Queue\ShouldQueue;
@@ -11,15 +12,22 @@ use ReflectionFunction;
class CallQueuedClosure implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* The serializable Closure instance.
*
* @var \Illuminate\Queue\SerializableClosure
* @var \Laravel\SerializableClosure\SerializableClosure
*/
public $closure;
/**
* The callbacks that should be executed on failure.
*
* @var array
*/
public $failureCallbacks = [];
/**
* Indicate if the job should be deleted when models are missing.
*
@@ -30,10 +38,10 @@ class CallQueuedClosure implements ShouldQueue
/**
* Create a new job instance.
*
* @param \Illuminate\Queue\SerializableClosure $closure
* @param \Laravel\SerializableClosure\SerializableClosure $closure
* @return void
*/
public function __construct(SerializableClosure $closure)
public function __construct($closure)
{
$this->closure = $closure;
}
@@ -46,7 +54,7 @@ class CallQueuedClosure implements ShouldQueue
*/
public static function create(Closure $job)
{
return new self(new SerializableClosure($job));
return new self(SerializableClosureFactory::make($job));
}
/**
@@ -57,7 +65,35 @@ class CallQueuedClosure implements ShouldQueue
*/
public function handle(Container $container)
{
$container->call($this->closure->getClosure());
$container->call($this->closure->getClosure(), ['job' => $this]);
}
/**
* Add a callback to be executed if the job fails.
*
* @param callable $callback
* @return $this
*/
public function onFailure($callback)
{
$this->failureCallbacks[] = $callback instanceof Closure
? SerializableClosureFactory::make($callback)
: $callback;
return $this;
}
/**
* Handle a job failure.
*
* @param \Throwable $e
* @return void
*/
public function failed($e)
{
foreach ($this->failureCallbacks as $callback) {
$callback($e);
}
}
/**

View File

@@ -3,12 +3,19 @@
namespace Illuminate\Queue;
use Exception;
use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Contracts\Cache\Repository as Cache;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Encryption\Encrypter;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
use Illuminate\Database\Eloquent\ModelNotFoundException;
use Illuminate\Pipeline\Pipeline;
use Illuminate\Support\Str;
use ReflectionClass;
use RuntimeException;
class CallQueuedHandler
{
@@ -50,16 +57,25 @@ class CallQueuedHandler
{
try {
$command = $this->setJobInstanceIfNecessary(
$job, unserialize($data['command'])
$job, $this->getCommand($data)
);
} catch (ModelNotFoundException $e) {
return $this->handleModelNotFound($job, $e);
}
if ($command instanceof ShouldBeUniqueUntilProcessing) {
$this->ensureUniqueJobLockIsReleased($command);
}
$this->dispatchThroughMiddleware($job, $command);
if (! $job->isReleased() && ! $command instanceof ShouldBeUniqueUntilProcessing) {
$this->ensureUniqueJobLockIsReleased($command);
}
if (! $job->hasFailed() && ! $job->isReleased()) {
$this->ensureNextJobInChainIsDispatched($command);
$this->ensureSuccessfulBatchJobIsRecorded($command);
}
if (! $job->isDeletedOrReleased()) {
@@ -67,6 +83,27 @@ class CallQueuedHandler
}
}
/**
* Get the command from the given payload.
*
* @param array $data
* @return mixed
*
* @throws \RuntimeException
*/
protected function getCommand(array $data)
{
if (Str::startsWith($data['command'], 'O:')) {
return unserialize($data['command']);
}
if ($this->container->bound(Encrypter::class)) {
return unserialize($this->container[Encrypter::class]->decrypt($data['command']));
}
throw new RuntimeException('Unable to extract job payload.');
}
/**
* Dispatch the given job / command through its specified middleware.
*
@@ -132,6 +169,50 @@ class CallQueuedHandler
}
}
/**
* Ensure the batch is notified of the successful job completion.
*
* @param mixed $command
* @return void
*/
protected function ensureSuccessfulBatchJobIsRecorded($command)
{
$uses = class_uses_recursive($command);
if (! in_array(Batchable::class, $uses) ||
! in_array(InteractsWithQueue::class, $uses) ||
is_null($command->batch())) {
return;
}
$command->batch()->recordSuccessfulJob($command->job->uuid());
}
/**
* Ensure the lock for a unique job is released.
*
* @param mixed $command
* @return void
*/
protected function ensureUniqueJobLockIsReleased($command)
{
if (! $command instanceof ShouldBeUnique) {
return;
}
$uniqueId = method_exists($command, 'uniqueId')
? $command->uniqueId()
: ($command->uniqueId ?? '');
$cache = method_exists($command, 'uniqueVia')
? $command->uniqueVia()
: $this->container->make(Cache::class);
$cache->lock(
'laravel_unique_job:'.get_class($command).$uniqueId
)->forceRelease();
}
/**
* Handle a model not found exception.
*
@@ -163,15 +244,56 @@ class CallQueuedHandler
* The exception that caused the failure will be passed.
*
* @param array $data
* @param \Throwable $e
* @param \Throwable|null $e
* @param string $uuid
* @return void
*/
public function failed(array $data, $e)
public function failed(array $data, $e, string $uuid)
{
$command = unserialize($data['command']);
$command = $this->getCommand($data);
if (! $command instanceof ShouldBeUniqueUntilProcessing) {
$this->ensureUniqueJobLockIsReleased($command);
}
$this->ensureFailedBatchJobIsRecorded($uuid, $command, $e);
$this->ensureChainCatchCallbacksAreInvoked($uuid, $command, $e);
if (method_exists($command, 'failed')) {
$command->failed($e);
}
}
/**
* Ensure the batch is notified of the failed job.
*
* @param string $uuid
* @param mixed $command
* @param \Throwable $e
* @return void
*/
protected function ensureFailedBatchJobIsRecorded(string $uuid, $command, $e)
{
if (! in_array(Batchable::class, class_uses_recursive($command)) ||
is_null($command->batch())) {
return;
}
$command->batch()->recordFailedJob($uuid, $e);
}
/**
* Ensure the chained job catch callbacks are invoked.
*
* @param string $uuid
* @param mixed $command
* @param \Throwable $e
* @return void
*/
protected function ensureChainCatchCallbacksAreInvoked(string $uuid, $command, $e)
{
if (method_exists($command, 'invokeChainCatchCallbacks')) {
$command->invokeChainCatchCallbacks($e);
}
}
}

View File

@@ -20,7 +20,8 @@ class BeanstalkdConnector implements ConnectorInterface
$this->pheanstalk($config),
$config['queue'],
$config['retry_after'] ?? Pheanstalk::DEFAULT_TTR,
$config['block_for'] ?? 0
$config['block_for'] ?? 0,
$config['after_commit'] ?? null
);
}

View File

View File

@@ -37,7 +37,8 @@ class DatabaseConnector implements ConnectorInterface
$this->connections->connection($config['connection'] ?? null),
$config['table'],
$config['queue'],
$config['retry_after'] ?? 60
$config['retry_after'] ?? 60,
$config['after_commit'] ?? null
);
}
}

View File

@@ -46,7 +46,8 @@ class RedisConnector implements ConnectorInterface
$this->redis, $config['queue'],
$config['connection'] ?? $this->connection,
$config['retry_after'] ?? 60,
$config['block_for'] ?? null
$config['block_for'] ?? null,
$config['after_commit'] ?? null
);
}
}

View File

@@ -23,7 +23,11 @@ class SqsConnector implements ConnectorInterface
}
return new SqsQueue(
new SqsClient($config), $config['queue'], $config['prefix'] ?? '', $config['suffix'] ?? ''
new SqsClient($config),
$config['queue'],
$config['prefix'] ?? '',
$config['suffix'] ?? '',
$config['after_commit'] ?? null
);
}

View File

View File

@@ -24,7 +24,7 @@ class ListFailedCommand extends Command
/**
* The table headers for the command.
*
* @var array
* @var string[]
*/
protected $headers = ['ID', 'Connection', 'Queue', 'Class', 'Failed At'];
@@ -66,7 +66,7 @@ class ListFailedCommand extends Command
{
$row = array_values(Arr::except($failed, ['payload', 'exception']));
array_splice($row, 3, 0, $this->extractJobName($failed['payload']));
array_splice($row, 3, 0, $this->extractJobName($failed['payload']) ?: '');
return $row;
}

View File

@@ -15,7 +15,9 @@ class ListenCommand extends Command
*/
protected $signature = 'queue:listen
{connection? : The name of connection}
{--delay=0 : The number of seconds to delay failed jobs}
{--name=default : The name of the worker}
{--delay=0 : The number of seconds to delay failed jobs (Deprecated)}
{--backoff=0 : The number of seconds to wait before retrying a job that encountered an uncaught exception}
{--force : Force the worker to run even in maintenance mode}
{--memory=128 : The memory limit in megabytes}
{--queue= : The queue to listen on}
@@ -91,10 +93,18 @@ class ListenCommand extends Command
*/
protected function gatherOptions()
{
$backoff = $this->hasOption('backoff')
? $this->option('backoff')
: $this->option('delay');
return new ListenerOptions(
$this->option('env'), $this->option('delay'),
$this->option('memory'), $this->option('timeout'),
$this->option('sleep'), $this->option('tries'),
$this->option('name'),
$this->option('env'),
$backoff,
$this->option('memory'),
$this->option('timeout'),
$this->option('sleep'),
$this->option('tries'),
$this->option('force')
);
}

View File

@@ -2,8 +2,13 @@
namespace Illuminate\Queue\Console;
use DateTimeInterface;
use Illuminate\Console\Command;
use Illuminate\Contracts\Encryption\Encrypter;
use Illuminate\Queue\Events\JobRetryRequested;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use RuntimeException;
class RetryCommand extends Command
{
@@ -14,6 +19,7 @@ class RetryCommand extends Command
*/
protected $signature = 'queue:retry
{id?* : The ID of the failed job or "all" to retry all jobs}
{--queue= : Retry all of the failed jobs for the specified queue}
{--range=* : Range of job IDs (numeric) to be retried}';
/**
@@ -36,6 +42,8 @@ class RetryCommand extends Command
if (is_null($job)) {
$this->error("Unable to find failed job with ID [{$id}].");
} else {
$this->laravel['events']->dispatch(new JobRetryRequested($job));
$this->retryJob($job);
$this->info("The failed job [{$id}] has been pushed back onto the queue!");
@@ -58,6 +66,10 @@ class RetryCommand extends Command
return Arr::pluck($this->laravel['queue.failer']->all(), 'id');
}
if ($queue = $this->option('queue')) {
return $this->getJobIdsByQueue($queue);
}
if ($ranges = (array) $this->option('range')) {
$ids = array_merge($ids, $this->getJobIdsByRanges($ranges));
}
@@ -65,6 +77,26 @@ class RetryCommand extends Command
return array_values(array_filter(array_unique($ids)));
}
/**
* Get the job IDs by queue, if applicable.
*
* @param string $queue
* @return array
*/
protected function getJobIdsByQueue($queue)
{
$ids = collect($this->laravel['queue.failer']->all())
->where('queue', $queue)
->pluck('id')
->toArray();
if (count($ids) === 0) {
$this->error("Unable to find failed jobs for queue [{$queue}].");
}
return $ids;
}
/**
* Get the job IDs ranges, if applicable.
*
@@ -93,14 +125,14 @@ class RetryCommand extends Command
protected function retryJob($job)
{
$this->laravel['queue']->connection($job->connection)->pushRaw(
$this->resetAttempts($job->payload), $job->queue
$this->refreshRetryUntil($this->resetAttempts($job->payload)), $job->queue
);
}
/**
* Reset the payload attempts.
*
* Applicable to Redis jobs which store attempts in their payload.
* Applicable to Redis and other jobs which store attempts in their payload.
*
* @param string $payload
* @return string
@@ -115,4 +147,41 @@ class RetryCommand extends Command
return json_encode($payload);
}
/**
* Refresh the "retry until" timestamp for the job.
*
* @param string $payload
* @return string
*
* @throws \RuntimeException
*/
protected function refreshRetryUntil($payload)
{
$payload = json_decode($payload, true);
if (! isset($payload['data']['command'])) {
return json_encode($payload);
}
if (Str::startsWith($payload['data']['command'], 'O:')) {
$instance = unserialize($payload['data']['command']);
} elseif ($this->laravel->bound(Encrypter::class)) {
$instance = unserialize($this->laravel->make(Encrypter::class)->decrypt($payload['data']['command']));
}
if (! isset($instance)) {
throw new RuntimeException('Unable to extract job payload.');
}
if (is_object($instance) && ! $instance instanceof \__PHP_Incomplete_Class && method_exists($instance, 'retryUntil')) {
$retryUntil = $instance->retryUntil();
$payload['retryUntil'] = $retryUntil instanceof DateTimeInterface
? $retryUntil->getTimestamp()
: $retryUntil;
}
return json_encode($payload);
}
}

View File

@@ -21,14 +21,19 @@ class WorkCommand extends Command
*/
protected $signature = 'queue:work
{connection? : The name of the queue connection to work}
{--name=default : The name of the worker}
{--queue= : The names of the queues to work}
{--daemon : Run the worker in daemon mode (Deprecated)}
{--once : Only process the next job on the queue}
{--stop-when-empty : Stop when the queue is empty}
{--delay=0 : The number of seconds to delay failed jobs}
{--delay=0 : The number of seconds to delay failed jobs (Deprecated)}
{--backoff=0 : The number of seconds to wait before retrying a job that encountered an uncaught exception}
{--max-jobs=0 : The number of jobs to process before stopping}
{--max-time=0 : The maximum number of seconds the worker should run}
{--force : Force the worker to run even in maintenance mode}
{--memory=128 : The memory limit in megabytes}
{--sleep=3 : Number of seconds to sleep when no job is available}
{--rest=0 : Number of seconds to rest between jobs}
{--timeout=60 : The number of seconds a child process can run}
{--tries=1 : Number of times to attempt a job before logging it failed}';
@@ -71,7 +76,7 @@ class WorkCommand extends Command
/**
* Execute the console command.
*
* @return void
* @return int|null
*/
public function handle()
{
@@ -92,7 +97,7 @@ class WorkCommand extends Command
// connection being run for the queue operation currently being executed.
$queue = $this->getQueue($connection);
$this->runWorker(
return $this->runWorker(
$connection, $queue
);
}
@@ -102,13 +107,13 @@ class WorkCommand extends Command
*
* @param string $connection
* @param string $queue
* @return array
* @return int|null
*/
protected function runWorker($connection, $queue)
{
$this->worker->setCache($this->cache);
return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
return $this->worker->setName($this->option('name'))
->setCache($this->cache)
->{$this->option('once') ? 'runNextJob' : 'daemon'}(
$connection, $queue, $this->gatherWorkerOptions()
);
}
@@ -121,10 +126,17 @@ class WorkCommand extends Command
protected function gatherWorkerOptions()
{
return new WorkerOptions(
$this->option('delay'), $this->option('memory'),
$this->option('timeout'), $this->option('sleep'),
$this->option('tries'), $this->option('force'),
$this->option('stop-when-empty')
$this->option('name'),
max($this->option('backoff'), $this->option('delay')),
$this->option('memory'),
$this->option('timeout'),
$this->option('sleep'),
$this->option('tries'),
$this->option('force'),
$this->option('stop-when-empty'),
$this->option('max-jobs'),
$this->option('max-time'),
$this->option('rest')
);
}
@@ -196,8 +208,10 @@ class WorkCommand extends Command
protected function logFailedJob(JobFailed $event)
{
$this->laravel['queue.failer']->log(
$event->connectionName, $event->job->getQueue(),
$event->job->getRawBody(), $event->exception
$event->connectionName,
$event->job->getQueue(),
$event->job->getRawBody(),
$event->exception
);
}

View File

@@ -14,7 +14,8 @@ class Create{{tableClassName}}Table extends Migration
public function up()
{
Schema::create('{{table}}', function (Blueprint $table) {
$table->bigIncrements('id');
$table->id();
$table->string('uuid')->unique();
$table->text('connection');
$table->text('queue');
$table->longText('payload');

View File

@@ -2,14 +2,16 @@
namespace Illuminate\Queue;
use Illuminate\Contracts\Queue\ClearableQueue;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Database\Connection;
use Illuminate\Queue\Jobs\DatabaseJob;
use Illuminate\Queue\Jobs\DatabaseJobRecord;
use Illuminate\Support\Carbon;
use Illuminate\Support\Str;
use PDO;
class DatabaseQueue extends Queue implements QueueContract
class DatabaseQueue extends Queue implements QueueContract, ClearableQueue
{
/**
* The database connection instance.
@@ -46,14 +48,20 @@ class DatabaseQueue extends Queue implements QueueContract
* @param string $table
* @param string $default
* @param int $retryAfter
* @param bool $dispatchAfterCommit
* @return void
*/
public function __construct(Connection $database, $table, $default = 'default', $retryAfter = 60)
public function __construct(Connection $database,
$table,
$default = 'default',
$retryAfter = 60,
$dispatchAfterCommit = false)
{
$this->table = $table;
$this->default = $default;
$this->database = $database;
$this->retryAfter = $retryAfter;
$this->dispatchAfterCommit = $dispatchAfterCommit;
}
/**
@@ -79,9 +87,15 @@ class DatabaseQueue extends Queue implements QueueContract
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushToDatabase($queue, $this->createPayload(
$job, $this->getQueue($queue), $data
));
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
null,
function ($payload, $queue) {
return $this->pushToDatabase($queue, $payload);
}
);
}
/**
@@ -108,9 +122,15 @@ class DatabaseQueue extends Queue implements QueueContract
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushToDatabase($queue, $this->createPayload(
$job, $this->getQueue($queue), $data
), $delay);
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
$delay,
function ($payload, $queue, $delay) {
return $this->pushToDatabase($queue, $payload, $delay);
}
);
}
/**
@@ -232,10 +252,16 @@ class DatabaseQueue extends Queue implements QueueContract
protected function getLockForPopping()
{
$databaseEngine = $this->database->getPdo()->getAttribute(PDO::ATTR_DRIVER_NAME);
$databaseVersion = $this->database->getPdo()->getAttribute(PDO::ATTR_SERVER_VERSION);
$databaseVersion = $this->database->getConfig('version') ?? $this->database->getPdo()->getAttribute(PDO::ATTR_SERVER_VERSION);
if ($databaseEngine == 'mysql' && ! strpos($databaseVersion, 'MariaDB') && version_compare($databaseVersion, '8.0.1', '>=') ||
$databaseEngine == 'pgsql' && version_compare($databaseVersion, '9.5', '>=')) {
if (Str::of($databaseVersion)->contains('MariaDB')) {
$databaseEngine = 'mariadb';
$databaseVersion = Str::before(Str::after($databaseVersion, '5.5.5-'), '-');
}
if (($databaseEngine === 'mysql' && version_compare($databaseVersion, '8.0.1', '>=')) ||
($databaseEngine === 'mariadb' && version_compare($databaseVersion, '10.6.0', '>=')) ||
($databaseEngine === 'pgsql' && version_compare($databaseVersion, '9.5', '>='))) {
return 'FOR UPDATE SKIP LOCKED';
}
@@ -321,6 +347,38 @@ class DatabaseQueue extends Queue implements QueueContract
});
}
/**
* Delete a reserved job from the reserved queue and release it.
*
* @param string $queue
* @param \Illuminate\Queue\Jobs\DatabaseJob $job
* @param int $delay
* @return void
*/
public function deleteAndRelease($queue, $job, $delay)
{
$this->database->transaction(function () use ($queue, $job, $delay) {
if ($this->database->table($this->table)->lockForUpdate()->find($job->getJobId())) {
$this->database->table($this->table)->where('id', $job->getJobId())->delete();
}
$this->release($queue, $job->getJobRecord(), $delay);
});
}
/**
* Delete all of the jobs from the queue.
*
* @param string $queue
* @return int
*/
public function clear($queue)
{
return $this->database->table($this->table)
->where('queue', $this->getQueue($queue))
->delete();
}
/**
* Get the queue or return the default.
*

View File

@@ -2,10 +2,11 @@
namespace Illuminate\Queue\Failed;
use DateTimeInterface;
use Illuminate\Database\ConnectionResolverInterface;
use Illuminate\Support\Facades\Date;
class DatabaseFailedJobProvider implements FailedJobProviderInterface
class DatabaseFailedJobProvider implements FailedJobProviderInterface, PrunableFailedJobProvider
{
/**
* The connection resolver implementation.
@@ -105,6 +106,27 @@ class DatabaseFailedJobProvider implements FailedJobProviderInterface
$this->getTable()->delete();
}
/**
* Prune all of the entries older than the given date.
*
* @param \DateTimeInterface $before
* @return int
*/
public function prune(DateTimeInterface $before)
{
$query = $this->getTable()->where('failed_at', '<', $before);
$totalDeleted = 0;
do {
$deleted = $query->take(1000)->delete();
$totalDeleted += $deleted;
} while ($deleted !== 0);
return $totalDeleted;
}
/**
* Get a new query builder instance for the table.
*

View File

@@ -7,7 +7,6 @@ use DateTimeInterface;
use Exception;
use Illuminate\Support\Carbon;
use Illuminate\Support\Facades\Date;
use Illuminate\Support\Str;
class DynamoDbFailedJobProvider implements FailedJobProviderInterface
{
@@ -58,7 +57,7 @@ class DynamoDbFailedJobProvider implements FailedJobProviderInterface
*/
public function log($connection, $queue, $payload, $exception)
{
$id = (string) Str::orderedUuid();
$id = json_decode($payload, true)['uuid'];
$failedAt = Date::now();
@@ -96,7 +95,9 @@ class DynamoDbFailedJobProvider implements FailedJobProviderInterface
'ScanIndexForward' => false,
]);
return collect($results['Items'])->map(function ($result) {
return collect($results['Items'])->sortByDesc(function ($result) {
return (int) $result['failed_at']['N'];
})->map(function ($result) {
return (object) [
'id' => $result['uuid']['S'],
'connection' => $result['connection']['S'],

View File

View File

@@ -45,15 +45,13 @@ class DatabaseJob extends Job implements JobContract
* Release the job back into the queue.
*
* @param int $delay
* @return mixed
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->delete();
return $this->database->release($this->queue, $this->job, $delay);
$this->database->deleteAndRelease($this->queue, $this, $delay);
}
/**
@@ -97,4 +95,14 @@ class DatabaseJob extends Job implements JobContract
{
return $this->job->payload;
}
/**
* Get the database job record.
*
* @return \Illuminate\Queue\Jobs\DatabaseJobRecord
*/
public function getJobRecord()
{
return $this->job;
}
}

22
vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php vendored Normal file → Executable file
View File

@@ -210,7 +210,7 @@ abstract class Job
[$class, $method] = JobName::parse($payload['job']);
if (method_exists($this->instance = $this->resolve($class), 'failed')) {
$this->instance->failed($payload['data'], $e);
$this->instance->failed($payload['data'], $e, $payload['uuid'] ?? '');
}
}
@@ -266,13 +266,23 @@ abstract class Job
}
/**
* Get the number of seconds to delay a failed job before retrying it.
* Determine if the job should fail when it timeouts.
*
* @return bool
*/
public function shouldFailOnTimeout()
{
return $this->payload()['failOnTimeout'] ?? false;
}
/**
* The number of seconds to wait before retrying a job that encountered an uncaught exception.
*
* @return int|null
*/
public function delaySeconds()
public function backoff()
{
return $this->payload()['delay'] ?? null;
return $this->payload()['backoff'] ?? $this->payload()['delay'] ?? null;
}
/**
@@ -290,9 +300,9 @@ abstract class Job
*
* @return int|null
*/
public function timeoutAt()
public function retryUntil()
{
return $this->payload()['timeoutAt'] ?? null;
return $this->payload()['retryUntil'] ?? $this->payload()['timeoutAt'] ?? null;
}
/**

0
vendor/laravel/framework/src/Illuminate/Queue/Jobs/SqsJob.php vendored Normal file → Executable file
View File

0
vendor/laravel/framework/src/Illuminate/Queue/Jobs/SyncJob.php vendored Normal file → Executable file
View File

3
vendor/laravel/framework/src/Illuminate/Queue/Listener.php vendored Normal file → Executable file
View File

@@ -151,8 +151,9 @@ class Listener
'queue:work',
$connection,
'--once',
"--name={$options->name}",
"--queue={$queue}",
"--delay={$options->delay}",
"--backoff={$options->backoff}",
"--memory={$options->memory}",
"--sleep={$options->sleep}",
"--tries={$options->maxTries}",

View File

@@ -14,8 +14,9 @@ class ListenerOptions extends WorkerOptions
/**
* Create a new listener options instance.
*
* @param string $name
* @param string|null $environment
* @param int $delay
* @param int $backoff
* @param int $memory
* @param int $timeout
* @param int $sleep
@@ -23,10 +24,10 @@ class ListenerOptions extends WorkerOptions
* @param bool $force
* @return void
*/
public function __construct($environment = null, $delay = 0, $memory = 128, $timeout = 60, $sleep = 3, $maxTries = 1, $force = false)
public function __construct($name = 'default', $environment = null, $backoff = 0, $memory = 128, $timeout = 60, $sleep = 3, $maxTries = 1, $force = false)
{
$this->environment = $environment;
parent::__construct($delay, $memory, $timeout, $sleep, $maxTries, $force);
parent::__construct($name, $backoff, $memory, $timeout, $sleep, $maxTries, $force);
}
}

View File

@@ -24,7 +24,7 @@ LUA;
* Get the Lua script for pushing jobs onto the queue.
*
* KEYS[1] - The queue to push the job onto, for example: queues:foo
* KEYS[2] - The notification list fot the queue we are pushing jobs onto, for example: queues:foo:notify
* KEYS[2] - The notification list for the queue we are pushing jobs onto, for example: queues:foo:notify
* ARGV[1] - The job payload
*
* @return string
@@ -124,6 +124,25 @@ if(next(val) ~= nil) then
end
return val
LUA;
}
/**
* Get the Lua script for removing all jobs from the queue.
*
* KEYS[1] - The name of the primary queue
* KEYS[2] - The name of the "delayed" queue
* KEYS[3] - The name of the "reserved" queue
* KEYS[4] - The name of the "notify" queue
*
* @return string
*/
public static function clear()
{
return <<<'LUA'
local size = redis.call('llen', KEYS[1]) + redis.call('zcard', KEYS[2]) + redis.call('zcard', KEYS[3])
redis.call('del', KEYS[1], KEYS[2], KEYS[3], KEYS[4])
return size
LUA;
}
}

140
vendor/laravel/framework/src/Illuminate/Queue/Queue.php vendored Normal file → Executable file
View File

@@ -5,6 +5,10 @@ namespace Illuminate\Queue;
use Closure;
use DateTimeInterface;
use Illuminate\Container\Container;
use Illuminate\Contracts\Encryption\Encrypter;
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Queue\Events\JobQueued;
use Illuminate\Support\Arr;
use Illuminate\Support\InteractsWithTime;
use Illuminate\Support\Str;
@@ -26,6 +30,13 @@ abstract class Queue
*/
protected $connectionName;
/**
* Indicates that jobs should be dispatched after all database transactions have committed.
*
* @return $this
*/
protected $dispatchAfterCommit;
/**
* The create payload callbacks.
*
@@ -91,7 +102,7 @@ abstract class Queue
$job = CallQueuedClosure::create($job);
}
$payload = json_encode($this->createPayloadArray($job, $queue, $data));
$payload = json_encode($this->createPayloadArray($job, $queue, $data), \JSON_UNESCAPED_UNICODE);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidPayloadException(
@@ -132,20 +143,25 @@ abstract class Queue
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => $job->tries ?? null,
'maxExceptions' => $job->maxExceptions ?? null,
'delay' => $this->getJobRetryDelay($job),
'failOnTimeout' => $job->failOnTimeout ?? false,
'backoff' => $this->getJobBackoff($job),
'timeout' => $job->timeout ?? null,
'timeoutAt' => $this->getJobExpiration($job),
'retryUntil' => $this->getJobExpiration($job),
'data' => [
'commandName' => $job,
'command' => $job,
],
]);
$command = $this->jobShouldBeEncrypted($job) && $this->container->bound(Encrypter::class)
? $this->container[Encrypter::class]->encrypt(serialize(clone $job))
: serialize(clone $job);
return array_merge($payload, [
'data' => [
'data' => array_merge($payload['data'], [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
'command' => $command,
]),
]);
}
@@ -162,21 +178,26 @@ abstract class Queue
}
/**
* Get the retry delay for an object-based queue handler.
* Get the backoff for an object-based queue handler.
*
* @param mixed $job
* @return mixed
*/
public function getJobRetryDelay($job)
public function getJobBackoff($job)
{
if (! method_exists($job, 'retryAfter') && ! isset($job->retryAfter)) {
if (! method_exists($job, 'backoff') && ! isset($job->backoff)) {
return;
}
$delay = $job->retryAfter ?? $job->retryAfter();
if (is_null($backoff = $job->backoff ?? $job->backoff())) {
return;
}
return $delay instanceof DateTimeInterface
? $this->secondsUntil($delay) : $delay;
return collect(Arr::wrap($backoff))
->map(function ($backoff) {
return $backoff instanceof DateTimeInterface
? $this->secondsUntil($backoff) : $backoff;
})->implode(',');
}
/**
@@ -187,16 +208,31 @@ abstract class Queue
*/
public function getJobExpiration($job)
{
if (! method_exists($job, 'retryUntil') && ! isset($job->timeoutAt)) {
if (! method_exists($job, 'retryUntil') && ! isset($job->retryUntil)) {
return;
}
$expiration = $job->timeoutAt ?? $job->retryUntil();
$expiration = $job->retryUntil ?? $job->retryUntil();
return $expiration instanceof DateTimeInterface
? $expiration->getTimestamp() : $expiration;
}
/**
* Determine if the job should be encrypted.
*
* @param object $job
* @return bool
*/
protected function jobShouldBeEncrypted($job)
{
if ($job instanceof ShouldBeEncrypted) {
return true;
}
return isset($job->shouldBeEncrypted) && $job->shouldBeEncrypted;
}
/**
* Create a typical, string based queue payload array.
*
@@ -213,7 +249,8 @@ abstract class Queue
'job' => $job,
'maxTries' => null,
'maxExceptions' => null,
'delay' => null,
'failOnTimeout' => false,
'backoff' => null,
'timeout' => null,
'data' => $data,
]);
@@ -222,7 +259,7 @@ abstract class Queue
/**
* Register a callback to be executed when creating job payloads.
*
* @param callable $callback
* @param callable|null $callback
* @return void
*/
public static function createPayloadUsing($callback)
@@ -254,6 +291,67 @@ abstract class Queue
return $payload;
}
/**
* Enqueue a job using the given callback.
*
* @param \Closure|string|object $job
* @param string $payload
* @param string $queue
* @param \DateTimeInterface|\DateInterval|int|null $delay
* @param callable $callback
* @return mixed
*/
protected function enqueueUsing($job, $payload, $queue, $delay, $callback)
{
if ($this->shouldDispatchAfterCommit($job) &&
$this->container->bound('db.transactions')) {
return $this->container->make('db.transactions')->addCallback(
function () use ($payload, $queue, $delay, $callback, $job) {
return tap($callback($payload, $queue, $delay), function ($jobId) use ($job) {
$this->raiseJobQueuedEvent($jobId, $job);
});
}
);
}
return tap($callback($payload, $queue, $delay), function ($jobId) use ($job) {
$this->raiseJobQueuedEvent($jobId, $job);
});
}
/**
* Determine if the job should be dispatched after all database transactions have committed.
*
* @param \Closure|string|object $job
* @return bool
*/
protected function shouldDispatchAfterCommit($job)
{
if (is_object($job) && isset($job->afterCommit)) {
return $job->afterCommit;
}
if (isset($this->dispatchAfterCommit)) {
return $this->dispatchAfterCommit;
}
return false;
}
/**
* Raise the job queued event.
*
* @param string|int|null $jobId
* @param \Closure|string|object $job
* @return void
*/
protected function raiseJobQueuedEvent($jobId, $job)
{
if ($this->container->bound('events')) {
$this->container['events']->dispatch(new JobQueued($this->connectionName, $jobId, $job));
}
}
/**
* Get the connection name for the queue.
*
@@ -277,6 +375,16 @@ abstract class Queue
return $this;
}
/**
* Get the container instance being used by the connection.
*
* @return \Illuminate\Container\Container
*/
public function getContainer()
{
return $this->container;
}
/**
* Set the IoC container instance.
*

35
vendor/laravel/framework/src/Illuminate/Queue/QueueManager.php vendored Normal file → Executable file
View File

@@ -148,11 +148,17 @@ class QueueManager implements FactoryContract, MonitorContract
*
* @param string $name
* @return \Illuminate\Contracts\Queue\Queue
*
* @throws \InvalidArgumentException
*/
protected function resolve($name)
{
$config = $this->getConfig($name);
if (is_null($config)) {
throw new InvalidArgumentException("The [{$name}] queue connection has not been configured.");
}
return $this->getConnector($config['driver'])
->connect($config)
->setConnectionName($name);
@@ -203,7 +209,7 @@ class QueueManager implements FactoryContract, MonitorContract
* Get the queue connection configuration.
*
* @param string $name
* @return array
* @return array|null
*/
protected function getConfig($name)
{
@@ -246,6 +252,33 @@ class QueueManager implements FactoryContract, MonitorContract
return $connection ?: $this->getDefaultDriver();
}
/**
* Get the application instance used by the manager.
*
* @return \Illuminate\Contracts\Foundation\Application
*/
public function getApplication()
{
return $this->app;
}
/**
* Set the application instance used by the manager.
*
* @param \Illuminate\Contracts\Foundation\Application $app
* @return $this
*/
public function setApplication($app)
{
$this->app = $app;
foreach ($this->connections as $connection) {
$connection->setContainer($app);
}
return $this;
}
/**
* Dynamically pass calls to the default connection.
*

View File

@@ -12,15 +12,18 @@ use Illuminate\Queue\Connectors\RedisConnector;
use Illuminate\Queue\Connectors\SqsConnector;
use Illuminate\Queue\Connectors\SyncConnector;
use Illuminate\Queue\Failed\DatabaseFailedJobProvider;
use Illuminate\Queue\Failed\DatabaseUuidFailedJobProvider;
use Illuminate\Queue\Failed\DynamoDbFailedJobProvider;
use Illuminate\Queue\Failed\NullFailedJobProvider;
use Illuminate\Support\Arr;
use Illuminate\Support\Facades\Facade;
use Illuminate\Support\ServiceProvider;
use Illuminate\Support\Str;
use Opis\Closure\SerializableClosure;
use Laravel\SerializableClosure\SerializableClosure;
class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
{
use SerializesAndRestoresModelIdentifiers;
/**
* Register the service provider.
*
@@ -28,12 +31,37 @@ class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
*/
public function register()
{
$this->configureSerializableClosureUses();
$this->registerManager();
$this->registerConnection();
$this->registerWorker();
$this->registerListener();
$this->registerFailedJobServices();
$this->registerOpisSecurityKey();
}
/**
* Configure serializable closures uses.
*
* @return void
*/
protected function configureSerializableClosureUses()
{
SerializableClosure::transformUseVariablesUsing(function ($data) {
foreach ($data as $key => $value) {
$data[$key] = $this->getSerializedPropertyValue($value);
}
return $data;
});
SerializableClosure::resolveUseVariablesUsing(function ($data) {
foreach ($data as $key => $value) {
$data[$key] = $this->getRestoredPropertyValue($value);
}
return $data;
});
}
/**
@@ -168,11 +196,22 @@ class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
return $this->app->isDownForMaintenance();
};
$resetScope = function () use ($app) {
if (method_exists($app['log']->driver(), 'withoutContext')) {
$app['log']->withoutContext();
}
$app->forgetScopedInstances();
return Facade::clearResolvedInstances();
};
return new Worker(
$app['queue'],
$app['events'],
$app[ExceptionHandler::class],
$isDownForMaintenance
$isDownForMaintenance,
$resetScope
);
});
}
@@ -199,8 +238,15 @@ class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
$this->app->singleton('queue.failer', function ($app) {
$config = $app['config']['queue.failed'];
if (array_key_exists('driver', $config) &&
(is_null($config['driver']) || $config['driver'] === 'null')) {
return new NullFailedJobProvider;
}
if (isset($config['driver']) && $config['driver'] === 'dynamodb') {
return $this->dynamoFailedJobProvider($config);
} elseif (isset($config['driver']) && $config['driver'] === 'database-uuids') {
return $this->databaseUuidFailedJobProvider($config);
} elseif (isset($config['table'])) {
return $this->databaseFailedJobProvider($config);
} else {
@@ -222,6 +268,19 @@ class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
);
}
/**
* Create a new database failed job provider that uses UUIDs as IDs.
*
* @param array $config
* @return \Illuminate\Queue\Failed\DatabaseUuidFailedJobProvider
*/
protected function databaseUuidFailedJobProvider($config)
{
return new DatabaseUuidFailedJobProvider(
$this->app['db'], $config['database'], $config['table']
);
}
/**
* Create a new DynamoDb failed job provider.
*
@@ -249,20 +308,6 @@ class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
);
}
/**
* Configure Opis Closure signing for security.
*
* @return void
*/
protected function registerOpisSecurityKey()
{
if (Str::startsWith($key = $this->app['config']->get('app.key'), 'base64:')) {
$key = base64_decode(substr($key, 7));
}
SerializableClosure::setSecretKey($key);
}
/**
* Get the services provided by the provider.
*
@@ -271,8 +316,11 @@ class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
public function provides()
{
return [
'queue', 'queue.worker', 'queue.listener',
'queue.failer', 'queue.connection',
'queue',
'queue.connection',
'queue.failer',
'queue.listener',
'queue.worker',
];
}
}

View File

@@ -2,12 +2,13 @@
namespace Illuminate\Queue;
use Illuminate\Contracts\Queue\ClearableQueue;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Contracts\Redis\Factory as Redis;
use Illuminate\Queue\Jobs\RedisJob;
use Illuminate\Support\Str;
class RedisQueue extends Queue implements QueueContract
class RedisQueue extends Queue implements QueueContract, ClearableQueue
{
/**
* The Redis factory implementation.
@@ -52,15 +53,22 @@ class RedisQueue extends Queue implements QueueContract
* @param string|null $connection
* @param int $retryAfter
* @param int|null $blockFor
* @param bool $dispatchAfterCommit
* @return void
*/
public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60, $blockFor = null)
public function __construct(Redis $redis,
$default = 'default',
$connection = null,
$retryAfter = 60,
$blockFor = null,
$dispatchAfterCommit = false)
{
$this->redis = $redis;
$this->default = $default;
$this->blockFor = $blockFor;
$this->connection = $connection;
$this->retryAfter = $retryAfter;
$this->dispatchAfterCommit = $dispatchAfterCommit;
}
/**
@@ -78,6 +86,25 @@ class RedisQueue extends Queue implements QueueContract
);
}
/**
* Push an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string|null $queue
* @return void
*/
public function bulk($jobs, $data = '', $queue = null)
{
$this->getConnection()->pipeline(function () use ($jobs, $data, $queue) {
$this->getConnection()->transaction(function () use ($jobs, $data, $queue) {
foreach ((array) $jobs as $job) {
$this->push($job, $data, $queue);
}
});
});
}
/**
* Push a new job onto the queue.
*
@@ -88,7 +115,15 @@ class RedisQueue extends Queue implements QueueContract
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
null,
function ($payload, $queue) {
return $this->pushRaw($payload, $queue);
}
);
}
/**
@@ -120,7 +155,15 @@ class RedisQueue extends Queue implements QueueContract
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->laterRaw($delay, $this->createPayload($job, $this->getQueue($queue), $data), $queue);
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
$delay,
function ($payload, $queue, $delay) {
return $this->laterRaw($delay, $payload, $queue);
}
);
}
/**
@@ -166,11 +209,7 @@ class RedisQueue extends Queue implements QueueContract
{
$this->migrate($prefixed = $this->getQueue($queue));
if (empty($nextJob = $this->retrieveNextJob($prefixed))) {
return;
}
[$job, $reserved] = $nextJob;
[$job, $reserved] = $this->retrieveNextJob($prefixed);
if ($reserved) {
return new RedisJob(
@@ -267,6 +306,22 @@ class RedisQueue extends Queue implements QueueContract
);
}
/**
* Delete all of the jobs from the queue.
*
* @param string $queue
* @return int
*/
public function clear($queue)
{
$queue = $this->getQueue($queue);
return $this->getConnection()->eval(
LuaScripts::clear(), 4, $queue, $queue.':delayed',
$queue.':reserved', $queue.':notify'
);
}
/**
* Get a random ID string.
*

View File

@@ -4,6 +4,9 @@ namespace Illuminate\Queue;
use Opis\Closure\SerializableClosure as OpisSerializableClosure;
/**
* @deprecated This class will be removed in Laravel 9.
*/
class SerializableClosure extends OpisSerializableClosure
{
use SerializesAndRestoresModelIdentifiers;
@@ -11,7 +14,7 @@ class SerializableClosure extends OpisSerializableClosure
/**
* Transform the use variables before serialization.
*
* @param array $data The Closure's use variables
* @param array $data
* @return array
*/
protected function transformUseVariables($data)
@@ -26,7 +29,7 @@ class SerializableClosure extends OpisSerializableClosure
/**
* Resolve the use variables after unserialization.
*
* @param array $data The Closure's transformed use variables
* @param array $data
* @return array
*/
protected function resolveUseVariables($data)

View File

@@ -91,7 +91,7 @@ trait SerializesModels
* Restore the model after serialization.
*
* @param array $values
* @return array
* @return void
*/
public function __unserialize(array $values)
{
@@ -122,8 +122,6 @@ trait SerializesModels
$this, $this->getRestoredPropertyValue($values[$name])
);
}
return $values;
}
/**

74
vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php vendored Normal file → Executable file
View File

@@ -3,11 +3,12 @@
namespace Illuminate\Queue;
use Aws\Sqs\SqsClient;
use Illuminate\Contracts\Queue\ClearableQueue;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Jobs\SqsJob;
use Illuminate\Support\Str;
class SqsQueue extends Queue implements QueueContract
class SqsQueue extends Queue implements QueueContract, ClearableQueue
{
/**
* The Amazon SQS instance.
@@ -44,14 +45,20 @@ class SqsQueue extends Queue implements QueueContract
* @param string $default
* @param string $prefix
* @param string $suffix
* @param bool $dispatchAfterCommit
* @return void
*/
public function __construct(SqsClient $sqs, $default, $prefix = '', $suffix = '')
public function __construct(SqsClient $sqs,
$default,
$prefix = '',
$suffix = '',
$dispatchAfterCommit = false)
{
$this->sqs = $sqs;
$this->prefix = $prefix;
$this->default = $default;
$this->suffix = $suffix;
$this->dispatchAfterCommit = $dispatchAfterCommit;
}
/**
@@ -82,7 +89,15 @@ class SqsQueue extends Queue implements QueueContract
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $queue ?: $this->default, $data), $queue);
return $this->enqueueUsing(
$job,
$this->createPayload($job, $queue ?: $this->default, $data),
$queue,
null,
function ($payload, $queue) {
return $this->pushRaw($payload, $queue);
}
);
}
/**
@@ -111,11 +126,19 @@ class SqsQueue extends Queue implements QueueContract
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $this->createPayload($job, $queue ?: $this->default, $data),
'DelaySeconds' => $this->secondsUntil($delay),
])->get('MessageId');
return $this->enqueueUsing(
$job,
$this->createPayload($job, $queue ?: $this->default, $data),
$queue,
$delay,
function ($payload, $queue, $delay) {
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $payload,
'DelaySeconds' => $this->secondsUntil($delay),
])->get('MessageId');
}
);
}
/**
@@ -139,6 +162,21 @@ class SqsQueue extends Queue implements QueueContract
}
}
/**
* Delete all of the jobs from the queue.
*
* @param string $queue
* @return int
*/
public function clear($queue)
{
return tap($this->size($queue), function () use ($queue) {
$this->sqs->purgeQueue([
'QueueUrl' => $this->getQueue($queue),
]);
});
}
/**
* Get the queue or return the default.
*
@@ -150,10 +188,28 @@ class SqsQueue extends Queue implements QueueContract
$queue = $queue ?: $this->default;
return filter_var($queue, FILTER_VALIDATE_URL) === false
? rtrim($this->prefix, '/').'/'.Str::finish($queue, $this->suffix)
? $this->suffixQueue($queue, $this->suffix)
: $queue;
}
/**
* Add the given suffix to the given queue name.
*
* @param string $queue
* @param string $suffix
* @return string
*/
protected function suffixQueue($queue, $suffix = '')
{
if (Str::endsWith($queue, '.fifo')) {
$queue = Str::beforeLast($queue, '.fifo');
return rtrim($this->prefix, '/').'/'.Str::finish($queue, $suffix).'.fifo';
}
return rtrim($this->prefix, '/').'/'.Str::finish($queue, $this->suffix);
}
/**
* Get the underlying SQS instance.
*

2
vendor/laravel/framework/src/Illuminate/Queue/SyncQueue.php vendored Normal file → Executable file
View File

@@ -105,7 +105,7 @@ class SyncQueue extends Queue implements QueueContract
/**
* Handle an exception that occurred while processing a job.
*
* @param \Illuminate\Queue\Jobs\Job $queueJob
* @param \Illuminate\Contracts\Queue\Job $queueJob
* @param \Throwable $e
* @return void
*

View File

@@ -19,6 +19,17 @@ class Worker
{
use DetectsLostConnections;
const EXIT_SUCCESS = 0;
const EXIT_ERROR = 1;
const EXIT_MEMORY_LIMIT = 12;
/**
* The name of the worker.
*
* @var string
*/
protected $name;
/**
* The queue manager instance.
*
@@ -54,6 +65,13 @@ class Worker
*/
protected $isDownForMaintenance;
/**
* The callback used to reset the application's scope.
*
* @var callable
*/
protected $resetScope;
/**
* Indicates if the worker should exit.
*
@@ -68,6 +86,13 @@ class Worker
*/
public $paused = false;
/**
* The callbacks used to pop jobs from queues.
*
* @var callable[]
*/
protected static $popCallbacks = [];
/**
* Create a new queue worker.
*
@@ -75,17 +100,20 @@ class Worker
* @param \Illuminate\Contracts\Events\Dispatcher $events
* @param \Illuminate\Contracts\Debug\ExceptionHandler $exceptions
* @param callable $isDownForMaintenance
* @param callable|null $resetScope
* @return void
*/
public function __construct(QueueManager $manager,
Dispatcher $events,
ExceptionHandler $exceptions,
callable $isDownForMaintenance)
callable $isDownForMaintenance,
callable $resetScope = null)
{
$this->events = $events;
$this->manager = $manager;
$this->exceptions = $exceptions;
$this->isDownForMaintenance = $isDownForMaintenance;
$this->resetScope = $resetScope;
}
/**
@@ -94,26 +122,36 @@ class Worker
* @param string $connectionName
* @param string $queue
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
* @return int
*/
public function daemon($connectionName, $queue, WorkerOptions $options)
{
if ($this->supportsAsyncSignals()) {
if ($supportsAsyncSignals = $this->supportsAsyncSignals()) {
$this->listenForSignals();
}
$lastRestart = $this->getTimestampOfLastQueueRestart();
[$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0];
while (true) {
// Before reserving any jobs, we will make sure this queue is not paused and
// if it is we will just pause this worker for a given amount of time and
// make sure we do not need to kill this worker process off completely.
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$this->pauseWorker($options, $lastRestart);
$status = $this->pauseWorker($options, $lastRestart);
if (! is_null($status)) {
return $this->stop($status);
}
continue;
}
if (isset($this->resetScope)) {
($this->resetScope)();
}
// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
@@ -121,7 +159,7 @@ class Worker
$this->manager->connection($connectionName), $queue
);
if ($this->supportsAsyncSignals()) {
if ($supportsAsyncSignals) {
$this->registerTimeoutHandler($job, $options);
}
@@ -129,19 +167,31 @@ class Worker
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job) {
$jobsProcessed++;
$this->runJob($job, $connectionName, $options);
if ($options->rest > 0) {
$this->sleep($options->rest);
}
} else {
$this->sleep($options->sleep);
}
if ($this->supportsAsyncSignals()) {
if ($supportsAsyncSignals) {
$this->resetTimeoutHandler();
}
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$this->stopIfNecessary($options, $lastRestart, $job);
$status = $this->stopIfNecessary(
$options, $lastRestart, $startTime, $jobsProcessed, $job
);
if (! is_null($status)) {
return $this->stop($status);
}
}
}
@@ -160,11 +210,19 @@ class Worker
pcntl_signal(SIGALRM, function () use ($job, $options) {
if ($job) {
$this->markJobAsFailedIfWillExceedMaxAttempts(
$job->getConnectionName(), $job, (int) $options->maxTries, $this->maxAttemptsExceededException($job)
$job->getConnectionName(), $job, (int) $options->maxTries, $e = $this->maxAttemptsExceededException($job)
);
$this->markJobAsFailedIfWillExceedMaxExceptions(
$job->getConnectionName(), $job, $e
);
$this->markJobAsFailedIfItShouldFailOnTimeout(
$job->getConnectionName(), $job, $e
);
}
$this->kill(1);
$this->kill(static::EXIT_ERROR);
});
pcntl_alarm(
@@ -214,33 +272,39 @@ class Worker
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param int $lastRestart
* @return void
* @return int|null
*/
protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
$this->sleep($options->sleep > 0 ? $options->sleep : 1);
$this->stopIfNecessary($options, $lastRestart);
return $this->stopIfNecessary($options, $lastRestart);
}
/**
* Stop the process if necessary.
* Determine the exit code to stop the process if necessary.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param int $lastRestart
* @param int $startTime
* @param int $jobsProcessed
* @param mixed $job
* @return void
* @return int|null
*/
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null)
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $startTime = 0, $jobsProcessed = 0, $job = null)
{
if ($this->shouldQuit) {
$this->stop();
return static::EXIT_SUCCESS;
} elseif ($this->memoryExceeded($options->memory)) {
$this->stop(12);
return static::EXIT_MEMORY_LIMIT;
} elseif ($this->queueShouldRestart($lastRestart)) {
$this->stop();
return static::EXIT_SUCCESS;
} elseif ($options->stopWhenEmpty && is_null($job)) {
$this->stop();
return static::EXIT_SUCCESS;
} elseif ($options->maxTime && hrtime(true) / 1e9 - $startTime >= $options->maxTime) {
return static::EXIT_SUCCESS;
} elseif ($options->maxJobs && $jobsProcessed >= $options->maxJobs) {
return static::EXIT_SUCCESS;
}
}
@@ -277,9 +341,17 @@ class Worker
*/
protected function getNextJob($connection, $queue)
{
$popJobCallback = function ($queue) use ($connection) {
return $connection->pop($queue);
};
try {
if (isset(static::$popCallbacks[$this->name])) {
return (static::$popCallbacks[$this->name])($popJobCallback, $queue);
}
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
if (! is_null($job = $popJobCallback($queue))) {
return $job;
}
}
@@ -396,11 +468,7 @@ class Worker
// so it is not lost entirely. This'll let the job be retried at a later time by
// another listener (or this same one). We will re-throw this exception after.
if (! $job->isDeleted() && ! $job->isReleased() && ! $job->hasFailed()) {
$job->release(
method_exists($job, 'delaySeconds') && ! is_null($job->delaySeconds())
? $job->delaySeconds()
: $options->delay
);
$job->release($this->calculateBackoff($job, $options));
}
}
@@ -423,13 +491,13 @@ class Worker
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
$timeoutAt = $job->timeoutAt();
$retryUntil = $job->retryUntil();
if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) {
if ($retryUntil && Carbon::now()->getTimestamp() <= $retryUntil) {
return;
}
if (! $timeoutAt && ($maxTries === 0 || $job->attempts() <= $maxTries)) {
if (! $retryUntil && ($maxTries === 0 || $job->attempts() <= $maxTries)) {
return;
}
@@ -451,11 +519,11 @@ class Worker
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) {
if ($job->retryUntil() && $job->retryUntil() <= Carbon::now()->getTimestamp()) {
$this->failJob($job, $e);
}
if ($maxTries > 0 && $job->attempts() >= $maxTries) {
if (! $job->retryUntil() && $maxTries > 0 && $job->attempts() >= $maxTries) {
$this->failJob($job, $e);
}
}
@@ -486,6 +554,21 @@ class Worker
}
}
/**
* Mark the given job as failed if it should fail on timeouts.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Throwable $e
* @return void
*/
protected function markJobAsFailedIfItShouldFailOnTimeout($connectionName, $job, Throwable $e)
{
if (method_exists($job, 'shouldFailOnTimeout') ? $job->shouldFailOnTimeout() : false) {
$this->failJob($job, $e);
}
}
/**
* Mark the given job as failed and raise the relevant event.
*
@@ -498,6 +581,25 @@ class Worker
return $job->fail($e);
}
/**
* Calculate the backoff for the given job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return int
*/
protected function calculateBackoff($job, WorkerOptions $options)
{
$backoff = explode(
',',
method_exists($job, 'backoff') && ! is_null($job->backoff())
? $job->backoff()
: $options->backoff
);
return (int) ($backoff[$job->attempts() - 1] ?? last($backoff));
}
/**
* Raise the before queue job event.
*
@@ -611,20 +713,20 @@ class Worker
* Stop listening and bail out of the script.
*
* @param int $status
* @return void
* @return int
*/
public function stop($status = 0)
{
$this->events->dispatch(new WorkerStopping($status));
exit($status);
return $status;
}
/**
* Kill the process.
*
* @param int $status
* @return void
* @return never
*/
public function kill($status = 0)
{
@@ -669,11 +771,42 @@ class Worker
* Set the cache repository implementation.
*
* @param \Illuminate\Contracts\Cache\Repository $cache
* @return void
* @return $this
*/
public function setCache(CacheContract $cache)
{
$this->cache = $cache;
return $this;
}
/**
* Set the name of the worker.
*
* @param string $name
* @return $this
*/
public function setName($name)
{
$this->name = $name;
return $this;
}
/**
* Register a callback to be executed to pick jobs.
*
* @param string $workerName
* @param callable $callback
* @return void
*/
public static function popUsing($workerName, $callback)
{
if (is_null($callback)) {
unset(static::$popCallbacks[$workerName]);
} else {
static::$popCallbacks[$workerName] = $callback;
}
}
/**

View File

@@ -5,11 +5,18 @@ namespace Illuminate\Queue;
class WorkerOptions
{
/**
* The number of seconds before a released job will be available.
* The name of the worker.
*
* @var int
*/
public $delay;
public $name;
/**
* The number of seconds to wait before retrying a job that encountered an uncaught exception.
*
* @var int
*/
public $backoff;
/**
* The maximum amount of RAM the worker may consume.
@@ -32,6 +39,13 @@ class WorkerOptions
*/
public $sleep;
/**
* The number of seconds to rest between jobs.
*
* @var int
*/
public $rest;
/**
* The maximum amount of times a job may be attempted.
*
@@ -47,32 +61,55 @@ class WorkerOptions
public $force;
/**
* Indicates if the worker should stop when queue is empty.
* Indicates if the worker should stop when the queue is empty.
*
* @var bool
*/
public $stopWhenEmpty;
/**
* The maximum number of jobs to run.
*
* @var int
*/
public $maxJobs;
/**
* The maximum number of seconds a worker may live.
*
* @var int
*/
public $maxTime;
/**
* Create a new worker options instance.
*
* @param int $delay
* @param string $name
* @param int $backoff
* @param int $memory
* @param int $timeout
* @param int $sleep
* @param int $maxTries
* @param bool $force
* @param bool $stopWhenEmpty
* @param int $maxJobs
* @param int $maxTime
* @param int $rest
* @return void
*/
public function __construct($delay = 0, $memory = 128, $timeout = 60, $sleep = 3, $maxTries = 1, $force = false, $stopWhenEmpty = false)
public function __construct($name = 'default', $backoff = 0, $memory = 128, $timeout = 60, $sleep = 3, $maxTries = 1,
$force = false, $stopWhenEmpty = false, $maxJobs = 0, $maxTime = 0, $rest = 0)
{
$this->delay = $delay;
$this->name = $name;
$this->backoff = $backoff;
$this->sleep = $sleep;
$this->rest = $rest;
$this->force = $force;
$this->memory = $memory;
$this->timeout = $timeout;
$this->maxTries = $maxTries;
$this->stopWhenEmpty = $stopWhenEmpty;
$this->maxJobs = $maxJobs;
$this->maxTime = $maxTime;
}
}

View File

@@ -14,18 +14,20 @@
}
],
"require": {
"php": "^7.2.5|^8.0",
"php": "^7.3|^8.0",
"ext-json": "*",
"illuminate/console": "^7.0",
"illuminate/container": "^7.0",
"illuminate/contracts": "^7.0",
"illuminate/database": "^7.0",
"illuminate/filesystem": "^7.0",
"illuminate/pipeline": "^7.0",
"illuminate/support": "^7.0",
"illuminate/collections": "^8.0",
"illuminate/console": "^8.0",
"illuminate/container": "^8.0",
"illuminate/contracts": "^8.0",
"illuminate/database": "^8.0",
"illuminate/filesystem": "^8.0",
"illuminate/pipeline": "^8.0",
"illuminate/support": "^8.0",
"laravel/serializable-closure": "^1.0",
"opis/closure": "^3.6",
"ramsey/uuid": "^3.7|^4.0",
"symfony/process": "^5.0"
"ramsey/uuid": "^4.2.2",
"symfony/process": "^5.4"
},
"autoload": {
"psr-4": {
@@ -34,14 +36,14 @@
},
"extra": {
"branch-alias": {
"dev-master": "7.x-dev"
"dev-master": "8.x-dev"
}
},
"suggest": {
"ext-pcntl": "Required to use all features of the queue worker.",
"ext-posix": "Required to use all features of the queue worker.",
"aws/aws-sdk-php": "Required to use the SQS queue driver and DynamoDb failed job storage (^3.155).",
"illuminate/redis": "Required to use the Redis queue driver (^7.0).",
"aws/aws-sdk-php": "Required to use the SQS queue driver and DynamoDb failed job storage (^3.198.1).",
"illuminate/redis": "Required to use the Redis queue driver (^8.0).",
"pda/pheanstalk": "Required to use the Beanstalk queue driver (^4.0)."
},
"config": {