Batching Jobs in Laravel

post-thumb

Background tasks component is essential in most backend systems. Regardless of what you’re creating, there’s almost always a need for a background worker in your application. Do users need to register in your app? The confirmation email should be sent as a background job. Does your app need to import data from third parties? A background job dedicated to importing is the best tool for this. If your users require reports, a background job is again the perfect tool for the task.

Note This article is written with the assumption that you already have knowledge about Queues, Workers, Jobs, and understand how to dispatch jobs.

Before the Batching Feature

Before Laravel 8, there was no built-in support for batching jobs. Therefore, if you wanted to group your jobs into batches and keep track of them in versions before Laravel 8, you needed to generate a unique ID for each batch, for example, batch-123, and include this as a property in your jobs. Each job would then query the batch table in the database using this batch ID, allowing you to execute whatever tracking logic was necessary.

Job Batches

You might be wondering how this is implemented. A critical question that came to my mind when this feature was introduced was - does the Batch know about the jobs, or do the jobs know about the batch?

According to the implementation, it’s the jobs that know about the batch. Let’s take a look at how to start a batch.

 1$articles = Article::query()
 2			    ->wherePublishDateToday()
 3			    ->get()
 4			    ->map(function ($article) {
 5			        return new PublishArticleJob($article);
 6			    });
 7
 8Bus::batch($articles->toArray())
 9    ->name('publish-articles')
10    ->dispatch();

In our blog system, a daily command runs to fetch all articles scheduled to be published on that day. The system then dispatches the PublishArticle job for each of these articles in a single batch. This way, we can easily check later whether the entire batch completed successfully or not.

I’m using Laravel Horizon to illustrate some details about the batch we just created.

Batch details in Horizon

You’ll notice the unique Batch ID, along with statistics on how many jobs are still pending, have failed, or have been completed.

The PublishArticleJob may randomly throw an exception. I did this to demonstrate what a batch with failures might look like.

 1class PublishArticleJob implements ShouldQueue
 2{
 3    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels, Batchable;
 4
 5    public function __construct(public Article $article)
 6    {
 7        //
 8    }
 9
10    public function handle(): void
11    {
12        if(rand(0.0, 1.0) > 0.8) {
13            $this->fail();
14        }
15    }
16}
Batch with failures in Horizon

Now, let’s take a look at the job_batches table in the database.

Batch details in the database

The batch contains information about any failed jobs, so you have the option to retry these jobs directly from the UI button on Horizon or through an API call, depending on your configuration.

Next, let’s dive deeper and see how all of this works under the hood.

A Deep Dive into the Batch Component

The ideal starting point for our deep dive into this component is the Bus::batch function.

1namespace Illuminate\Bus;
2
3class Dispatcher implements QueueingDispatcher
4{
5    public function batch($jobs)
6    {
7        return new PendingBatch($this->container, Collection::wrap($jobs));
8    }
9}

The pattern of using Pending objects is prevalent in various parts of Laravel. I’ll write more about this topic very soon.

Now, let’s explore what the PendingBatch contains.

 1namespace Illuminate\Bus;
 2
 3class PendingBatch
 4{
 5
 6    public function __construct(Container $container, Collection $jobs)
 7    {
 8        $this->container = $container;
 9        $this->jobs = $jobs;
10    }
11
12    // many methods
13}

In its constructor, the PendingBatch takes a container instance and a collection of jobs.

Now, let’s find out what happens when we call the dispatch method on a PendingBatch instance.

 1public function dispatch()
 2{
 3    $repository = $this->container->make(BatchRepository::class);
 4
 5    try {
 6        $batch = $repository->store($this);
 7
 8        $batch = $batch->add($this->jobs);
 9    } catch (Throwable $e) {
10        if (isset($batch)) {
11            $repository->delete($batch->id);
12        }
13
14        throw $e;
15    }
16
17    $this->container->make(EventDispatcher::class)->dispatch(
18        new BatchDispatched($batch)
19    );
20
21    return $batch;
22}

