Wednesday, 9 December 2015

Messaging for ETL Anti-Pattern

This is a trap I've observed numerous professionals in the software industry fall into. After all, quite a few people I talk to like to think of messaging as the golden hammer. The sales folks surely want us to believe that this is the case. So many organisations have dug themselves into a hole by using Messaging for ETL that I'm classifying this problem as an anti-pattern and giving it a brief overview.

Context

The business mandates end-of-day reports. The data required for these reports is locked up in CSV files hosted on a FTP server. Each file can range from hundreds to thousands of MBs (i.e., GBs) of records. Records need to be cleansed, massaged, enriched, and transformed from one format to another. Furthermore, some record sets need to be joined with others. At the final stages of the process, the target records have to be written to files and uploaded to a CRM.

Problem

The business decides to use Messaging for ETL. The rationality behind such a decision can vary. One argument might be that some messaging solutions are suited for ETL tasks because they come with a broad set of protocol adaptors and have sophisticated transformation capabilities. The messaging solution could be an ESB, even though the term appears to have fallen out of fashion with the marketing crowd nowadays.

Predictably, the development team models each record as a message. Messaging patterns are used to solve common recurring problems. For example, message queues in order to process the records concurrently between competing consumersmessage translators for cleansing, massaging, enriching and transforming the data; aggregator to join records. Applying these patterns is sufficiently easy if the messaging solution has them baked-in.


Consequences

Loosely speaking, the primitives offered by messaging solutions are overly low-level and general for ETL operations. Taking the context above, reasoning about the application becomes hard when you have more than a handful of joins. Aggregators think in terms of correlation keys while we tend to think in higher terms of join columns. Similarly, message queueing and competing consumers is a low-level way of concurrently processing records. It's more useful for us to think in terms of partitioning the record stream in order to achieve concurrency and not having to worry about queues, consumers, and so on.

Conceptual dissonance is one aspect to the problem of Messaging for ETL. Another aspect is performance. Treating each record as a packet of data and processing them in a single go leads to a high rate of message traffic that is uniform over time. From my experience, this often causes a significant, if not drastic, drop in throughput simply because most messaging solutions can't reliably cope with this pattern of traffic. Lock contention is a key factor for this. To illustrate the point, consider the message ID. Several messaging solutions generate a UUID, representing the message ID, and add it to the message before going on to publishing it. Generating a UUID involves obtaining a global lock. As the reader produces hundreds of thousands of messages while it's churning through the CSV files, concurrently, the aggregator is combining individual messages to produce messages with new UUIDs. Given the stream of messages is constant and without any respite, the result is a high rate of lock contention caused by the reader and aggregator fighting each other out for the lock to generate UUIDs.

Refactored solution

One way to untangle this anti-pattern is to migrate the data intensive logic to another tool. A staging database may be a good initial candidate where you can leverage SQL for the heavy-lifting. Other candidates include ones specifically built for ETL. This doesn't mean you're stuck with having to purchase a proprietary ETL tool. Open-source alternatives do exist like Pentaho. If the data you're transforming is in the realm of "Big Data", where you need to distribute its processing across a cluster of nodes, map/reduce frameworks such as Apache Spark or Apache Hadoop should be considered.

Monday, 9 November 2015

Implementing a Replicated Token Service with JSON Web Tokens

Last week I observed one of the 8 fallacies of distributed systems in action:
"Topology doesn't change"
A client of mine deployed the latest versions of his web services to a highly-available QA environment. Sanity tests gave initial confirmation that the system was behaving as expected. But then, the QA team reported weird behaviour in the system's offline functionality. So I was called in the figure out the problem. The logs showed an application getting random HTTP 401s from the system's token service.

This token service is a Java web application that creates and verifies JSON Web Tokens (JWTs). A client receives a 200 HTTP OK from the service for a token passing verification. Otherwise, it receives a 401 HTTP Unauthorized code. On startup, the token service creates a public/private key pair (PPK)  in-memory for signing and verifying these tokens. I knew the token service in QA was replicated and requests to replicas were load-balanced in a round-robin fashion. This quickly led me to the realisation that the issue occurred when (1) a replica of the token service verified a token with its own public key and (2) the same token was created as well signed by a different replica with its own private key. This issue wasn't caught in the developer's testing environment because services weren't replicated.

I'm going to describe a solution I implemented for this problem because, though it's simple to program, such a solution might not be obvious. All shown code is in Java or SQL but it should be relatively easy to adapt the code to the technologies of your choice. 

At an abstract level, the solution is to have each token service replica's public key and key ID visible to all other replicas. In addition to making the key ID visible to the set of replicas, the signer embeds the key ID in the created token before signing it with its own private key. This allows the verifier to know which public key to use for verifying the token. When the token service accepts a request to verify a token, it extracts the key ID from the token to lookup the public key to use for verifying it. Security-wise, this approach enables us to keep the private key secret with respect to the other token service replicas.

Now let's delve into the details. Given that the token service replicas share a database, I re-use the database to share the public keys and key IDs between replicas. In a relational database context, the schema for holding such information might look like this:

  1. nodeId is a UUID representing the replica owning the table row. This enables me to delete the row owned by a replica when it gracefully shuts down, reducing, but not eliminating, the likelihood of orphan records.

  2. name identifies the type of configuration. Although in this solution I'm only storing the public key in the table, you might want to store other configurations.

  3. value_ is where I store the actual public key along with the key ID.
