klaus baumecker home github twitter

groovy, camel and curry

21 Dec 2010

In a recent project I had to assemble camel routes from a config file. As a fan of the Groovy language, I’ve chosen Groovy for the config style and for the implementation of the route builder.
Conceptually, the code creates routes from a timer to a bean that collects data and finally to a consumer of the bean output (e.g. a store). Furthermore, the bean had to be stateless since the same bean was potentially used in multiple routes.
Each bean has a init method to set itself up and to create a context object for further calls. When this is done, the collection of the data begins, using the context object.
We split this task into two routes per collector: one for calling the init method with a map [:] of key/value pairs from the config file, except polling and type. Those key/value pairs are bean specific. Once init is complete, the route is stopped and another route for the actual collection is started.

Ok, enough theory. This is an example of the config file

myCollectionTask {
    type = 'myBean'
    user = 'usr'
    password = 'passwd'
    polling {
        collect = 2000 // 2 seconds
    }
}
 
yourCollectionTask {
    type = 'yourBean'
    host = 'yourhost.com'
    port = '4711'
    user = 'root'
    password = 'secret'
    polling {
        collect = 20 * 1000 // 20 seconds
    }
}

This is how you read it in

def config = new ConfigSlurper().parse(new File("collectors.cfg").toURL())

Now we want to create camel routes automatically. Let’s loop over the config:

config.each{ collector, values ->
 
  // collector is one of {myCollectionTask,yourCollectionTask}

  // filter out reserved keys
  def params = values.findAll { param ->
    param.key != 'type' &&
    param.key != 'polling'
  }

  // the init route (try this each second)
  from("timer://${collector}-initiator?period=1000")
    .routeId("${collector}-initiator")
    .setBody(constant(params))
    .to("bean:${values.type}?method=init")
    .process(/* switch routes: this one off and collect on */)

  // the collect route
  from("timer://${collector}-c ollect?period=${values.polling.collect}")
    .routeId("${collector}-collect").noAutoStartup()
    .process(/* set the context in the body of the exchange */)
    .to("bean:${values.type}?method=collect")
    .to("bean:store");
}

Switching the routes from init to collect is done in a camel process step. The same for setting the context in the collector route. The problem here is, that you cannot simply write a processor, which refers to the collector parameter of the enclosing each closure. The processor object is executed later, when the route is running. This code runs during route construction. You would refer to a rather undefined value of collector. What we need is a way to describe a processor object, that is associated with the correct collector name. Now this is where Groovy closures and currying comes very handy. Here is how the closures look like:

def routeSwitcher = { collector, Exchange exchange ->
  if (exchange.in.body) {
    contexts[collector] = exchange.in.body; // get the context coming from init
    exchange.context.stopRoute("${collector}-initiator")
    exchange.context.startRoute("${collector}-collector")
  }
}

def contextSetter = { collector, Exchange exchange ->
  exchange.in.body = contexts[collector] // set the context as argument for collect
}

The first argument is the collector name, which we will bind to the closure. This is done using Closure.curry(…). Curry returns a new closure, where one or more params of the closure are bound to values and the required params for the closure are reduced by the already bound ones. In our case, we bind the collector param and leave the exchange param open to be filled by the route execution. The processing steps in context will the look like this.

from("timer://${collector}-initiator?period=1000")
  .routeId("${collector}-initiator")
  .setBody(constant(params))
  .to("bean:${values.type}?method=init")
  .process(routeSwitcher.curry(collector) as Processor)

// the collect route
from("timer://${collector}-c ollect?period=${values.polling.collect}")
  .routeId("${collector}-collect").noAutoStartup()
  .process(contextSetter.curry(collector) as Processor)
  .to("bean:${values.type}?method=collect")
  .to("bean:store");

That’s it. Although this is not production ready code, I hope it shows the idea.

blog comments powered by Disqus