Paper: CQRS Documents - by Greg Young
Events
Column | Type | Comment |
AggregateId | guid | indexed, fk aggregates |
Data | blob | |
Version | int | unique within aggregate |
(UserId) | guid | |
(Timestamp) | ||
(CorrelationId) | guid | all events point to origin command |
(SequenceNumber) | long | auto-incrementingeventstore as queue |
Aggregates
Column | Type | Comment |
AggregateId | guid | indexed, fk aggregates |
Type | varchar | fully qualified name |
Version | int | denormalized from Events |
Read event
Select * from events where AggregateId =? Order by version
Write event
Begin
version = SELECT version from aggregates where AggregateId = ''
if version is null
Insert into aggregates
version = 0
end
if expectedversion != version
raise concurrency problem
foreach event
insert event with incremented version number
update aggregate with last version number
End Transaction
Interface
public interface IEventStore {
void SaveChanges(Guid AggregateId, int OriginatingVersion, IEnumerable<Event> events);
IEnumerable<Event> GetEventsFor(Guid AggregateId);
}
Snapshots
Column | Type | Comment |
AggregateId | guid | fk aggregates |
SerializedData | blob | |
Version | int | version of the aggregate |
select aggregateId
from snapshots s join aggregates a on a.aggregateId = a.aggregateId
where a.version - s.version > ?
Eventstorage as queue
Column | Type | Comment |
AggregateId | guid | |
Data | blob | |
SequenceNumber | long | auto-incrementing |
Version | int | version of the aggregate |
Why: An issue that exists with many systems publishing events is that they require a two- phase commit between whatever storage they are using (Relational or otherwise) and the publishing of their events to the queue. “a catastrophe could occur during the small period of time between when the write to the data storage commits and when the write to the queue commits”
Solution: Eventstore as queue Because the values are unique and incrementing a secondary process can chase the Events table, publishing the events off to ther queue. The chasing process would simply have to store the value of the sequence number of the last event it had processed, it could even update this value with a two-phase commit bringing the update and the publish to the queue into the same transaction.
Benefits:
- lowers the latency of completing the initial operation
- limit the number of disk writes in the processing of the initial request to on
Disadvantage:
- raises the time until the message is actually published slightly