Kafka Streams trong Spring Boot

Kafka Streams là kỹ thuật được sử dụng rộng rãi trong các dịch vụ nội dung số (Netflix), dịch vụ xử lý message với giao dịch lớn (Titok – Trong sách trắng công nghệ của BiteDance có đề cập đến vấn đề này). Trong các ứng dụng Enterprise cho doanh nghiệp nó cũng được ứng dụng để xứ lý các bài toán liên quan đến SAGA, hay những bài toán liên quan đến việc tìm người bán và người mua thích hợp trong một khoảng thời gian nhất định của giao dịch chứng khoán, v.v…
Hiểu và vận hành được Kafka Streams có thể giúp chúng ta xử lý được nhiều bài toàn trong thực tế một cách dễ dàng. Trong bài viết này mình sẽ hướng dẫn mọi người cách triển khai Kafka Streams trên Spring Boot, mã nguồn tham khảo các bạn có thể tìm tại đây.
Một số lưu ý trước khi bắt đầu:
- Khi làm việc với Kafka thì mọi người nắm được các khái niệm như: topic, group, partition, retention time, dead letter topic, retries …
- Sử dụng Kafka Streams là một phương pháp khác bên cạnh việc sử dụng Produce/Consumer(listenner) gửi và nhận dữ liệu đến một topic được chỉ định.
- Nắm một số kiến thức cơ bản về
Functional Programming
- Supplier: hàm không có giá trị đầu vào, chỉ có giá trị đầu ra
- Functional: hàm có giá trị đầu vào và giá trị đầu ra
- Consumer: hàm có giá trị đầu vào và không có giá trị đầu ra
Một Streams
có thể hiểu là một luồng/dòng/đường ống (pipeline) chuyền tải dữ liệu cùng loại liên tục, thời gian thực. Mỗi đơn vị dữ liệu trong Stream được gọi là Message
. Kafka Streams sử dụng interface KStream<K, V>
để thể hiện cho dòng dữ liệu Streams, khi đó K là ID của mỗi message, V là đối tượng thể hiện dữ liệu.
Dữ liệu được truyền tải theo dòng (stream) do vậy hàm xử lý dữ liệu có thể:
- lọc (filtering) lấy các message thỏa mãn yêu cầu nhất định
- kết hợp (joining) các message có cùng ID sẽ được join với nhau, có thể xử lý nghiệp vụ để cập nhật V dữ liệu của message
- tổng hợp (aggregating) mỗi khi message đi qua ta có thể tổng hợp dữ liệu đó vào KTable
- windowning, khi thực hiện Joinning đôi khi ta cần chờ một khoảng thời gian nào đó để message khác đến khi đó ta có thể tạo một window để đợi message.
Kafka Streams trong Spring Boot kết hợp sử dụng Spring Function Cloud và Spring Cloud Stream để khai báo các function.
Supplier
Trong application.yaml
ta có khai báo sau
spring.cloud.function.definition: orderEventSupplier #1
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: orders.buy #2
#1
là tên của function, đây chính làm tên hàm trong Java sẽ implement logic. Đây là một Supplier do vậy không có đầu vào, đầu ra là một Message, Kafka Streams quy định đầu ra của một Supplier hoặc Function sẽ gửi đến một topic, #2
định nghĩa một topic sẽ được gửi tới.
Ta xem định nghĩa một function Supplier
của Kafka Streams
@Bean
public Supplier<Message<OrderEvent>> orderEventSupplier() {
return () -> {
OrderEvent event = orders.peek();
if (event != null) {
OrderEvent order = orders.poll();
Message<OrderEvent> message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.KEY, order.uuid())
.build();
log.info("Order: {}", message.getPayload());
return message;
}
return null;
};
}
Hàm này thường dùng để lắng nghe dữ liệu từ từ một LinkedBlockingQueue<> sau đó dùng hàm .poll()
để lấy dữ liệu sau đó đẩy vào một topic để xử lý.
Ta hay dùng hàm này làm nguồn dữ liệu cho Kafka Streams, các yêu cầu tiếp nhận từ RESTful APIs sẽ được dẩy vào LinkedBlockingQueue<> sau đó dùng hàm .poll()
để lấy dữ liệu, xử lý dữ liệu và ‘trả về’ khi trả về tức là gửi vào một topic khác của Kafka.
Function
Tiếp theo ta hãy xem một khai báo cho một function Function
spring.cloud.function.definition: orderEventSaga #3
spring.cloud.stream.bindings.orderEventSaga-in-0.destination: orders.saga.stock #4
spring.cloud.stream.bindings.orderEventSaga-in-1.destination: orders.saga.delivery #5
spring.cloud.stream.bindings.orderEventSaga-in-2.destination: orders.saga.product #6
spring.cloud.stream.bindings.orderEventSaga-out-0.destination: orders.buy #7
Tên của function là orderEventSaga
, định nghĩa các destination ta thấy có 3 in
và 1 out
, ta thấy nó nhận đầu vào 3 topic #4, #5, #6
và trả ra tại topic #7
Trong đoạn code implement ta thấy, nó sẽ Join 3 message đến từ 3 topic ở trên theo một logic được định nghĩa, sau đó message cuối được trả về và sẽ được gửi đến topic orders.buy
@Bean
public Function<KStream<String, OrderEvent>,
Function<KStream<String, OrderEvent>,
Function<KStream<String, OrderEvent>, KStream<String, OrderEvent>>>> orderEventSaga() {
JsonSerde<OrderEvent> orderSerde = new JsonSerde<>(OrderEvent.class);
return stocks ->
deliveries ->
payments -> {
log.info("Starting order saga processing");
JoinWindows joinWindow = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(10));
log.debug("Joining stock and delivery streams");
KStream<String, OrderEvent> stockAndDeliveryJoin =
stocks.join(
deliveries,
orderJoinService::firstJoin,
joinWindow,
StreamJoined.with(Serdes.String(), orderSerde, orderSerde));
log.debug("Joining result with payment stream");
KStream<String, OrderEvent> result =
stockAndDeliveryJoin.join(
payments,
orderJoinService::nextJoin,
joinWindow,
StreamJoined.with(Serdes.String(), orderSerde, orderSerde));
log.info("Order saga processing complete");
return result;
};
}
Consumer
Như định nghĩa của nó là không có giá trị trả về, message sau khi được xử lý sẽ kết thúc tại consumer, ta hãy xem một đoạn code minh họa consumer. Tại đây ta thấy message xử lý bằng việc cập nhật lại trạng thái trong CSDL, và kết thúc Streams.
@Bean
public Consumer<KStream<String, OrderEvent>> acceptedOrder() {
return this::processAcceptedOrders;
}
private void processAcceptedOrders(KStream<String, OrderEvent> inputStream) {
// Process the events and store the resulting stream
inputStream.peek((key, orderEvent) -> {
log.info("Processing accepted order: key={}, orderId={}, status={}",
key, orderEvent.uuid(), orderEvent.eventStatus());
orderHandler.updateOrderStatus(orderEvent)
.subscribe(
// onNext - won't be called for Mono<Void> but required for parameter positioning
null,
// onError handler
error -> log.error("Failed to update order status for orderId: {}", orderEvent.uuid(), error),
// onComplete handler
() -> log.info("Successfully updated order status for orderId: {}", orderEvent.uuid())
);
});
}
Tổng kết
Cần lưu ý đến vấn đề lỗi khi xử lý dữ liệu trong Kafka Streams khi xử lý các bài toán thực tế.
Bên cạnh việc sử dụng Kafka với Producer/Consumer ta có thể cân nhắc lựa chọn sử dụng Kafka Streams khi xử lý dữ liệu yêu cầu Joining hoặc aggregating.
Rất nhiều bài toán khác có thể sử dụng được Kafka Streams, ví dụ khi áp dụng CQRS pattern ta muốn đồng bộ dữ liệu giữa query và command, ta có thể stream dữ liệu từ command sang query.
Related Posts