The dispatch method begins by retrieving the BatchRepository to save the batch data.

In a ’try’ block, it attempts to persist a batch, then associates all jobs with the batch using $batch->add($this->jobs).

Let’s examine what the ‘store’ method in the batch does. First of all, Laravel provides two implementations of the BatchRepository interface:

  • Illuminate\Bus\DatabaseBatchRepository (the default)
  • Illuminate\Support\Testing\Fakes\BatchRepositoryFake (primarily used for testing)

Let’s dive into the ‘store’ method of the DatabaseBatchRepository.

 1public function store(PendingBatch $batch)
 2{
 3    $id = (string) Str::orderedUuid();
 4
 5    $this->connection->table($this->table)->insert([
 6        'id' => $id,
 7        'name' => $batch->name,
 8        'total_jobs' => 0,
 9        'pending_jobs' => 0,
10        'failed_jobs' => 0,
11        'failed_job_ids' => '[]',
12        'options' => $this->serialize($batch->options),
13        'created_at' => time(),
14        'cancelled_at' => null,
15        'finished_at' => null,
16    ]);
17
18    return $this->find($id);
19}

It generates a unique UUID and then saves the batch information into a database table. After that, it calls find(id) to return a Batch object.

 1public function find(string $batchId)
 2{
 3    $batch = $this->connection->table($this->table)
 4                        ->useWritePdo()
 5                        ->where('id', $batchId)
 6                        ->first();
 7
 8    if ($batch) {
 9        return $this->toBatch($batch);
10    }
11}

Take note of the useWritePdo() function, which ensures that the ID will always be found if your database uses write and read replicas.

I’m going to skip the ->toBatch($batch) part because it simply transforms the array returned from the database connection->first() into a Batch object.

Now, let’s return to the dispatch method.

 1public function dispatch()
 2{
 3    $repository = $this->container->make(BatchRepository::class);
 4
 5    try {
 6        $batch = $repository->store($this);
 7
 8        $batch = $batch->add($this->jobs);
 9    } catch (Throwable $e) {
10        if (isset($batch)) {
11            $repository->delete($batch->id);
12        }
13
14        throw $e;
15    }
16
17    $this->container->make(EventDispatcher::class)->dispatch(
18        new BatchDispatched($batch)
19    );
20
21    return $batch;
22}

Let’s see what $batch->add($this->jobs) does

 1namespace Illuminate\Bus;
 2
 3class Batch implements Arrayable, JsonSerializable
 4{
 5    public function add($jobs)
 6    {
 7        $count = 0;
 8
 9        $jobs = Collection::wrap($jobs)->map(function ($job) use (&$count) {
10            $job = $job instanceof Closure ? CallQueuedClosure::create($job) : $job;
11
12            if (is_array($job)) {
13                $count += count($job);
14
15                return with($this->prepareBatchedChain($job), function ($chain) {
16                    return $chain->first()
17                            ->allOnQueue($this->options['queue'] ?? null)
18                            ->allOnConnection($this->options['connection'] ?? null)
19                            ->chain($chain->slice(1)->values()->all());
20                });
21            } else {
22                $job->withBatchId($this->id);
23
24                $count++;
25            }
26
27            return $job;
28        });
29
30        $this->repository->transaction(function () use ($jobs, $count) {
31            $this->repository->incrementTotalJobs($this->id, $count);
32
33            $this->queue->connection($this->options['connection'] ?? null)->bulk(
34                $jobs->all(),
35                $data = '',
36                $this->options['queue'] ?? null
37            );
38        });
39
40        return $this->fresh();
41    }
42}

To understand the add method, let me explain how it can be used.

If you want to dispatch multiple jobs in a single batch and run them concurrently, particularly if jobs are targeting different queues, passing an array of jobs to the Bus::batch([$job1, $job2]) function works perfectly.

However, if you have multiple jobs that need to be run in a specific sequence (known as job chaining), you must pass them to the batch method within a sub-array. This ensures they are executed in the correct order.

