Các thư viện hỗ trợ Reactive (Postgres, Mongo, Elasticseach, Redis, v.v…)

Các thư viện hỗ trợ Reactive (Postgres, Mongo, Elasticseach, Redis, v.v…)

Để một ứng dụng reactive thể hiện được tính chất non-blocking thì yêu cầu các dịch vụ bên trong ứng dụng như: tương tác với cơ sở dữ liệu, gọi API, dịch vụ lưu trữ file v.v.. cũng cần có yêu cầu về bất đồng bộ.

Đây là bài tiếp theo trong chuối bài về Reactive, các bạn đọc thêm về 2 bài trước đó để hiểu thêm:

Ban đầu các thư viện dịch vụ trên chưa hỗ trợ reactive do vậy các adaptor được viết ra giúp app tương tác với các dịch vụ đó dưới dạng bất đồng bộ, nhưng với những phiên bản Spring Boot mới nhất, các thư viện dịch vụ trên đã hỗ trợ kết nối bất đồng bộ một cách native

Trong bài này mình sẽ hướng dẫn cấu hình kết nối đến các dịch vụ sau qua reactive

  • Postgres database
  • MongoDB
  • Elasticsearch
  • Redis
  • Gọi HTTP request bằng reactive
  • File upload (S3)

Postgres database

Trong bài trước ta đã biết cách cầu hình kết nối app với database Postgres thông qua cấu hình kế nối trong file YAML, đấy là trong trường hợp có 1 database, khi app có kết nối đến nhiều hơn 1 database ta không thể dùng cách cấu hình này được, mà cần tạo file cấu hình database.

Giả sử ta triển khai app theo pattern CQRS, khi này một đối tượng Product sẽ được lưu trữ và truy xuất dữ liệu tại hai database khác nhau, một cho đọc (query) và một cho ghi (command)

Cấu hình thư viện

implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
implementation 'org.postgresql:r2dbc-postgresql'

Cấu hình thông tin kết nối thông qua file YAML

spring:
  r2dbc:
    product:
      driver: postgresql
      host: ${POSTGRESQL_COMMAND_HOST}
      port: ${POSTGRESQL_COMMAND_PORT}
      database: ${APP_PRODUCT_DATABASE}
      username: ${POSTGRESQL_COMMAND_USERNAME}
      password: ${POSTGRESQL_COMMAND_PASSWORD}
    query:
      driver: postgresql
      host: ${POSTGRESQL_QUERY_HOST}
      port: ${POSTGRESQL_QUERY_PORT}
      database: ${APP_PRODUCT_QUERY_DATABASE}
      username: ${POSTGRESQL_QUERY_USERNAME}
      password: ${POSTGRESQL_QUERY_PASSWORD}

Tạo các file cấu hình đọc cấu hình kết nối này

@Configuration
@EnableR2dbcRepositories(
    basePackages = "com.etask.product.domain.product.infrastructure.repository.product", // #1
    entityOperationsRef = "productR2dbcEntityTemplate" // #2
)
@Slf4j
public class ProductDatabaseConfig {

    @Value("${spring.r2dbc.product.host}")
    private String host;

    @Value("${spring.r2dbc.product.port}")
    private int port;

    @Value("${spring.r2dbc.product.database}")
    private String database;

    @Value("${spring.r2dbc.product.username}")
    private String username;

    @Value("${spring.r2dbc.product.password}")
    private String password;

    @Primary
    @Bean
    public ConnectionFactory productConnectionFactory() { // #3

        ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
                .option(ConnectionFactoryOptions.DRIVER, "postgresql")
                .option(ConnectionFactoryOptions.HOST, host)
                .option(ConnectionFactoryOptions.PORT, port)
                .option(ConnectionFactoryOptions.DATABASE, database)
                .option(ConnectionFactoryOptions.USER, username)
                .option(ConnectionFactoryOptions.PASSWORD, password)
                .option(ConnectionFactoryOptions.SSL, true)
                .option(Option.valueOf("sslMode"), "prefer") // Use 'require' instead of 'prefer'
                .option(Option.valueOf("connectTimeout"), "5000") // 5 seconds timeout
                .build();

        return ConnectionFactories.get(options);
    }

    @Primary
    @Bean
    public R2dbcEntityTemplate productR2dbcEntityTemplate( // #4
            @Qualifier("productConnectionFactory") ConnectionFactory connectionFactory) {
        return new R2dbcEntityTemplate(connectionFactory);
    }
}

#1 chỉ định package cho repository

