Spring-cloud-gateway: Reading request body in filter produces an IllegalStateException

Created on 16 Dec 2017  路  36Comments  路  Source: spring-cloud/spring-cloud-gateway

Using master branch:

@Override
  public GatewayFilter apply(Tuple args) {
   // @formatter:off
   return (exchange, chain) -> 
    exchange.getFormData()
      .flatMap(formData -> {
        String username = formData.getFirst("username");
        String password = formData.getFirst("password");

       //...

        ServerHttpRequest modifiedRequest = exchange.getRequest().mutate()
          .uri(newUri)
          .build();

        return chain.filter(
          exchange
            .mutate()
            .request(modifiedRequest)
        .build()
       );
   });
   // @formatter:on
}

First call emmits IllegalStateException

017-12-16 21:39:37.809  INFO 18409 --- [erListUpdater-0] c.netflix.config.ChainedDynamicProperty  : Flipping property: admin.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2017-12-16 21:39:39.222  INFO 18409 --- [ctor-http-nio-4] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@494e7642: startup date [Sat Dec 16 21:39:39 MSK 2017]; parent: org.springframework.boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext@4fe89c24
2017-12-16 21:39:39.288  INFO 18409 --- [ctor-http-nio-4] f.a.AutowiredAnnotationBeanPostProcessor : JSR-330 'javax.inject.Inject' annotation found and supported for autowiring
2017-12-16 21:39:39.388  INFO 18409 --- [ctor-http-nio-4] c.netflix.config.ChainedDynamicProperty  : Flipping property: uaa.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2017-12-16 21:39:39.405  INFO 18409 --- [ctor-http-nio-4] c.n.u.concurrent.ShutdownEnabledTimer    : Shutdown hook installed for: NFLoadBalancer-PingTimer-uaa
2017-12-16 21:39:39.406  INFO 18409 --- [ctor-http-nio-4] c.netflix.loadbalancer.BaseLoadBalancer  : Client: uaa instantiated a LoadBalancer: DynamicServerListLoadBalancer:{NFLoadBalancer:name=uaa,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null
2017-12-16 21:39:39.408  INFO 18409 --- [ctor-http-nio-4] c.n.l.DynamicServerListLoadBalancer      : Using serverListUpdater PollingServerListUpdater
2017-12-16 21:39:39.411  INFO 18409 --- [ctor-http-nio-4] c.netflix.config.ChainedDynamicProperty  : Flipping property: uaa.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2017-12-16 21:39:39.412  INFO 18409 --- [ctor-http-nio-4] c.n.l.DynamicServerListLoadBalancer      : DynamicServerListLoadBalancer for client uaa initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=uaa,current list of Servers=[192.168.1.10:25050],Load balancer stats=Zone stats: {defaultzone=[Zone:defaultzone;  Instance count:1;   Active connections count: 0;    Circuit breaker tripped count: 0;   Active connections per server: 0.0;]
},Server stats: [[Server:192.168.1.10:25050;    Zone:defaultZone;   Total Requests:0;   Successive connection failure:0;    Total blackout seconds:0;   Last connection made:Thu Jan 01 03:00:00 MSK 1970;  First connection made: Thu Jan 01 03:00:00 MSK 1970;    Active Connections:0;   total failure count in last (1000) msecs:0; average resp time:0.0;  90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;  max resp time:0.0;  stddev resp time:0.0]
]}ServerList:org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList@67c89f69
2017-12-16 21:39:39.540 ERROR 18409 --- [ctor-http-nio-3] .a.w.r.e.DefaultErrorWebExceptionHandler : Failed to handle request [POST http://ant.den:25080/uaa/oauth/token?grant_type=password]

java.lang.IllegalStateException: Only one connection receive subscriber allowed.
    at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:276) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
    at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:127) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) ~[netty-common-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) ~[netty-common-4.1.17.Final.jar:4.1.17.Final]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]

