Tuesday, 16 September 2014

The Trials of Smooks

The fact that I'm a hard to please guy explains why I rarely show appreciation for a tool. I easily get frustrated when a tool fails to meet the challenges it's meant to solve. Smooks is one of the few tools I appreciate. It's an invaluable transformation framework in the integrator's arsenal. On a project I was on, I threw at Smooks [1] all manner of challenges, and one after another, Smooks overcame them without giving up a key requirement: maintaining a low memory overhead during transformation. A shoutout to Tom Fennelly and his team for bringing to us such a fantastic tool.

Trial I


The initial challenge I brought to Smooks was about taking a tilde delimited CSV file and map its records to POJOs:

You can see the file has an unorthodox header in addition to a footer. Using Smooks's built-in CSV reader, I wrote concisely the Smooks config doing the mapping to POJOs:

What's happening under the covers, and in general, is that the reader pulls data from a source (e.g., java.io.InputStream) to go on to produce a stream of SAX events. The reader I'm using above is expecting the source data to be structured as CSV and to consist of 4 columns. Let's make things more concrete. Reading from products.csv, the reader produces the following XML stream [2]:

Listening to the stream of SAX events is the visitor. A visitor listens to specific events from the stream to fire some kind of behaviour, typically transformation. With the singleBinding config element, the CSV reader pre-configures a JavaBean visitor to listen for csv-record elements. On intercepting this element, the JavaBean visitor instantiates a org.ossandme.Product object and binds its properties to csv-record's children element content. You'll notice that I left Product's target properties unspecified in the config. The CSV reader assumes Product follows JavaBean conventions and its properties are named the same as the defined CSV columns. Records disobeying the column definition are ignored. Consequently, I do not need to worry about the file's header and footer.

With the transformation configuration out of the way, I turned my attention to running the transformation on the CSV file from my Java code and process the Product objects as they are instantiated and bound by Smooks:


Trial II


A more complex transformation task I gave to Smooks was to load file records, holding a variable number of columns, into a database. As in the previous task, this file had a header as well as a footer:

You'll observe in the sample CSV file that records could be one of three types as denoted by the first column: TH, TB or TF. The CSV reader, as it transforms and pushes records to the XML stream, can be customised such that it renames the csv-record holder to the record's primary column:

As we'll see later, the above config permits Smooks to distinguish between the different record types. Given the sample file transactions.csv, the reader I've configured produces the following stream:

UNMATCHED elements represent the file's header and footer. A CSV record having TH in the first field will trigger the reader to create a TH element holding the other record fields. The same logic goes for TB and TF.

Database visitors load the records. However, since these visitors are limited to binding data from POJOs, I first must turn the XML mapped records from the stream into said POJOs. The CSV reader doesn't know how to bind variable field records to POJOs so I configure the mapping myself:

Given what we've learnt about Smooks, we can deduce what's happening here. The JavaBean visitor for lines 10 till 17 has a selector (i.e, createOnElement) for the element TH. A selector is a quasi XPath expression applied on XML elements as they come through the stream. On viewing TH, the visitor will:
  1. Instantiate a HashMap.

  2. Iterate through the TH fragment. If an element inside the fragment matches the selector set in a data attribute, then (a) a map entry is created, (b) bound to the element content, and (c) put in the map.

  3. Add the map to the Smooks bean context which is identified by the name set in beanID. The map overwrites any previous map in the context with the same ID. This makes sense since we want to prevent objects from accumulating in memory.
The database visitors reference the maps in the bean context:

The insert statements are bound to the map entry values and are executed after the element, the executeOnElement selector points to, is processed. The next step is to configure a datasource for the database visitors (lines 47-49):

Last but not least, the Java code to kick off the data load:


Trial III


The next challenge for Smooks makes the previous ones look like child's play. The goal: transform an XML stream to a CSV file that is eventually uploaded to an FTP server. The input:

The desired output:

Considering the CSV could be large in size, my requirement was for Smooks to write the transformed content to a PipedOutputStream. An FTP library would read from the PipedOutputStream's connected PipedInputStream, and write the streamed content to a file. To this end, I wrote the class running the transformation as follows:

My focus then turned to the XML-to-CSV mapping configuration. After deliberation, I reluctantly settled to use the FreeMarker visitor for writing the CSV. I considered as an alternative to develop a visitor specialised for this type of transformation but time constraints made this unfeasible. The FreeMarker visitor, like the database one, cannot read directly off the XML stream. Instead, it can read from DOM and POJOs. So I decide to use the DOM visitor such that it creates DOMs from record elements found within the input stream:

I then configured the FreeMarker visitor to apply the CSV template on seeing the element record in the stream:

Below is a simplified version of what I had in real life in account.ftl (note the last line of the template must be a newline):

An additional complexity I had to consider were the CSV's header and footer. Apart from being structured differently than the rest of the records, the header had to contain the current date whereas, for the footer, the total record count. What I did for the header was to bind the current date from my Java code to Smooks's bean context (lines 27-30 and 38):

The date is then referenced from the Smooks config (lines 9-12):

With respect to the above config, at the start of the XML stream, FreeMarker writes the header to the output stream (i.e., PipedOutputStream):

000000Card Extract   [current date]

<?TEMPLATE-SPLIT-PI?> is an embedded Smooks instruction that applies account.ftl to record elements after the header.

Adding the record count to the footer is just a matter of configuring the Calculator visitor to maintain a counter in the bean context and referencing that counter from the template:


Trial IV


The final challenge Smooks had to go against was to read from a java.util.Iterator of maps and, like the previous task, write the transformed output to a stream in CSV format. Unlike the InputStream that Smooks read from the other tasks, Smooks doesn't have a reader that is capable of writing a properly structured XML doc from an iterator of maps. So I'm left with writing my own reader:

The custom reader is hooked into Smooks as follows (line 5):

Finally, passing the iterator to Smooks for transformation consists of setting a JavaSource parameter, holding the iterator, on filterSource(...)  (line 27):


1: The Smooks version I used was 1.5.2.
2: You might be wondering how I know for certain the XML document shown is the one actually produced by Smooks. I know because of Smooks's HtmlReportGenerator class.

Wednesday, 19 February 2014

Safely Prevent Template Caching in AngularJS

AngularJS's $templateCache can be a pain in the ass. Sometimes we don't want templates to be cached. A quick Internet search to disable caching gives the following workaround:

But as I have learnt with the UI Bootsrap module, this may cause AngularJS modules that use $templateCache to break. A solution is to tweak the above workaround so that new cache entries are removed on route change instead of indiscriminately removing all entries:

Tuesday, 11 February 2014

Dynamically Create BitCoin Wallets & Payment Pages on Coinbase in Ruby

Last weekend, as part of my new year's resolution to dedicate some time to good causes, I participated in Hack4good: a global 48 hour hackathon aimed at bringing ideas for the social good into life. In Malta, our team brought forward a crowd funding solution for charitable fundraisers with minimal transaction fees. To this end, we selected BitCoin as the donation currency and Coinbase to host fundraise donations.

One requirement in our project was to have Coinbase automatically issue a BitCoin wallet to each fundraiser. To further complicate matters, we wanted to generate a Coinbase payment page that allows the donor to transfer his BitCoins to the fundraiser's wallet:


Coinbase's awesome API permitted us to do both things with very little effort. Since we developed the solution in Ruby on Rails 4, I'll show you the code of how we accomplished this using a forked version of Coinbase API's Ruby client [1]:

The create() controller action does numerous things so let's dissect it piece by piece. The action instantiates the Coinbase client with our API key: this key is created in Coinbase's account settings page. The client object's create_user(...) method is then invoked to make a wallet in addition to a Coinbase account for the fundraiser. The email address and password parameters are used by the end-user to access his fundraiser wallet on Coinbase. COINBASE_CLIENT_SECRET, linked to our API key, is passed as a parameter so that we can automatically grant ourselves merchant permissions on the created user account. These permissions are needed to dynamically generate the payment page on behalf of the user.

Making the call to Coinbase to generate the payment page requires that we follow the OAuth 2 protocol [2]. Fortunately, an OAuth 2 Ruby library exists. So we go ahead and use the library to instantiate an OAuth client, passing COINBASE_API_KEY and COINBASE_API_SECRET as parameters. Before we ask Coinbase to create a payment page on the user's behalf, an AccessToken object is constructed with the access token obtained from coinbase.create_user(...) and the OAuth client we have just instantiated. After this, we use the newly constructed oauth_token object to post a request to https://coinbase.com/api/v1/buttons. Note that JSON_CREATE_PAYMENT_PAGE's value is sent as the HTTP body.

All I need from the JSON response returned from the API call is the payment page code. This code lets Coinbase know which payment page to display. We persist this code along with the fundraiser details so that we can retrieve them later when we show the fundraiser to a potential donor:

Here is view associated with the above action:

The view gets the page code from @fundraiser.coinbase_page_code and sets the necessary HTML attributes with this value. button.js is a script provided by Coinbase that styles the anchor element and opens the fundraising donation page tied to the page code when the anchor is clicked:


The final step is to add the OAuth 2 and Coinbase dependencies to the project Gemfile:


1: We forked Coinbase's Ruby client because create_user(...) didn't support client ID.
2: You need to register your application on Coinbase before you can gain rights to manage user accounts through OAuth. 

Tuesday, 7 January 2014

Scaling up Mule with Async Request Handling/Continuations

Non-blocking I/O servers such as Node.js are appealing because, when compared to blocking I/O servers, they utilise less threads to perform the same tasks under the same load. Less threads mean more efficient use of resources (e.g., smaller memory footprint) and better performance (e.g., reduced no. of context switches between threads). Let's take a stab at having non-blocking I/O behaviour in Mule. Consider the following Mule 3.4 application calling an HTTP service:

Wrapping the async processor around http:outbound-endpoint prevents the receiver thread from blocking on the outgoing HTTP call. But this kind of asynchronous behaviour causes the service's reply to be ignored: certainly not what we want for the common case. Moreover, the async processor borrows a thread from some thread pool to carry out the blocking HTTP call, preventing the borrowed thread from doing any useful work while being blocked.

The aforementioned problems can generally be solved by replacing the blocking I/O library with a non-blocking version and Asynchronous Request Handling (a.k.a continuations). Async request handling is a threading model where a thread serving a client request can be suspended and returned to its respective thread pool; free to serve other client requests. Typically the thread would be suspended after sending out a request to a remote service or kicking off a long-running computation. Although the suspended thread has forgotten about the client, the server has not. It knows the client is still waiting for a reply. For this reason, a thread can pick up where the suspended tread has left off and deliver the reply back to the client. Normally this would happen in the context of a callback.

Awesome! Let's implement this in every place where blocking I/O is present. Not so fast. First, a library supporting a non-blocking alternative to what you already have in your solution must be available. Second, to my knowledge, the only Mule transport that provides async request handling is Jetty. So for this to work, the Jetty inbound-endpoint processor must be used as the message source:

Furthermore, as shown above, async request handling must be turned on by setting useContinuations to true on the Jetty connector.

Calling an HTTP service is a fine example where we can put async request handling to good use. The initial step is to find an HTTP client library implementing a non-blocking API [1]. I'll opt for Apache HttpAsyncClient.

The next step is to develop a message processor that (1) uses HttpAsyncClient to call a service, (2) registers a callback to resume processing of the client request on receiving the HTTP service reply, and (3) immediately returns the thread to its thread pool upon sending asynchronously the HTTP request. Such a processor will require special abilities so I'll extend my processor from AbstractInterceptingMessageProcessor:

By inheriting from AbstractInterceptingMessageProcessor, I can invoke the next processor in the flow from my callback. Speaking of callbacks, here is the snippet concerning the HTTP client:

Lines 10-13 initialise the HTTP client and set the server address to wherever we're going to send the request to. Line 15 sends asynchronously the request, and registers the callback that will handle the reply. Other than the usual stuff of reading from the response stream (lines 19-22), observe that on line 23 the subsequent flow processor in invoked on a different thread. Line 24 tells Jetty that the flow's output message is to be used as the reply for the end-user.

One additional item in the list is left: freeing the thread after invoking asynchronously the HTTP client's execute(...) method. Returning null from the process(...) method will do the job (line 40):

Finally, we can hook up everything together:

The complete example is found on GitHub.

Hopefully async request handling will someday be part of Mule's default behaviour. Imagine how useful it would be to call almost any service (e.g., HTTP, JMS, VM) synchronously knowing fully well that behind the scenes Mule is taking care of making every remote call non-blocking.

1: A client library implementation should be based on the Reactor pattern otherwise we would be going back to the original problem of many blocking threads.

Monday, 23 December 2013

Log4j 2 Memory-Mapped File Appender

During the weekend I dug into Java NIO, specifically, mapping files to memory to reduce I/O time. What's more, since I had a lot of free time on my hands, I developed a Log4j 2 memory-mapped file appender. On my machine, performance tests running on a single thread using the MemoryMappedFile appender show an improvement by a factor of 3 when compared to the RandomAccessFile appender.

Wednesday, 11 December 2013

Apache Kafka for Event Sourcing

Event Sourcing is a pattern intended for "capturing all changes to an application state as a sequence of events". As explained by Fowler, the pattern is useful when you want the ability to completely rebuild the application state, perform temporal querying, or replay events. The LMAX platform is a famous example where Event Sourcing is applied to keep all application state in-memory and consequently contributing to the system's surprisingly high throughput and low latency.

