Richard Searle's Blog

Thoughts about software

Archive for January, 2012

Integrating Futures with the real world

Posted by eggsearle on January 28, 2012

The previous post described how to unit test a Futures based choreography implementation.

This post will examine how such implementation might be connected to the external world and thus perform useful work.

The Camel akka module permits integration of Camel routes with Akka actors and hence with Futures. Camel provides a wide range of components, supporting pretty much transport that might be required.

Two seperate Camel implementations are required:

  1. Expose the service to external clients
  2. Provide access to the lower level services required by this service.

The latter will always use a Request-Response MEP, while latter is determined by the needs of the client.

A JMS based service implementation that reads from one queue and responds on another is quite common. This might be implemented as follows

case class OWFlow[A, R](in: String, out: String, flow: A => Future[R]) {

  val inActor = Actor.actorOf(new InActor).start
  val outActor = Actor.actorOf(new OutActor).start

  private class OutActor extends Actor with Producer with Oneway {
    val endpointUri = out
  }

  private class InActor extends Actor with Consumer {
    val endpointUri = in

    def receive = {
      case akka.camel.Message(a: A, _) => (Future(a).flatMap(flow)).onComplete { outActor ! _ }
    }
  }
}

Where in and out are the Camel uris that reference the appropriate transport end-points. flow is the service to the connected between in and out.

The OutActor simply provides an oneway endpoint to Camel over which messages are sent.

The InActor

  1. Receives message from Camel endpoint.
  2. Creates a Future whose value is that message. The Future executes asynchronously, so the actor is not blocked.
  3. The Future executes the flow
  4. Upon completion, the result is send to the outActor
  5. The outActor delivers the result to the Camel endpoint.

An example using the JMS component

 val slbOW = new OWFlow("jms:slbIn", "jms:slbOut", SingleLineBalance.apply)

The lower level service can be implemented using a simple case class, wrapping a Map containing the data of interest.

case class Responder[K, V](map: Map[K, V]) {
  def apply(a: Any) = map(a.asInstanceOf[K])
}

This case class is then wired into a Camel route using the bean component and the Scala DSL.

val context = CamelContextManager.mandatoryContext
context.addRoutes(new RouteBuilder { "seda:num".bean(Responder(Map(
      Id(123) -> List(Num("124-555-1234"), Num("333-555-1234")))) })

The service can then be tested by sending a Num via the jms queue.

val producer = CamelContextManager.mandatoryContext.createProducerTemplate
producer.sendBody("jms:slbIn", Num("124-555-1234"))

The complete test can be found on github. That code uses seda components rather than jms.

Advertisements

Posted in Akka, Scala | Tagged: | Leave a Comment »

The future of Futures in Scala

Posted by eggsearle on January 28, 2012

This SIP provides a concise and clear description of how and why Futures will form an important part of any concurrent programming,

Posted in Akka, Scala | Leave a Comment »

Unit testing Akka future based choreography

Posted by eggsearle on January 17, 2012

The previous post described a choreography implementation using Futures.

All technologies need to consider testability, which is thus the topic of this post.

The first step is a completely self contained unit test mechanism. This allows the testing of the business logic embodied in the service, without the complexity of the concurrent runtime environment.

A specs2 test for the Balance service might be

"balance" in {
    Balance(Acct("alpha")).get must beEqualTo(Bal(124.5F))
  }

Note the get which blocks until the future returns its result.

The Balance service requires a suitable mock implementation of Lookup[Acct, Bal].

implicit val balLook: Lookup[Acct, Bal] = Service(Map(Acct("alpha") -> Bal124.5F))


private abstract class MapService[K, V](map: Map[K, V]) {
    self: Function1[K, Future[V]] =>
    def apply(a: K) = Future(map(a))
}

private case class Service[K, V](values: Map[K, V]) extends MapService(values) with Lookup[K, V]

The implicits eliminate the need to explicitly reference the balLook, much like Spring auto wiring by type.

Posted in Uncategorized | Leave a Comment »

Choreography using Akka Futures

Posted by eggsearle on January 16, 2012

A previous series of post covered a choreography implementation using recursive PartialFunctions. That implementation is quite complex and required the workflow designer to have a good understanding of the underlying implementation. In other words, an interesting engineering solution but not really practical.

Both Akka and Finagle have new Future based toolkits that permit a much simpler implementation.

The following discussion uses the Akka future, with the complete code available here.

The choreography problem domain involves the implementation of a service using lower level services that expose a request-response message exchange protocol(R-R MEP). The service generally includes non trivial state and the combination of responses from several invocations. The latency seen by the client must be minimized, requiring the exploitation of any available concurrency in the invocation of the lower level services.

This combination of characteristics does not match a straightforward message passing implementation, that might be perhaps be trivially implemented using Camel. It can be implemented using a full blown workflow engine, such as Tibco BusinessWorks or Aqualogic Service Bus.

The R-R MEP can be directly mapped to a send-And-Receive-Future actor invocation. The returned Future directly supports the desired concurrency. Directly referencing the Akka actor is an unnecessary lock-in and complicates unit testing. The Lookup trait provides an abstraction of the lower level service, exposing a function that takes a single argument and returns a Future that will ultimately return the result.

trait Lookup[A, R] extends Function1[A, Future[R]] {
  def apply(arg: A): Future[R]
}

A simple service that returns the balance of an account might then have the form

object Balance {
  def apply(acct: Acct)(implicit balLook: Lookup[Acct, Bal]): Future[Bal] = 
     balLook(acct)
}

The following service applies a discount to the balance of certain accounts. Two concurrent calls are made to determine the account balance and whether the account qualifies for the discount.

object Discount {
  def apply(acct: Acct)(implicit balLook: Lookup[Acct, Bal],
                                 specialLook: Lookup[Acct, Boolean]): Future[Bal] = {
    val balance = balLook(acct)
    val special = specialLook(acct)
    for {
      val bal <- balance
      val spec <- special
    } yield if (spec) bal * 0.9F else bal
  }
}

Determine the discounted balance, given the phone number of the account. Note how easily the two services are composed,

object DiscountByPhone {
  def apply(pn: Num)(implicit acctLook: Lookup[Num, Acct],
                              balLook: Lookup[Acct, Bal], 
                              specialLook: Lookup[Acct, Boolean]): Future[Bal] =
    acctLook(pn) flatMap { Discount(_) }
}

Determine the sum of the discounted balances, given an Id (e.g. SSN) that maps to many accounts.

object DiscountById {
  def apply(id: Id)(implicit numLook: Lookup[Id, List[Num]], 
                             acctLook: Lookup[Num, Acct], 
                             balLook: Lookup[Acct, Bal], 
                             specialLook: Lookup[Acct, Boolean]): Future[Bal] =
    numLook(id) flatMap { Future.traverse(_)(DiscountByPhone(_)) } 
          map { _.reduce(_ + _) }
}

This might be clearer, split into several lists with typed temporary variables

object DiscountById {
  def apply(id: Id)(implicit numLook: Lookup[Id, List[Num]], 
                             acctLook: Lookup[Num, Acct],
                             balLook: Lookup[Acct, Bal],
                             specialLook: Lookup[Acct, Boolean]): Future[Bal] = {
    val numbers: Future[List[Num]] = numLook(id)
    val balances: Future[List[Bal]] = numbers flatMap
             { ns: List[Num] => Future.traverse(ns) { 
                 n: Num => DiscountByPhone(n) } }
    balances map { _.reduce(_ + _) }
  }
}

The above does require knowledge of Scala (and its functional idioms) and the akka Futures API. An internal DSL could permit a simpler representation, hiding some of the implementation details.

Nonetheless, this is still quite straightforward and very powerful.

Posted in Akka, Scala | Leave a Comment »

Surprising initialization of abstract Scala vals

Posted by eggsearle on January 11, 2012

Consider the problem of computing deltas between entries in a stream of events.  Each event is paired with the previous value and delivered to a derived implementation. This requires an initial value to be paired with initial value, what might be called the zero.

package function
trait Delta[T] {
   def zero: T
   private var before: T = zero
   def update(current: T) {
      if (before != current) {
         delta(before, current)
         before = current
      }
   }
   def delta(before: T, current: T)
}

In the above code, the zero is provided by an abstract function.

The implementation might then be

case class StringDelta extends Delta[String] {
def zero = ""
def delta(before: String, current: String) {
      println(before, current)
   }
}

 

This works fine, but looks a little odd. The zero is a constant, which one might expect to be defined using a val. Changing def zero to val zero compiles (Scala permits abstract vals) but the run-time value is actually null.

Changing the definition to be lazy, i.e. lazy val zero = “” , restores the desired behavior. The underlying implementation of lazy is obviously rather similar to def, being executed on first reference to the name.

 

 

Posted in Uncategorized | Leave a Comment »