vendor/symfony/messenger/EventListener/SendFailedMessageForRetryListener.php line 54

  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\EventListener;
  11. use Psr\Container\ContainerInterface;
  12. use Psr\EventDispatcher\EventDispatcherInterface;
  13. use Psr\Log\LoggerInterface;
  14. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  15. use Symfony\Component\Messenger\Envelope;
  16. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  17. use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
  18. use Symfony\Component\Messenger\Exception\HandlerFailedException;
  19. use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
  20. use Symfony\Component\Messenger\Exception\RuntimeException;
  21. use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
  22. use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
  23. use Symfony\Component\Messenger\Stamp\DelayStamp;
  24. use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
  25. use Symfony\Component\Messenger\Stamp\StampInterface;
  26. use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
  27. /**
  28.  * @author Tobias Schultze <http://tobion.de>
  29.  */
  30. class SendFailedMessageForRetryListener implements EventSubscriberInterface
  31. {
  32.     private ContainerInterface $sendersLocator;
  33.     private ContainerInterface $retryStrategyLocator;
  34.     private ?LoggerInterface $logger;
  35.     private ?EventDispatcherInterface $eventDispatcher;
  36.     private int $historySize;
  37.     public function __construct(ContainerInterface $sendersLocatorContainerInterface $retryStrategyLocator, ?LoggerInterface $logger null, ?EventDispatcherInterface $eventDispatcher nullint $historySize 10)
  38.     {
  39.         $this->sendersLocator $sendersLocator;
  40.         $this->retryStrategyLocator $retryStrategyLocator;
  41.         $this->logger $logger;
  42.         $this->eventDispatcher $eventDispatcher;
  43.         $this->historySize $historySize;
  44.     }
  45.     /**
  46.      * @return void
  47.      */
  48.     public function onMessageFailed(WorkerMessageFailedEvent $event)
  49.     {
  50.         $retryStrategy $this->getRetryStrategyForTransport($event->getReceiverName());
  51.         $envelope $event->getEnvelope();
  52.         $throwable $event->getThrowable();
  53.         $message $envelope->getMessage();
  54.         $context = [
  55.             'class' => $message::class,
  56.         ];
  57.         $shouldRetry $retryStrategy && $this->shouldRetry($throwable$envelope$retryStrategy);
  58.         $retryCount RedeliveryStamp::getRetryCountFromEnvelope($envelope);
  59.         if ($shouldRetry) {
  60.             $event->setForRetry();
  61.             ++$retryCount;
  62.             $delay $retryStrategy->getWaitingTime($envelope$throwable);
  63.             $this->logger?->warning('Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"'$context + ['retryCount' => $retryCount'delay' => $delay'error' => $throwable->getMessage(), 'exception' => $throwable]);
  64.             // add the delay and retry stamp info
  65.             $retryEnvelope $this->withLimitedHistory($envelope, new DelayStamp($delay), new RedeliveryStamp($retryCount));
  66.             // re-send the message for retry
  67.             $retryEnvelope $this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);
  68.             $this->eventDispatcher?->dispatch(new WorkerMessageRetriedEvent($retryEnvelope$event->getReceiverName()));
  69.         } else {
  70.             $this->logger?->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"'$context + ['retryCount' => $retryCount'error' => $throwable->getMessage(), 'exception' => $throwable]);
  71.         }
  72.     }
  73.     /**
  74.      * Adds stamps to the envelope by keeping only the First + Last N stamps.
  75.      */
  76.     private function withLimitedHistory(Envelope $envelopeStampInterface ...$stamps): Envelope
  77.     {
  78.         foreach ($stamps as $stamp) {
  79.             $history $envelope->all($stamp::class);
  80.             if (\count($history) < $this->historySize) {
  81.                 $envelope $envelope->with($stamp);
  82.                 continue;
  83.             }
  84.             $history array_merge(
  85.                 [$history[0]],
  86.                 \array_slice($history, -$this->historySize 2),
  87.                 [$stamp]
  88.             );
  89.             $envelope $envelope->withoutAll($stamp::class)->with(...$history);
  90.         }
  91.         return $envelope;
  92.     }
  93.     public static function getSubscribedEvents(): array
  94.     {
  95.         return [
  96.             // must have higher priority than SendFailedMessageToFailureTransportListener
  97.             WorkerMessageFailedEvent::class => ['onMessageFailed'100],
  98.         ];
  99.     }
  100.     private function shouldRetry(\Throwable $eEnvelope $envelopeRetryStrategyInterface $retryStrategy): bool
  101.     {
  102.         if ($e instanceof RecoverableExceptionInterface) {
  103.             return true;
  104.         }
  105.         // if one or more nested Exceptions is an instance of RecoverableExceptionInterface we should retry
  106.         // if ALL nested Exceptions are an instance of UnrecoverableExceptionInterface we should not retry
  107.         if ($e instanceof HandlerFailedException) {
  108.             $shouldNotRetry true;
  109.             foreach ($e->getWrappedExceptions() as $nestedException) {
  110.                 if ($nestedException instanceof RecoverableExceptionInterface) {
  111.                     return true;
  112.                 }
  113.                 if (!$nestedException instanceof UnrecoverableExceptionInterface) {
  114.                     $shouldNotRetry false;
  115.                     break;
  116.                 }
  117.             }
  118.             if ($shouldNotRetry) {
  119.                 return false;
  120.             }
  121.         }
  122.         if ($e instanceof UnrecoverableExceptionInterface) {
  123.             return false;
  124.         }
  125.         return $retryStrategy->isRetryable($envelope$e);
  126.     }
  127.     private function getRetryStrategyForTransport(string $alias): ?RetryStrategyInterface
  128.     {
  129.         if ($this->retryStrategyLocator->has($alias)) {
  130.             return $this->retryStrategyLocator->get($alias);
  131.         }
  132.         return null;
  133.     }
  134.     private function getSenderForTransport(string $alias): SenderInterface
  135.     {
  136.         if ($this->sendersLocator->has($alias)) {
  137.             return $this->sendersLocator->get($alias);
  138.         }
  139.         throw new RuntimeException(sprintf('Could not find sender "%s" based on the same receiver to send the failed message to for retry.'$alias));
  140.     }
  141. }