src/Cms/CoreBundle/Service/OneRosterService.php line 187

Open in your IDE?
  1. <?php
  2. namespace Cms\CoreBundle\Service;
  3. use App\Doctrine\ORM\Filters\DiscardableFilter;
  4. use App\Util\Errors;
  5. use Cms\CoreBundle\Entity\AbstractOneRosterEntity;
  6. use Cms\CoreBundle\Entity\OneRosterJob;
  7. use Cms\CoreBundle\Entity\OneRosterLog;
  8. use Cms\CoreBundle\Entity\OneRosterSync;
  9. use Cms\CoreBundle\Events\OneRosterEvents;
  10. use Cms\CoreBundle\Model\Interfaces\OneRosterable\OneRosterableInterface;
  11. use Cms\CoreBundle\Service\OneRoster\AbstractOneRosterApi;
  12. use Cms\CoreBundle\Util\DateTimeUtils;
  13. use Cms\CoreBundle\Util\Doctrine\EntityManager;
  14. use DateTime;
  15. use DateTimeZone;
  16. use Doctrine\DBAL\Types\Types;
  17. use Platform\QueueBundle\Model\AsyncMessage;
  18. use Platform\QueueBundle\Service\AsyncQueueService;
  19. use Ramsey\Uuid\Uuid;
  20. /**
  21.  * Class OneRosterService
  22.  * @package Cms\CoreBundle\Service
  23.  */
  24. final class OneRosterService
  25. {
  26.     // DI
  27.     protected EntityManager $em;
  28.     protected AsyncQueueService $async;
  29.     /**
  30.      * Holder for the API instances.
  31.      * Each sync gets their own API instance, so need to track them separately.
  32.      *
  33.      * @var array|AbstractOneRosterApi[]
  34.      */
  35.     protected array $apis = [];
  36.     /**
  37.      * @param EntityManager $em
  38.      * @param AsyncQueueService $async
  39.      */
  40.     public function __construct(EntityManager $emAsyncQueueService $async)
  41.     {
  42.         $this->em $em;
  43.         $this->async $async;
  44.     }
  45.     /**
  46.      * @param OneRosterSync|OneRosterJob $thing
  47.      * @return AbstractOneRosterApi
  48.      */
  49.     public function api($thing): AbstractOneRosterApi
  50.     {
  51.         // get sync if we are just a job
  52.         if ($thing instanceof OneRosterJob) {
  53.             $thing $thing->getSync();
  54.         }
  55.         // validate input
  56.         if ( ! $thing instanceof OneRosterSync) {
  57.             throw new \Exception();
  58.         }
  59.         // see if cached
  60.         $index $this->index($thing);
  61.         if ( ! array_key_exists($index$this->apis)) {
  62.             // not cached, make a new instance
  63.             $this->apis[$index] = AbstractOneRosterApi::factory($thing);
  64.         }
  65.         // attempt to connect and return the api service
  66.         return $this->apis[$index]->connect();
  67.     }
  68.     /**
  69.      * @param OneRosterSync $sync
  70.      * @return string
  71.      */
  72.     protected function index(OneRosterSync $sync): string
  73.     {
  74.         return sprintf(
  75.             '%s:%s:%s',
  76.             $sync->getId(),
  77.             $sync->getApiTokenTokenType(),
  78.             $sync->getApiTokenAccessToken()
  79.         );
  80.     }
  81.     /**
  82.      * Takes data to generate a CRC code for.
  83.      * This is used to better fine-tune change tracking in data syncs.
  84.      *
  85.      * @param array $data
  86.      * @return int
  87.      */
  88.     public function checksum(array $data): int
  89.     {
  90.         // fields that we don't want to include in the calculation
  91.         static $ignore = [
  92.             'dateLastModified',
  93.         ];
  94.         // get rid of ignored fields
  95.         foreach ($ignore as $filter) {
  96.             if (array_key_exists($filter$data)) {
  97.                 unset($data[$filter]);
  98.             }
  99.         }
  100.         // generate the check string to pass into the crc function
  101.         // simply just serialize the data into a json string
  102.         // finally calc the crc
  103.         return crc32(json_encode($data));
  104.     }
  105.     /**
  106.      * Used to trigger all runnable syncs in the system.
  107.      *
  108.      * @return array|OneRosterJob[]
  109.      */
  110.     public function run(): array
  111.     {
  112.         // get all the syncs that are able to tun
  113.         $syncs $this->em->getRepository(OneRosterSync::class)->findRunnable();
  114.         // sync each and save off their jobs
  115.         $jobs = [];
  116.         foreach ($syncs as $sync) {
  117.             $jobs[] = $this->queue($sync);
  118.         }
  119.         return $jobs;
  120.     }
  121.     /**
  122.      * @param OneRosterJob $job
  123.      */
  124.     public function validate(OneRosterJob $job): void
  125.     {
  126.         // TODO
  127.         return;
  128.     }
  129.     /**
  130.      * @param OneRosterSync $sync
  131.      * @param int $phase
  132.      * @return OneRosterJob|null
  133.      */
  134.     public function queue(OneRosterSync $syncint $phase OneRosterJob::PHASES__INIT): ?OneRosterJob
  135.     {
  136.         // TODO: make sure a job can be created, like there is not already a running job?
  137.         // generate a job for this new sync
  138.         $this->em->save(
  139.             $job = (new OneRosterJob())
  140.                 ->setSync($sync)
  141.                 ->setStartPhase($phase)
  142.         );
  143.         if ($sync->getSchedule()) {
  144.             // ensure that the next run date here is at least some number of hours past the current date/time
  145.             $sync
  146.                 ->setNextRunAt(true, (new DateTime())->modify('+2 hours'), new DateTimeZone('UTC'));
  147.         }
  148.         $this->em->save(
  149.             $sync
  150.                 ->setSemaphore(null)
  151.                 ->setLastRunAt(new DateTime()),
  152.         );
  153.         try {
  154.             // make sure the job is legit
  155.             $this->validate($job);
  156.             // init
  157.             $this->orchestrateJobInit($job$phase);
  158.             // start a transaction
  159.             // this should be mq-safe because a lock on the table should prevent subsequent updates from firing until the lock is released
  160.             $this->em->transactional(
  161.                 function () use ($job$phase) {
  162.                     // fin
  163.                     // doing the fin tracking first because if we did the reverse order, this method failing would mess things up as the mq tasks would be committed already
  164.                     $this->orchestrateJobFin($job);
  165.                     // simply generate a message to start the chain of tasks that will ultimately send the notifications
  166.                     $this->async->send(
  167.                         null,
  168.                         new AsyncMessage(
  169.                             $job,
  170.                             OneRosterEvents::PHASES[$phase],
  171.                             [
  172.                                 'job' => $job->getId(),
  173.                             ],
  174.                             AsyncMessage::PRIORITY__NORMAL,
  175.                         )
  176.                     );
  177.                 }
  178.             );
  179.         } catch (\Exception $e) {
  180.             $this->orchestrateJobError($job$e);
  181.         }
  182.         return $job;
  183.     }
  184.     /**
  185.      * Used when a job has been pulled and is started to be synced.
  186.      *
  187.      * @param OneRosterJob $job
  188.      * @param int $phase
  189.      * @return bool
  190.      * @internal
  191.      */
  192.     protected function orchestrateJobInit(OneRosterJob $jobint $phase): bool
  193.     {
  194.         return boolval($this->em->createQueryBuilder()
  195.             ->update(OneRosterJob::class, 'job')
  196.             // job should be marked as sending
  197.             ->set('job.status'OneRosterJob::STATUSES__RUNNING)
  198.             // set the init phase as ready
  199.             ->set('job.phasesReady''BIT_OR(job.phasesReady, :phase)')
  200.             ->setParameter('phase'$phase)
  201.             // IMPORTANT: must always filter by job id since we are doing a bulk update query!
  202.             ->andWhere('job.id = :job')
  203.             ->setParameter('job'$job->getId())
  204.             ->getQuery()
  205.             ->execute()
  206.         );
  207.     }
  208.     /**
  209.      * @param OneRosterJob $job
  210.      * @return bool
  211.      * @internal
  212.      */
  213.     protected function orchestrateJobFin(OneRosterJob $job): bool
  214.     {
  215.         return boolval(
  216.             $this->em->createQueryBuilder()
  217.                 ->update(OneRosterJob::class, 'job')
  218.                 // we have added a single task to the mq
  219.                 ->set('job.semaphore''(job.semaphore + 1)')
  220.                 // IMPORTANT: must always filter by job id since we are doing a bulk update query!
  221.                 ->andWhere('job.id = :job')
  222.                 ->setParameter('job'$job->getId())
  223.                 ->getQuery()
  224.                 ->execute()
  225.         );
  226.     }
  227.     /**
  228.      * @param OneRosterJob $job
  229.      * @param \Throwable $e
  230.      * @return bool
  231.      * @internal
  232.      */
  233.     protected function orchestrateJobError(OneRosterJob $job\Throwable $e): bool
  234.     {
  235.         return boolval($this->em->createQueryBuilder()
  236.             ->update(OneRosterJob::class, 'job')
  237.             // this is a hard error that should stop the entire job, update the status
  238.             ->set('job.status'':status')
  239.             ->setParameter('status'OneRosterJob::STATUSES__ERROR)
  240.             // set the primary error details
  241.             ->set('job.error'':error')
  242.             ->setParameter('error'Errors::jsonSerialize($e), Types::JSON)
  243.             // IMPORTANT: must always filter by job id since we are doing a bulk update query!
  244.             ->andWhere('job.id = :job')
  245.             ->setParameter('job'$job->getId())
  246.             ->getQuery()
  247.             ->execute()
  248.         );
  249.     }
  250.     /**
  251.      * Used when a phase for a job is started to be processed.
  252.      *
  253.      * @param OneRosterJob $job
  254.      * @param int $phase
  255.      * @return bool
  256.      * @internal
  257.      */
  258.     public function orchestratePhaseInit(OneRosterJob $jobint $phase): bool
  259.     {
  260.         return boolval(
  261.             $this->em->createQueryBuilder()
  262.                 ->update(OneRosterJob::class, 'job')
  263.                 // flag this channel as having started
  264.                 ->set('job.phasesRunning''BIT_OR(job.phasesRunning, :phase)')
  265.                 ->setParameter('phase'$phase)
  266.                 // set the first activity timestamp if needed
  267.                 ->set('job.firstActivityAt''CASE WHEN job.firstActivityAt IS NULL THEN NOW() ELSE job.firstActivityAt END')
  268.                 // set the last activity timestamp
  269.                 ->set('job.lastActivityAt''NOW()')
  270.                 // IMPORTANT: must always filter by job id since we are doing a bulk update query!
  271.                 ->andWhere('job.id = :job')
  272.                 ->setParameter('job'$job->getId())
  273.                 ->getQuery()
  274.                 ->execute()
  275.         );
  276.     }
  277.     /**
  278.      * Used when a channel processor has determined the subset of tasks that are needed to fulfill the channels.
  279.      * This can be a set of social accounts that need posted to, departments needing alerts, or various transactional contact types.
  280.      *
  281.      * @param OneRosterJob $job
  282.      * @param int $count
  283.      * @return bool
  284.      * @internal
  285.      */
  286.     public function orchestratePhaseChange(OneRosterJob $jobint $count): bool
  287.     {
  288.         return boolval($this->em->createQueryBuilder()
  289.             ->update(OneRosterJob::class, 'job')
  290.             // fiddle with the semaphore
  291.             ->set('job.semaphore''(job.semaphore + :sem)')
  292.             ->setParameter('sem'$count)
  293.             // set the last activity timestamp
  294.             ->set('job.lastActivityAt''NOW()')
  295.             // IMPORTANT: must always filter by job id since we are doing a bulk update query!
  296.             ->andWhere('job.id = :job')
  297.             ->setParameter('job'$job->getId())
  298.             ->getQuery()
  299.             ->execute()
  300.         );
  301.     }
  302.     /**
  303.      * Used when a channel processor has determined the subset of tasks that are needed to fulfill the channels.
  304.      * This can be a set of social accounts that need posted to, departments needing alerts, or various transactional contact types.
  305.      *
  306.      * @param OneRosterJob $job
  307.      * @param int $count
  308.      * @return bool
  309.      * @internal
  310.      */
  311.     public function orchestratePhaseRevert(OneRosterJob $jobint $count): bool
  312.     {
  313.         return boolval($this->em->createQueryBuilder()
  314.             ->update(OneRosterJob::class, 'job')
  315.             // fiddle with the semaphore, since we are undoing we should substract
  316.             ->set('job.semaphore''(job.semaphore - :sem)')
  317.             ->setParameter('sem'$count)
  318.             // set the last activity timestamp
  319.             ->set('job.lastActivityAt''NOW()')
  320.             // IMPORTANT: must always filter by job id since we are doing a bulk update query!
  321.             ->andWhere('job.id = :job')
  322.             ->setParameter('job'$job->getId())
  323.             ->getQuery()
  324.             ->execute()
  325.         );
  326.     }
  327.     /**
  328.      * Used when a channel processor has determined the subset of tasks that are needed to fulfill the channels.
  329.      * This can be a set of social accounts that need posted to, departments needing alerts, or various transactional contact types.
  330.      *
  331.      * @param OneRosterJob $job
  332.      * @param int $phase
  333.      * @return bool
  334.      * @internal
  335.      */
  336.     public function orchestratePhaseFin(OneRosterJob $jobint $phase): bool
  337.     {
  338.         return boolval($this->em->createQueryBuilder()
  339.             ->update(OneRosterJob::class, 'job')
  340.             // set the last activity timestamp
  341.             ->set('job.lastActivityAt''NOW()')
  342.             // flag this channel as having finished
  343.             ->set('job.phasesComplete''CASE WHEN (job.semaphore + :sem) = 0 THEN BIT_OR(job.phasesComplete, :phase) ELSE job.phasesComplete END')
  344.             ->setParameter('phase'$phase)
  345.             // remove the current running flag
  346.             ->set('job.phasesRunning''CASE WHEN (job.semaphore + :sem) = 0 THEN BIT_AND(job.phasesRunning, BIT_NOT(:phase)) ELSE job.phasesRunning END')
  347.             // we are now done, so can dec the semaphore
  348.             // only do this if we are not triggering another phase after us
  349.             // IMPORTANT: this comes after the above checks as modifying this earlier can lead to weird behavior because the queries reference a changed value
  350.             ->set('job.semaphore''(job.semaphore + :sem)')
  351.             ->setParameter('sem', -1)
  352.             // IMPORTANT: must always filter by job id since we are doing a bulk update query!
  353.             ->andWhere('job.id = :job')
  354.             ->setParameter('job'$job->getId())
  355.             ->getQuery()
  356.             ->execute()
  357.         );
  358.     }
  359.     /**
  360.      * Used when a channel processor has determined the subset of tasks that are needed to fulfill the channels.
  361.      * This can be a set of social accounts that need posted to, departments needing alerts, or various transactional contact types.
  362.      *
  363.      * @param OneRosterJob $job
  364.      * @param int $phase
  365.      * @param int|null $next
  366.      * @return bool
  367.      * @internal
  368.      */
  369.     public function orchestratePhaseConditionalInit(OneRosterJob $jobint $phase, ?int $next): bool
  370.     {
  371.         if ( ! $next) {
  372.             return false;
  373.         }
  374.         return boolval($this->em->createQueryBuilder()
  375.             ->update(OneRosterJob::class, 'job')
  376.             // go ahead and increment the semaphore as we should be throwing a new mq message in the mix
  377.             ->set('job.semaphore''CASE WHEN BIT_AND(job.phasesComplete, :phase) > 0 AND BIT_AND(job.phasesReady, :next) = 0 THEN (job.semaphore + 1) ELSE job.semaphore END')
  378.             ->setParameter('phase'$phase)
  379.             ->setParameter('next'$next)
  380.             // update the phases ready if it is applicable
  381.             // this should return zero rows changed if another thread has already handled this
  382.             // IMPORTANT: this comes after the above checks as modifying this earlier can lead to weird behavior because the queries reference a changed value
  383.             ->set('job.phasesReady''CASE WHEN BIT_AND(job.phasesComplete, :phase) > 0 AND BIT_AND(job.phasesReady, :next) = 0 THEN BIT_OR(job.phasesReady, :next) ELSE job.phasesReady END')
  384.             // IMPORTANT: must always filter by job id since we are doing a bulk update query!
  385.             ->andWhere('job.id = :job')
  386.             ->setParameter('job'$job->getId())
  387.             ->getQuery()
  388.             ->execute()
  389.         );
  390.     }
  391.     /**
  392.      * @param OneRosterJob $job
  393.      * @param int $phase
  394.      * @param string $event
  395.      * @param string $type
  396.      * @param string|AbstractOneRosterEntity $entity
  397.      * @param \Throwable $issue
  398.      */
  399.     public function logIssue(OneRosterJob $jobint $phasestring $eventstring $type$entity\Throwable $issue): void
  400.     {
  401.         // generate the object here so we can re-use it
  402.         $log = (new OneRosterLog())
  403.             ->setJob($job)
  404.             ->setPhase($phase)
  405.             ->setEvent($event)
  406.             ->setType($type)
  407.             ->setExternalId(($entity instanceof AbstractOneRosterEntity) ? $entity->getSourcedId() : $entity)
  408.             ->setMessage($issue->getMessage() ?: null)
  409.             ->setError(Errors::jsonSerialize($issue));
  410.         // catch errors in an attempt to force the log entry with raw sql
  411.         // this is more complex and harder to maintain, but helps to track really pesky errors
  412.         try {
  413.             // try to save, if the em is closed this will fail...
  414.             $this->em->save($log);
  415.         } catch (\Exception $e) {
  416.             // likely the entity manager was closed, attempt to do a raw sql insert...
  417.             try {
  418.                 $metadata $this->em->getClassMetadata(OneRosterLog::class);
  419.                 $now DateTimeUtils::now()->format('Y-m-d H:i:s');
  420.                 $params array_filter([
  421.                     $metadata->getColumnName('tenant') => $job->getTenant()->getId(),
  422.                     // TODO: do we use v1 or v4 uuids?
  423.                     $metadata->getColumnName('uid') => Uuid::uuid1()->toString(),
  424.                     $metadata->getColumnName('createdAt') => $now,
  425.                     $metadata->getColumnName('touchedAt') => $now,
  426.                     $metadata->getColumnName('job') => $log->getJob()->getId(),
  427.                     $metadata->getColumnName('phase') => $log->getPhase(),
  428.                     $metadata->getColumnName('event') => $log->getEvent(),
  429.                     $metadata->getColumnName('type') => $log->getType(),
  430.                     $metadata->getColumnName('externalId') => $log->getExternalId(),
  431.                     $metadata->getColumnName('message') => $log->getMessage(),
  432.                     $metadata->getColumnName('error') => json_encode($log->getError() ?: []),
  433.                 ]);
  434.                 $params array_filter($params);
  435.                 if (json_last_error() !== JSON_ERROR_NONE) {
  436.                     throw new \Exception();
  437.                 }
  438.                 $this->em->getConnection()->executeStatement(
  439.                     sprintf(
  440.                         'INSERT INTO `%s` (`%s`) VALUES (%s)',
  441.                         $metadata->getTableName(),
  442.                         implode('`, `'array_keys($params)),
  443.                         implode(', 'array_pad([], count($params), '?'))
  444.                     ),
  445.                     array_combine(
  446.                         range(1count($params)),
  447.                         array_values($params)
  448.                     )
  449.                 );
  450.             } catch (\Exception $ee) {
  451.                 // noop
  452.                 // at this point we'll just let it be; fail silently so that things will continue on...
  453.             }
  454.         }
  455.     }
  456.     /**
  457.      * @param OneRosterSync $oneRosterSync
  458.      * @return void
  459.      */
  460.     public function delete(OneRosterSync $oneRosterSync): void
  461.     {
  462.         $this->em->wrapInTransaction(
  463.             function (EntityManager $em) use ($oneRosterSync) {
  464.                 $em->getFilters()->disable(DiscardableFilter::FILTER);
  465.                 $tenant $oneRosterSync->getTenant();
  466.                 // delete oneRosterSync entity
  467.                 $em->delete($oneRosterSync);
  468.                 // clean up all OneRosterable entities
  469.                 $entityClasses = [];
  470.                 $metas $em->getMetadataFactory()->getAllMetadata();
  471.                 foreach ($metas as $meta) {
  472.                     $entityClasses[] = $meta->getName();
  473.                 }
  474.                 foreach ($entityClasses as $entityClass) {
  475.                     $reflect = new \ReflectionClass($entityClass);
  476.                     if ( ! $reflect->isAbstract() && $reflect->implementsInterface(OneRosterableInterface::class)) {
  477.                         $this->em->createQueryBuilder()
  478.                             ->update($entityClass'e')
  479.                             ->set('e.onerosterId'':onerosterId')
  480.                             ->set('e.onerosterArchived'':onerosterArchived')
  481.                             ->andWhere('e.tenant = :tenant')
  482.                             ->setParameter('onerosterId'null)
  483.                             ->setParameter('onerosterArchived'false)
  484.                             ->setParameter('tenant'$tenant)
  485.                             ->getQuery()
  486.                             ->execute();
  487.                     }
  488.                 }
  489.                 $em->getFilters()->enable(DiscardableFilter::FILTER);
  490.             }
  491.         );
  492.     }
  493. }