Java的CQRS和事件溯源ES入门:如何从CRUD切换到CQRS/ES - Baeldung


在本教程中,我们将探索命令查询责任隔离(CQRS)和事件源设计模式的基本概念。
虽然通常被称为互补模式,但我们将尝试分别理解它们,并最终了解它们如何相互补充。这些模式通常在企业应用程序中一起使用。在这方面,他们还受益于其他几种企业架构模式。我们将讨论其中的一些内容。有多种工具和框架可帮助采用这些模式,但是我们将使用Java创建一个简单的应用程序以了解基础知识。

事件溯源ES
ES为我们提供了一种新的方式来将应用程序状态保持为事件的有序序列。我们可以有选择地查询这些事件并在任何时间点重建应用程序的状态。当然,要使其工作,我们需要将对应用程序状态的所有更改重新映射为事件:

这些事件是已经发生并且不能更改的事实,换句话说,它们必须是不变的。重新创建应用程序状态只是重播所有事件。
请注意,这还提供了选择性重播事件,反向重播某些事件等的可能性。因此,我们可以将应用程序状态本身视为次要公民,而事件日志则是我们的主要事实来源。

CQRS
CQRS是关于将应用程序体系结构的命令和查询分开。CQRS基于Bertrand Meyer提出的命令查询分离(CQS)原理。CQS建议将对域对象的操作分为两个不同的类别:查询和命令:

查询返回结果,并且不更改系统的可观察状态。命令会更改系统的状态,但不一定会返回值。
我们通过完全分离域模型的Command和Query端来实现这一点。当然,我们可以采取进一步的措施,通过引入一种使数据存储保持同步的机制来拆分数据存储区的写和读侧。

一个简单的应用
我们将从描述一个简单的Java应用程序开始,该应用程序可以构建域模型。
该应用程序将在域模型上提供CRUD操作,并且还将具有域对象的持久性。CRUD代表创建,读取,更新和删除,这是我们可以对域对象执行的基本操作。
在后面的部分中,我们将使用相同的应用程序来介绍事件源和CQRS。
在此过程中,我们将在示例中利用域驱动设计(DDD)中的一些概念
DDD解决了依赖复杂领域特定知识的软件的分析和设计。它基于这样的思想,即软件系统必须基于完善的域模型。EDD Evans首先将DDD规定为模式目录。我们将使用其中一些模式来构建示例。

创建用户配置文件并对其进行管理是许多应用程序中的典型要求。我们将定义一个简单的域模型来捕获用户配置文件以及持久性:


如我们所见,我们的域模型已规范化并公开了几个CRUD操作。这些操作仅用于演示,根据需要可以简单或复杂。而且,这里的持久性存储库可以在内存中,也可以使用数据库。

简单CRUD应用
首先,我们必须创建代表域模型的Java类。这是一个非常简单的领域模型,甚至可能不需要复杂的事件源和CQRS等设计模式。但是,我们将保持简单,着重于了解基础知识:

public class User {
private String userid;
    private String firstName;
    private String lastName;
    private Set<Contact> contacts;
    private Set<Address> addresses;
    // getters and setters
}
 
public class Contact {
    private String type;
    private String detail;
   
// getters and setters
}
 
public class Address {
    private String city;
    private String state;
    private String postcode;
   
// getters and setters
}

同样,我们将为应用程序状态的持久性定义一个简单的内存存储库。当然,这并没有增加任何价值,但足以满足我们稍后的演示要求:

public class UserRepository {
    private Map<String, User> store = new HashMap<>();
}

现在,我们将定义一个服务以在我们的域模型上公开典型的CRUD操作:

public class UserService {
    private UserRepository repository;
    public UserService(UserRepository repository) {
        this.repository = repository;
    }
 
    public void createUser(String userId, String firstName, String lastName) {
        User user = new User(userId, firstName, lastName);
        repository.addUser(userId, user);
    }
 
    public void updateUser(String userId, Set<Contact> contacts, Set<Address> addresses) {
        User user = repository.getUser(userId);
        user.setContacts(contacts);
        user.setAddresses(addresses);
        repository.addUser(userId, user);
    }
 