2017-12-16 21:39:40.409  INFO 18409 --- [erListUpdater-0] c.netflix.config.ChainedDynamicProperty  : Flipping property: uaa.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647

Second call produces io.netty.handler.codec.EncoderException:

2017-12-16 21:45:12.062 ERROR 18409 --- [ctor-http-nio-3] .a.w.r.e.DefaultErrorWebExceptionHandler : Failed to handle request [POST http://ant.den:25080/uaa/oauth/token?grant_type=password]

io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultHttpRequest
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106) ~[netty-codec-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:348) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at reactor.ipc.netty.channel.ChannelOperationsHandler.doWrite(ChannelOperationsHandler.java:283) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
    at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:463) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
    at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:183) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at reactor.ipc.netty.http.HttpOperations.lambda$sendHeaders$0(HttpOperations.java:130) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
    at reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:106) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3008) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3008) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE]
    at reactor.ipc.netty.NettyOutbound.subscribe(NettyOutbound.java:356) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
    at reactor.core.publisher.MonoSource.subscribe(MonoSource.java:51) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE]
    at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:383) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
    at reactor.ipc.netty.http.client.HttpClientOperations.onHandlerStart(HttpClientOperations.java:479) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) ~[netty-common-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) ~[netty-common-4.1.17.Final.jar:4.1.17.Final]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
Caused by: java.lang.IllegalStateException: unexpected message type: DefaultHttpRequest
    at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:86) ~[netty-codec-http-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.handler.codec.http.HttpClientCodec$Encoder.encode(HttpClientCodec.java:167) ~[netty-codec-http-4.1.17.Final.jar:4.1.17.Final]
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:88) ~[netty-codec-4.1.17.Final.jar:4.1.17.Final]
    ... 30 common frames omitted

When using M4, the same code works perfect.

bug

Most helpful comment

@spencergibb @rstoyanchev
I wrote two filters to change request and response body with some support classes. Now form-url-encoded body can be changed successfully to any object in json to be passed to server behind gateway, and visa-versa. You can see them at branch rewrite-body here. Some tests has been written too.

But in list here I don't see anything about request/response body changing. Before I'd continue to spend time, possible, on unneeded functionality, or in a wrong way, please, let me know, do you need this? Should I continue this work being sure it may be usefull for community, or I should stop and use only what I really need at this point in my own project?

It is not ready yet, some codecs/readers/writers are not implemented, but the main concept should be clear:

  • Rewrite request body example usage:
@Bean
public RouteLocator rewriteRequestRouteLocator(RouteLocatorBuilder builder) {

    return builder.routes()

        .route("RequestBodyRewrite_StringToString_Route", r ->
            r.path("/rewrite-request-body/post-string-string")
            .filters(f ->
                f.rewriteRequestBody(rw ->
                    rw.classesOfBody(String.class, String.class)
                    .rewrite((request, attrs, originalBody, mediaType) -> {
                        System.out.println("originalBody:" + originalBody);
                        return "bar";
                    })
                    .withResultMediaType(MediaType.TEXT_PLAIN)
                    .build()
                )
            ).uri("http://localhost:"+this.port)
        )

        .route("RequestBodyRewrite_StringToPerson_Route", r ->
            r.path("/rewrite-request-body/post-string-person")
            .filters(f ->
                f.rewriteRequestBody(rw ->
                    rw.classesOfBody(String.class, Person.class)
                    .rewrite((request, attrs, originalBody, mediaType) -> {
                        System.out.println("originalBody:" + originalBody);
                        return new Person(originalBody, "Smith");
                    })
                    .withResultMediaType(MediaType.APPLICATION_JSON)
                    .build()
                )
            ).uri("http://localhost:"+this.port)
        )

        .route("RequestBodyRewrite_formDataToJson_Route", r ->
            r.path("/rewrite-request-body/post-formdata-to-json")
            .filters(f ->
                f.rewriteRequestBody(rw ->
                    rw.classesOfBody(MultiValueMap.class, Login.class)
                    .rewrite((request, attrs, originalBody, mediaType) -> {
                        Map<String, String> map = originalBody.toSingleValueMap();
                        System.out.println("originalBody - mapped to <String,String>:" + map.toString());

                        String user = map.get("user");
                        String password = map.get("password");

                        return new Login(user, password);
                    })
                    .withResultMediaType(MediaType.APPLICATION_JSON)
                    .build()
                )
            ).uri("http://localhost:"+this.port)
        )

        .route("RequestBodyRewrite_jsonToJson_Route", r ->
            r.path("/rewrite-request-body/post-json-to-json")
            .filters(f ->
                f.rewriteRequestBody(rw ->
                    rw.classesOfBody(Login.class, Person.class)
                    .rewrite((request, attrs, originalBody, mediaType) -> {
                        System.out.println("originalBody: " + originalBody.toString());

                        return new Person(originalBody.getUser(), "Smith");
                    })
                    .withResultMediaType(MediaType.APPLICATION_JSON)
                    .build()
                )
            ).uri("http://localhost:"+this.port)
        )

        .route("RequestBodyRewrite_jsonToFormData_Route", r ->
            r.path("/rewrite-request-body/post-json-to-formdata")
            .filters(f ->
                f.rewriteRequestBody(rw ->
                    rw.classesOfBody(Person.class, MultiValueMap.class)
                    .rewrite((request, attrs, originalBody, mediaType) -> {
                        MultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
                        map.add("first", "John");
                        map.add("last", "Doe");
                        return map;
                    })
                    .withResultMediaType(MediaType.APPLICATION_FORM_URLENCODED)
                    .build()
                )
            ).uri("http://localhost:"+this.port)
        )

        .route("RequestBodyRewrite_formDataToFormData_Route", r ->
            r.path("/rewrite-request-body/post-formdata-to-formdata")
            .filters(f ->
                f.rewriteRequestBody(rw ->
                    rw.classesOfBody(MultiValueMap.class, MultiValueMap.class)
                    .rewrite((request, attrs, originalBody, mediaType) -> {
                        MultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
                        map.add("firstName", "John");
                        map.add("lastName", "Doe");
                        return map;
                    })
                    .withResultMediaType(MediaType.APPLICATION_FORM_URLENCODED)
                    .build()
                )
            ).uri("http://localhost:"+this.port)
        )

    .build();
}
  • Rewrite response body example usage:
@Bean
public RouteLocator rewriteResponseRouteLocator(RouteLocatorBuilder builder) {

    RewriteResponseBody<ServerHttpResponse, Map<String, Object>, Person, MediaType, Person> personToPersonRewriteFn = (response, attrs, originalBody, mediaType) -> {

        return new Person(originalBody.getFirstName(), "Smith");
    };

    return builder.routes()

        .route("ResponseBodyRewrite_PersonToPerson_Route", r ->
            r.path("/body-response/person")
            .filters(f ->
                f.rewriteResponseBody(rw ->
                    rw.classesOfBody(Person.class, Person.class)
                    .withResultMediaType(MediaType.APPLICATION_JSON)
                    .rewrite(personToPersonRewriteFn)
                    .build()
                )
            )
            .uri("http://localhost:"+this.port)
        )

        .route("ResponseBodyRewrite_StringToPerson_Route", r ->
            r.path("/body-response/string2person")
            .filters(f ->
                f.rewriteResponseBody(rw ->
                    rw.classesOfBody(String.class, Person.class)
                    .withResultMediaType(MediaType.APPLICATION_JSON)
                    .rewrite((response, attrs, originalBody, mediaType) -> {
                        String[] nameParts = originalBody.split("\\s+");

                        return new Person(nameParts[0], nameParts[1]);
                    })
                    .build()
                )
            )
            .uri("http://localhost:"+this.port)
        )

    .build();
}

Please let me know what is your decision.

PS: see possible usecase (not only my own). The main think is integration between ready services, and possibility to create custom security schemas with thin client.

All 36 comments

This might be due to changes in spring-framework. @rstoyanchev any ideas or thoughts?

Not sure myself. It sounds like something is trying to read the request body a second time (besides the call to getFormData()) but not sure what. What Spring Framework version does M4 correspond to?

I'm sorry, I was wrong: when the reading of form data worked, the Spring Cloud version I've used then was not M4 but even M3, so the Spring Gateway version was M3:

springBootVersion = '2.0.0.M6'
springCloudVersion = 'Finchley.M3'

The current set is:

springBootVersion = '2.0.0.M7'
springCloudVersion = 'Finchley.M5'
springGatewayVersion = '2.0.0.BUILD-SNAPSHOT' // local build

The exception occurs near the line where proxy sends a request.
The attempt to read request body directly using the code similar to the used in DefaultServerWebExchange results in the same issue.

When I commented out my custom filter in customRouteLocator and tried to insert result data directly without call exchange.getFormData() (but using addRequestParameter()), current version of NettyRoutingFilter works properly.

I even tried to take the old version of NettyRoutingFilter with special workaround concerning forms data, and replaced the new code of NettyRoutingFilter with old one, but it results in connection hangs on tightly.

Indeed, we are reading request body twice in this case: at first in my custom filter (because I have to read request body), and at second time because we need to send request using HttpClient to the endpoint.

Ah, I don't have a great story for manipulating the request body yet. Mixing milestones and snapshots is also not a good idea.

We all learn all our lives, it's OK. Many things have been done, Spring Gateway looks very promising, thank you!
小oncerning manipulating the request/response body, having such functionality on the board would be very attractive. This possibility people actively use in Spring Cloud Zuul, e.g. for manipulating the OAuth2 tokens when UAA decoupled from gateway to separate unit. Being accustomed to such possibilities, your customers (me too) search this functionality in Spring Cloud Gateway too.
At the moment I have some success with response body manipulation and even I'd pull the code, but I have no tests covered such functionality - in my case, I do not have great story with netty/reactor-netty yet. But this is not a problem, the problem is only in time.

reactor.ipc.netty.channel.FluxReceive implements Subscription, which "It can only be used once by a single Subscriber."

When in custom filter I attempt to read body, I provoke the situation when two subscribers (mine plus in NettyRoutingFilter one) exist.

I wrote to Stephane, may be he can help us...

I don't think the answer has much or anything to do with Reactor Netty. In plain terms you can only read the request body once. This is no different from the Servlet API for example which provides an InputStream to read from once. Flux has the cache operator so it's easy to add buffering when needed.

For the case of Spring Cloud Gateway, it makes sense that it writes the request body, as a proxy should. The question then is how can an application "peek" into that request body content? It's worth to ask is this going to happen primarily for form data, or more generally for any content type?

Could be anything really. In zuul there is a thread local map, where a request entity can be placed, if it exists, that is used rather than the incoming one. We may need to do something similar.

-- I don't think the answer has much or ...
I tried using cache, but with no positive results. May be I did smth wrong, I'm new to Flux, you know. I'm going to reread docs and weigh roughly how to do it right way. Thank you, Rossen!

-- I would imagine in the case of ...
Spencer said all before me I'd say, and even more. :)
Simplest what comes is to add request as an exchange attribute but I'm not sure how to clean up (possible it may be needed) them... What do you think?

There's two cases: 1) just reading and 2) reading and modifying.

@re6exp it will require changes in the gateway.

I meant from Reactor Netty perspective, whether you want to buffer the request body is a higher level concern. The cache operator didn't work for you because the ServerWebExchange parses the request body content internally when getFormData is used.

In any case the use case is more clear to me now. It sounds like the application should have a way to provide the request body to forward with. Perhaps the WebClient can provide some inspiration. It takes the body in a variety of ways including MultiValueMap.

@rstoyanchev Yes, I saw that code. I get it! Thanks!

@spencergibb The second, sure. But modifying should be an optional, of cause.

I have some thoughts on this matter, but I need some time to investigate the code (gateway and all core flux exchange/decorators/etc stuff).

Anyway, the first thought of mine was to use exchange attributes to save original request data, but now I'm leaning toward tought to implement ServerWebExchange, ServerHttpRequest and ServerHttpResponse to provide additional fields and functionality with the same interfaces to minimize costs.

Any thoughts?

I'm not sure implementing ServerWebExchange, ServerHttpRequest and ServerHttpResponse is the right thing to do.

I keep in mind the filter's code that uses standard interfaces. Mutation you mean, etc?

So, exchange's attributes way? Or, may be, we need some special containers, we can keep them in attributes, but they had let us to do things easily?

I have same exception is a different scenario though. My use case is at SO:
https://stackoverflow.com/questions/48046238/spring-webflux-only-one-connection-receive-subscriber-allowed

Your use case is completely unrelated to Spring Cloud Gateway.

@spencergibb @rstoyanchev
FYI
I'm working on body rewritting of request/response using request/response decorators.

Now I can successfully rewrite body using Encoders/Decoders, but I'm going to change this way in favor of using Readers/Writers.

I achieved some success in such approach in my tests, but I have to realise a layers adjoining to work of NettyFoatingFilter and NettyResponseFilter. As an example I look in AbstractMessageWriterResultHandler and AbstractMessageReaderArgumentResolver. For this purpose I'm going to write some "inverted" readers/writers thas reads/writes response/request and support/configuration classes respectively.

It seems that the objective may be achived comprehensively.

Are you interested in this work?

@re6exp can you please share sample code to read request body?

@pravinkumarb84
In your implementation of GatewayFilterFactory, in apply method, you need to wrap request with ServerHttpRequestDecorator.

The code is similar to WeirdBob's example of changing the response body.

But you need to use another decorator (ServerHttpRequestDecorator instead of ServerHttpResponseDecorator), you have to overload getBody method and also you have to set the filter priority, which should be less then NettyRouterFilter's one (see Gateway sources).

And, in your router builder you should use .filter(<your filter>, PRIORITY) version to put your filter before netty client. Or you can implement Ordered interface in your filter factory realization, it should work too.

@re6exp thanks for details. Acutally, i am trying to read the request body and convert it to a string to be used as part of AWS signature header.

Sorry, I have nothing to add, I've never used AWS.

@re6exp i want to read the request body and convert it to a string. I am getting "only one subscriber allowed" when i try to read the request body.

@re6exp it's not very clear what your approach is to rewriting which makes it hard to understand what you mean (e.g. by "inverted" readers/writers). I assume you're deserializing to Objects and then serializing again to DataBuffer's but I'm not sure. Also related to that, it would certainly help to have some concrete use case(s) in mind to provide context.

First of all, I'm sorry for my English!

it's not very clear what your approach is to rewriting which makes it hard to understand what you mean, for example with "inverted" readers/writers.

Readers and writers implement HttpMessageReader and HttpMessageWriter interfaces respectively. But they can be used only when they can be, e.g. we cannot write to request and read the response. Because of that I called them "inverted". May be it is not correct, but it does not so much matter. I'm ready to any suggestion of how to call them, but now I call them so because I have to call them somehow... And, I'm not sure they can be useful somewhere else, but as I understand, only under Spring Cloud Gateway, because they are too specific.

You are absolutely correct:

I assume you're deserializing to Objects and then serializing again to DataBuffer's

