Richard Searle's Blog

Thoughts about software

Integrating Futures with the real world

Posted by eggsearle on January 28, 2012

The previous post described how to unit test a Futures based choreography implementation.

This post will examine how such implementation might be connected to the external world and thus perform useful work.

The Camel akka module permits integration of Camel routes with Akka actors and hence with Futures. Camel provides a wide range of components, supporting pretty much transport that might be required.

Two seperate Camel implementations are required:

  1. Expose the service to external clients
  2. Provide access to the lower level services required by this service.

The latter will always use a Request-Response MEP, while latter is determined by the needs of the client.

A JMS based service implementation that reads from one queue and responds on another is quite common. This might be implemented as follows

case class OWFlow[A, R](in: String, out: String, flow: A => Future[R]) {

  val inActor = Actor.actorOf(new InActor).start
  val outActor = Actor.actorOf(new OutActor).start

  private class OutActor extends Actor with Producer with Oneway {
    val endpointUri = out
  }

  private class InActor extends Actor with Consumer {
    val endpointUri = in

    def receive = {
      case akka.camel.Message(a: A, _) => (Future(a).flatMap(flow)).onComplete { outActor ! _ }
    }
  }
}

Where in and out are the Camel uris that reference the appropriate transport end-points. flow is the service to the connected between in and out.

The OutActor simply provides an oneway endpoint to Camel over which messages are sent.

The InActor

  1. Receives message from Camel endpoint.
  2. Creates a Future whose value is that message. The Future executes asynchronously, so the actor is not blocked.
  3. The Future executes the flow
  4. Upon completion, the result is send to the outActor
  5. The outActor delivers the result to the Camel endpoint.

An example using the JMS component

 val slbOW = new OWFlow("jms:slbIn", "jms:slbOut", SingleLineBalance.apply)

The lower level service can be implemented using a simple case class, wrapping a Map containing the data of interest.

case class Responder[K, V](map: Map[K, V]) {
  def apply(a: Any) = map(a.asInstanceOf[K])
}

This case class is then wired into a Camel route using the bean component and the Scala DSL.

val context = CamelContextManager.mandatoryContext
context.addRoutes(new RouteBuilder { "seda:num".bean(Responder(Map(
      Id(123) -> List(Num("124-555-1234"), Num("333-555-1234")))) })

The service can then be tested by sending a Num via the jms queue.

val producer = CamelContextManager.mandatoryContext.createProducerTemplate
producer.sendBody("jms:slbIn", Num("124-555-1234"))

The complete test can be found on github. That code uses seda components rather than jms.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
%d bloggers like this: