'Extract WebClient GET response values within a Spring Cloud Gateway filter

My end goal with this is to implement a way to make composite API calls within the body of a gateway route filter. I have a very basic demo application running on port 9000 and exposing a few endpoints. Here is the REST controller:

@RestController
@RequestMapping("/composite")
public class CompositeCallController {

    @GetMapping("/test/one")
    public Map<String, Object> first() {
        Map<String, Object> output = new HashMap<>();
        output.put("response-1-1", "FIRST 1");
        output.put("response-1-2", "FIRST 2");
        output.put("response-1-3", "FIRST 3");
        return output;
    }

    @GetMapping("/test/two")
    public Map<String, Object> second() {
        Map<String, Object> output = new HashMap<>();
        output.put("response-2-1", "SECOND 1");
        output.put("response-2-2", "SECOND 2");
        output.put("response-2-3", "SECOND 3");
        return output;
    }

    @GetMapping
    public Map<String, Object> init() {
        return new HashMap<>();
    }
}

Both controllers return just a plain Map with a few entries inside. I have a Spring Cloud Gateway application running on a separate port, and I have configured via YML a route that leads to the localhost:9000/composite endpoint, which returns a blank map. Then I have a ModifyResponseBodyGatewayFilterFactory filter that kicks in and creates two brand new requests towards the two other endpoints in my demo application.

I want to aggregate those two responses into one by transferring them into a new map that I return to the filter chain. Here's how my filter looks:

    public GatewayFilter apply(final Config config) {
        final ModifyResponseBodyGatewayFilterFactory.Config modifyResponseBodyFilterFactoryConfig = new ModifyResponseBodyGatewayFilterFactory.Config();
        modifyResponseBodyFilterFactoryConfig.setRewriteFunction(Map.class, Map.class, (exchange, body) -> {
            WebClient client = WebClient.create();

            Mono<Map<String, Object>> firstCallMono = client.get()
                    .uri(FIRST_SERVICE_URL)
                    .retrieve()
                    .bodyToMono(json);

            Mono<Map<String, Object>> secondCallMono = client.get()
                    .uri(SECOND_SERVICE_URL)
                    .retrieve()
                    .bodyToMono(json);

            Map<String, Object> output = new HashMap<>();
            Mono.zip(firstCallMono, secondCallMono)
                    .log()
                    .subscribe(v -> {
                        System.out.println("FIRST VALUE = " + v.getT1());
                        System.out.println("SECOND VALUE = " + v.getT2());
                        output.put("1", v.getT1());
                        output.put("2", v.getT2());
                    });

            System.out.println("OUTPUT VALUE 1 = " + output.get("1"));
            System.out.println("OUTPUT VALUE 2 = " + output.get("2"));

            return Mono.just(output);
        });
        return modifyResponseBodyFilterFactory.apply(modifyResponseBodyFilterFactoryConfig);
    }

The json type is defined as private final ParameterizedTypeReference<Map<String, Object>> json = new ParameterizedTypeReference<>() {};

The URIs are as follows:

public static final String FIRST_SERVICE_URL = "http://localhost:9000/composite/test/one";
public static final String SECOND_SERVICE_URL = "http://localhost:9000/composite/test/two";

And here's my gateway config for reference:

logging:
  level:
    reactor:
      netty: INFO
    org:
      springframework:
        cloud:
          gateway: TRACE

