In this series of posts I discuss how we do CQRS and event sourcing. One of our main goals was to reduce the overall complexity of our solution which ultimately led us to the point where we got rid of our database. Please see my previous posts for further details (part 1, part 2, part 3, part 4 and part 5).
In this post we will discuss how events generated by our aggregates are serialized and then stored in the event store. Remember, we do not use any database to store data and thus have to provide our own persistence mechanism.
Note: the code snippets presented in this post represents a simplified version of code found in Lokad.CQRS. This code is used to show the core concepts.
Serializing and deserializing an event
Since we are not going to use a database to store our data we are now on our own. Let’s first choose a serialization format that suits our needs. What do we need?
- Serialization and deserialization should be fast
- The serialized data should be as compact as possible
- The serialization process should be tolerant for changes in the events, e.g. allow us to rename properties of the event or add new properties to the event
Let’s define an interface that provides what we need
We have a method SerializeEvent which accepts an event and returns the serialized event as array of bytes. Of course we then need the counterpart, which does the opposite. The method DeserializeEvent accepts an array of bytes and returns the deserialized event.
Now please note that the SerializeEvent method accepts any event that implements IEvent<IIdentity>. The serialization of the event is no problem, but to be able to deserialize the event we need to know the concrete type of the event we had previously serialized. Thus we need to somehow serialized the type or rather the contract name of the event together with the content of the event. This fact slightly complicates the whole process. But as you will see, it is still no rocket science involved. Each step is simple.
Lets first define a helper class Formatter which contains a contract name of an event and a delegate to serialize and another to deserialize this event
As you can see, the serializer delegate takes an object and serializes it into a stream. The deserializer delegate takes a stream (containing the serialized event) and deserializes its content and returns it as object.
We want to create an instance of Formatter for each event that we have in our system. To get all events we can use code similar to this
The result will be our know event types. Note that line 18 will be evident in a minute.
The Formatter class introduced above is hosted by the EventSerializer class which is responsible for the effective event serialization/deserialization. We inject the known event types via constructor into this class. The EventSerializer takes these known types and creates two dictionaries out of it
- one that gives a formatter instance provided the event type and
- the other gives the event type provided its (contract-) name
We use the RuntimeTypeModel class of the protobuf-net library to get a formatter (the instance that serializes/deserializes the event to an array of bytes). We also use an extension method GetContractName to get the contract name of the event type. It is defined as follows
In the above method we take the namespace of the event from the [DataContract] attribute with which we have to decorate each event in order to make it serializable using the protocol buffer format (see our NewTaskScheduled event).
With all this preparation the actual serialization of the event is quite easy
The method Serialize shown above takes an event instance and its type and serializes it into the given destination stream.
The deserialization is a two step process. First we have the contract name of the event and want to get the corresponding (event-) type
Having this type we can deserialize the event
The above method gets the stream from which it reads the serialized content of the event as well as the event type. The method returns the deserialized event (as object).
That was not so bad, wasn’t it? No magic or rocket science needed so far.
Ok, then we can now discuss the implementation of the IEventStreamer interface that I introduced at the beginning of this section. This class that we will now discuss not only writes the (serialized) content of the event to a stream but also some message contract information (or message header; where an event is a message).
First of all the EventStreamer class uses our EventSerializer
Let’s now show the SerializeEvent method and then discuss the various parts of it.
The method consists of 3 parts
- line 22-27: we use the event serializer class discussed above to serialize the event (=content)
- line 29-36: we serialize a message contract which contains the event type name (=contract name), the length of the content as well as the content position (=messageContractBuffer)
- line 38-45: we open a stream and first write the (serialized) message header contract into it (line 41). Then we append the messageContractBuffer to the stream and finally we append the content to the stream. Last we return the content of the stream (line 44)
The DeserializeEvent method has to do the exact opposite of the above method. Let’s have a look at it
On line 50 we create a memory stream around the buffer containing the serialized data. Then we have again our 3 steps
- line 52-53: read and deserialize the message header contract. From it we get the length of the message header that will be deserialized in step 2
- line 55-58: read and deserialize the message header. From the previous step we know exactly how many bytes we have to read (header.HeaderBytes)
- line 60-65: read and deserialize the event. From step two we know the length of the content and thus how many bytes we have to read (contract.ContentSize).
To be complete I also have to show the MessageContract and the MessageHeaderContract classes. the MessageContract class contains information about the event (i.e. the event).
We specifically need the contract name and the size of the event when it is serialized (the content length). Since the message contract is serialized by using protocol buffer we have decorated it with [DataMember] attributes.
The MessageHeaderContract contains information about the MessageContract, namely the length of the message contract when it is serialized. It also contains logic to write and read itself to a stream. We do not need protocol buffer here since it is trivial and of fixed length (just an long – which is 8 bytes long).
With this we have the basis to be able to store events in the event store and subsequently read them back from the event store when needed. Let’s now look at the event store in detail.
Saving events to the event store
We want to create a file per aggregate instance which contains all events generated by this particular aggregate. Any new event is serialized into an array of bytes (as discussed in the previous section) and then appended to this file. With each event we also store
- the length of the serialized event (the data length),
- the version of the event (starting from 1 for the first event in the life cycle of an aggregate) and
- the hash code of the serialized event to recognize whether the data is corrupt or has been tampered.
Let’s start and create a class that allows us to append an array of bytes (the serialized event) to the file which contains all events of an aggregate. This class has an Append method which accepts as parameter the said array of bytes.
Note that for write operations the file is opened in a mode that allows a single writer but many concurrent readers (line 16).
We use a helper class TapeStreamSerializer to do the actual write operation. Note that in this first draft we always write version = 1 to the file, no matter how many events we already have stored before.
Lets now look into the TapeStreamSerializer class. The WriteRecord method uses a binary serializer to write the record into a memory stream. We also use the SHA1Managed class of the .NET framework to calculate the hash code of the serialized event.
On line 22 to 24 we create a header containing the length of the serialized event (the data array) and write it into the memory stream. Then on line 26 we write the actual data array into the memory stream and on lines 27 to 31 we add a footer section to the stream. The footer contains once again the length of the data array, the version of the event and the hash code computed from the data array.
Once we have everything written to the memory stream we append this data to the file (line 34) and we are done.
The above method uses two simple helper methods to write a 64bit integer (lines 23, 28 and 29) and a hash code (line 30) into the memory stream.
Reading events from the event store
We now want a way to read all existing events of a given aggregate from the event store. For this purpose we implement the ReadRecords method in our FileTapeStream class.
On line 42 we make sure we arrived at the end of the file and no more records can be retrieved. On line 45 we again use the helper class TapeStreamSerializer to do the actual reading of a single event record from the file.
The above method returns an array of TapeRecord items. A TapeRecord item contains the serialized event as well as its version
Let’s now look into the ReadRecord method of the helper class. We basically have to revert the write operation we described earlier. First we try to locate/read and validate the header information (line 72-74). The header has a fixed length, thus we know exactly how many bytes to read. Then we read the data (lines 76 and 77). We know how many bytes we need to read since the data length was stored in the header. Finally we read and verify the footer which is also of fixed length (line 79-92). Specifically we make sure that the stored hash code corresponds to the ad-hoc calculated hash code of the data array (line 83-91).
We use the following helper method to read a 64bit integer
and this one to read the hash code
and finally this one to read and verify a specific signature like e.g. ‘header start’ or ‘footer end’.
In this post we discussed in detail how events generated by aggregates are serialized and the appended to the event store. I also showed how those serialized events can be read from the event store.
In my next post I will discuss how we can integrate this code into our sample application. Stay tuned.