<?php
namespace Cms\CoreBundle\Service;
use App\Doctrine\ORM\Filters\DiscardableFilter;
use App\Util\Errors;
use Cms\CoreBundle\Entity\AbstractOneRosterEntity;
use Cms\CoreBundle\Entity\OneRosterJob;
use Cms\CoreBundle\Entity\OneRosterLog;
use Cms\CoreBundle\Entity\OneRosterSync;
use Cms\CoreBundle\Events\OneRosterEvents;
use Cms\CoreBundle\Model\Interfaces\OneRosterable\OneRosterableInterface;
use Cms\CoreBundle\Service\OneRoster\AbstractOneRosterApi;
use Cms\CoreBundle\Util\DateTimeUtils;
use Cms\CoreBundle\Util\Doctrine\EntityManager;
use DateTime;
use DateTimeZone;
use Doctrine\DBAL\Types\Types;
use Platform\QueueBundle\Model\AsyncMessage;
use Platform\QueueBundle\Service\AsyncQueueService;
use Ramsey\Uuid\Uuid;
/**
* Class OneRosterService
* @package Cms\CoreBundle\Service
*/
final class OneRosterService
{
// DI
protected EntityManager $em;
protected AsyncQueueService $async;
/**
* Holder for the API instances.
* Each sync gets their own API instance, so need to track them separately.
*
* @var array|AbstractOneRosterApi[]
*/
protected array $apis = [];
/**
* @param EntityManager $em
* @param AsyncQueueService $async
*/
public function __construct(EntityManager $em, AsyncQueueService $async)
{
$this->em = $em;
$this->async = $async;
}
/**
* @param OneRosterSync|OneRosterJob $thing
* @return AbstractOneRosterApi
*/
public function api($thing): AbstractOneRosterApi
{
// get sync if we are just a job
if ($thing instanceof OneRosterJob) {
$thing = $thing->getSync();
}
// validate input
if ( ! $thing instanceof OneRosterSync) {
throw new \Exception();
}
// see if cached
$index = $this->index($thing);
if ( ! array_key_exists($index, $this->apis)) {
// not cached, make a new instance
$this->apis[$index] = AbstractOneRosterApi::factory($thing);
}
// attempt to connect and return the api service
return $this->apis[$index]->connect();
}
/**
* @param OneRosterSync $sync
* @return string
*/
protected function index(OneRosterSync $sync): string
{
return sprintf(
'%s:%s:%s',
$sync->getId(),
$sync->getApiTokenTokenType(),
$sync->getApiTokenAccessToken()
);
}
/**
* Takes data to generate a CRC code for.
* This is used to better fine-tune change tracking in data syncs.
*
* @param array $data
* @return int
*/
public function checksum(array $data): int
{
// fields that we don't want to include in the calculation
static $ignore = [
'dateLastModified',
];
// get rid of ignored fields
foreach ($ignore as $filter) {
if (array_key_exists($filter, $data)) {
unset($data[$filter]);
}
}
// generate the check string to pass into the crc function
// simply just serialize the data into a json string
// finally calc the crc
return crc32(json_encode($data));
}
/**
* Used to trigger all runnable syncs in the system.
*
* @return array|OneRosterJob[]
*/
public function run(): array
{
// get all the syncs that are able to tun
$syncs = $this->em->getRepository(OneRosterSync::class)->findRunnable();
// sync each and save off their jobs
$jobs = [];
foreach ($syncs as $sync) {
$jobs[] = $this->queue($sync);
}
return $jobs;
}
/**
* @param OneRosterJob $job
*/
public function validate(OneRosterJob $job): void
{
// TODO
return;
}
/**
* @param OneRosterSync $sync
* @param int $phase
* @return OneRosterJob|null
*/
public function queue(OneRosterSync $sync, int $phase = OneRosterJob::PHASES__INIT): ?OneRosterJob
{
// TODO: make sure a job can be created, like there is not already a running job?
// generate a job for this new sync
$this->em->save(
$job = (new OneRosterJob())
->setSync($sync)
->setStartPhase($phase)
);
if ($sync->getSchedule()) {
// ensure that the next run date here is at least some number of hours past the current date/time
$sync
->setNextRunAt(true, (new DateTime())->modify('+2 hours'), new DateTimeZone('UTC'));
}
$this->em->save(
$sync
->setSemaphore(null)
->setLastRunAt(new DateTime()),
);
try {
// make sure the job is legit
$this->validate($job);
// init
$this->orchestrateJobInit($job, $phase);
// start a transaction
// this should be mq-safe because a lock on the table should prevent subsequent updates from firing until the lock is released
$this->em->transactional(
function () use ($job, $phase) {
// fin
// doing the fin tracking first because if we did the reverse order, this method failing would mess things up as the mq tasks would be committed already
$this->orchestrateJobFin($job);
// simply generate a message to start the chain of tasks that will ultimately send the notifications
$this->async->send(
null,
new AsyncMessage(
$job,
OneRosterEvents::PHASES[$phase],
[
'job' => $job->getId(),
],
AsyncMessage::PRIORITY__NORMAL,
)
);
}
);
} catch (\Exception $e) {
$this->orchestrateJobError($job, $e);
}
return $job;
}
/**
* Used when a job has been pulled and is started to be synced.
*
* @param OneRosterJob $job
* @param int $phase
* @return bool
* @internal
*/
protected function orchestrateJobInit(OneRosterJob $job, int $phase): bool
{
return boolval($this->em->createQueryBuilder()
->update(OneRosterJob::class, 'job')
// job should be marked as sending
->set('job.status', OneRosterJob::STATUSES__RUNNING)
// set the init phase as ready
->set('job.phasesReady', 'BIT_OR(job.phasesReady, :phase)')
->setParameter('phase', $phase)
// IMPORTANT: must always filter by job id since we are doing a bulk update query!
->andWhere('job.id = :job')
->setParameter('job', $job->getId())
->getQuery()
->execute()
);
}
/**
* @param OneRosterJob $job
* @return bool
* @internal
*/
protected function orchestrateJobFin(OneRosterJob $job): bool
{
return boolval(
$this->em->createQueryBuilder()
->update(OneRosterJob::class, 'job')
// we have added a single task to the mq
->set('job.semaphore', '(job.semaphore + 1)')
// IMPORTANT: must always filter by job id since we are doing a bulk update query!
->andWhere('job.id = :job')
->setParameter('job', $job->getId())
->getQuery()
->execute()
);
}
/**
* @param OneRosterJob $job
* @param \Throwable $e
* @return bool
* @internal
*/
protected function orchestrateJobError(OneRosterJob $job, \Throwable $e): bool
{
return boolval($this->em->createQueryBuilder()
->update(OneRosterJob::class, 'job')
// this is a hard error that should stop the entire job, update the status
->set('job.status', ':status')
->setParameter('status', OneRosterJob::STATUSES__ERROR)
// set the primary error details
->set('job.error', ':error')
->setParameter('error', Errors::jsonSerialize($e), Types::JSON)
// IMPORTANT: must always filter by job id since we are doing a bulk update query!
->andWhere('job.id = :job')
->setParameter('job', $job->getId())
->getQuery()
->execute()
);
}
/**
* Used when a phase for a job is started to be processed.
*
* @param OneRosterJob $job
* @param int $phase
* @return bool
* @internal
*/
public function orchestratePhaseInit(OneRosterJob $job, int $phase): bool
{
return boolval(
$this->em->createQueryBuilder()
->update(OneRosterJob::class, 'job')
// flag this channel as having started
->set('job.phasesRunning', 'BIT_OR(job.phasesRunning, :phase)')
->setParameter('phase', $phase)
// set the first activity timestamp if needed
->set('job.firstActivityAt', 'CASE WHEN job.firstActivityAt IS NULL THEN NOW() ELSE job.firstActivityAt END')
// set the last activity timestamp
->set('job.lastActivityAt', 'NOW()')
// IMPORTANT: must always filter by job id since we are doing a bulk update query!
->andWhere('job.id = :job')
->setParameter('job', $job->getId())
->getQuery()
->execute()
);
}
/**
* Used when a channel processor has determined the subset of tasks that are needed to fulfill the channels.
* This can be a set of social accounts that need posted to, departments needing alerts, or various transactional contact types.
*
* @param OneRosterJob $job
* @param int $count
* @return bool
* @internal
*/
public function orchestratePhaseChange(OneRosterJob $job, int $count): bool
{
return boolval($this->em->createQueryBuilder()
->update(OneRosterJob::class, 'job')
// fiddle with the semaphore
->set('job.semaphore', '(job.semaphore + :sem)')
->setParameter('sem', $count)
// set the last activity timestamp
->set('job.lastActivityAt', 'NOW()')
// IMPORTANT: must always filter by job id since we are doing a bulk update query!
->andWhere('job.id = :job')
->setParameter('job', $job->getId())
->getQuery()
->execute()
);
}
/**
* Used when a channel processor has determined the subset of tasks that are needed to fulfill the channels.
* This can be a set of social accounts that need posted to, departments needing alerts, or various transactional contact types.
*
* @param OneRosterJob $job
* @param int $count
* @return bool
* @internal
*/
public function orchestratePhaseRevert(OneRosterJob $job, int $count): bool
{
return boolval($this->em->createQueryBuilder()
->update(OneRosterJob::class, 'job')
// fiddle with the semaphore, since we are undoing we should substract
->set('job.semaphore', '(job.semaphore - :sem)')
->setParameter('sem', $count)
// set the last activity timestamp
->set('job.lastActivityAt', 'NOW()')
// IMPORTANT: must always filter by job id since we are doing a bulk update query!
->andWhere('job.id = :job')
->setParameter('job', $job->getId())
->getQuery()
->execute()
);
}
/**
* Used when a channel processor has determined the subset of tasks that are needed to fulfill the channels.
* This can be a set of social accounts that need posted to, departments needing alerts, or various transactional contact types.
*
* @param OneRosterJob $job
* @param int $phase
* @return bool
* @internal
*/
public function orchestratePhaseFin(OneRosterJob $job, int $phase): bool
{
return boolval($this->em->createQueryBuilder()
->update(OneRosterJob::class, 'job')
// set the last activity timestamp
->set('job.lastActivityAt', 'NOW()')
// flag this channel as having finished
->set('job.phasesComplete', 'CASE WHEN (job.semaphore + :sem) = 0 THEN BIT_OR(job.phasesComplete, :phase) ELSE job.phasesComplete END')
->setParameter('phase', $phase)
// remove the current running flag
->set('job.phasesRunning', 'CASE WHEN (job.semaphore + :sem) = 0 THEN BIT_AND(job.phasesRunning, BIT_NOT(:phase)) ELSE job.phasesRunning END')
// we are now done, so can dec the semaphore
// only do this if we are not triggering another phase after us
// IMPORTANT: this comes after the above checks as modifying this earlier can lead to weird behavior because the queries reference a changed value
->set('job.semaphore', '(job.semaphore + :sem)')
->setParameter('sem', -1)
// IMPORTANT: must always filter by job id since we are doing a bulk update query!
->andWhere('job.id = :job')
->setParameter('job', $job->getId())
->getQuery()
->execute()
);
}
/**
* Used when a channel processor has determined the subset of tasks that are needed to fulfill the channels.
* This can be a set of social accounts that need posted to, departments needing alerts, or various transactional contact types.
*
* @param OneRosterJob $job
* @param int $phase
* @param int|null $next
* @return bool
* @internal
*/
public function orchestratePhaseConditionalInit(OneRosterJob $job, int $phase, ?int $next): bool
{
if ( ! $next) {
return false;
}
return boolval($this->em->createQueryBuilder()
->update(OneRosterJob::class, 'job')
// go ahead and increment the semaphore as we should be throwing a new mq message in the mix
->set('job.semaphore', 'CASE WHEN BIT_AND(job.phasesComplete, :phase) > 0 AND BIT_AND(job.phasesReady, :next) = 0 THEN (job.semaphore + 1) ELSE job.semaphore END')
->setParameter('phase', $phase)
->setParameter('next', $next)
// update the phases ready if it is applicable
// this should return zero rows changed if another thread has already handled this
// IMPORTANT: this comes after the above checks as modifying this earlier can lead to weird behavior because the queries reference a changed value
->set('job.phasesReady', 'CASE WHEN BIT_AND(job.phasesComplete, :phase) > 0 AND BIT_AND(job.phasesReady, :next) = 0 THEN BIT_OR(job.phasesReady, :next) ELSE job.phasesReady END')
// IMPORTANT: must always filter by job id since we are doing a bulk update query!
->andWhere('job.id = :job')
->setParameter('job', $job->getId())
->getQuery()
->execute()
);
}
/**
* @param OneRosterJob $job
* @param int $phase
* @param string $event
* @param string $type
* @param string|AbstractOneRosterEntity $entity
* @param \Throwable $issue
*/
public function logIssue(OneRosterJob $job, int $phase, string $event, string $type, $entity, \Throwable $issue): void
{
// generate the object here so we can re-use it
$log = (new OneRosterLog())
->setJob($job)
->setPhase($phase)
->setEvent($event)
->setType($type)
->setExternalId(($entity instanceof AbstractOneRosterEntity) ? $entity->getSourcedId() : $entity)
->setMessage($issue->getMessage() ?: null)
->setError(Errors::jsonSerialize($issue));
// catch errors in an attempt to force the log entry with raw sql
// this is more complex and harder to maintain, but helps to track really pesky errors
try {
// try to save, if the em is closed this will fail...
$this->em->save($log);
} catch (\Exception $e) {
// likely the entity manager was closed, attempt to do a raw sql insert...
try {
$metadata = $this->em->getClassMetadata(OneRosterLog::class);
$now = DateTimeUtils::now()->format('Y-m-d H:i:s');
$params = array_filter([
$metadata->getColumnName('tenant') => $job->getTenant()->getId(),
// TODO: do we use v1 or v4 uuids?
$metadata->getColumnName('uid') => Uuid::uuid1()->toString(),
$metadata->getColumnName('createdAt') => $now,
$metadata->getColumnName('touchedAt') => $now,
$metadata->getColumnName('job') => $log->getJob()->getId(),
$metadata->getColumnName('phase') => $log->getPhase(),
$metadata->getColumnName('event') => $log->getEvent(),
$metadata->getColumnName('type') => $log->getType(),
$metadata->getColumnName('externalId') => $log->getExternalId(),
$metadata->getColumnName('message') => $log->getMessage(),
$metadata->getColumnName('error') => json_encode($log->getError() ?: []),
]);
$params = array_filter($params);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new \Exception();
}
$this->em->getConnection()->executeStatement(
sprintf(
'INSERT INTO `%s` (`%s`) VALUES (%s)',
$metadata->getTableName(),
implode('`, `', array_keys($params)),
implode(', ', array_pad([], count($params), '?'))
),
array_combine(
range(1, count($params)),
array_values($params)
)
);
} catch (\Exception $ee) {
// noop
// at this point we'll just let it be; fail silently so that things will continue on...
}
}
}
/**
* @param OneRosterSync $oneRosterSync
* @return void
*/
public function delete(OneRosterSync $oneRosterSync): void
{
$this->em->wrapInTransaction(
function (EntityManager $em) use ($oneRosterSync) {
$em->getFilters()->disable(DiscardableFilter::FILTER);
$tenant = $oneRosterSync->getTenant();
// delete oneRosterSync entity
$em->delete($oneRosterSync);
// clean up all OneRosterable entities
$entityClasses = [];
$metas = $em->getMetadataFactory()->getAllMetadata();
foreach ($metas as $meta) {
$entityClasses[] = $meta->getName();
}
foreach ($entityClasses as $entityClass) {
$reflect = new \ReflectionClass($entityClass);
if ( ! $reflect->isAbstract() && $reflect->implementsInterface(OneRosterableInterface::class)) {
$this->em->createQueryBuilder()
->update($entityClass, 'e')
->set('e.onerosterId', ':onerosterId')
->set('e.onerosterArchived', ':onerosterArchived')
->andWhere('e.tenant = :tenant')
->setParameter('onerosterId', null)
->setParameter('onerosterArchived', false)
->setParameter('tenant', $tenant)
->getQuery()
->execute();
}
}
$em->getFilters()->enable(DiscardableFilter::FILTER);
}
);
}
}