spring:
  codec:
    max-in-memory-size: 20MB
  cloud:
    gateway:
      httpclient:
        wiretap: true
      httpserver:
        wiretap: true
      routes:
        - id: composite-call-test
          uri: http://localhost:9000
          predicates:
            - Path=/composite/**
          filters:
            - CompositeApiCallFilter

To merge the Monos, I use Mono.zip() as it seems to serve just that goal. I've purposefully put two System.out.println()s within the zip() body to make sure the responses from the above two WebClient requests are actually correct, and it definitely seems so:

FIRST VALUE = {response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1}
SECOND VALUE = {response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}

However, I've also put two console prints after the zip() to check if something is populated in the map, and it's completely empty for some reason:

OUTPUT VALUE 1 = null
OUTPUT VALUE 2 = null

Here's the full console output from the request for reference:

2022-05-13 14:53:22.087  INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1                       : onSubscribe([Fuseable] MonoZip.ZipCoordinator)
2022-05-13 14:53:22.090  INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1                       : request(unbounded)
OUTPUT VALUE 1 = null
OUTPUT VALUE 2 = null
2022-05-13 14:53:22.139  INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1                       : onNext([{response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1},{response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}])
FIRST VALUE = {response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1}
SECOND VALUE = {response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}
2022-05-13 14:53:22.140  INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1                       : onComplete()

I tried a bunch of other ways of doing the above, for example merging the two Monos into a Flux by using firstCallMono.mergeWith(secondCallMono) and then subscribing to the resulting Flux object and populating the map, but the result is identical.

I also tried putting the two Monos into a Pair object and extracting the values like so:

Pair<Mono<Map<String, Object>>, Mono<Map<String, Object>>> pair = new Pair(firstCall, secondCallDTOMono);
pair.getValue0().log().subscribe(v -> output.put("1", v));
pair.getValue1().log().subscribe(v -> output.put("2", v));

But again, the output map is empty at the end, and I don't understand why. It seems like whatever comes back from the WebClient .get() call is of type MonoFlapMap.FlatMapMain and I suspect the issue comes from unpacking the values from this type into my regular HashMap, but I don't know how to resolve that issue. I tried using .map() and .flatMap() but neither worked.

Could someone please let me know how to extract those values?



Solution 1:[1]

Thanks to the advice of Toerktumlare, I was able to make it work properly. Here's the entire filter for reference:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;

@Component
public class CompositeApiCallFilter extends AbstractGatewayFilterFactory<CompositeApiCallFilter.Config> {
    public static final String COMPOSITE_TEST_URL = "http://localhost:9000/composite/test/";
    private final ModifyResponseBodyGatewayFilterFactory modifyResponseBodyFilterFactory;
    private final ParameterizedTypeReference<Map<String, Object>> json = new ParameterizedTypeReference<>() {
    };

    @Autowired
    public CompositeApiCallFilter(ModifyResponseBodyGatewayFilterFactory factory) {
        super(Config.class);
        this.modifyResponseBodyFilterFactory = factory;
    }

    @Override
    public GatewayFilter apply(final Config config) {
        final ModifyResponseBodyGatewayFilterFactory.Config modifyResponseBodyFilterFactoryConfig = new ModifyResponseBodyGatewayFilterFactory.Config();
        modifyResponseBodyFilterFactoryConfig.setRewriteFunction(Map.class, Map.class, (exchange, body) -> {
            WebClient client = WebClient.create();

            Mono<Map<String, Object>> firstCallMono = client.get()
                    .uri(COMPOSITE_TEST_URL + "one")
                    .retrieve()
                    .bodyToMono(json);

            Mono<Map<String, Object>> secondCallMono = client.get()
                    .uri(COMPOSITE_TEST_URL + "two")
                    .retrieve()
                    .bodyToMono(json);

            Map<String, Object> output = new HashMap<>();

            return Mono.zip(firstCallMono, secondCallMono)
                    .flatMap(v -> {
                        output.put("1", v.getT1());
                        output.put("2", v.getT2());
                        return Mono.just(output);
                    });
        });

        return modifyResponseBodyFilterFactory.apply(modifyResponseBodyFilterFactoryConfig);
    }

    public static class Config {
    }
}

And the respective output in Postman:

{
    "1": {
        "response-1-2": "FIRST 2",
        "response-1-3": "FIRST 3",
        "response-1-1": "FIRST 1"
    },
    "2": {
        "response-2-3": "SECOND 3",
        "response-2-1": "SECOND 1",
        "response-2-2": "SECOND 2"
    }
}

Seems like subscribing wasn't necessary at all, just zipping the monos and extracting their values with flatMap worked well.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Hristo Naydenov