1Bus::batch([
2	$job1,
3	[
4		$job2,
5		$job3,
6	],
7	$job4
8]);

So, Job1, Job2, and Job4 will be run concurrently. However, Job3 won’t start until Job2 has been processed.

Now, let’s get back to the add method.

 1$jobs = Collection::wrap($jobs)->map(function ($job) use (&$count) {
 2    $job = $job instanceof Closure ? CallQueuedClosure::create($job) : $job;
 3
 4    if (is_array($job)) {
 5        $count += count($job);
 6
 7        return with($this->prepareBatchedChain($job), function ($chain) {
 8            return $chain->first()
 9                    ->allOnQueue($this->options['queue'] ?? null)
10                    ->allOnConnection($this->options['connection'] ?? null)
11                    ->chain($chain->slice(1)->values()->all());
12        });
13    } else {
14        $job->withBatchId($this->id);
15
16        $count++;
17    }
18
19    return $job;
20});

In line 4, if it’s an array, it indicates that it’s a job chain (like Job2 and Job3 in our example).

The PrepareBatchedChain function assigns a batch ID to every job in the chain.

1protected function prepareBatchedChain(array $chain)
2{
3    return collect($chain)->map(function ($job) {
4        $job = $job instanceof Closure ? CallQueuedClosure::create($job) : $job;
5
6        return $job->withBatchId($this->id);
7    });
8}

Next, within the add() method’s with callback, it selects the first job in the array, sets allOnQueue and allOnConnection, and then calls the ->chain() method for the remaining jobs.

1return with($this->prepareBatchedChain($job), function ($chain) {
2    return $chain->first()
3            ->allOnQueue($this->options['queue'] ?? null)
4            ->allOnConnection($this->options['connection'] ?? null)
5            ->chain($chain->slice(1)->values()->all());
6});

Now, returning to the save method. If the $job isn’t part of a chain, the method will simply set the batch ID.

 1$jobs = Collection::wrap($jobs)->map(function ($job) use (&$count) {
 2    $job = $job instanceof Closure ? CallQueuedClosure::create($job) : $job;
 3
 4    if (is_array($job)) {
 5        // chained jobs
 6    } else {
 7        $job->withBatchId($this->id);
 8
 9        $count++;
10    }
11
12    return $job;
13});

The withBatchId($id) method sets the public property batchId in the job class.

Next, let’s move on with the save method.

 1public function add($jobs)
 2{
 3    $count = 0;
 4
 5    $jobs = Collection::wrap($jobs)->map(function ($job) use (&$count) {
 6        // trimmed for brevity
 7    });
 8
 9    $this->repository->transaction(function () use ($jobs, $count) {
10        $this->repository->incrementTotalJobs($this->id, $count);
11
12        $this->queue->connection($this->options['connection'] ?? null)->bulk(
13            $jobs->all(),
14            $data = '',
15            $this->options['queue'] ?? null
16        );
17    });
18
19    return $this->fresh();
20}

After preparing the jobs, whether they’re chained or not, Laravel uses a transaction to increase the total number of jobs in the batch. Then it calls the queue to bulk send all the prepared jobs to the queue.

How Job Changes Reflect on a Batch

In the previous section, we saw how Bus::batch prepares the job and dispatches them to the queue. But what happens after that?

To understand how the batch gets updated, we need to start by looking at how the Dispatchable trait is used.

The Dispatchable trait must be included in any job that we want to batch

1class PublishArticleJob implements ShouldQueue
2{
3    use Dispatchable,
4		    InteractsWithQueue,
5		    Queueable,
6		    Batchable;
7
8    //
9}

So, if we examine the use of the Batchable trait (for example, by using ‘find usage’ in PHPStorm), we see it used in several places.

One of these usages is in Illuminate\Queue\CallQueuedHandler, a class that appears to be exactly what we’re looking for.

