Wrapping the async processor around http:outbound-endpoint prevents the receiver thread from blocking on the outgoing HTTP call. But this kind of asynchronous behaviour causes the service's reply to be ignored: certainly not what we want for the common case. Moreover, the async processor borrows a thread from some thread pool to carry out the blocking HTTP call, preventing the borrowed thread from doing any useful work while being blocked.
The aforementioned problems can generally be solved by replacing the blocking I/O library with a non-blocking version and Asynchronous Request Handling (a.k.a continuations). Async request handling is a threading model where a thread serving a client request can be suspended and returned to its respective thread pool; free to serve other client requests. Typically the thread would be suspended after sending out a request to a remote service or kicking off a long-running computation. Although the suspended thread has forgotten about the client, the server has not. It knows the client is still waiting for a reply. For this reason, a thread can pick up where the suspended tread has left off and deliver the reply back to the client. Normally this would happen in the context of a callback.
Awesome! Let's implement this in every place where blocking I/O is present. Not so fast. First, a library supporting a non-blocking alternative to what you already have in your solution must be available. Second, to my knowledge, the only Mule transport that provides async request handling is Jetty. So for this to work, the Jetty inbound-endpoint processor must be used as the message source:
Furthermore, as shown above, async request handling must be turned on by setting useContinuations to true on the Jetty connector.
Calling an HTTP service is a fine example where we can put async request handling to good use. The initial step is to find an HTTP client library implementing a non-blocking API [1]. I'll opt for Apache HttpAsyncClient.
The next step is to develop a message processor that (1) uses HttpAsyncClient to call a service, (2) registers a callback to resume processing of the client request on receiving the HTTP service reply, and (3) immediately returns the thread to its thread pool upon sending asynchronously the HTTP request. Such a processor will require special abilities so I'll extend my processor from AbstractInterceptingMessageProcessor:
By inheriting from AbstractInterceptingMessageProcessor, I can invoke the next processor in the flow from my callback. Speaking of callbacks, here is the snippet concerning the HTTP client:
Lines 10-13 initialise the HTTP client and set the server address to wherever we're going to send the request to. Line 15 sends asynchronously the request, and registers the callback that will handle the reply. Other than the usual stuff of reading from the response stream (lines 19-22), observe that on line 23 the subsequent flow processor in invoked on a different thread. Line 24 tells Jetty that the flow's output message is to be used as the reply for the end-user.
One additional item in the list is left: freeing the thread after invoking asynchronously the HTTP client's execute(...) method. Returning null from the process(...) method will do the job (line 40):
Finally, we can hook up everything together:
The complete example is found on GitHub.
Hopefully async request handling will someday be part of Mule's default behaviour. Imagine how useful it would be to call almost any service (e.g., HTTP, JMS, VM) synchronously knowing fully well that behind the scenes Mule is taking care of making every remote call non-blocking.
1: A client library implementation should be based on the Reactor pattern otherwise we would be going back to the original problem of many blocking threads.
Cool one. Just loved it
ReplyDeleteThank you! I was struggling to get NIO to work for a long pooling Mule solution, and your article was spot on! I tested with a single thread thread pool and it worked like a charm.
ReplyDelete