Content Source:
When creating jobs, listeners, or subscribers to push into the queue, you may start thinking that, once dispatched, you’re all on your own with what the queue worker decides to do with your logic.
Well, it’s not that you can’t interact with the queue worker from inside the job, but you usually don’t need to… until you do.
The magic happens because of the InteractsWithQueue
trait. When the queued job is being pulled out from the queue, the CallQueuedListener
will check if it’s using the InteractsWithQueue
trait, and if it is, the framework will inject the underlying “job” instance inside.
This “job” instance is like the driver that wraps your real Job
class, which contains information about the connection and attempts, among other things.
Context
We will use a transcoding Job
as an example. This is a job that transcodes a podcast audio file into an MP3 of 192kbps. Because this is set in the free transcoding queue, it has limited availability.
Checking the Attempts
The first method is called attempts()
, and as its name implies, it returns the number of attempts. A Job
always starts with one attempt.
This method is meant to be used with others, like fail()
or release()
(delay). For illustration purposes, we will notify the user of the nth retrying: each time we try to transcode a podcast in the free queue, we will notify the user we’re retrying for the nth time, giving him the option to cancel future transcodes.
<?php namespace App\Jobs; use App\Podcast; use Transcoder\Transcoder; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use App\Notifications\PodcastTranscoded; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Foundation\Bus\Dispatchable; use App\Notifications\RetyingPodcastTranscode; class TranscodePodcast { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; /** * Transcoder Instance * * @var \App\Podcast */ protected $podcast; /** * Create a new Transcode Podcast instance. * * @param \App\Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * Execute the job. * * @param \Transcoder\Transcoder $podcast * @return void */ public function handle(Transcoder $transcoder) { // Tell the user we're retrying for the nth time if ($this->attempts() > 1) { $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts()); } $transcoded = $this->transcoder->setFile($event->podcast) ->format('mp3') ->bitrate(192) ->start(); // Associate the transcoded podcast to the original podcast. $this->podcast->transcode()->associate($transcoded); // Notify the publisher of the podcast that his podcast is ready $this->publisher->notify(new PodcastTranscoded($this->podcast)); } }
Telling the user that we’re retrying something for the nth time is useful when the logic has failed beforehand, letting the user (or developer) check what went wrong, but of course you can do more than that.
Personally, we like to do that after the Job
failed, and if it has retries left, tell him that we’re going to retry later.
Deleting the Job
The second method is delete()
. As you can guess, you can delete the current Job
from the queue.
This can be handy when you shouldn’t process the job or listener after it was queued for several reasons. For example, think about this scenario: the publisher that uploaded the podcast has been deactivated for any reason (like a TOS violation) before the transcoding occurs, and we should not process the podcast.
We will add that code to the example from before:
<?php namespace App\Jobs; use App\Podcast; use Transcoder\Transcoder; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use App\Notifications\PodcastTranscoded; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Foundation\Bus\Dispatchable; use App\Notifications\RetyingPodcastTranscode; class TranscodePodcast { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; /** * Transcoder Instance * * @var \App\Podcast */ protected $podcast; /** * Create a new Transcode Podcast instance. * * @param \App\Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * Execute the job. * * @param \Transcoder\Transcoder $podcast * @return void */ public function handle(Transcoder $transcoder) { // If the publisher has been deactivated, take this job out if ($this->podcast->publisher->isDeactivated()) { $this->delete(); } // Tell the user we're retrying for the nth time if ($this->attempts() > 1) { $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts()); } $transcoded = $this->transcoder->setFile($event->podcast) ->format('mp3') ->bitrate(192) ->start(); // Associate the transcoded podcast to the original podcast. $this->podcast->transcode()->associate($transcoded); // Notify the publisher of the podcast that his podcast is ready $this->publisher->notify(new PodcastTranscoded($this->podcast)); } }
If you need to delete the job on models that may have been deleted, you may want to set $deleteWhenMissingModels
to true to avoid processing something that is not there.
Failing the Job
This is very, very handy when you need control over artificially failing the logic, because using an empty return
statement will mark the Job as done successfully. You can forcefully fail the queued job
, hopefully with an exception, allowing the handler to retry it later if possible.
This gives you finer control when the job fails. In any case, you can also use the failed()
method, which allows you to perform any cleaning after it fails, like notifying the user or deleting something.
In this example, we will fail the job if the podcast cannot be retrieved from storage for whatever reason, like when the CDN goes down, throwing a custom exception.
<?php namespace App\Jobs; use App\Podcast; use Transcoder\Transcoder; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use App\Exceptions\PodcastUnretrievable; use App\Notifications\PodcastTranscoded; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Foundation\Bus\Dispatchable; use App\Notifications\RetyingPodcastTranscode; class TranscodePodcast { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; /** * Transcoder Instance * * @var \App\Podcast */ protected $podcast; /** * Create a new Transcode Podcast instance. * * @param \App\Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * Execute the job. * * @param \Transcoder\Transcoder $podcast * @return void */ public function handle(Transcoder $transcoder) { // If the publisher has been deactivated, take this job out if ($this->podcast->publisher->isDeactivated()) { $this->delete(); } // If the podcast cannot be retrieved from the storage, we will fail miserably. if ($this->podcast->fileDoesntExists()) { $this->fail(new PodcastUnretrievable($this->podcast)); } // Tell the user we're retrying for the nth time if ($this->attempts() > 1) { $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts()); } $transcoded = $this->transcoder->setFile($event->podcast) ->format('mp3') ->bitrate(192) ->start(); // Associate the transcoded podcast to the original podcast. $this->podcast->transcode()->associate($transcoded); // Notify the publisher of the podcast that his podcast is ready $this->publisher->notify(new PodcastTranscoded($this->podcast)); } }
Releasing (Delaying) a Job
This is probably the most useful method of the trait, as it allows you to push the job further into the future. This method is used for rate limiting your job.
Apart from rate limiting, you can also use this when something is not available but you expect it to be in the near future. Also, to avoid failing preemptively.
In this last example, we will delay the transcoding for later: if the transcoder is under heavy usage, we will delay the transcoding for five minutes until the load lowers.
<?php namespace App\Jobs; use App\Podcast; use Transcoder\Transcoder; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use App\Exceptions\PodcastUnretrievable; use App\Notifications\PodcastTranscoded; use Illuminate\Queue\InteractsWithQueue; use App\Notifications\TranscoderHighUsage; use Illuminate\Foundation\Bus\Dispatchable; use App\Notifications\RetyingPodcastTranscode; class TranscodePodcast { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; /** * Transcoder Instance * * @var \App\Podcast */ protected $podcast; /** * Create a new Transcode Podcast instance. * * @param \App\Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * Execute the job. * * @param \Transcoder\Transcoder $podcast * @return void */ public function handle(Transcoder $transcoder) { // If the publisher has been deactivated, take this job out if ($this->podcast->publisher->isDeactivated()) { $this->delete(); } // If the podcast cannot be retrieved from the storage, we will fail miserably. if ($this->podcast->fileDoesntExists()) { $this->fail(new PodcastUnretrievable($this->podcast)); } // If the Transcoder usage is high, we will avoid choking it by delaying the // transcoding by 5 minutes. Otherwise we may risk stalling the transcoder // process, which will take down all transcoding sub-processes with him. if ($transcoder->getLoad()->isHigh()) { $delay = 60 * 5; $this->podcast->publisher->notify(new TranscoderHighUsage($this->podcast, $delay)); $this->release($delay); } // Tell the user we're retrying for the nth time if ($this->attempts() > 1) { $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts()); } $transcoded = $this->transcoder->setFile($event->podcast) ->format('mp3') ->bitrate(192) ->start(); // Associate the transcoded podcast to the original podcast. $this->podcast->transcode()->associate($transcoded); // Notify the publisher of the podcast that his podcast is ready $this->publisher->notify(new PodcastTranscoded($this->podcast)); } }
We could use some magic, for example, getting assign some slots to the transcoder, and delay the job if the transcoder slots are full.
And that’s pretty much all you can do from inside the queued job.
For more Information and to build a website using Laravel, Hire Laravel Developer from us as we give you a high-quality product by utilizing all the latest tools and advanced technology. E-mail us any clock at – hello@hkinfosoft.com or Skype us: “hkinfosoft“.
To develop the custom website using Laravel, please visit our technology page
- medium.com