[Hands-on] Saga Pattern

[Hands-on] Saga Pattern

Ta có bài toán đặt hàng, khi Order đơn hàng, các giao dịch của các dịch vụ Payment, Inventory, Delivery cần đảm bảo hoàn thành ở toàn bộ dịch vụ, nếu ít nhất một dịch vụ có lỗi, các giao dịch đã tiến hành cần phải thực hiện “bù” (compensating/rollback) giao dịch.

Khi triển khai ứng dụng trên theo mô hình Microservice, thách thức đặt ra là quản lý giao dịch (transactions) trong hệ thống phân tán và phải đảm bảo được tính chất ACID. Có một số giải pháp được đưa ra để xử lý vấn đề này như 2PC, nhưng cũng gặp phải những vấn đề về quản lý transaction cho những CSDL noSQL.

SAGA pattern ra đời với ý tưởng mỗi local transaction của dịch vụ được thực hiện sẽ phát ra một tin nhắn/sự kiện (message/event) để kích hoạt giao dịch tiếp theo trong cụm saga.

Cụm saga là nhóm các app được triển khai theo mô hình saga.

Nếu bạn chưa làm việc quen với Kafka Streams, bạn tham khảo trước bài này https://koder.vn/2025/07/23/kafka-streams-trong-spring-boot/

Đây là mã nguồn các bạn có thể tham khảo https://github.com/DogooTech/pub-tuts-repos/tree/master/order-service-saga

Các khái niệm như ACID hay 2PC ai chưa làm qua có thể tìm hiểu thêm tại các nguồn khác.

Có 2 mô hình để triển khai Saga

  • Choreography (vũ đạo) Saga: các local transaction được triển khai tuần tự Order -> Payment -> Inventory -> Delivery, bất kỳ bước nào có lỗi sẽ thực hiện “bù” transaction các bước trước đó.
  • Orchestration (giao hưởng) Saga: các local transaction sẽ gửi message/event đến trung tâm điều phối (gọi là Orchestration), trung tâm này sẽ quyết định hành động tiếp theo.

Trong blog này sẽ hướng dẫn triển khai Saga theo mô hình Orchestration trên mô hình Microservice bằng Spring Boot thông qua các thư viện hỗ trợ như Java Function, Kafka Streams, …

Trong Java có rất nhiều framework hỗ trợ để triển khai Saga như Camunda, Tram, v.v… Nếu ta hiểu được nguyên lý vận hành và sử dụng các thư viện của Java để implement được Saga thì cũng có nhiều lợi ích.

Các kiến thức nền tảng để thực hành implement Saga Pattern sử dụng Spring Boot với Kafka Streams

  • Functional Programming in Java (có nhiều nguồn trên mạng)
  • Kafka Streams in Spring Boot (xem bài này)

Luồng đi dữ liệu

Cấu hình thư viện

Dùng Reactive Web cho RESTful API, ngoài ra ta khai báo các thư viện cho Kafka Streams.

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
    </dependency>

Cấu hình các function

spring.cloud.function.definition: orderEventSupplier;stockOrders;deliveryOrders;productOrders;orderEventSaga

spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: orders.buy

spring.cloud.stream.bindings.stockOrders-in-0.destination: orders.buy
spring.cloud.stream.bindings.stockOrders-out-0.destination: orders.saga.stock

spring.cloud.stream.bindings.deliveryOrders-in-0.destination: orders.buy
spring.cloud.stream.bindings.deliveryOrders-out-0.destination: orders.saga.delivery

spring.cloud.stream.bindings.productOrders-in-0.destination: orders.buy
spring.cloud.stream.bindings.productOrders-out-0.destination: orders.saga.product

spring.cloud.stream.bindings.orderEventSaga-in-0.destination: orders.saga.stock
spring.cloud.stream.bindings.orderEventSaga-in-1.destination: orders.saga.delivery
spring.cloud.stream.bindings.orderEventSaga-in-2.destination: orders.saga.product
spring.cloud.stream.bindings.orderEventSaga-out-0.destination: orders.buy
spring.cloud.stream.kafka.streams.binder.functions.orderEventSaga.applicationId: orderEventSaga