    public Set<Contact> getContactByType(String userId, String contactType) {
        User user = repository.getUser(userId);
        Set<Contact> contacts = user.getContacts();
        return contacts.stream()
          .filter(c -> c.getType().equals(contactType))
          .collect(Collectors.toSet());
    }
 
    public Set<Address> getAddressByRegion(String userId, String state) {
        User user = repository.getUser(userId);
        Set<Address> addresses = user.getAddresses();
        return addresses.stream()
          .filter(a -> a.getState().equals(state))
          .collect(Collectors.toSet());
    }
}

这几乎是我们设置简单CRUD应用程序所要做的。这远非生产就绪型代码,但它揭示了一些我们在本教程后面将要讨论的重要点。

CRUD应用程序中的问题
在继续进行与事件源和CQRS的讨论之前,值得讨论当前解决方案中的问题。毕竟,我们将通过应用这些模式来解决相同的问题!
在我们可能在这里注意到的许多问题中,我们只想关注其中两个:

  • 域模型:读写操作是在同一域模型上进行的。尽管对于像这样的简单域模型来说这不是问题,但随着域模型变得复杂,它可能会恶化。我们可能需要优化我们的域模型和基础存储,以适合读写操作的各个需求。
  • 持久性:我们对域对象的持久性仅存储域模型的最新状态。尽管这对于大多数情况已经足够了,但它使某些任务具有挑战性。例如,如果我们必须对域对象如何更改状态进行历史审核,则这里是不可能的。我们必须用一些审核日志来补充我们的解决方案,以实现此目的。

CQRS重构
我们将通过在应用程序中引入CQRS模式来解决上一节中讨论的第一个问题。作为其中的一部分,我们将域模型及其持久性分开以处理写入和读取操作。让我们看看CQRS模式如何重组我们的应用程序:

此图说明了我们打算如何彻底分离应用程序体系结构以进行写入和读取。但是,我们在这里引入了很多新组件,我们必须更好地理解它们。请注意,这些与CQRS并不严格相关,但是CQRS可以从中受益匪浅。

Aggregate聚合/Aggregator聚合器
聚合是域驱动设计(DDD)中描述的一种模式,该模式通过将实体绑定到聚合根逻辑上对不同的实体进行分组。聚合模式提供了实体之间的事务一致性。
CQRS自然受益于聚合模式,该模式将写域模型组合在一起,提供了交易事务保证。聚合通常保持高速缓存状态以提高性能,但是如果没有它,则可以完美地工作。

Projection投影/Projector投射器
投影是另一个大大有利于CQRS的重要模式。投影本质上是指以不同的形状和结构表示领域对象。
这些原始数据投影是只读的,并且经过高度优化,以提供增强的读取体验。我们可能会再次决定缓存预测以提高性能,但这不是必须的。

CQRS写入端
让我们首先实现应用程序的写入端。
我们将从定义所需的命令开始。甲命令是一个意图突变域模型的状态。它是否成功取决于我们配置的业务规则。
让我们看看我们的命令:

public class CreateUserCommand {
    private String userId;
    private String firstName;
    private String lastName;
}
 
public class UpdateUserCommand {
    private String userId;
    private Set<Address> addresses;
    private Set<Contact> contacts;
}

这些是非常简单的类,用于保存我们打算改变的数据。
接下来,我们定义一个聚合器,负责接收和处理命令。聚合可以接受或拒绝命令:

public class UserAggregate {
    private UserWriteRepository writeRepository;
    public UserAggregate(UserWriteRepository repository) {
        this.writeRepository = repository;
    }
 
    public User handleCreateUserCommand(CreateUserCommand command) {
        User user = new User(command.getUserId(), command.getFirstName(), command.getLastName());
        writeRepository.addUser(user.getUserid(), user);
        return user;
    }
 
    public User handleUpdateUserCommand(UpdateUserCommand command) {
        User user = writeRepository.getUser(command.getUserId());
        user.setAddresses(command.getAddresses());
        user.setContacts(command.getContacts());
        writeRepository.addUser(user.getUserid(), user);
        return user;
    }
}

聚合使用存储库来检索当前状态并保留对它的任何更改。而且,它可以在本地存储当前状态,以避免在处理每个命令时往返存储库的开销。
最后,我们需要一个存储库来保存域模型的状态。这通常是数据库或其他持久性存储,但是在这里我们将简单地将它们替换为内存中的数据结构:

public class UserWriteRepository {
    private Map<String, User> store = new HashMap<>();
    // accessors and mutators
}

至此,我们的应用程序的写入方面结束了。

CQRS读取端
现在让我们切换到应用程序的读取端。我们将从定义域模型的读取端开始:

public class UserAddress {
    private Map<String, Set<Address>> addressByRegion = new HashMap<>();
}
 
public class UserContact {
    private Map<String, Set<Contact>> contactByType = new HashMap<>();
}

接下来,我们将定义读取存储库。同样,我们将只使用内存中的数据结构,即使这将在实际应用程序中提供更持久的数据存储:

public class UserReadRepository {
    private Map<String, UserAddress> userAddress = new HashMap<>();
    private Map<String, UserContact> userContact = new HashMap<>();
    // accessors and mutators
}

现在,我们将定义我们必须支持的必需查询。查询是为了获取数据的意图-它不一定会导致数据生成:

public class ContactByTypeQuery {
    private String userId;
    private String contactType;
}
 
public class AddressByRegionQuery {
    private String userId;
    private String state;
}

同样,这些是保存数据以定义查询的简单Java类。
我们现在需要的是可以处理以下查询的投影:
public class UserProjection {
    private UserReadRepository readRepository;
    public UserProjection(UserReadRepository readRepository) {
        this.readRepository = readRepository;
    }
 
    public Set<Contact> handle(ContactByTypeQuery query) {
        UserContact userContact = readRepository.getUserContact(query.getUserId());
        return userContact.getContactByType()
          .get(query.getContactType());
    }
 
    public Set<Address> handle(AddressByRegionQuery query) {
        UserAddress userAddress = readRepository.getUserAddress(query.getUserId());
        return userAddress.getAddressByRegion()
          .get(query.getState());
    }
}

这里的投影使用我们之前定义的读取存储库来解决我们所拥有的查询。这几乎也总结了我们应用程序的读取方面。

CQRS同步读写数据
这个难题的一个难题仍然没有解决:没有什么可以使我们的读写存储库同步。这是我们需要投影器的地方。一个投影机投射写域模型到读取域模型的逻辑。有很多更复杂的方法可以解决此问题,但我们将使其保持相对简单:

public class UserProjector {
    UserReadRepository readRepository = new UserReadRepository();
    public UserProjector(UserReadRepository readRepository) {
        this.readRepository = readRepository;
    }
 
    public void project(User user) {
        UserContact userContact = Optional.ofNullable(
          readRepository.getUserContact(user.getUserid()))
            .orElse(new UserContact());
        Map<String, Set<Contact>> contactByType = new HashMap<>();
        for (Contact contact : user.getContacts()) {
            Set<Contact> contacts = Optional.ofNullable(
              contactByType.get(contact.getType()))
                .orElse(new HashSet<>());
            contacts.add(contact);
            contactByType.put(contact.getType(), contacts);
        }
        userContact.setContactByType(contactByType);
        readRepository.addUserContact(user.getUserid(), userContact);
 
        UserAddress userAddress = Optional.ofNullable(
          readRepository.getUserAddress(user.getUserid()))
            .orElse(new UserAddress());
        Map<String, Set<Address>> addressByRegion = new HashMap<>();
        for (Address address : user.getAddresses()) {
            Set<Address> addresses = Optional.ofNullable(
              addressByRegion.get(address.getState()))
                .orElse(new HashSet<>());
            addresses.add(address);
            addressByRegion.put(address.getState(), addresses);
        }
        userAddress.setAddressByRegion(addressByRegion);
        readRepository.addUserAddress(user.getUserid(), userAddress);
    }
}

