vendor/pimcore/data-hub-simple-rest/src/Queue/QueueService.php line 39

Open in your IDE?
  1. <?php
  2. /**
  3.  * Pimcore
  4.  *
  5.  * This source file is available under following license:
  6.  * - Pimcore Commercial License (PCL)
  7.  *
  8.  *  @copyright  Copyright (c) Pimcore GmbH (http://www.pimcore.org)
  9.  *  @license    http://www.pimcore.org/license     PCL
  10.  */
  11. namespace Pimcore\Bundle\DataHubSimpleRestBundle\Queue;
  12. use Carbon\Carbon;
  13. use Doctrine\DBAL\Exception\TableNotFoundException;
  14. use Pimcore\Db;
  15. class QueueService
  16. {
  17.     const QUEUE_TABLE_NAME 'bundle_data_hub_rest_index_queue';
  18.     /**
  19.      * @return Db\Connection|Db\ConnectionInterface
  20.      */
  21.     protected function getDb()
  22.     {
  23.         return Db::get();
  24.     }
  25.     protected function getCurrentQueueTableOperationTime(): int
  26.     {
  27.         /** @var Carbon $carbonNow */
  28.         $carbonNow Carbon::now();
  29.         return (int)($carbonNow->getTimestamp() . str_pad((string)$carbonNow->milli3'0'));
  30.     }
  31.     public function addItemToQueue($elementIdstring $entityTypestring $configName null)
  32.     {
  33.         try {
  34.             $this->getDb()->executeQuery(sprintf(
  35.                 'INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE timestamp = VALUES(timestamp)',
  36.                 self::QUEUE_TABLE_NAME,
  37.                 implode(',', ['elementId''timestamp''configName''entityType']),
  38.                 implode(',', [
  39.                     $elementId,
  40.                     $this->getCurrentQueueTableOperationTime(),
  41.                     $this->getDb()->quote($configName),
  42.                     $this->getDb()->quote($entityType)
  43.                 ])
  44.             ));
  45.         } catch (TableNotFoundException $exception) {
  46.             $this->createQueueTableIfNotExisting(function () use ($elementId$entityType$configName) {
  47.                 $this->addItemToQueue($elementId$entityType$configName);
  48.             });
  49.         }
  50.     }
  51.     protected function createQueueTableIfNotExisting(\Closure $callable null)
  52.     {
  53.         $this->getDb()->executeQuery(sprintf('CREATE TABLE IF NOT EXISTS %s (
  54.             elementId bigint NOT NULL,
  55.             timestamp bigint NULL,
  56.             configName varchar(50) NOT NULL,
  57.             entityType varchar(50) NOT NULL,
  58.             PRIMARY KEY (elementId, configName, entityType),
  59.             KEY `bundle_index_queue_configName_index` (`configName`))
  60.         'self::QUEUE_TABLE_NAME));
  61.         if ($callable) {
  62.             return $callable();
  63.         }
  64.     }
  65.     public function getAllQueueEntries($limit 100000): array
  66.     {
  67.         try {
  68.             $results $this->getDb()->fetchAll(
  69.                 sprintf('SELECT * FROM %s'self::QUEUE_TABLE_NAME)
  70.             );
  71.             return $results ?? [];
  72.         } catch (TableNotFoundException $exception) {
  73.             return $this->createQueueTableIfNotExisting(function () use ($limit) {
  74.                 return $this->getAllQueueEntries($limit);
  75.             });
  76.         }
  77.     }
  78.     /**
  79.      * @return int
  80.      */
  81.     public function getQueueItemCount(): int
  82.     {
  83.         try {
  84.             return $this->getDb()->fetchOne(
  85.                 sprintf('SELECT count(*) as count FROM %s'self::QUEUE_TABLE_NAME)
  86.             ) ?? 0;
  87.         } catch (TableNotFoundException $exception) {
  88.             return $this->createQueueTableIfNotExisting(function () {
  89.                 return $this->getQueueItemCount();
  90.             });
  91.         }
  92.     }
  93.     public function markQueueEntryAsProcessed($elementId$entityType$configName)
  94.     {
  95.         try {
  96.             $this->getDb()->executeQuery(
  97.                 sprintf('DELETE FROM %s WHERE elementId = ? AND entityType = ? AND configName = ?'self::QUEUE_TABLE_NAME),
  98.                 [$elementId$entityType$configName]
  99.             );
  100.         } catch (TableNotFoundException $exception) {
  101.             $this->createQueueTableIfNotExisting();
  102.         }
  103.     }
  104. }