CQRS (Command Query Responsibility Segregation)

CQRS (Command Query Responsibility Segregation)

Ý tường của CQRS là tách độc lập cho việc xử lý dữ liệu đọc và ghi trên luồng và database riêng biệt với mục đích tăng hiệu năng của hệ thống, có một số ý kiến cho rằng có thể dùng replica database cho ‘query database’, nhưng ta cần lưu lý là replica database sẽ chia sẻ tài nguyên như transaction với primary database, do vậy nó sẽ không đúng với nguyên tắc của CQRS. Mọi người có thể tham khảo bài viết này

Trong các bài viết về pattern mình lưu ý là các bài viết đặt trong ngữ cảnh áp dụng cho các RESTful API được implement trên Spring Boot. Mã nguồn trong các bài viết về pattern với mục đích show demo, việc giải thích chi tiết code sẽ được giới thiệu tại các bài viết trong mục 6. CRUD ổn định/hiệu năng cao

  • command đại diện cho các REST api POST/PUT/PATCH/DELETE
  • query đại diện cho REST api GET

Khi triển khai trên các database độc lập như hình vẽ trên ta thấy sơ đồ tuần tự của chương trình là

[1] client gửi command request vào app

[2] command hanlder sẽ lưu trữ dữ liệu vào command database

[3] nếu lưu trữ thành công sẽ gửi đến một service bus

Service bus đại diện cho một luồng stream dữ liệu, các service khác có thể produce và subscribe vào bus này, implement cho service bus có thể dùng Kafka/rabitmq, v.v..

Command có nên tạo connection đến query database và lưu trực tiếp dữ liệu vào đó không? Theo mình là không nên làm như vậy, nó sẽ vi phạm tính độc lập của Command và Query, hơn nữa việc kết nối và ghi dữ liệu vào query database sẽ tốn thời gian hơn việc gửi một message vào service bus.

Đây là đoạn Java code implement cho việc lưu trữ và gửi đến Service bus

@Override
public Mono<ProductResponse> addProduct(String idempotencyKey, ProductCommand command) {

    return Mono.fromCallable(() -> concurrencyLimiter.tryAcquire(0, TimeUnit.MILLISECONDS))
            .subscribeOn(Schedulers.boundedElastic())
            .flatMap(acquired -> {
                if (!acquired) {
                    return Mono.error(new TooManyRequestsException("Too many concurrent product creation " +
                            "operations, please try again later"));
                }

                log.info("Acquired concurrency permit for product creation, available permits: {}",
                        concurrencyLimiter.availablePermits());

                Mono<ProductResponse> saveProduct = Mono.fromCallable(() ->
                                productMapper.toEntity(command))
                        .flatMap(productRepository::save)
                        .as(mono -> databaseRetryUtil
                                .applyRetry(mono, "save product", log)) // #1
                        .map(productMapper::toResponse)
                        .doOnSuccess(response -> {
                            log.info("Product added successfully: {}", response);
                            publishAddEvent(idempotencyKey, response, ProductEventType.PRODUCT_ADDED); // #2
                        });

                Mono<ProductResponse> result;

                if (cachedProductEnabled && idempotencyKey != null) {
                    String cachedProductUriWithIdempotency = cachedProductUri + "idempotency/" + idempotencyKey;
                    log.info("Attempting to fetch product from cache using idempotency key: {}", idempotencyKey);

                    // Attempt to fetch from cache using idempotency key
                    result = fetchFromCache(cachedProductUriWithIdempotency, idempotencyKey)
                            .timeout(Duration.ofSeconds(2)) // Set 2-second timeout
                            .onErrorResume(TimeoutException.class, ex -> {
                                log.warn("Cache request timed out for key: {}", idempotencyKey);
                                return saveProduct;
                            })
                            .onErrorResume(Exception.class, ex -> {
                                log.info("Idempotency key not found in cache (404), proceeding to add product");
                                return saveProduct;
                            })
                            .doOnSuccess(response -> log.info("Product fetched from cache using idempotency key: {}", idempotencyKey));
                } else {
                    log.info("Idempotency key not provided or caching is disabled, " +
                            "proceeding to add product without cache");
                    result = saveProduct;
                }

                return result.doFinally(signal -> {
                    concurrencyLimiter.release();
                    log.info("Released concurrency permit, signal: {}, available permits: {}",
                            signal, concurrencyLimiter.availablePermits());
                });
            });
}

