Wednesday, 9 December 2015

Messaging for ETL Anti-Pattern

This is a trap I've observed numerous professionals in the software industry fall into. After all, quite a few people I talk to like to think of messaging as the golden hammer. The sales folks surely want us to believe that this is the case. So many organisations have dug themselves into a hole by using Messaging for ETL that I'm classifying this problem as an anti-pattern and giving it a brief overview.

Context

The business mandates end-of-day reports. The data required for these reports is locked up in CSV files hosted on a FTP server. Each file can range from hundreds to thousands of MBs (i.e., GBs) of records. Records need to be cleansed, massaged, enriched, and transformed from one format to another. Furthermore, some record sets need to be joined with others. At the final stages of the process, the target records have to be written to files and uploaded to a CRM.

Problem

The business decides to use Messaging for ETL. The rationality behind such a decision can vary. One argument might be that some messaging solutions are suited for ETL tasks because they come with a broad set of protocol adaptors and have sophisticated transformation capabilities. The messaging solution could be an ESB, even though the term appears to have fallen out of fashion with the marketing crowd nowadays.

Predictably, the development team models each record as a message. Messaging patterns are used to solve common recurring problems. For example, message queues in order to process the records concurrently between competing consumersmessage translators for cleansing, massaging, enriching and transforming the data; aggregator to join records. Applying these patterns is sufficiently easy if the messaging solution has them baked-in.


Consequences

Loosely speaking, the primitives offered by messaging solutions are overly low-level and general for ETL operations. Taking the context above, reasoning about the application becomes hard when you have more than a handful of joins. Aggregators think in terms of correlation keys while we tend to think in higher terms of join columns. Similarly, message queueing and competing consumers is a low-level way of concurrently processing records. It's more useful for us to think in terms of partitioning the record stream in order to achieve concurrency and not having to worry about queues, consumers, and so on.

Conceptual dissonance is one aspect to the problem of Messaging for ETL. Another aspect is performance. Treating each record as a packet of data and processing them in a single go leads to a high rate of message traffic that is uniform over time. From my experience, this often causes a significant, if not drastic, drop in throughput simply because most messaging solutions can't reliably cope with this pattern of traffic. Lock contention is a key factor for this. To illustrate the point, consider the message ID. Several messaging solutions generate a UUID, representing the message ID, and add it to the message before going on to publishing it. Generating a UUID involves obtaining a global lock. As the reader produces hundreds of thousands of messages while it's churning through the CSV files, concurrently, the aggregator is combining individual messages to produce messages with new UUIDs. Given the stream of messages is constant and without any respite, the result is a high rate of lock contention caused by the reader and aggregator fighting each other out for the lock to generate UUIDs.

Refactored solution

One way to untangle this anti-pattern is to migrate the data intensive logic to another tool. A staging database may be a good initial candidate where you can leverage SQL for the heavy-lifting. Other candidates include ones specifically built for ETL. This doesn't mean you're stuck with having to purchase a proprietary ETL tool. Open-source alternatives do exist like Pentaho. If the data you're transforming is in the realm of "Big Data", where you need to distribute its processing across a cluster of nodes, map/reduce frameworks such as Apache Spark or Apache Hadoop should be considered.

Monday, 9 November 2015

Implementing a Replicated Token Service with JSON Web Tokens

Last week I observed one of the 8 fallacies of distributed systems in action:
"Topology doesn't change"
A client of mine deployed the latest versions of his web services to a highly-available QA environment. Sanity tests gave initial confirmation that the system was behaving as expected. But then, the QA team reported weird behaviour in the system's offline functionality. So I was called in the figure out the problem. The logs showed an application getting random HTTP 401s from the system's token service.

This token service is a Java web application that creates and verifies JSON Web Tokens (JWTs). A client receives a 200 HTTP OK from the service for a token passing verification. Otherwise, it receives a 401 HTTP Unauthorized code. On startup, the token service creates a public/private key pair (PPK)  in-memory for signing and verifying these tokens. I knew the token service in QA was replicated and requests to replicas were load-balanced in a round-robin fashion. This quickly led me to the realisation that the issue occurred when (1) a replica of the token service verified a token with its own public key and (2) the same token was created as well signed by a different replica with its own private key. This issue wasn't caught in the developer's testing environment because services weren't replicated.

I'm going to describe a solution I implemented for this problem because, though it's simple to program, such a solution might not be obvious. All shown code is in Java or SQL but it should be relatively easy to adapt the code to the technologies of your choice. 

At an abstract level, the solution is to have each token service replica's public key and key ID visible to all other replicas. In addition to making the key ID visible to the set of replicas, the signer embeds the key ID in the created token before signing it with its own private key. This allows the verifier to know which public key to use for verifying the token. When the token service accepts a request to verify a token, it extracts the key ID from the token to lookup the public key to use for verifying it. Security-wise, this approach enables us to keep the private key secret with respect to the other token service replicas.

Now let's delve into the details. Given that the token service replicas share a database, I re-use the database to share the public keys and key IDs between replicas. In a relational database context, the schema for holding such information might look like this:

  1. nodeId is a UUID representing the replica owning the table row. This enables me to delete the row owned by a replica when it gracefully shuts down, reducing, but not eliminating, the likelihood of orphan records.

  2. name identifies the type of configuration. Although in this solution I'm only storing the public key in the table, you might want to store other configurations.

  3. value_ is where I store the actual public key along with the key ID.
In the token service, I use jose.4.j 0.4.4, an open-source Java implementation of JWT, for generating and verifying tokens. Before I can go on to generate/verify a token, first I need create a PPK and register the public key, including its key ID, so that it can be read by other replicas:

The above code is executed at startup and merits a brief explanation:

Line 7-8: RsaJwkGenerator.generateJwk(2048) returns a 2048-bit PPK. The key ID for the PPK is set to the node ID which is simply a UUID created as well at startup.

Line 9: ConfigurationDataMapper.insertConfiguration(...) registers the public key by adding a record to the database table Configuration. Its parameters map to the table columns nodeId, name, and value_, respectively.

Line 9: rsaJsonWebKey.toJson() does the job of serialising the public key and key ID to JSON for us. Note the toJson() method does NOT include the private key in the returned JSON.

Line 10: Finally, the PPK is saved in the service's context in order to read the private key later on for signing the token.

As mentioned above, the token service creates signed tokens for clients. The code for this is implemented in the createToken(...) method:

I blatantly copied the code from jose.4.j's excellent examples page from where you can find an explanation of what it does. However, I want to highlight that I'm passing the PPK I saved earlier in the service context to createToken(...). Additionally, observe that on line 16 I'm setting the token's key ID to the PPK's key ID which is the node ID.

On receiving a request to verify a token, the service fetches all registered public keys and key IDs from the database before verifying the token [1]:

In the above method, the public keys are (1) re-constructed from the JSON persisted to the database and (2) added to a list. The list is passed to the isValid(...) method along with the token. isValid(...) returns true if a token passes verification, otherwise false:

In isValid(...), I pass the list of public keys to the JwksVerificationKeyResolver class constructor to create an object that resolves the public key to use for verifying the token according to the key ID extracted from the received token. The rest of the code builds a JwtConsumer object to verify the token.

The last item to tackle is to have a token service replica, that is shutting down gracefully, delete its public key from the configuration table:

This is required because the replica's private key and node ID are kept in-memory and therefore lost on shutdown. Of course, this isn't a foolproof way of eliminating orphan records. Furthermore, it's possible that a token signed by a replica is still in circulation after the replica has shutdown causing the token to fail verification. I'll leave these problems as exercises for the reader to solve.

1: createToken(...) definitely has room for improvement in terms of performance. 

Friday, 25 September 2015

Describing API Key Authentication in RAML

I've finally figured out how to say in RAML that API operations are protected by an API key query parameter:

Saturday, 19 September 2015

Retiring Kafka Web Console

I've been a busy bee the past few months. The lack of activity on my blog and GitHub is a testament to this. Given my current priorities, I've taken the decision to retire Kafka Web Console. Don't despair yourself! Kafka Manager appears to be a more sophisticated alternative to what I've developed, and besides, it's maintained by Yahoo.

Monday, 10 August 2015

Dynamically Create Rules using Drools & Rule Templates

Rules are used for a variety of stuff in the systems we build. Most often these rules are hard-coded in our application logic. The trouble is that sometimes we want to have the end-user the ability to define his own rules. Imagine an order processing system. The supplier wants to be notified on any range of events as they occur throughout the system but the notification rules are not known ahead of time. Such a rule could be for a late payment or a highly lucrative order event. In Java, the latter rule can be modelled as follows [1]:

Supporting conjoined conditions in a rule requires us that we tweak the previous example:

I consider it a risky proposition to write your own primitive rules engine to evaluate rules like the above. I much prefer a solution leveraging Drools 6 in combination with Rule Templates. Rule Templates is an awesome Drools feature giving you the ability to define abstract rules at design-time. At run-time, a Drools compiler runs through the rule template and evaluates expressions to generate concrete rules. Given an event type class (e.g., OrderEvent) and a Rule object (e.g., highValueOrderWidgetsIncRule), we can conceive the following rule template:

A couple of things to observe:
  • Line 1: Declares that the DRL file is a rule template.
  • Line 3-4: rule and eventType are template parameters.
  • Line 8: alertDecision is global variable which we write the outcome to should the rule evaluate to true.
  • Line 12: @{row.rowNumber} is an in-built expression that makes the rule ID unique. This is useful for situations when you don't know how many rules you're going to have ahead of time. Note that this doesn't apply to our example.
  • Line 14: @{eventType} and @{rule} MVEL expressions that are substituted with the template parameters at run-time.
  • Line 16: Sets the property doAlert to true to signal the application that the notification rule was fired.
Generating a rule from the template is a matter of instantiating ObjectDataCompiler and passing as parameters:
  1. A map consisting of a Rule object (e.g., highValueOrderWidgetsIncRule) and the name of the event class the Rule object pertains to (e.g., org.ossandme.event.OrderEvent)

  2. The template.drl file

Drools cannot evaluate a Rule object in its current POJO form. In order to evaluate it, we override the Rule class's toString() method to transform the POJO into a formal statement:

Before running the data through the template, Drools calls toString() on the template parameters. Calling toString() on highValueOrderWidgetsIncRule returns the statement: price > 5000.0 && customer == 'Widgets Inc.'. Going even further, if we apply the template to the statement and event type OrderEvent, we would get the following generated rule:

The last step is to evaluate the rule:

Finally, let's put this all together. Don't worry, a copy of the complete application can be found on GtiHub:


1: I'm ignoring the fact that most likely the rule is retrieved from a data store.

Friday, 15 May 2015

A Primer to AS2

Check out my latest guest post about AS2 on ModusBox's blog.

Tuesday, 16 September 2014

The Trials of Smooks

The fact that I'm a hard to please guy explains why I rarely show appreciation for a tool. I easily get frustrated when a tool fails to meet the challenges it's meant to solve. Smooks is one of the few tools I appreciate. It's an invaluable transformation framework in the integrator's arsenal. On a project I was on, I threw at Smooks [1] all manner of challenges, and one after another, Smooks overcame them without giving up a key requirement: maintaining a low memory overhead during transformation. A shoutout to Tom Fennelly and his team for bringing to us such a fantastic tool.

Trial I


The initial challenge I brought to Smooks was about taking a tilde delimited CSV file and map its records to POJOs:

You can see the file has an unorthodox header in addition to a footer. Using Smooks's built-in CSV reader, I wrote concisely the Smooks config doing the mapping to POJOs:

What's happening under the covers, and in general, is that the reader pulls data from a source (e.g., java.io.InputStream) to go on to produce a stream of SAX events. The reader I'm using above is expecting the source data to be structured as CSV and to consist of 4 columns. Let's make things more concrete. Reading from the products.csv file, the reader produces the following XML stream [2]:

Listening to the stream of SAX events is the visitor. A visitor listens to specific events from the stream to fire some kind of behaviour, typically transformation. With the singleBinding element in the csv-to-pojos.xml config, the CSV reader pre-configures a JavaBean visitor to listen for csv-record elements. On intercepting this element, the JavaBean visitor instantiates a org.ossandme.Product object and binds its properties to csv-record's children element content. You'll notice that I left Product's target properties unspecified in the config. The CSV reader assumes Product follows JavaBean conventions and its properties are named the same as the defined CSV columns. Records disobeying the column definition are ignored. Consequently, I do not need to worry about the file's header and footer.

With the transformation configuration out of the way, I turned my attention to running the transformation on the CSV file from my Java code and process the Product objects as they are instantiated and bound by Smooks:


Trial II


A more complex transformation task I gave to Smooks was to load file records, holding a variable number of columns, into a database. As in the previous task, this file had a header as well as a footer:

You'll observe in the sample CSV file that records could be one of three types as denoted by the first column: TH, TB or TF. The CSV reader, as it transforms and pushes records to the XML stream, can be customised such that it renames the csv-record holder to the record's primary column:

As we'll see later, the above config permits Smooks to distinguish between the different record types. Given the sample file transactions.csv, the reader I've configured produces the following stream:

UNMATCHED elements represent the file's header and footer. A CSV record having TH in the first field will trigger the reader to create a TH element holding the other record fields. The same logic goes for TB and TF.

Database visitors load the records. However, since these visitors are limited to binding data from POJOs, I first must turn the XML mapped records from the stream into said POJOs. The CSV reader doesn't know how to bind variable field records to POJOs so I configure the mapping myself:

Given what we've learnt about Smooks, we can deduce what's happening here. The JavaBean visitor for lines 10 till 17 has a selector (i.e, createOnElement) for the element TH. A selector is a quasi XPath expression applied on XML elements as they come through the stream. On viewing TH, the visitor will:
  1. Instantiate a HashMap.

  2. Iterate through the TH fragment. If an element inside the fragment matches the selector set in a data attribute, then (a) a map entry is created, (b) bound to the element content, and (c) put in the map.

  3. Add the map to the Smooks bean context which is identified by the name set in beanID. The map overwrites any previous map in the context with the same ID. This makes sense since we want to prevent objects from accumulating in memory.
The database visitors reference the maps in the bean context:

The insert statements are bound to the map entry values and are executed after the element, the executeOnElement selector points to, is processed. The next step is to configure a datasource for the database visitors (lines 47-49):

Last but not least, the Java code to kick off the data load:


Trial III


The next challenge for Smooks makes the previous ones look like child's play. The goal: transform an XML stream to a CSV file that is eventually uploaded to an FTP server. The input:

The desired output:

Considering the CSV could be large in size, my requirement was for Smooks to write the transformed content to a PipedOutputStream. An FTP library would read from the PipedOutputStream's connected PipedInputStream, and write the streamed content to a file. To this end, I wrote the class running the transformation as follows:

My focus then turned to the XML-to-CSV mapping configuration. After deliberation, I reluctantly settled to use the FreeMarker visitor for writing the CSV. I considered as an alternative to develop a visitor specialised for this type of transformation but time constraints made this unfeasible. The FreeMarker visitor, like the database one, cannot read directly off the XML stream. Instead, it can read from DOM and POJOs. So I decide to use the DOM visitor such that it creates DOMs from record elements found within the input stream:

I then configured the FreeMarker visitor to apply the CSV template on seeing the element record in the stream:

Below is a simplified version of what I had in real life in account.ftl (note the last line of the template must be a newline):

An additional complexity I had to consider were the CSV's header and footer. Apart from being structured differently than the rest of the records, the header had to contain the current date whereas, for the footer, the total record count. What I did for the header was to bind the current date from my Java code to Smooks's bean context (lines 27-30 and 38):

The date is then referenced from the Smooks config (lines 9-12):

With respect to the above config, at the start of the XML stream, FreeMarker writes the header to the output stream (i.e., PipedOutputStream):

000000Card Extract   [current date]

<?TEMPLATE-SPLIT-PI?> is an embedded Smooks instruction that applies account.ftl to record elements after the header.

Adding the record count to the footer is just a matter of configuring the Calculator visitor to maintain a counter in the bean context and referencing that counter from the template:


Trial IV


The final challenge Smooks had to go against was to read from a java.util.Iterator of maps and, like the previous task, write the transformed output to a stream in CSV format. Unlike the InputStream that Smooks read from the other tasks, Smooks doesn't have a reader that is capable of writing a properly structured XML doc from an iterator of maps. So I'm left with writing my own reader:

The custom reader is hooked into Smooks as follows (line 5):

Finally, passing the iterator to Smooks for transformation consists of setting a JavaSource parameter, holding the iterator, on filterSource(...)  (line 27):


1: The Smooks version I used was 1.5.2.
2: You might be wondering how I know for certain the XML document shown is the one actually produced by Smooks. I know because of Smooks's HtmlReportGenerator class.