Skip to content

Commit

Permalink
Merge pull request #14 from neos/feature/12-deterministic-charset-and…
Browse files Browse the repository at this point in the history
…-collation

FEATURE: Deterministic charset and collation
  • Loading branch information
bwaidelich authored Jan 19, 2024
2 parents c984151 + 447864b commit a65fdbc
Showing 1 changed file with 64 additions and 34 deletions.
98 changes: 64 additions & 34 deletions src/DoctrineEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@
use Doctrine\DBAL\Exception\DeadlockException;
use Doctrine\DBAL\Exception\LockWaitTimeoutException;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\SqlitePlatform;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Column;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\SchemaConfig;
use Doctrine\DBAL\Schema\Table;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Helper\BatchEventStream;
use Neos\EventStore\Model\Event;
use Neos\EventStore\Model\Event\CausationId;
use Neos\EventStore\Model\Event\CorrelationId;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventType;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\Event\StreamName;
use Neos\EventStore\Model\Event\Version;
Expand Down Expand Up @@ -161,11 +169,11 @@ private function determineRequiredSqlStatements(): array
$platform = $this->connection->getDatabasePlatform();
assert($platform !== null);
if (!$schemaManager->tablesExist($this->eventTableName)) {
return $platform->getCreateTableSQL($this->createEventStoreSchema()->getTable($this->eventTableName));
return $platform->getCreateTableSQL($this->createEventStoreSchema($schemaManager)->getTable($this->eventTableName));
}
$tableSchema = $schemaManager->listTableDetails($this->eventTableName);
$fromSchema = new Schema([$tableSchema], [], $schemaManager->createSchemaConfig());
$schemaDiff = (new Comparator())->compare($fromSchema, $this->createEventStoreSchema());
$schemaDiff = (new Comparator())->compare($fromSchema, $this->createEventStoreSchema($schemaManager));
return $schemaDiff->toSaveSql($platform);
}

Expand All @@ -176,44 +184,66 @@ private function determineRequiredSqlStatements(): array
*
* @return Schema
*/
private function createEventStoreSchema(): Schema
private function createEventStoreSchema(AbstractSchemaManager $schemaManager): Schema
{
$schemaConfiguration = new SchemaConfig();
$connectionParameters = $this->connection->getParams();
if (isset($connectionParameters['defaultTableOptions'])) {
assert(is_array($connectionParameters['defaultTableOptions']));
$schemaConfiguration->setDefaultTableOptions($connectionParameters['defaultTableOptions']);
}
$schema = new Schema([], [], $schemaConfiguration);
$table = $schema->createTable($this->eventTableName);
$isSQLite = $schemaManager->getDatabasePlatform() instanceof SqlitePlatform;
$table = new Table($this->eventTableName, [
// The monotonic sequence number
(new Column('sequencenumber', Type::getType($isSQLite ? Types::INTEGER : Types::BIGINT)))
->setUnsigned(true)
->setAutoincrement(true),

// The stream name, usually in the format "<BoundedContext>:<StreamName>"
(new Column('stream', Type::getType(Types::STRING)))
->setLength(StreamName::MAX_LENGTH)
->setCustomSchemaOptions($isSQLite ? [] : ['charset' => 'ascii']),

// Version of the event in the respective stream
(new Column('version', Type::getType($isSQLite ? Types::INTEGER : Types::BIGINT)))
->setUnsigned(true),

// The event type, often in the format "<BoundedContext>:<EventType>"
(new Column('type', Type::getType(Types::STRING)))
->setLength(EventType::MAX_LENGTH)
->setCustomSchemaOptions($isSQLite ? [] : ['charset' => 'ascii']),

// The event payload, usually stored as JSON
(new Column('payload', Type::getType(Types::TEXT))),

// The monotonic sequence number
$table->addColumn('sequencenumber', Types::INTEGER, ['autoincrement' => true]);
// The stream name, usually in the format "<BoundedContext>:<StreamName>"
$table->addColumn('stream', Types::STRING, ['length' => 255]);
// Version of the event in the respective stream
$table->addColumn('version', Types::BIGINT, ['unsigned' => true]);
// The event type in the format "<BoundedContext>:<EventType>"
$table->addColumn('type', Types::STRING, ['length' => 255]);
// The event payload as JSON
$table->addColumn('payload', Types::TEXT);
// The event metadata as JSON
$table->addColumn('metadata', Types::TEXT, ['notnull' => false]);
// The unique event id, usually a UUID
$table->addColumn('id', Types::STRING, ['length' => 255]);
// An optional correlation id, usually a UUID
$table->addColumn('correlationid', Types::STRING, ['length' => 255, 'notnull' => false]);
// An optional causation id, usually a UUID
$table->addColumn('causationid', Types::STRING, ['length' => 255, 'notnull' => false]);
// Timestamp of the the event publishing
$table->addColumn('recordedat', Types::DATETIME_IMMUTABLE);
// The event metadata stored as JSON
(new Column('metadata', Type::getType(Types::JSON)))
->setNotnull(false),

// The unique event id, stored as UUID
(new Column('id', Type::getType(Types::STRING)))
->setFixed(true)
->setLength(EventId::MAX_LENGTH)
->setCustomSchemaOptions($isSQLite ? [] : ['charset' => 'ascii']),

// An optional causation id, usually a UUID
(new Column('causationid', Type::getType(Types::STRING)))
->setNotnull(false)
->setLength(CausationId::MAX_LENGTH)
->setCustomSchemaOptions($isSQLite ? [] : ['charset' => 'ascii']),

// An optional correlation id, usually a UUID
(new Column('correlationid', Type::getType(Types::STRING)))
->setNotnull(false)
->setLength(CorrelationId::MAX_LENGTH)
->setCustomSchemaOptions($isSQLite ? [] : ['charset' => 'ascii']),

// Timestamp of the event publishing
(new Column('recordedat', Type::getType(Types::DATETIME_IMMUTABLE))),
]);

$table->setPrimaryKey(['sequencenumber']);
$table->addUniqueIndex(['id']);
$table->addUniqueIndex(['stream', 'version']);
$table->addIndex(['correlationid']);

return $schema;
$schemaConfiguration = $schemaManager->createSchemaConfig();
$schemaConfiguration->setDefaultTableOptions(['charset' => 'utf8mb4']);
return new Schema([$table], [], $schemaConfiguration);
}

/**
Expand Down Expand Up @@ -254,7 +284,7 @@ private function commitEvent(StreamName $streamName, Event $event, Version $vers
'recordedat' => $this->clock->now(),
],
[
'version' => Types::INTEGER,
'version' => Types::BIGINT,
'recordedat' => Types::DATETIME_IMMUTABLE,
]
);
Expand Down

0 comments on commit a65fdbc

Please sign in to comment.