But this process can be more easy.
At the moment when we get some portion of data in points before NettyRouterFilter and after NettyResponseFilter, it had be very helpful to be able to set appropriate MediaType forcibly when we write request like we can do with EncoderHttpMessageWriter in case of response in Spring Flux, or to read response before we change its body like we can do with request with help of DecoderHttpMessageReader. And, having a collection of such "inverted" readers/writers and handler which can choose appropriate reader/writer by specifying MediaType, let us minimize uplevel code and code of end user, so he can choose easily the final type of final body and readers/writers can be chosen by simply looking through the collection of EncoderHttpMessageWriter/DecoderHttpMessageReader-like classes like in original Spring Flux. The reader before changing the response body and the writer for changing request body user can specify by result class type and by choosing MediaType. A Builder will hide the details of initialization, using readers/writers, both original Spring readers/writers (read request before NettyRouterFilter, write response after NettyResponseFilter) and "reverted" (write changed request, read response before change).

I'm going to create classes like DecoderHttpMessageReader and EncoderHttpMessageWriter to be able to set MediaType and body class type, and they can be used by some handlers like AbstractMessageReaderArgumentResolver or AbstractMessageWriterResultHandler, which can choose needed "reverted" reader/writer by user specified MediaType. The set of such decoders and encoders may be configured like normal are in Spring Flux. One of the decoders has been written today and works perfectly, but I haven't write a handler as yet and no configuration has been written as well, but this is not a problem.