While investigating the architectural components of Samza, I came across a component that can be of great help when implementing Event Sourcing: Apache Kafka. Created by the folks at LinkedIn as a solution to their log processing requirements, Kafka is a broker with message replay built-in.

Kafka consumers receive messages from publish/subscribe channels known as topics. A topic is divided into user-defined partitions where a partition can serve messages only to a single consumer process. Balancing the message load between consumers is a matter of adding more partitions to the topic, assigning those partitions to other consumer instances, and finally, publishing messages to all topic partitions in a round robin fashion.




What fascinates about Kafka is that at any point in time a consumer can rewind back through the history of messages and re-consume messages at a particular offset. In the above diagram, Consumer B can consume the latest messages or replay messages, say, starting from offset 1. 

At face value we could be forgiven to think that a broker with in-built message replay would have trouble achieving high throughput for large message volumes. After all, Kafka is retaining unconsumed as well as consumed messages on disk: presumably costlier than simply keeping unconsumed messages in memory. However, a few clever design decisions, such as relying on the OS page cache and minimising random disk I/O, gave LinkedIn engineers impressive throughput results when comparing Kafka against both ActiveMQ and RabbitMQ.

With the basic concepts and performance considerations out of the way, let me illustrate my point about Kafka's suitability for Event Sourcing by giving a code example:

The above producer publishes, for a number of times, a message to the topic ossandme on partition 0. In particular, it creates a message by instantiating the KeyedMessage class with the following parameters (line 19):
  • Name of the topic to which the message is published.
  • ID of the partition the message will sit on.
  • Message content, in this case, the time the message was published.
The following consumer pulls messages from the topic ossandme:

For each message received, the application outputs the message's offset on partition 0 in addition to its content (line 50). The first thing to observe is that I've programmed against Kafka's low-level SimpleConsumer API. Alternatively, I could have opted for the High Level Consumer API to reduce development time. I chose the former because with the latter I was unable to find a way to replay any set of messages I wanted. 

Event Sourcing comes into play when an exception occurs. On exception, the application rewinds back to the first message in the partition and re-attempts to process all of the partition's messages (line 24). I like the fact that, to get this type of behaviour, I didn't have to introduce a database but simply leveraged the broker's message replay capability. Without a database, I've one less moving part to think about in my architecture.

Kafka is a young project and I'm interested to see how it matures. I'm keen to hear people's experiences using Kafka and whether it proved to be the right solution for them. As the project matures, I suspect we'll hear more often about Kafka in our technical discussions along with the other, more established, open source brokers.

Tuesday, 27 August 2013

Bridging Mule and MSMQ with ZeroMQ

Hearing the words Mule and Microsoft's MSMQ in the same sentence sends a shiver down my spine. I remember once, Mule guru John D'Emic and me had spent a considerable amount of time and patience getting Mule and MSMQ to talk to each other through DCOM. The major factor that contributed to this unpleasant experience was our ignorance of the numerous security measures imposed by Windows to restrict DCOM access. The morale of this story is unless you have a veteran Windows administrator at your disposal, avoid the DCOM route.

So which choices do we have other than DCOM? JNI sounds promising but you are then sacrificing Mule's platform independence. Here's an idea: introduce a messaging bridge between Mule and MSMQ. The bridge can be implemented in any language that facilitates interaction with MSMQ. C# is an attractive option.

We still have to consider which middleware to use for exchanging messages between the bridge and Mule. There are many alternatives and among them is ZeroMQ. I think ZeroMQ is a good candidate for several reasons:
  • It supports asynchronous communication
  • You're not adding another component to your architecture
  • It's well documented in addition to having a low learning curve
  • A ZeroMQ transport [1] and binding are available for Mule and C# respectively
  • It will more than likely satisfy your message throughput requirements
In as little as 15 minutes I developed a simple bridge in C#:

The above code should be self-explanatory but I've put comments for your convenience.

Here's the Mule 3 app dispatching messages to the bridge:

On receiving an HTTP request, Mule leverages the ZeroMQ transport to send asynchronously the request's payload to the bridge.

In all likelihood, the illustrated bridge code for Mule-MSMQ interoperability won't serve all your needs. I can think about a dozen features that a developer would want such as destination queues resolved at run-time, an agreed format for message content, and etc. But hey, at least it's a start :-)

1: I've recently replaced the transport's ZeroMQ C++ library with a pure Java implementation of ZeroMQ.