private void publishAddEvent(String idempotency, ProductResponse response, ProductEventType eventType) {
        var event = productMapper.toEvent(response, eventType);
        productEventSupplier.publishProduct(event.withIdempotencyKey(idempotency));
}

#1 lưu dữ liệu vào trong database.

#2 khi lưu thành công sẽ gửi đến Service Bus (Kafka).

[4] và [5] query handler sẽ subscribe vào service bus

Khi có một message mới (sẽ chứa đối tượng Product vừa được add vào Command query), thì query handler sẽ convert message này và lưu vào trong query database.

public class ProductQueryConsumer {

    private final AddProductQueryAction addProductQueryAction;
    private final UpdateProductQueryAction updateProductQueryAction;
    private final RemoveProductQueryAction removeProductQueryAction;
    private final DLQHandler dqlHandler;

    @Bean
    public Consumer<KStream<String, ProductEvent>> processProductQuery() {

        return this::processProductQueries;
    }

    public void processProductQueries(KStream<String, ProductEvent> inputStream) {

        inputStream.peek((key, productEvent) -> {
            log.info("Processing product query: key={}, productId={}, action={}",
                    key, productEvent.uuid(), productEvent.eventType());

            if (productEvent.eventType().equals(ProductEventType.PRODUCT_ADDED)) { // #1
                addProductQueryAction.handle(productEvent.payload()) // #2
                        .onErrorResume(e -> {
                            log.error("Failed to handle PRODUCT_ADDED event for key={}: {}", key, e.getMessage(), e);

                            return dqlHandler.sendToDlq(productEvent, ActionCode.ADD_PRODUCT_QUERY)
                                    .doOnError(dlqError ->
                                            log.error("Failed to send to DLQ for PRODUCT_UPDATED: {}",
                                            dlqError.getMessage(), dlqError))
                                    .then(Mono.empty());
                        })
                        .subscribe();
            }

            if (productEvent.eventType().equals(ProductEventType.PRODUCT_UPDATED)) {
                updateProductQueryAction.handle(productEvent.payload().uuid(), productEvent.payload())
                        .onErrorResume(e -> {
                            log.error("Failed to handle PRODUCT_UPDATED event for key={}: {}", key, e.getMessage(), e);

                            return dqlHandler.sendToDlq(productEvent, ActionCode.UPDATE_PRODUCT_QUERY)
                                    .doOnError(dlqError -> log.error("Failed to send to DLQ for PRODUCT_ADDED: {}", dlqError.getMessage(), dlqError))
                                    .then(Mono.empty());
                        })
                        .subscribe();
            }

            if (productEvent.eventType().equals(ProductEventType.PRODUCT_DELETED)) {
                removeProductQueryAction.handle(productEvent.payload().uuid())
                        .onErrorResume(e -> {
                            log.error("Failed to handle PRODUCT_DELETED event for key={}: {}", key, e.getMessage(), e);

                            return dqlHandler.sendToDlq(productEvent, ActionCode.DELETE_PRODUCT_QUERY)
                                    .doOnError(dlqError -> log.error("Failed to send to DLQ for PRODUCT_ADDED: {}", dlqError.getMessage(), dlqError))
                                    .then(Mono.empty());
                        })
                        .subscribe();
            }

        });
    }

}

#1 kiểm tra loại messgae.

#2 gọi hàm lưu vào query database

[6] và [7] truy vấn dữ liệu trên query database

Luồng service này tương tự như việc implement RESTful service thông thường khi nó lấy dữ liệu từ query database

Tổng kết

  • CQRS giúp tăng cường hiệu năng hệ thống khi tách biệt nghiệp vụ đọc và ghi.
  • Nên triển khai thành các command/query trên các database độc lập không nên dùng replica.
  • Dùng service bus (event-driven) để đồng bộ dữ liệu command và query
  • Trong các dự án thực tế, sẽ triển khai thêm một bước cached khi query dữ liệu tài query command
Written by :

user

Java Developer, System Architect, Learner and becoming Youtuber

View All Posts

Để lại một bình luận

Email của bạn sẽ không được hiển thị công khai. Các trường bắt buộc được đánh dấu *