To illustrate described above and possible usecases, I enclose [simple schema] (https://gist.github.com/re6exp/0abbc184fed99031da6efbb40307cb73) illustrating the described above and short notes to some points.

@pravinkumarb84
Did you try my recept out?

@re6exp i tried your suggestions but getting exception as "Only one connection receive subscriber allowed". NettyRoutingFilter is already having the order of lowest precedence and i will not be able to make my filter further lower in order. Also, i require to read the request body and get a part of it as one of the header values before the request is processed by Netty Routing filter. Please advise on the same.

@pravinkumarb84
When I wrote you have to set the filter priority, which should be less then NettyRouterFilter's one (see Gateway sources) I meant HIGHEST_PRECEDENCE which is equal to Integer.MIN_VALUE.
So your filter will be before NettyRoutingFilter in filter chain.

public interface Ordered {
...
    int HIGHEST_PRECEDENCE = Integer.MIN_VALUE;
...
    int LOWEST_PRECEDENCE = Integer.MAX_VALUE;
...
}

See logs where your filter actually placed in filter chain after your application has been on.

You can read request body in one filter and write what you need in exchange attributes to pass to the second filter which will set your header. Note the orders of them.

The using of attributes - see NettyRoutingFilter. And AddRequestHeaderGatewayFilterFactory how to set headers in second filter.

@spencergibb @rstoyanchev
I wrote two filters to change request and response body with some support classes. Now form-url-encoded body can be changed successfully to any object in json to be passed to server behind gateway, and visa-versa. You can see them at branch rewrite-body here. Some tests has been written too.

But in list here I don't see anything about request/response body changing. Before I'd continue to spend time, possible, on unneeded functionality, or in a wrong way, please, let me know, do you need this? Should I continue this work being sure it may be usefull for community, or I should stop and use only what I really need at this point in my own project?

It is not ready yet, some codecs/readers/writers are not implemented, but the main concept should be clear:

  • Rewrite request body example usage:
@Bean
public RouteLocator rewriteRequestRouteLocator(RouteLocatorBuilder builder) {

    return builder.routes()

        .route("RequestBodyRewrite_StringToString_Route", r ->
            r.path("/rewrite-request-body/post-string-string")
            .filters(f ->
                f.rewriteRequestBody(rw ->
                    rw.classesOfBody(String.class, String.class)
                    .rewrite((request, attrs, originalBody, mediaType) -> {
                        System.out.println("originalBody:" + originalBody);
                        return "bar";
                    })
                    .withResultMediaType(MediaType.TEXT_PLAIN)
                    .build()
                )
            ).uri("http://localhost:"+this.port)
        )

        .route("RequestBodyRewrite_StringToPerson_Route", r ->
            r.path("/rewrite-request-body/post-string-person")
            .filters(f ->
                f.rewriteRequestBody(rw ->
                    rw.classesOfBody(String.class, Person.class)
                    .rewrite((request, attrs, originalBody, mediaType) -> {
                        System.out.println("originalBody:" + originalBody);
                        return new Person(originalBody, "Smith");
                    })
                    .withResultMediaType(MediaType.APPLICATION_JSON)
                    .build()
                )
            ).uri("http://localhost:"+this.port)
        )

        .route("RequestBodyRewrite_formDataToJson_Route", r ->
            r.path("/rewrite-request-body/post-formdata-to-json")
            .filters(f ->
                f.rewriteRequestBody(rw ->
                    rw.classesOfBody(MultiValueMap.class, Login.class)
                    .rewrite((request, attrs, originalBody, mediaType) -> {
                        Map<String, String> map = originalBody.toSingleValueMap();
                        System.out.println("originalBody - mapped to <String,String>:" + map.toString());

                        String user = map.get("user");
                        String password = map.get("password");

                        return new Login(user, password);
                    })
                    .withResultMediaType(MediaType.APPLICATION_JSON)
                    .build()
                )
            ).uri("http://localhost:"+this.port)
        )

        .route("RequestBodyRewrite_jsonToJson_Route", r ->
            r.path("/rewrite-request-body/post-json-to-json")
            .filters(f ->
                f.rewriteRequestBody(rw ->
                    rw.classesOfBody(Login.class, Person.class)
                    .rewrite((request, attrs, originalBody, mediaType) -> {
                        System.out.println("originalBody: " + originalBody.toString());

                        return new Person(originalBody.getUser(), "Smith");
                    })
                    .withResultMediaType(MediaType.APPLICATION_JSON)
                    .build()
                )
            ).uri("http://localhost:"+this.port)
        )

        .route("RequestBodyRewrite_jsonToFormData_Route", r ->
            r.path("/rewrite-request-body/post-json-to-formdata")
            .filters(f ->
                f.rewriteRequestBody(rw ->
                    rw.classesOfBody(Person.class, MultiValueMap.class)
                    .rewrite((request, attrs, originalBody, mediaType) -> {
                        MultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
                        map.add("first", "John");
                        map.add("last", "Doe");
                        return map;
                    })
                    .withResultMediaType(MediaType.APPLICATION_FORM_URLENCODED)
                    .build()
                )
            ).uri("http://localhost:"+this.port)
        )

        .route("RequestBodyRewrite_formDataToFormData_Route", r ->
            r.path("/rewrite-request-body/post-formdata-to-formdata")
            .filters(f ->
                f.rewriteRequestBody(rw ->
                    rw.classesOfBody(MultiValueMap.class, MultiValueMap.class)
                    .rewrite((request, attrs, originalBody, mediaType) -> {
                        MultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
                        map.add("firstName", "John");
                        map.add("lastName", "Doe");
                        return map;
                    })
                    .withResultMediaType(MediaType.APPLICATION_FORM_URLENCODED)
                    .build()
                )
            ).uri("http://localhost:"+this.port)
        )

    .build();
}
  • Rewrite response body example usage:
@Bean
public RouteLocator rewriteResponseRouteLocator(RouteLocatorBuilder builder) {

    RewriteResponseBody<ServerHttpResponse, Map<String, Object>, Person, MediaType, Person> personToPersonRewriteFn = (response, attrs, originalBody, mediaType) -> {

        return new Person(originalBody.getFirstName(), "Smith");
    };

    return builder.routes()

        .route("ResponseBodyRewrite_PersonToPerson_Route", r ->
            r.path("/body-response/person")
            .filters(f ->
                f.rewriteResponseBody(rw ->
                    rw.classesOfBody(Person.class, Person.class)
                    .withResultMediaType(MediaType.APPLICATION_JSON)
                    .rewrite(personToPersonRewriteFn)
                    .build()
                )
            )
            .uri("http://localhost:"+this.port)
        )

        .route("ResponseBodyRewrite_StringToPerson_Route", r ->
            r.path("/body-response/string2person")
            .filters(f ->
                f.rewriteResponseBody(rw ->
                    rw.classesOfBody(String.class, Person.class)
                    .withResultMediaType(MediaType.APPLICATION_JSON)
                    .rewrite((response, attrs, originalBody, mediaType) -> {
                        String[] nameParts = originalBody.split("\\s+");

                        return new Person(nameParts[0], nameParts[1]);
                    })
                    .build()
                )
            )
            .uri("http://localhost:"+this.port)
        )

    .build();
}

Please let me know what is your decision.

PS: see possible usecase (not only my own). The main think is integration between ready services, and possibility to create custom security schemas with thin client.

@spencergibb
I add tests and some modifications to body-related code. See PR.

Readers and writers implement HttpMessageReader and HttpMessageWriter interfaces respectively. But they can be used only when they can be, e.g. we cannot write to request and read the response.

@re6exp this is not true technically. Those contracts are symmetric and have to be because they are used on both the client and the server side. This is why the reader works with ReactiveHttpInputMessage which is implemented by ClientHttpResponse and ServerHttpRequest and likewise the writer works with ReactiveHttpOutputMessage implemented by ClientHttpRequest and ServerHttpResponse.

For re-writing a request on the server side, you can read from ServerHttpRequest / ReactiveHttpInputMessage and write to ClientHttpRequest / ReactiveHttpOuputMessage, and that makes sense since the gateway is a client to whatever server it forwards to, but of course internally you'll have to actually decorate the ServerHttpRequest and ServerHttpResponse.

The higher level, client and server wrappers from the function package such as ClientRequest, ServerRequest, ClientResponse, and ServerResponse could be helpful here so you don't have explicitly deal with finding and invoking encoders and decoders.

That is what Spencer did in his realization, if I don't confuse anything.
His code is more clear and I use it as a base with some modification I attempt to share.

Closed via dacd641335911b272460a6d3bbceea52192a1d90

Hi @re6exp

I tried to read the request body following your guidelines to @pravinkumarb84 but I could not achieve that. Could you share a sample code that reads request body. It will be a great help to me.

Thank you.

@pravinkumarb84
In your implementation of GatewayFilterFactory, in apply method, you need to wrap request with ServerHttpRequestDecorator.

The code is similar to WeirdBob's example of changing the response body.

But you need to use another decorator (ServerHttpRequestDecorator instead of ServerHttpResponseDecorator), you have to overload getBody method and also you have to set the filter priority, which should be less then NettyRouterFilter's one (see Gateway sources).

And, in your router builder you should use .filter(<your filter>, PRIORITY) version to put your filter before netty client. Or you can implement Ordered interface in your filter factory realization, it should work too.

Can you please give an example of how you will user ServerHttpRequestDecorator to log the POST request Body ?

How can we log all incoming request with request body and also make another call to audit service passing the request body and api. The example you have provided is for response body. Can you please tell how would you do that for incoming request ?

@Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

        ServerHttpRequest originalRequest = exchange.getResponse();
                // This doesn't work
                String requestBody = originalRequest.getRequestBody().toString();
        ServerHttpRequestDecorator decoratedResponse = new ServerHttpRequestDecorator(originalRequest) {
                 // what needs to be implemented here ?
            }           
        };
                restTemplate.postForObject(auditUrl, requestBody, String.class);
                log.info("Incoming request is [{}] for path [{}]" requestBody, originalRequest.getPath() );
        return chain.filter(exchange.mutate().response(originalRequest).build()); // replace response with decorator
    }

I am using DiscoveryClient Route Definition Locator to build routes so cannot add ReadBodyPredicateFactory as it is JAVA DSL based config.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

bcelenk picture bcelenk  路  31Comments

tony-clarke-amdocs picture tony-clarke-amdocs  路  32Comments

Burt-L picture Burt-L  路  25Comments

re6exp picture re6exp  路  37Comments

sincang picture sincang  路  41Comments