#2 tên của method cấu hình khi muốn sử dụng thông qua R2dbcEntityTemplate, kết nối đến DB qua template là một cách khác bên cạnh việc sử dụng repository. 

#3 cấu hình kết nối đến database, ConnectionFactory là một Bean được sử dụng bở Repository 

#4 cấu hình cho việc sử dụng R2dbcEntityTemplate, ở đây ta thấy có dùng annonation @Qualifier(“productConnectionFactory”), tức là nó sẽ sử dụng cấu hình của Bean productConnectionFactory thay cho cấu hình mặc định.

Các file repository

Với cấu hình này ta sẽ phải tạo ra 2 repository tương ứng cho Command và Query, và trong file cấu hình ta thấy có chỉ định package cho repo, do vậy ta phải tách hai repo ở 2 package khác nhau

├── repository
│   ├── product
│   │   └── ProductRepository.java
│   └── query
│       └── ProductQueryRepository.java

MongoDB

Giả sử app của ta cần kết nối đến CSDL MongoDB để lưu trữ dữ liệu về Auditting, kết nối đến MongoDB có yêu cầu sử dụng SSL và xác thực qua Certificate file để đảm an toàn.

Các CDSL thường đưa ra một CA file .crt để cho client (app) có thể xác thực được kết nối đến database, trong JVM trong Java sẽ không thể xác thực bằng file .crt do vậy ta cần convert .crt sang dạng mà JVM hỗ trợ là truststore file. Mọi người tìm hiểu thêm để biết cách convert.

File truststore.jks có thể đặt ở thư mục resource của app trong quá trình phát triển, file này nên được quản lý bở các service chuyên biệt.

Cấu hình thư viện

implementation 'org.springframework.boot:spring-boot-starter-data-mongodb-reactive'

Cấu hình thông tin kết nối trong YAMl file

Mật khẩu của truststore file được thiết lập khi chuyển .crt sang .jks

spring.data.mongodb.uri: mongodb://${MONGO_CLUSTER_USER}:${MONGO_CLUSTER_PASSWORD}@{MONGO_CLUSTER_ENPOINTS}/etaskdb?replicaSet=rs0&authSource=etaskdb&tls=true
mongodb.ssl.truststore-password: ${MONGO_CLUSTER_SSL_PASSWORD}

File cấu hình trong App

Trong file cấu hình này ta để ý đến

@Configuration
public class MongoSslConfig {

    @Value("${spring.data.mongodb.uri}")
    private String mongoUri;

    @Value("${mongodb.ssl.truststore-password}")
    private String sslPass;

    @Bean
    public MongoClient mongoClient() throws Exception {

        // Load truststore from resources
        KeyStore trustStore = KeyStore.getInstance("JKS");
        try (InputStream is = getClass().getClassLoader().getResourceAsStream("truststore.jks")) { // #1
            if (is == null) {
                throw new RuntimeException("truststore.jks not found in classpath");
            }
            trustStore.load(is, sslPass.toCharArray());
        }

        // Create SSLContext with truststore
        SSLContext sslContext = SSLContexts.custom()  // #2
                .loadTrustMaterial(trustStore, null)
                .build();

        // Configure MongoClient to use custom SSLContext
        MongoClientSettings settings = MongoClientSettings.builder()
                .applyConnectionString(new ConnectionString(mongoUri))
                .applyToSslSettings(builder -> builder.enabled(true).context(sslContext)) // #3
                .build();

        return MongoClients.create(settings);
    }
}

Trong file cấu hình này ta để ý một số điểm sau

#1 load file truststore.jks, như trong đoạn code trên là đang được load từ resource folder

#2 tạo SSL context #3 đưa SSL context vào cấu hình kết nối với MongoDB

Ta có thể sử dụng Mongo qua MongoRepository hoặc thông qua template ReactiveMongoTemplate

Elasticsearch

ElasticSearch được sử dụng hiệu quả khi xây dựng hàm tìm kiếm bở việc nó hỗ trợ nhiều loại toán tử tìm kiếm, trọng số kết quả tìm kiếm, và trong thời gian gần đây với phiên bản 8.x nó đã hỗ trợ việc lưu trữ dạng Vector do vậy rất hiệu quả sử dụng embedding dữ liệu để hỗ trợ các AI model.

Elasticsearch hiện cũng đã hỗ trợ kết nối kiểu reactive, giả sử ta cần kết nối với Elasticsearch thông qua SSL và có xác thực bở Certificate, và sử dụng API key.

Cấu hình thư viện

implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
implementation 'org.springframework.boot:spring-boot-starter-data-elasticsearch'
implementation 'org.springframework.data:spring-data-elasticsearch'

Cấu hình YAML file

spring:
  elasticsearch:
    api-key: ${ELASTIC_API_KEY}
    custom-uris: ${ELASTIC_API_ENDPOINT:elasticsearch:9200}
    ssl-pass: ${ELASTIC_SSL_PASS}
    connection-timeout: 5s
    socket-timeout: 3s

Cấu hình thông trong Java

@Configuration
public class ElasticsearchProductConfig {

    @Value("${spring.elasticsearch.custom-uris}")
    private String elasticsearchUrl;

    @Value("${spring.elasticsearch.api-key}")
    private String secretKey;

    @Value("${spring.elasticsearch.ssl-pass}")
    private String sslPass;

    @Bean
    public ClientConfiguration clientConfiguration() throws Exception {
        KeyStore trustStore = KeyStore.getInstance("JKS");
        try (InputStream is = getClass().getClassLoader().getResourceAsStream("truststore.jks")) {
            if (is == null) {
                throw new FileNotFoundException("truststore.jks not found in classpath");
            }
            trustStore.load(is, sslPass.toCharArray());
        }

        SSLContext sslContext = SSLContexts.custom()
                .loadTrustMaterial(trustStore, null)
                .build();

        return ClientConfiguration.builder()
                .connectedTo(elasticsearchUrl)
                .usingSsl(true)
                .withConnectTimeout(Duration.ofSeconds(5))
                .withSocketTimeout(Duration.ofSeconds(3))
                .withDefaultHeaders(getDefaultHeaders())
                .withClientConfigurer(ElasticsearchClients.ElasticsearchHttpClientConfigurationCallback.from(
                        httpAsyncClientBuilder -> httpAsyncClientBuilder
                                .setSSLContext(sslContext)
                                .setMaxConnTotal(100)
                                .setMaxConnPerRoute(100)
                                .setKeepAliveStrategy((response, context) -> Duration.ofMinutes(2).toMillis())
                ))
                .build();
    }


    private HttpHeaders getDefaultHeaders() {
        HttpHeaders headers = new HttpHeaders();
        headers.add("Content-Type", MediaType.APPLICATION_JSON_VALUE);
        headers.add("Accept", MediaType.APPLICATION_JSON_VALUE);
        headers.add("Authorization", "ApiKey " + secretKey); // #1 
        return headers;
    }

    @Bean
    public ReactiveElasticsearchClient reactiveElasticsearchClient() throws Exception { // #2 
        return ElasticsearchClients.createReactive(clientConfiguration());
    }

    @Bean
    public ReactiveElasticsearchTemplate reactiveElasticsearchTemplate( // #3
            ReactiveElasticsearchClient client) {
        
        return new ReactiveElasticsearchTemplate(client,
                new MappingElasticsearchConverter(
                        new SimpleElasticsearchMappingContext()
                ));
    }
}

Trong cấu hình Elasticsearch sử dụng truststore tương tự như Mongo khi ta tạo một SSL context, ta để ý đến một số cấu hình sau

#1 kết nối qua API key đặt trong header, nếu cấu hình qua Username/Password ta thay bằng Basic Auth và truyển username/password #2 ReactiveElasticsearchClient tạo một Bean cho client, cấu hình này được sử dụng khi dùng ReactiveElasticsearchRepository #3 ReactiveElasticsearchTemplate cấu hình sử dụng qua template

Ví dụ về sử dụng qua template

	private final ReactiveElasticsearchTemplate elasticsearchTemplate;

	public Mono<Page<ProductResponse>> searchProducts(String name, Double minPrice, Double maxPrice, Pageable pageable) {
        Query multiMatch = MultiMatchQuery.of(m -> m
                .fields("name", "description")
                .query(name)
                .analyzer("standard")
        )._toQuery();

        Query priceRange = RangeQuery.of(
                r -> r.number(nv ->
                        nv.field("price")
                                .gte(minPrice)
                                .lte(maxPrice))
        )._toQuery();

        Query boolQuery = BoolQuery.of(b -> b
                .must(multiMatch)
                .filter(priceRange)
        )._toQuery();

        NativeQuery searchQuery = NativeQuery.builder()
                .withQuery(boolQuery)
                .withPageable(pageable)
                .build();

        return elasticsearchTemplate.search(searchQuery, ProductIndex.class)
                .collectList()
                .as(flux -> databaseRetryUtil.applyRetry(flux, "search products in elasticsearch", log))
                .map(hits -> {
                    List<ProductResponse> products = hits.stream()
                            .map(SearchHit::getContent)
                            .map(productMapper::mapToResponse)
                            .toList();
                    long total = hits.size();
                    return new PageImpl<>(products, pageable, total);
                });
    }

