src/Platform/QueueBundle/Service/AsyncQueueService.php line 396

Open in your IDE?
  1. <?php
  2. namespace Platform\QueueBundle\Service;
  3. use Platform\QueueBundle\Model\AsyncMessage;
  4. use Stomp\Client;
  5. use Stomp\Network\Connection;
  6. use Stomp\StatefulStomp;
  7. use Stomp\Transport\Frame;
  8. use Stomp\Transport\Message;
  9. use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface;
  10. /**
  11.  * Class AsyncQueueService
  12.  * @package Platform\QueueBundle\Service
  13.  */
  14. final class AsyncQueueService
  15. {
  16.     private const DEFAULT_QUEUE '/queue/campussuite/worker';
  17.     private const READ_TIMEOUT 60;
  18.     private const SUBSCRIPTION_LIMIT 1;
  19.     private const CONNECT_MAX_RETRIES 10;
  20.     /**
  21.      * @var array
  22.      */
  23.     protected array $config;
  24.     /**
  25.      * @var StatefulStomp
  26.      */
  27.     protected StatefulStomp $stomp;
  28.     /**
  29.      * @var array|int[]
  30.      */
  31.     protected array $subscriptions = [];
  32.     /**
  33.      * @var array|Frame[]
  34.      */
  35.     protected array $frames = [];
  36.     /**
  37.      * @param ParameterBagInterface $params
  38.      */
  39.     public function __construct(ParameterBagInterface $params)
  40.     {
  41.         $this->config = [
  42.             'broker' => sprintf(
  43.                 'failover://(%s)',
  44.                 implode(','array_map(
  45.                     static function (string $broker) {
  46.                         return str_replace('stomp+'''$broker);
  47.                     },
  48.                     explode(','$params->get('app.mq.default.broker'))
  49.                 ))
  50.             ),
  51.             'user' => $params->get('app.mq.default.user'),
  52.             'password' => $params->get('app.mq.default.password'),
  53.         ];
  54.     }
  55.     /**
  56.      * @param string|null $suffix
  57.      * @return string
  58.      */
  59.     public static function qname(?string $suffix null): string
  60.     {
  61.         if ( ! empty($suffix)) {
  62.             return sprintf(
  63.                 '%s/%s',
  64.                 self::DEFAULT_QUEUE,
  65.                 trim($suffix'/')
  66.             );
  67.         }
  68.         return self::DEFAULT_QUEUE;
  69.     }
  70.     /**
  71.      * @return $this
  72.      */
  73.     public function connect(): self
  74.     {
  75.         // if we don't have a stomp object yet, we need to make one
  76.         if (empty($this->stomp)) {
  77.             // create the client, make it stateful
  78.             $connection = new Connection(
  79.                 $this->config['broker'],
  80.                 30
  81.             );
  82.             $connection->setReadTimeout(120);
  83.             $client = new Client($connection);
  84.             $client->setLogin(
  85.                 $this->config['user'],
  86.                 $this->config['password']
  87.             );
  88.             $this->stomp = new StatefulStomp($client);
  89.         }
  90.         // if we are not connected, attempt to connect
  91.         if ( ! $this->stomp->getClient()->isConnected()) {
  92.             // try to connect multiple times
  93.             // aws mq tends to "sleep" on us when not used for a while
  94.             $exception null;
  95.             $counter 0;
  96.             while ( ! $this->stomp->getClient()->isConnected() && $counter self::CONNECT_MAX_RETRIES) {
  97.                 // wrapping in a try clause because of the issue with aws mq "sleeping"
  98.                 try {
  99.                     // do connect
  100.                     $this->stomp->getClient()->connect();
  101.                 } catch (\Exception $e) {
  102.                     // we had an error, increment the counter so that we don't attempt to connect forever
  103.                     $counter++;
  104.                     // track the last connection exception
  105.                     $exception $e;
  106.                     // sleep for a bit to wait for it to come online
  107.                     sleep(1);
  108.                 }
  109.             }
  110.             // check again if we have a connection
  111.             // throw the last error if not
  112.             if ( ! $this->stomp->getClient()->isConnected()) {
  113.                 throw $exception ?: new \Exception('Could not connect to MQ server.');
  114.             }
  115.             // TODO: we may be reconnecting, if so, resubscribe to the queues if we were subscribed already?
  116.         }
  117.         return $this;
  118.     }
  119.     /**
  120.      * @return $this
  121.      */
  122.     public function close(): self
  123.     {
  124.         // if we were in the middle of processing a frame and forgot to ack or nack, nack it
  125.         if ( ! empty($this->frames)) {
  126.             // TODO: allow auto-nack option for outstanding frames?
  127.             throw new \RuntimeException('Cannot close the connection, there are outstanding frames.');
  128.         }
  129.         // if we are subscribed, properly unsubscribe first
  130.         foreach ($this->subscriptions as $subscription) {
  131.             $this->unsubscribe($subscription);
  132.         }
  133.         if ( ! empty($this->subscriptions)) {
  134.             // TODO: allow auto-unsubscribe option?
  135.             throw new \RuntimeException('Cannot close the connection, there are still active subscriptions.');
  136.         }
  137.         // disconnect the client
  138.         $this->stomp->getClient()->disconnect(true);
  139.         return $this;
  140.     }
  141.     /**
  142.      * @return bool|Frame
  143.      */
  144.     public function listen(?int $timeout null)
  145.     {
  146.         // check if we are even subscribed
  147.         if (empty($this->subscriptions)) {
  148.             throw new \RuntimeException('Cannot listen for messages, no subscriptions have been made.');
  149.         }
  150.         // ensure connection
  151.         $this->connect();
  152.         // read from the queue; this may block for some time...
  153.         if ( ! $timeout) {
  154.             $this->stomp->getClient()->getConnection()->setReadTimeout(self::READ_TIMEOUT);
  155.         } else {
  156.             $this->stomp->getClient()->getConnection()->setReadTimeout(0, ($timeout 1000));
  157.         }
  158.         $frame $this->stomp->read();
  159.         // make sure we got a proper result back
  160.         // if we did, track it
  161.         if ($frame instanceof Frame) {
  162.             $this->frames[$frame->getMessageId()] = $frame;
  163.         }
  164.         // send back the result, may be a frame or a boolean
  165.         return $frame;
  166.     }
  167.     /**
  168.      * @param Frame $frame
  169.      * @return $this
  170.      */
  171.     public function ack(Frame $frame): self
  172.     {
  173.         // check that we haven't already processed a frame
  174.         if ( ! isset($this->frames[$frame->getMessageId()])) {
  175.             throw new \RuntimeException(sprintf(
  176.                 'Frame #%s has already been handled.',
  177.                 $frame->getMessageId()
  178.             ));
  179.         }
  180.         // ensure connection
  181.         $this->connect();
  182.         // do the ack
  183.         $this->stomp->ack($frame);
  184.         // clear the frame from tracking
  185.         unset($this->frames[$frame->getMessageId()]);
  186.         return $this;
  187.     }
  188.     /**
  189.      * @param Frame $frame
  190.      * @return $this
  191.      */
  192.     public function nack(Frame $frame): self
  193.     {
  194.         // check that we haven't already processed a frame
  195.         if ( ! isset($this->frames[$frame->getMessageId()])) {
  196.             throw new \RuntimeException(sprintf(
  197.                 'Frame #%s has already been handled.',
  198.                 $frame->getMessageId()
  199.             ));
  200.         }
  201.         // ensure connection
  202.         $this->connect();
  203.         // do the nack
  204.         $this->stomp->nack($frame);
  205.         // clear the frame from tracking
  206.         unset($this->frames[$frame->getMessageId()]);
  207.         return $this;
  208.     }
  209.     /**
  210.      * @param string|null $queue
  211.      * @return int
  212.      */
  213.     public function subscribe(?string $queue): int
  214.     {
  215.         // default the queue in case none is given
  216.         if (empty($queue)) {
  217.             $queue self::qname();
  218.         }
  219.         // make sure we have proper queue
  220.         if ( ! str_starts_with($queue'/queue')) {
  221.             throw new \RuntimeException('Queue name is not properly formatted; must being with "/queue".');
  222.         }
  223.         // make sure we are not above our limit
  224.         if (count($this->subscriptions) > self::SUBSCRIPTION_LIMIT) {
  225.             throw new \RuntimeException(sprintf(
  226.                 'Cannot subscribe to queue "%s", doing so will exceed max subscription limit of %s.',
  227.                 $queue,
  228.                 self::SUBSCRIPTION_LIMIT
  229.             ));
  230.         }
  231.         // ensure connection
  232.         $this->connect();
  233.         // make the subscription and save of the id of the subscription
  234.         return $this->subscriptions[] = $this->stomp->subscribe(
  235.             $queue,
  236.             null,
  237.             'client-individual',
  238.         );
  239.     }
  240.     /**
  241.      * @param int $id
  242.      * @return $this
  243.      */
  244.     public function unsubscribe(int $id): self
  245.     {
  246.         // make sure this subscription is legit
  247.         if ( ! in_array($id$this->subscriptionstrue)) {
  248.             throw new \RuntimeException(sprintf(
  249.                 'Subscription #%s has not been setup.',
  250.                 $id
  251.             ));
  252.         }
  253.         // ensure connection
  254.         $this->connect();
  255.         // do the unsub
  256.         $this->stomp->unsubscribe($id);
  257.         // clear the tracked subscription id
  258.         $this->subscriptions = [
  259.             ...array_slice($this->subscriptions0array_search($id$this->subscriptionstrue)),
  260.             ...array_slice($this->subscriptions, (array_search($id$this->subscriptionstrue) + 1)),
  261.         ];
  262.         return $this;
  263.     }
  264.     /**
  265.      * @param string|null $queue
  266.      * @param array|AsyncMessage[] $messages
  267.      * @param bool $transactional
  268.      * @return array|Message[]
  269.      */
  270.     public function queue(?string $queue, array $messagesbool $transactional true): array
  271.     {
  272.         // default the queue in case none is given
  273.         if (empty($queue)) {
  274.             $queue self::qname();
  275.         }
  276.         // make sure we have proper queue
  277.         if ( ! str_starts_with($queue'/queue')) {
  278.             throw new \RuntimeException('Queue name is not properly formatted; must being with "/queue".');
  279.         }
  280.         // quit if no messages to send
  281.         if (empty($messages)) {
  282.             return [];
  283.         }
  284.         // ensure connection
  285.         $this->connect();
  286.         // holder for messages
  287.         $msgs = [];
  288.         // detect errors
  289.         try {
  290.             // start transaction if doing that
  291.             if ($transactional) {
  292.                 $this->stomp->begin();
  293.             }
  294.             // loop over each message and send
  295.             foreach ($messages as $message) {
  296.                 $result $this->stomp->send(
  297.                     $queue,
  298.                     $msgs[] = $message->stomp()
  299.                 );
  300.                 if ( ! $result) {
  301.                     throw new \RuntimeException(sprintf(
  302.                         'Sending of a single message to queue "%s" in a transaction failed.',
  303.                         $queue
  304.                     ));
  305.                 }
  306.             }
  307.             // commit transaction if doing that
  308.             if ($transactional) {
  309.                 $this->stomp->commit();
  310.             }
  311.         } catch (\Exception $e) {
  312.             // rollback transaction if doing that
  313.             if ($transactional) {
  314.                 $this->stomp->abort();
  315.             }
  316.             // rethrow the error
  317.             throw $e;
  318.         }
  319.         return $msgs;
  320.     }
  321.     /**
  322.      * @param string|null $queue
  323.      * @param AsyncMessage $message
  324.      * @return Message
  325.      */
  326.     public function send(?string $queueAsyncMessage $message): Message
  327.     {
  328.         // default the queue in case none is given
  329.         if (empty($queue)) {
  330.             $queue self::qname();
  331.         }
  332.         // make sure we have proper queue
  333.         if ( ! str_starts_with($queue'/queue')) {
  334.             throw new \RuntimeException('Queue name is not properly formatted; must being with "/queue".');
  335.         }
  336.         // ensure connection
  337.         $this->connect();
  338.         // send the stomp formatted message
  339.         $result $this->stomp->send(
  340.             $queue,
  341.             $msg $message->stomp()
  342.         );
  343.         // make sure it was successful
  344.         if ( ! $result) {
  345.             throw new \RuntimeException(sprintf(
  346.                 'Sending of a single message to queue "%s" failed.',
  347.                 $queue
  348.             ));
  349.         }
  350.         return $msg;
  351.     }
  352. }