<?php
/**
* Pimcore
*
* This source file is available under following license:
* - Pimcore Commercial License (PCL)
*
* @copyright Copyright (c) Pimcore GmbH (http://www.pimcore.org)
* @license http://www.pimcore.org/license PCL
*/
namespace Pimcore\Bundle\DataHubSimpleRestBundle\Queue;
use Carbon\Carbon;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Pimcore\Db;
class QueueService
{
const QUEUE_TABLE_NAME = 'bundle_data_hub_rest_index_queue';
/**
* @return Db\Connection|Db\ConnectionInterface
*/
protected function getDb()
{
return Db::get();
}
protected function getCurrentQueueTableOperationTime(): int
{
/** @var Carbon $carbonNow */
$carbonNow = Carbon::now();
return (int)($carbonNow->getTimestamp() . str_pad((string)$carbonNow->milli, 3, '0'));
}
public function addItemToQueue($elementId, string $entityType, string $configName = null)
{
try {
$this->getDb()->executeQuery(sprintf(
'INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE timestamp = VALUES(timestamp)',
self::QUEUE_TABLE_NAME,
implode(',', ['elementId', 'timestamp', 'configName', 'entityType']),
implode(',', [
$elementId,
$this->getCurrentQueueTableOperationTime(),
$this->getDb()->quote($configName),
$this->getDb()->quote($entityType)
])
));
} catch (TableNotFoundException $exception) {
$this->createQueueTableIfNotExisting(function () use ($elementId, $entityType, $configName) {
$this->addItemToQueue($elementId, $entityType, $configName);
});
}
}
protected function createQueueTableIfNotExisting(\Closure $callable = null)
{
$this->getDb()->executeQuery(sprintf('CREATE TABLE IF NOT EXISTS %s (
elementId bigint NOT NULL,
timestamp bigint NULL,
configName varchar(50) NOT NULL,
entityType varchar(50) NOT NULL,
PRIMARY KEY (elementId, configName, entityType),
KEY `bundle_index_queue_configName_index` (`configName`))
', self::QUEUE_TABLE_NAME));
if ($callable) {
return $callable();
}
}
public function getAllQueueEntries($limit = 100000): array
{
try {
$results = $this->getDb()->fetchAll(
sprintf('SELECT * FROM %s', self::QUEUE_TABLE_NAME)
);
return $results ?? [];
} catch (TableNotFoundException $exception) {
return $this->createQueueTableIfNotExisting(function () use ($limit) {
return $this->getAllQueueEntries($limit);
});
}
}
/**
* @return int
*/
public function getQueueItemCount(): int
{
try {
return $this->getDb()->fetchOne(
sprintf('SELECT count(*) as count FROM %s', self::QUEUE_TABLE_NAME)
) ?? 0;
} catch (TableNotFoundException $exception) {
return $this->createQueueTableIfNotExisting(function () {
return $this->getQueueItemCount();
});
}
}
public function markQueueEntryAsProcessed($elementId, $entityType, $configName)
{
try {
$this->getDb()->executeQuery(
sprintf('DELETE FROM %s WHERE elementId = ? AND entityType = ? AND configName = ?', self::QUEUE_TABLE_NAME),
[$elementId, $entityType, $configName]
);
} catch (TableNotFoundException $exception) {
$this->createQueueTableIfNotExisting();
}
}
}