Redis

Trong một số bài viết về Gateway chúng ta đã biết Redis có thể dùng để hỗ trợ cho cơ chế ratelimit. Redis ngoài ra còn có rất nhiều các ứng dụng khác nhau.

Giả sử ta có yêu cầu lưu sử dụng Redis cho lưu trữ cached của ứng dụng, kết nối đến Redis sử dụng username/password và xác thực kết nối qua certificate file (.crt) ta sẽ cấu hình kết nối như sau

Cấu hình thư viện

implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'

Yaml file

spring:
  data:
    redis:
      host: ${REDIS_HOST:redis}
      port: ${REDIS_PORT:6379}
      database: 0
      password: ${REDIS_PASSWORD}
      username: ${REDIS_USERNAME:default}
      ssl:
        enabled: ${REDIS_SSL_ENABLED:false}

Java file

Tại SslOption ta có thể load trực tiếp certificate file (.crt), hơi khác một chút với kết nối sử dụng truststore file như trong Mongo hoặc ElastisSearch.

public class RedisConfig {

    private final ResourceLoader resourceLoader;

    @Value("${spring.data.redis.host:localhost}")
    private String host;

    @Value("${spring.data.redis.port:6379}")
    private int port;

    @Value("${spring.data.redis.password:}")
    private String password;

    @Value("${spring.data.redis.username}")
    private String username;


    @Bean
    public LettuceConnectionFactory redisConnectionFactory() throws IOException {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setHostName(host);
        redisStandaloneConfiguration.setPort(port); // Or your SSL port, usually 6380 or custom
        redisStandaloneConfiguration.setPassword(password); // If your Redis requires authentication
        redisStandaloneConfiguration.setUsername(username);

        LettuceClientConfiguration.LettuceClientConfigurationBuilder lettuceClientConfigurationBuilder = LettuceClientConfiguration.builder();

        // Enable SSL
        lettuceClientConfigurationBuilder.useSsl();

         SslOptions sslOptions = SslOptions.builder()
                 .trustManager(resourceLoader.getResource("classpath:redis.crt").getFile())
                 .build();

         ClientOptions clientOptions = ClientOptions.builder()
                 .sslOptions(sslOptions)
                 .build();

         lettuceClientConfigurationBuilder.clientOptions(clientOptions);

        LettuceClientConfiguration lettuceClientConfiguration = lettuceClientConfigurationBuilder.build();

        return new LettuceConnectionFactory(redisStandaloneConfiguration, lettuceClientConfiguration);
    }
}

Lưu ý khi muốn sử dụng ReactiveRedisTemplate đẻ lưu một đối tượng POJO trong Java ta cần cấu hình thêm một serializer để template có thể hiều để chuyển Java Object sang Object trong Redis và ngược lại

public class RedisTemplateConfig {

    private final ObjectMapper objectMapper;

    @Bean
    public ReactiveRedisTemplate<String, ProductResponse> reactiveRedisTemplate(
            ReactiveRedisConnectionFactory factory) {

        Jackson2JsonRedisSerializer<ProductResponse> serializer =
                new Jackson2JsonRedisSerializer<>(ProductResponse.class);

        serializer.setObjectMapper(objectMapper);

        RedisSerializationContext.RedisSerializationContextBuilder<String, ProductResponse> builder =
                RedisSerializationContext.newSerializationContext(new StringRedisSerializer());

        RedisSerializationContext<String, ProductResponse> context =
                builder.value(serializer).build();

        return new ReactiveRedisTemplate<>(factory, context);
    }
}

Đây là một đoạn code ví dụ về sử dụng reativeTemplate, ta thấy có một số toán tử được sử dụng ở đây như setasdoOnSuccess và then

return reactiveRedisTemplate.opsForValue()
                .set(key, productResponse, CACHE_TTL)
                .as(mono -> applyRetry(mono, "save"))
                .doOnSuccess(aVoid -> log.info("Product saved to cache: {}", productResponse.uuid()))
                .then();

WebClient