In the token service, I use jose.4.j 0.4.4, an open-source Java implementation of JWT, for generating and verifying tokens. Before I can go on to generate/verify a token, first I need create a PPK and register the public key, including its key ID, so that it can be read by other replicas:

The above code is executed at startup and merits a brief explanation:

Line 7-8: RsaJwkGenerator.generateJwk(2048) returns a 2048-bit PPK. The key ID for the PPK is set to the node ID which is simply a UUID created as well at startup.

Line 9: ConfigurationDataMapper.insertConfiguration(...) registers the public key by adding a record to the database table Configuration. Its parameters map to the table columns nodeId, name, and value_, respectively.

Line 9: rsaJsonWebKey.toJson() does the job of serialising the public key and key ID to JSON for us. Note the toJson() method does NOT include the private key in the returned JSON.

Line 10: Finally, the PPK is saved in the service's context in order to read the private key later on for signing the token.

As mentioned above, the token service creates signed tokens for clients. The code for this is implemented in the createToken(...) method:

I blatantly copied the code from jose.4.j's excellent examples page from where you can find an explanation of what it does. However, I want to highlight that I'm passing the PPK I saved earlier in the service context to createToken(...). Additionally, observe that on line 16 I'm setting the token's key ID to the PPK's key ID which is the node ID.

On receiving a request to verify a token, the service fetches all registered public keys and key IDs from the database before verifying the token [1]:

In the above method, the public keys are (1) re-constructed from the JSON persisted to the database and (2) added to a list. The list is passed to the isValid(...) method along with the token. isValid(...) returns true if a token passes verification, otherwise false:

In isValid(...), I pass the list of public keys to the JwksVerificationKeyResolver class constructor to create an object that resolves the public key to use for verifying the token according to the key ID extracted from the received token. The rest of the code builds a JwtConsumer object to verify the token.

The last item to tackle is to have a token service replica, that is shutting down gracefully, delete its public key from the configuration table:

This is required because the replica's private key and node ID are kept in-memory and therefore lost on shutdown. Of course, this isn't a foolproof way of eliminating orphan records. Furthermore, it's possible that a token signed by a replica is still in circulation after the replica has shutdown causing the token to fail verification. I'll leave these problems as exercises for the reader to solve.

1: createToken(...) definitely has room for improvement in terms of performance. 

Friday, 25 September 2015

Describing API Key Authentication in RAML

I've finally figured out how to say in RAML that API operations are protected by an API key query parameter:

Saturday, 19 September 2015

Retiring Kafka Web Console

I've been a busy bee the past few months. The lack of activity on my blog and GitHub is a testament to this. Given my current priorities, I've taken the decision to retire Kafka Web Console. Don't despair yourself! Kafka Manager appears to be a more sophisticated alternative to what I've developed, and besides, it's maintained by Yahoo.

Monday, 10 August 2015

Dynamically Create Rules using Drools & Rule Templates

Rules are used for a variety of stuff in the systems we build. Most often these rules are hard-coded in our application logic. The trouble is that sometimes we want to have the end-user the ability to define his own rules. Imagine an order processing system. The supplier wants to be notified on any range of events as they occur throughout the system but the notification rules are not known ahead of time. Such a rule could be for a late payment or a highly lucrative order event. In Java, the latter rule can be modelled as follows [1]:

Supporting conjoined conditions in a rule requires us that we tweak the previous example:

I consider it a risky proposition to write your own primitive rules engine to evaluate rules like the above. I much prefer a solution leveraging Drools 6 in combination with Rule Templates. Rule Templates is an awesome Drools feature giving you the ability to define abstract rules at design-time. At run-time, a Drools compiler runs through the rule template and evaluates expressions to generate concrete rules. Given an event type class (e.g., OrderEvent) and a Rule object (e.g., highValueOrderWidgetsIncRule), we can conceive the following rule template:

A couple of things to observe:
  • Line 1: Declares that the DRL file is a rule template.
  • Line 3-4: rule and eventType are template parameters.
  • Line 8: alertDecision is global variable which we write the outcome to should the rule evaluate to true.
  • Line 12: @{row.rowNumber} is an in-built expression that makes the rule ID unique. This is useful for situations when you don't know how many rules you're going to have ahead of time. Note that this doesn't apply to our example.
  • Line 14: @{eventType} and @{rule} MVEL expressions that are substituted with the template parameters at run-time.
  • Line 16: Sets the property doAlert to true to signal the application that the notification rule was fired.
Generating a rule from the template is a matter of instantiating ObjectDataCompiler and passing as parameters:
  1. A map consisting of a Rule object (e.g., highValueOrderWidgetsIncRule) and the name of the event class the Rule object pertains to (e.g., org.ossandme.event.OrderEvent)

  2. The template.drl file

Drools cannot evaluate a Rule object in its current POJO form. In order to evaluate it, we override the Rule class's toString() method to transform the POJO into a formal statement:

Before running the data through the template, Drools calls toString() on the template parameters. Calling toString() on highValueOrderWidgetsIncRule returns the statement: price > 5000.0 && customer == 'Widgets Inc.'. Going even further, if we apply the template to the statement and event type OrderEvent, we would get the following generated rule:

The last step is to evaluate the rule:

Finally, let's put this all together. Don't worry, a copy of the complete application can be found on GtiHub:


1: I'm ignoring the fact that most likely the rule is retrieved from a data store.

Friday, 15 May 2015

A Primer to AS2

Check out my latest guest post about AS2 on ModusBox's blog.