这是执行此操作的一种非常粗略的方法,但可以使我们对CQRS正常运行所需的功能有足够的了解。而且,没有必要将读写存储库放在不同的物理存储中。分布式系统有其自身的问题!
请注意,将写入域的当前状态投影到不同的读取域模型并不方便。我们在这里采取的示例非常简单,因此我们看不到问题所在。
但是,随着读写模型变得越来越复杂,投影将变得越来越困难。我们可以通过基于事件的投影来解决此问题,而不是通过事件搜索来解决基于状态的投影。我们将在本教程的后面部分中介绍如何实现此目的。
我们讨论了CQRS模式,并学习了如何在典型应用程序中引入它。我们一直试图解决与域模型在处理读取和写入时的刚性相关的问题。
现在让我们讨论CQRS带给应用程序体系结构的其他一些好处:
  • CQRS为我们提供了一种方便的方式来选择适用于写入和读取操作的单独域模型;我们不必创建支持两者的复杂域模型
  • 它可以帮助我们选择适合于处理读写操作复杂性的存储库,例如写入的高吞吐量和读取的低延迟
  • 它通过提供关注点分离和更简单的域模型,自然地补充了分布式体系结构中基于事件的编程模型

但是,这不是免费的。从这个简单的示例可以明显看出,CQRS为体系结构增加了相当大的复杂性。在许多情况下,它可能不合适或不值得付出痛苦:
  • 只有复杂的领域模型才能从此模式增加的复杂性中受益;一个简单的域模型可以在没有所有这些的情况下进行管理
  • 自然地在某种程度上导致代码重复,这与它带来的收益相比是可以接受的。但是,建议个人判断
  • 分开的存储库会导致一致性问题,并且很难始终保持写入和读取存储库的完美同步。我们经常必须为最终的一致性做好准备

事件溯源
接下来,我们将解决在简单应用程序中讨论的第二个问题。回想一下,它与我们的持久性存储库有关。
我们将介绍事件源来解决此问题。事件源极大地改变了我们对应用程序状态存储的看法。

在这里,我们已经构建了存储库,以存储域事件的有序列表。域对象的每次更改都被视为事件。事件的粗略程度应该是域设计的问题。这里要考虑的重要事项是事件具有时间顺序并且是不可变的。
事件驱动的应用程序中的基本对象是事件,事件源无异。正如我们之前所看到的,事件表示在特定时间点域模型状态的特定变化。因此,我们将从为简单应用程序定义基本事件开始:

public abstract class Event {
    public final UUID id = UUID.randomUUID();
    public final Date created = new Date();
}

这只是确保我们在应用程序中生成的每个事件都具有唯一的标识和创建的时间戳。这些是进一步处理它们所必需的。
当然,可能还有其他一些我们可能感兴趣的属性,例如用于建立事件来源的属性。
接下来,让我们创建一些从该基本事件继承的特定于域的事件:

public class UserCreatedEvent extends Event {
    private String userId;
    private String firstName;
    private String lastName;
}
 
public class UserContactAddedEvent extends Event {
    private String contactType;
    private String contactDetails;
}
 
public class UserContactRemovedEvent extends Event {
    private String contactType;
    private String contactDetails;
}
 
public class UserAddressAddedEvent extends Event {
    private String city;
    private String state;
    private String postCode;
}
 
public class UserAddressRemovedEvent extends Event {
    private String city;
    private String state;
    private String postCode;
}

这些是Java中包含域事件详细信息的简单POJO。但是,这里要注意的重要事项是事件的粒度。
我们可以为用户更新创建一个事件,但是相反,我们决定创建单独的事件来添加和删除地址及联系方式。选择被映射到使域模型更有效的方式。
现在,自然地,我们需要一个存储库来保存我们的域事件:
public class EventStore {
    private Map<String, List<Event>> store = new HashMap<>();
}

这是一个简单的内存数据结构,用于保存我们的域事件。实际上,有几种专门创建的用于处理事件数据的解决方案,例如Apache Druid。有许多能够处理事件源的通用分布式数据存储,包括KafkaCassandra

生产和消费事件
因此,现在我们处理所有CRUD操作的服务将发生变化。现在,它将更新域事件,而不是更新移动域的状态。它还将使用相同的域事件来响应查询。

public class UserService {
    private EventStore repository;
    public UserService(EventStore repository) {
        this.repository = repository;
    }
 
    public void createUser(String userId, String firstName, String lastName) {
        repository.addEvent(userId, new UserCreatedEvent(userId, firstName, lastName));
    }
 