Trong Spring MVC ta có thể dụng RestTemplate khi muốn kết nối đến các hệ thống khác qua HTTP Request, khi làm việc với Reactive trong WebFlux có một thư viện khác tương đương là WebClient, cách sử dụng như thế nào mình sẽ giới thiệu ở bên dưới. Có một điều đặc biệt là ngày nay trong một số dự án bằng Spring MVC (none reactive) nhưng các developer cũng sử dụng WebClient cho việc gọi HTTP Request thay cho RestTemplate truyền thống, vì một số ưu điểm của nó:

  • Khả năng gọi lại (retry)
  • Mapping response
  • Khả năng điều khiển lỗi

Để sử dụng WebClient ta có thể sử dụng trực tiếp không cần cấu hình, nhưng khi chúng ta làm việc với các hệ thống Tracing (Micrometer), thì WebClient được yêu cầu thêm tạo cấu hình

@Configuration
public class HttpClientConfig {

    @Bean
    public WebClient webClient(WebClient.Builder builder) {
        return builder.build(); // Spring will inject a builder with tracing filters
    }
}

Đây là một ví dụ sử dụng WebClient, ta thấy có sử dụng cơ chế retry ở toán tử as mà ta đã tìm hiểu trong bài trước, thông báo khi gọi API thành công, và xử lý lỗi khi gọi thất bại onErrorResume

private final WebClient webClient;

private Mono<ProductResponse> fetchFromCache(String uri, String uuid) {
        return webClient.get().uri(uri + uuid)
                        .retrieve()
                        .bodyToMono(ProductResponse.class)
                        .onErrorResume(WebClientResponseException.class, ex -> {
                            if (ex.getStatusCode().is4xxClientError()) {
                                if (ex.getStatusCode().value() == 404) {
                                    log.info("Product not found in cache (404) for uuid: {}", uuid);
                                    return Mono.error(new ProductNotFoundInCachedException("Product not found in cache"));
                                }
                                // Handle other 4xx errors as needed
                                log.warn("Client error fetching from cache: {} for uuid: {}", ex.getStatusCode(), uuid);
                                return Mono.error(new CachedSystemException(ex.getMessage()));
                            }
                            return Mono.error(new CachedSystemException(ex.getMessage()));
                        })
                        .as(mono -> databaseRetryUtil.applyRetry(mono, "fetch product from cache", log))
                        .doOnSuccess(response -> log.info("Fetched product from cache: {}", response));
}

Lưu trữ file lên hệ thống S3 với Reactive

Trong các ứng dụng của chúng ta việc lưu trữ file cũng là nghiệp vụ thường xuyên được sử dụng, vậy lưu trữ theo cơ chế reative như thế nào, chúng ta hãy cùng tìm hiểu.

S3 (Simple Storage Service) được một số nhà cung cấp dịch vụ hỗ trợ như Amazon S3, Wasabi, chúng ta cũng có thể tự xây dựng cho mình hệ thống file lưu trữ theo kiểu S3 bằng cách sử dụng MinIO

Cấu hình thư viện

implementation 'software.amazon.awssdk:s3:2.31.59'
implementation 'software.amazon.awssdk:netty-nio-client:2.31.59'

Cấu hình S3AsyncClient

public S3AsyncClient s3AsyncClient() {
    return S3AsyncClient.builder()
            .endpointOverride(URI.create(endpoint))
            .region(Region.of(region))
            .credentialsProvider(StaticCredentialsProvider.create(
                    AwsBasicCredentials.create(accessKey, secretKey)))
            .build();
}

Cách sử dụng S3Client

private final S3AsyncClient s3Client;

s3Client.putObject(req -> req
                    .bucket(bucketName)
                    .key(s3Key)
                    .contentType(filePart.headers().getContentType() != null ?
                            filePart.headers().getContentType().toString() : "application/octet-stream")
                    .build(),
            AsyncRequestBody.fromBytes(bytes))

Tổng kết

Để ứng dụng reactive phát huy được khả năng non-blocking thì có yêu cầu là kết nối reactive qua các dịch vụ đều là reactive (end-to-end), hiều và nắm bắt được việc cấu hình để kết nối các dịch vụ là rất quan trọng. Các dịch vụ kết nối ở trên là rất cơ bản, trong khi phát triển các ứng dụng của mình có thể có những dịch vụ khác mọi người tìm hiểu thêm.

Một ứng dụng viết dưới dạng reactive, kết hợp với sử dụng các kỹ thuật như Virtual Thread và build app dưới dạng native sử dụng GraalVM cũng sẽ làm tăng đáng kể hiệu năng hệ thống trong một tài nguyên hữu hạn.

Written by :

user

Java Developer, System Architect, Learner and becoming Youtuber

View All Posts

Để lại một bình luận

Email của bạn sẽ không được hiển thị công khai. Các trường bắt buộc được đánh dấu *