One of the methods that utilizes Batchable is ensureSuccessfulBatchJobIsRecorded. From the name, we can infer that it’s certainly used when a batched job is processed successfully

 1protected function ensureSuccessfulBatchJobIsRecorded($command)
 2{
 3    $uses = class_uses_recursive($command);
 4
 5    if (! in_array(Batchable::class, $uses) ||
 6        ! in_array(InteractsWithQueue::class, $uses)) {
 7        return;
 8    }
 9
10    if ($batch = $command->batch()) {
11        $batch->recordSuccessfulJob($command->job->uuid());
12    }
13}

The method begins by recursively collecting all the traits used in the jobs. It verifies that the job is batchable and interacts with the queue. Then, it fetches the batch object and calls recordSuccessfulJob, passing in the job ID.

Now, let’s take a look at the recordSuccessfulJob method

 1public function recordSuccessfulJob(string $jobId)
 2{
 3    $counts = $this->decrementPendingJobs($jobId);
 4
 5    if ($counts->pendingJobs === 0) {
 6        $this->repository->markAsFinished($this->id);
 7    }
 8
 9    if ($counts->pendingJobs === 0 && $this->hasThenCallbacks()) {
10        $batch = $this->fresh();
11
12        collect($this->options['then'])->each(function ($handler) use ($batch) {
13            $this->invokeHandlerCallback($handler, $batch);
14        });
15    }
16
17    if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
18        $batch = $this->fresh();
19
20        collect($this->options['finally'])->each(function ($handler) use ($batch) {
21            $this->invokeHandlerCallback($handler, $batch);
22        });
23    }
24}

The method begins by reducing the number of pending jobs. If it’s the last job, it marks the batch as finished. It then carries out a few additional checks to run the Then callback and the Finally callback.

I want to introduce you to something that might be new. Let’s dive into decrementPendingJobs and see how it operates.

1public function decrementPendingJobs(string $jobId)
2{
3    return $this->repository->decrementPendingJobs($this->id, $jobId);
4}

Lets quickly jump into the repository’s method

 1public function decrementPendingJobs(string $batchId, string $jobId)
 2{
 3    $values = $this->updateAtomicValues($batchId, function ($batch) use ($jobId) {
 4        return [
 5            'pending_jobs' => $batch->pending_jobs - 1,
 6            'failed_jobs' => $batch->failed_jobs,
 7            'failed_job_ids' => json_encode(array_values(array_diff((array) json_decode($batch->failed_job_ids, true), [$jobId]))),
 8        ];
 9    });
10
11    return new UpdatedBatchJobCounts(
12        $values['pending_jobs'],
13        $values['failed_jobs']
14    );
15}

While it may seem like a standard DB::table('xyz')->update(); operation, what does updateAtomicValues actually mean?

 1protected function updateAtomicValues(string $batchId, Closure $callback)
 2{
 3    return $this->connection->transaction(function () use ($batchId, $callback) {
 4        $batch = $this->connection->table($this->table)->where('id', $batchId)
 5                    ->lockForUpdate()
 6                    ->first();
 7
 8        return is_null($batch) ? [] : tap($callback($batch), function ($values) use ($batchId) {
 9            $this->connection->table($this->table)->where('id', $batchId)->update($values);
10        });
11    });
12}

The lockForUpdate is a form of Pessimistic Locking . This means that once we fetch the batch row from the database, no one else can read the batch data until the current transaction is committed.

This guarantees that the count of pending, finished, and failed jobs will always be correct, as there’s no room for race conditions.

You can learn more about lockForUpdate in this insightful article Understanding lockForUpdate and sharedLock in Laravel , which explains it in detail.

Similarly to recordSuccessfulJob, there is a recordFailedJob method. This does much the same, but instead, it increases the count of failed jobs and logs its ID into the failed_job_ids column.

Conclusion

Together, we’ve delved into how Laravel’s Batch component operates under the hood. We’ve seen how a batch is created and updated, and how the processing of jobs influences the batch. At any given moment, you can check the status of the jobs inside the batch and take actions using the then and finally callbacks.

If you have any questions, please don’t hesitate to leave them in the comments section below.

Happy coding!

comments powered by Disqus

You May Also Like