    public void updateUser(String userId, Set<Contact> contacts, Set<Address> addresses) {
        User user = UserUtility.recreateUserState(repository, userId);
        user.getContacts().stream()
          .filter(c -> !contacts.contains(c))
          .forEach(c -> repository.addEvent(
            userId, new UserContactRemovedEvent(c.getType(), c.getDetail())));
        contacts.stream()
          .filter(c -> !user.getContacts().contains(c))
          .forEach(c -> repository.addEvent(
            userId, new UserContactAddedEvent(c.getType(), c.getDetail())));
        user.getAddresses().stream()
          .filter(a -> !addresses.contains(a))
          .forEach(a -> repository.addEvent(
            userId, new UserAddressRemovedEvent(a.getCity(), a.getState(), a.getPostcode())));
        addresses.stream()
          .filter(a -> !user.getAddresses().contains(a))
          .forEach(a -> repository.addEvent(
            userId, new UserAddressAddedEvent(a.getCity(), a.getState(), a.getPostcode())));
    }
 
    public Set<Contact> getContactByType(String userId, String contactType) {
        User user = UserUtility.recreateUserState(repository, userId);
        return user.getContacts().stream()
          .filter(c -> c.getType().equals(contactType))
          .collect(Collectors.toSet());
    }
 
    public Set<Address> getAddressByRegion(String userId, String state) throws Exception {
        User user = UserUtility.recreateUserState(repository, userId);
        return user.getAddresses().stream()
          .filter(a -> a.getState().equals(state))
          .collect(Collectors.toSet());
    }
}

请注意,作为此处处理更新用户操作的一部分,我们将生成多个事件。另外,有趣的是要注意我们如何通过重播到目前为止生成的所有域事件来生成域模型的当前状态。
当然,在实际的应用程序中,这不是可行的策略,我们必须维护本地缓存以避免每次生成状态。事件存储库中还有其他策略(例如快照和聚合)可以加快此过程。
到此结束了我们在简单应用程序中引入事件溯源的工作。

事件溯源好处和缺点
现在,我们已经成功采用了使用事件源存储域对象的另一种方法。事件源是一种强大的模式,如果使用得当,它将为应用程序体系结构带来很多好处:

  • 由于不需要读取,更新和写入,因此使写入操作快得多;写只是将事件附加到日志
  • 消除了对象关系阻抗,从而消除了对复杂映射工具的需求;当然,我们仍然需要重新创建对象
  • 恰好提供作为副产品的审核日志,这是完全可靠的;我们可以准确调试域模型的状态如何变化
  • 它使支持时态查询和实现时间旅行成为可能(过去某个时间点的域状态)!
  • 很自然地适合设计微服务架构中的松耦合组件,这些组件通过交换消息进行异步通信

但是,像往常一样,即使事件源也不是万灵丹。它确实迫使我们采用截然不同的方式来存储数据。在某些情况下,这可能没有用:
  • 有一个相关的学习曲线,并且采用事件源需要思维方式的转变。首先是不直观的
  • 除非我们将状态保留在本地缓存中,否则它使处理典型查询变得相当困难,因为我们需要重新创建状态
  • 尽管它可以应用于任何领域模型,但它更适合事件驱动的体系结构中基于事件的模型

事件溯源的CQRS
既然我们已经了解了如何将事件源和CQRS分别引入到我们的简单应用程序中,是时候将它们组合在一起了。现在应该很直观,因为这些模式可以相互受益。但是,在本节中我们将使其更加明确。
首先让我们看看应用程序体系结构如何将它们组合在一起:

到目前为止,这应该不足为奇。我们已将存储库的写端替换为事件存储,而存储库的读端仍然相同。
请注意,这不是在应用程序体系结构中使用事件源和CQRS的唯一方法。我们可以非常有创意,可以将这些模式与其他模式一起使用,并提供几种架构选择。
这里重要的是确保我们使用它们来管理复杂性,而不是简单地进一步增加复杂性!
我们将从引入CQRS的应用程序开始,然后进行相关更改以使事件源变得更加重要。我们还将利用在引入事件源的应用程序中定义的相同事件和事件存储。
只有几处更改。我们将首先更改聚合以生成事件,而不是更新state:

public class UserAggregate {
    private EventStore writeRepository;
    public UserAggregate(EventStore repository) {
        this.writeRepository = repository;
    }
 
    public List<Event> handleCreateUserCommand(CreateUserCommand command) {
        UserCreatedEvent event = new UserCreatedEvent(command.getUserId(), 
          command.getFirstName(), command.getLastName());
        writeRepository.addEvent(command.getUserId(), event);
        return Arrays.asList(event);
    }
 