Trong phần này ta cấu hình các function

  • orderEventSupplier nhận dữ liệu và đẩy vào topic, đây là một Supplier function
  • stockOrdersdeliveryOrdersproductOrders nhận dữ liệu từ topic orders.buy xử lý nghiệp vụ và trả ra các topic tương ứng như trong cấu hình, đây là một Function function.
  • orderEventSaga nhận dữ liệu từ 3 topic như cấu hình in-0..2 xử lý và trả ra topic ban đầu orders.buy, đây cũng là một Function function

Tại orderEventSaga trả về topic ban đẩu như vậy nó có bị lặp vô tận không? Sẽ không bị lặp khi ở mỗi stockOrdersdeliveryOrdersproductOrders ta có kiểm tra điều kiện trạng thái xem có gửi đi tiếp hay không

Các lớp chính

OrderController là RESTful API nhận vào đối tượng Order, sau đó gọi OrderService để xử lý.

public class OrderController {

    private final OrderService orderService;

    @PostMapping("/order")
    public Mono<Order> createOrder(@RequestBody Order order) {
        return orderService.createOrder(order);
    }
}

OrderService chuyển Order thành OrderEvent sau đó add (#1) vào một LinkedBlockingQueue của OrderSupplierService

public class OrderService {
    private final OrderSupplierService supplierService;

    public Mono<Order> createOrder(Order order) {
        Order preparedOrder = prepareOrder(order);
        OrderEvent event = createOrderEvent(preparedOrder);
        supplierService.addOrder(event); // #1
        return Mono.just(preparedOrder);
    }

    private OrderEvent createOrderEvent(Order order) {
        return new OrderEvent(order.uuid(), OrderEventStatus.CREATED, order, SourceEnum.ORDER_SERVICE);
    }

    private Order prepareOrder(Order order) {
        Order result = order;
        if (result.uuid() == null) {
            result = result.withUuid(java.util.UUID.randomUUID().toString());
        }
        if (result.status() == null) {
            result = result.withStatus("NEW");
        }
        return result;
    }
}

OrderSupplierService tại đây ta sử dụng LinkedBlockingQueue để lưu trữ orders, đây là một thread-safe để đảm bảo cho đa luồng và Spring Message sẽ lấy dữ liệu từ đây. Các message sẽ được xử lý đảm bảo FIFO.

Đây là nguồn khởi phát dữ liệu từ RESTful vào các topic trong Kafka. peek() lấy ra dữ liệu để kiểm tra , poll() lấy ra và remove khỏi queue.

public class OrderSupplierService {

    private final BlockingQueue<OrderEvent> orders = new LinkedBlockingQueue<>();

    public void addOrder(OrderEvent order) {
        orders.add(order);
    }

    @Bean
    public Supplier<Message<OrderEvent>> orderEventSupplier() {
        return () -> {
            OrderEvent event = orders.peek();
            if (event != null) {
                OrderEvent order = orders.poll();
                Message<OrderEvent> message = MessageBuilder
                        .withPayload(event)
                        .setHeader(KafkaHeaders.KEY, order.uuid())
                        .build();
                log.info("Order: {}", message.getPayload());
                return message;
            }
            return null;
        };
    }
}

Sau khi dữ liệu đi vào topic orders.buy, có 3 function đăng ký stream dữ liệu từ nguồn này stockOrdersdeliveryOrdersproductOrders, ta xem nó xử lý những gì.

Ta thấy productOrders nhận đầu vào là kiểu dữ liệu KStream<String, OrderEvent>, và trả ra cũng là một KStream<String, OrderEvent>.

Ở đây logic xử lý đơn giản, chỉ chuyển trạng thái của OrderEvent nếu CREATED thành ACCEPT, sau đó nó sẽ gửi đến output topic như cấu hình là orders.saga.product. Hàm shouldSendToOutputTopic kiểm tra xem có gửi tiếp đi không khi trạng thái không phải là ACCEPT hoặc REJECT, ta cần làm vậy để tránh vòng lặp vô tận, khi tại Orchestration nó lại tiếp tục gửi vào topic ban đầu order.buy, tại Orchestration sẽ join và biến đổi các trạng thái của message thành ACCEPTED và REJECTED.

Trong dự án thực thế thì phần này sẽ gọi đến các hàm để lưu trữ dữ liệu, và có thể áp dụng một số pattern như Event Sourcing ở đây để thuận tiện cho việc thực hiện ‘bù’ transction, nếu có lỗi ở bất kỳ giao dịch khác.

Các ConsumerService khác được implement tương tự.

public class ProductConsumerService {

    @Bean
    public Function<KStream<String, OrderEvent>, KStream<String, OrderEvent>> productOrders() {

        return this::processProductOrders;
    }

    private KStream<String, OrderEvent> processProductOrders(KStream<String, OrderEvent> inputStream) {
        return inputStream
                .mapValues(this::processProductEvent)
                .filter((key, value) -> shouldSendToOutputTopic(value));
    }

    private OrderEvent processProductEvent(OrderEvent event) {
        log.info("Processing product order: {}", event);

        event = event.withSource(SourceEnum.PRODUCT_SERVICE);

        if (event.eventStatus().equals(OrderEventStatus.CREATED)) {
            event = event.withEventType(OrderEventStatus.ACCEPT);
        }

        return event;
    }

    private boolean shouldSendToOutputTopic(OrderEvent event) {
        // Add your conditions here to control message sending
        boolean shouldSend = event.order() != null &&
                (event.eventStatus().equals(OrderEventStatus.ACCEPT)
                        || event.eventStatus().equals(OrderEventStatus.REJECT));

        if (!shouldSend) {
            log.info("Skipping product order: {}", event);
        }

        return shouldSend;
    }
}

OrderSagaService đây được gọi là ORCHESTRATION, đầu vào của nó là đầu ra của các dịch vụ khác (stockOrdersdeliveryOrdersproductOrders), đây là một nested function, với 3 tham số đầu vào và 1 tham số đầu ra đều có kiểu dữ liệu là KStream<String, OrderEvent>.

nó sẽ join stocks + deliveries + payments nếu toàn bộ trạng thái là ACCEPT thì sẽ trả ra message có trạng thái là ACCEPTED, nếu không trả về REJECTED, gửi vào topic order.buy, khi có một message với trạng thái là REJECTED gửi vào topic order.buy, các dịch vụ khác khi thấy trạng thái REJECTED sẽ thực hiện “bù” giao dịch, nếu là ‘ACCEPTED’ sẽ kết thúc message.

public class OrderSagaService {

    private final OrderJoinService orderJoinService;

    @Bean
    public Function<KStream<String, OrderEvent>,
                Function<KStream<String, OrderEvent>,
                        Function<KStream<String, OrderEvent>, KStream<String, OrderEvent>>>> orderEventSaga() {

        JsonSerde<OrderEvent> orderSerde = new JsonSerde<>(OrderEvent.class);

        return stocks ->
                deliveries ->
                        payments -> {

            log.info("Starting order saga processing");
            JoinWindows joinWindow = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(10));

            log.debug("Joining stock and delivery streams");
            KStream<String, OrderEvent> stockAndDeliveryJoin =
                    stocks.join(
                            deliveries,
                            orderJoinService::firstJoin,
                            joinWindow,
                            StreamJoined.with(Serdes.String(), orderSerde, orderSerde));

            log.debug("Joining result with payment stream");
            KStream<String, OrderEvent> result =
                    stockAndDeliveryJoin.join(
                            payments,
                            orderJoinService::nextJoin,
                            joinWindow,
                            StreamJoined.with(Serdes.String(), orderSerde, orderSerde));

            log.info("Order saga processing complete");
            return result;
        };
    }
}

Test app

Mở rộng bài toán

Từ khung trương trình đã có bạn có thể implement khi thêm lớp lưu dữ liệu vào database, xử lý bù (compensating) khi có một giao dịch lỗi.

Đây là mô hình được implement trong dự án thực tế, bạn có thể tham khảo

Tổng kết

  • SAGA là một pattern để xử lý transaction trong môi trường phân tán, thường các dự án sử dụng Microservice
  • Sử dụng các thư viện như Spring Cloud Function, Kafka Streams, ta có thể implement được SAGA Pattern cho ứng dụng của mình
  • Các hệ thống sử dụng Kafka Streams nếu kết hợp với Reactive Programming sẽ mang lại tốc độ tối ưu cho người dùng
Written by :

user

Java Developer, System Architect, Learner and becoming Youtuber

View All Posts

Để lại một bình luận

Email của bạn sẽ không được hiển thị công khai. Các trường bắt buộc được đánh dấu *