279 lines
7.4 KiB
PHP
279 lines
7.4 KiB
PHP
<?php
|
|
|
|
namespace Illuminate\Queue;
|
|
|
|
use Aws\DynamoDb\DynamoDbClient;
|
|
use Illuminate\Contracts\Debug\ExceptionHandler;
|
|
use Illuminate\Contracts\Support\DeferrableProvider;
|
|
use Illuminate\Queue\Connectors\BeanstalkdConnector;
|
|
use Illuminate\Queue\Connectors\DatabaseConnector;
|
|
use Illuminate\Queue\Connectors\NullConnector;
|
|
use Illuminate\Queue\Connectors\RedisConnector;
|
|
use Illuminate\Queue\Connectors\SqsConnector;
|
|
use Illuminate\Queue\Connectors\SyncConnector;
|
|
use Illuminate\Queue\Failed\DatabaseFailedJobProvider;
|
|
use Illuminate\Queue\Failed\DynamoDbFailedJobProvider;
|
|
use Illuminate\Queue\Failed\NullFailedJobProvider;
|
|
use Illuminate\Support\Arr;
|
|
use Illuminate\Support\ServiceProvider;
|
|
use Illuminate\Support\Str;
|
|
use Opis\Closure\SerializableClosure;
|
|
|
|
class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
|
|
{
|
|
/**
|
|
* Register the service provider.
|
|
*
|
|
* @return void
|
|
*/
|
|
public function register()
|
|
{
|
|
$this->registerManager();
|
|
$this->registerConnection();
|
|
$this->registerWorker();
|
|
$this->registerListener();
|
|
$this->registerFailedJobServices();
|
|
$this->registerOpisSecurityKey();
|
|
}
|
|
|
|
/**
|
|
* Register the queue manager.
|
|
*
|
|
* @return void
|
|
*/
|
|
protected function registerManager()
|
|
{
|
|
$this->app->singleton('queue', function ($app) {
|
|
// Once we have an instance of the queue manager, we will register the various
|
|
// resolvers for the queue connectors. These connectors are responsible for
|
|
// creating the classes that accept queue configs and instantiate queues.
|
|
return tap(new QueueManager($app), function ($manager) {
|
|
$this->registerConnectors($manager);
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register the default queue connection binding.
|
|
*
|
|
* @return void
|
|
*/
|
|
protected function registerConnection()
|
|
{
|
|
$this->app->singleton('queue.connection', function ($app) {
|
|
return $app['queue']->connection();
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register the connectors on the queue manager.
|
|
*
|
|
* @param \Illuminate\Queue\QueueManager $manager
|
|
* @return void
|
|
*/
|
|
public function registerConnectors($manager)
|
|
{
|
|
foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) {
|
|
$this->{"register{$connector}Connector"}($manager);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Register the Null queue connector.
|
|
*
|
|
* @param \Illuminate\Queue\QueueManager $manager
|
|
* @return void
|
|
*/
|
|
protected function registerNullConnector($manager)
|
|
{
|
|
$manager->addConnector('null', function () {
|
|
return new NullConnector;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register the Sync queue connector.
|
|
*
|
|
* @param \Illuminate\Queue\QueueManager $manager
|
|
* @return void
|
|
*/
|
|
protected function registerSyncConnector($manager)
|
|
{
|
|
$manager->addConnector('sync', function () {
|
|
return new SyncConnector;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register the database queue connector.
|
|
*
|
|
* @param \Illuminate\Queue\QueueManager $manager
|
|
* @return void
|
|
*/
|
|
protected function registerDatabaseConnector($manager)
|
|
{
|
|
$manager->addConnector('database', function () {
|
|
return new DatabaseConnector($this->app['db']);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register the Redis queue connector.
|
|
*
|
|
* @param \Illuminate\Queue\QueueManager $manager
|
|
* @return void
|
|
*/
|
|
protected function registerRedisConnector($manager)
|
|
{
|
|
$manager->addConnector('redis', function () {
|
|
return new RedisConnector($this->app['redis']);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register the Beanstalkd queue connector.
|
|
*
|
|
* @param \Illuminate\Queue\QueueManager $manager
|
|
* @return void
|
|
*/
|
|
protected function registerBeanstalkdConnector($manager)
|
|
{
|
|
$manager->addConnector('beanstalkd', function () {
|
|
return new BeanstalkdConnector;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register the Amazon SQS queue connector.
|
|
*
|
|
* @param \Illuminate\Queue\QueueManager $manager
|
|
* @return void
|
|
*/
|
|
protected function registerSqsConnector($manager)
|
|
{
|
|
$manager->addConnector('sqs', function () {
|
|
return new SqsConnector;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register the queue worker.
|
|
*
|
|
* @return void
|
|
*/
|
|
protected function registerWorker()
|
|
{
|
|
$this->app->singleton('queue.worker', function ($app) {
|
|
$isDownForMaintenance = function () {
|
|
return $this->app->isDownForMaintenance();
|
|
};
|
|
|
|
return new Worker(
|
|
$app['queue'],
|
|
$app['events'],
|
|
$app[ExceptionHandler::class],
|
|
$isDownForMaintenance
|
|
);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register the queue listener.
|
|
*
|
|
* @return void
|
|
*/
|
|
protected function registerListener()
|
|
{
|
|
$this->app->singleton('queue.listener', function ($app) {
|
|
return new Listener($app->basePath());
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register the failed job services.
|
|
*
|
|
* @return void
|
|
*/
|
|
protected function registerFailedJobServices()
|
|
{
|
|
$this->app->singleton('queue.failer', function ($app) {
|
|
$config = $app['config']['queue.failed'];
|
|
|
|
if (isset($config['driver']) && $config['driver'] === 'dynamodb') {
|
|
return $this->dynamoFailedJobProvider($config);
|
|
} elseif (isset($config['table'])) {
|
|
return $this->databaseFailedJobProvider($config);
|
|
} else {
|
|
return new NullFailedJobProvider;
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Create a new database failed job provider.
|
|
*
|
|
* @param array $config
|
|
* @return \Illuminate\Queue\Failed\DatabaseFailedJobProvider
|
|
*/
|
|
protected function databaseFailedJobProvider($config)
|
|
{
|
|
return new DatabaseFailedJobProvider(
|
|
$this->app['db'], $config['database'], $config['table']
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Create a new DynamoDb failed job provider.
|
|
*
|
|
* @param array $config
|
|
* @return \Illuminate\Queue\Failed\DynamoDbFailedJobProvider
|
|
*/
|
|
protected function dynamoFailedJobProvider($config)
|
|
{
|
|
$dynamoConfig = [
|
|
'region' => $config['region'],
|
|
'version' => 'latest',
|
|
'endpoint' => $config['endpoint'] ?? null,
|
|
];
|
|
|
|
if (! empty($config['key']) && ! empty($config['secret'])) {
|
|
$dynamoConfig['credentials'] = Arr::only(
|
|
$config, ['key', 'secret', 'token']
|
|
);
|
|
}
|
|
|
|
return new DynamoDbFailedJobProvider(
|
|
new DynamoDbClient($dynamoConfig),
|
|
$this->app['config']['app.name'],
|
|
$config['table']
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Configure Opis Closure signing for security.
|
|
*
|
|
* @return void
|
|
*/
|
|
protected function registerOpisSecurityKey()
|
|
{
|
|
if (Str::startsWith($key = $this->app['config']->get('app.key'), 'base64:')) {
|
|
$key = base64_decode(substr($key, 7));
|
|
}
|
|
|
|
SerializableClosure::setSecretKey($key);
|
|
}
|
|
|
|
/**
|
|
* Get the services provided by the provider.
|
|
*
|
|
* @return array
|
|
*/
|
|
public function provides()
|
|
{
|
|
return [
|
|
'queue', 'queue.worker', 'queue.listener',
|
|
'queue.failer', 'queue.connection',
|
|
];
|
|
}
|
|
}
|