    public List<Event> handleUpdateUserCommand(UpdateUserCommand command) {
        User user = UserUtility.recreateUserState(writeRepository, command.getUserId());
        List<Event> events = new ArrayList<>();
 
        List<Contact> contactsToRemove = user.getContacts().stream()
          .filter(c -> !command.getContacts().contains(c))
          .collect(Collectors.toList());
        for (Contact contact : contactsToRemove) {
            UserContactRemovedEvent contactRemovedEvent = new UserContactRemovedEvent(contact.getType(), 
              contact.getDetail());
            events.add(contactRemovedEvent);
            writeRepository.addEvent(command.getUserId(), contactRemovedEvent);
        }
        List<Contact> contactsToAdd = command.getContacts().stream()
          .filter(c -> !user.getContacts().contains(c))
          .collect(Collectors.toList());
        for (Contact contact : contactsToAdd) {
            UserContactAddedEvent contactAddedEvent = new UserContactAddedEvent(contact.getType(), 
              contact.getDetail());
            events.add(contactAddedEvent);
            writeRepository.addEvent(command.getUserId(), contactAddedEvent);
        }
 
        // similarly process addressesToRemove
       
// similarly process addressesToAdd
 
        return events;
    }
}

唯一需要进行的其他更改是在投影仪中,它现在需要处理事件而不是域对象状态:

public class UserProjector {
    UserReadRepository readRepository = new UserReadRepository();
    public UserProjector(UserReadRepository readRepository) {
        this.readRepository = readRepository;
    }
 
    public void project(String userId, List<Event> events) {
        for (Event event : events) {
            if (event instanceof UserAddressAddedEvent)
                apply(userId, (UserAddressAddedEvent) event);
            if (event instanceof UserAddressRemovedEvent)
                apply(userId, (UserAddressRemovedEvent) event);
            if (event instanceof UserContactAddedEvent)
                apply(userId, (UserContactAddedEvent) event);
            if (event instanceof UserContactRemovedEvent)
                apply(userId, (UserContactRemovedEvent) event);
        }
    }
 
    public void apply(String userId, UserAddressAddedEvent event) {
        Address address = new Address(
          event.getCity(), event.getState(), event.getPostCode());
        UserAddress userAddress = Optional.ofNullable(
          readRepository.getUserAddress(userId))
            .orElse(new UserAddress());
        Set<Address> addresses = Optional.ofNullable(userAddress.getAddressByRegion()
          .get(address.getState()))
          .orElse(new HashSet<>());
        addresses.add(address);
        userAddress.getAddressByRegion()
          .put(address.getState(), addresses);
        readRepository.addUserAddress(userId, userAddress);
    }
 
    public void apply(String userId, UserAddressRemovedEvent event) {
        Address address = new Address(
          event.getCity(), event.getState(), event.getPostCode());
        UserAddress userAddress = readRepository.getUserAddress(userId);
        if (userAddress != null) {
            Set<Address> addresses = userAddress.getAddressByRegion()
              .get(address.getState());
            if (addresses != null)
                addresses.remove(address);
            readRepository.addUserAddress(userId, userAddress);
        }
    }
 
    public void apply(String userId, UserContactAddedEvent event) {
        // Similarly handle UserContactAddedEvent event
    }
 
    public void apply(String userId, UserContactRemovedEvent event) {
       
// Similarly handle UserContactRemovedEvent event
    }
}

如果我们回想起在处理基于状态的投影时所讨论的问题,那么这可能是一种解决方案。
基于事件的投影相当方便且易于实现。我们要做的就是处理所有发生的领域事件,并将它们应用于所有读取的领域模型。通常,在基于事件的应用程序中,投影仪将收听其感兴趣的领域事件,而不依赖于直接调用它的人。
这是我们在简单的应用程序中将事件源和CQRS整合在一起的全部工作。

详细点击标题,可以在GitHub上找到本文的源代码。

banq评:该文基于贫血领域模型,将原属于充血领域模型的逻辑放到UserProjector中,这是其最大问题,不遵循DDD情况下的CQRS/ES无疑复杂化传统的微服务架构,不值得推荐。但是作为了解入门可以学习。