Richard Searle's Blog

Thoughts about software

Archive for April, 2011

Source code on github

Posted by eggsearle on April 5, 2011

The source code is available onĀ GitHub

Posted in Uncategorized | Leave a Comment »

Outlining the flow implementation infrastructure

Posted by eggsearle on April 4, 2011

This entry expands on the implementation of the Actor within which the flow will execute. The complete implementation is fairly complex, especially once structured to permit the usage of different actor libraries.

We will thus start with a simple implementation that illustrates the principles. It does not use any actors and is based on the InlineProcessor used for synchronous unit tests. Only linear flows (without any parallelism) can be evaluated.

finalResult captures the value contained within Result and is the value computed by the completed workflow.

pfs contains the RPF that is awaiting responses from sub-services that would permit it to execute. A serial flow (as in examples covered in early entries) would have only one outstanding RPF. A list is required to permit parallel invocation of sub-services.

The core receive method expects

  • tuple containing a CI and response value from the sub-service that has completed its processingThe pfs RPF is expected to match the CI and its contained function is evaluated with the response value to compute the next RPF. A Result value indicates the completion of the flow and its value is recorded for future reference (by the initiating test code). Any other value is an RPF for which a response is expected.
  • The first RPF of the flow to be executed.This is simply assigned to pfs. This RPF is computed by evaluating the flow.
class SimpleExecutor{
  var finalResult:Any = _
  var pfs:RPF = _
  def receive:PartialFunction[Any,Unit] = {
         case (ci:CI,r:Any) => process(ci,r)
         case f:RPF => pfs = f
   private def process(ci:CI,in:Any){
     pfs.apply(ci)(in) match {
        case Result(r) => finalResult = r
        case r:RPF => pfs = r

The above code is passive, so we need a runtime to drive it through its pace.

response records the result of the most recent sub-service invocation. Doubler illustrates how that interaction occurs.

The object creates an instance of SimpleExecutor, feeding it the result of evaluating the flow (which returns the first RPF). Each response from the sub-services are then feed to the instance to drive its flow, until it computes the final result.

A copy of response is required since process receive will cause response to be indirectly modified.

object InlineProcessor {

  var response:(CI,Int) = _

   def apply[A](flow:A=>RPF,initial:A) = {
      val processor = new SimpleExecutor
      processor receive Trigger(flow(initial))

      while(processor.finalResult == null){
         val copy = response
         processor receive copy

   object Doubler extends Lookup[Int,Int]{
       protected def call(arg:Int):CI = {
           val ci = CorrelationAllocator()
           response = ci->arg*2


A simple example flow that uses a Lookup to map an Int into some other Int.

def flow(lookup:Lookup[Int,Int]) = {arg:Int => lookup(arg){Result(_)}}

Use the above infrastructure to execute the flow with the Doubler lookup for the value 12.


Posted in Uncategorized | Leave a Comment »

Implementing Lookup

Posted by eggsearle on April 3, 2011

The previous blog entry was a broad brush description of the implementation concepts. This entry will cover the details of actually implementing the Lookup trait.

Lookup uses a curried apply to simplify the DSL, minimizing the visual clutter.

call is the actual implementation of the sub-service, launching the request-response interaction with arg and returning a CI instance that will appear in the corresponding response.

Pending provides a (Recursive)PartialFunction that matches the CI instance. It’s apply ignores the CI instance since its actual value has no impact on the processing. It returns a function that casts the request value to the expected type and evaluates the function originally provided in the call to Lookup.apply.

trait Lookup[A,R] {
  def apply(arg:A)(fn:R =>RPF):RPF = new Pending(call(arg),fn)
  protected def call(arg:A):CI

private class Pending[T](correlationId:CI,fn:T=>RPF) extends RPF{
    def isDefinedAt(ci:CI) = ci == correlationId
    def apply(ci:CI) =  {a:Any=>fn(a.asInstanceOf[T])}

case class CI(id:Int)

A trivial example Lookup that doubles the integer request

object DoubleLookup extends Lookup[Int,Int]{
   var responseQueue:List[(CI,Int)] = _
   protected def call(arg:Int) = {
      val ci = CorrelationAllocator()
      responseQueue = ci->arg*2 :: responseQueue

Note responseQueue which provides a trivial implementation of the mechanism required to asynchronously deliver the response. It captures the CI (to match the Pending instance that triggered the request and the actual value that will be processed via that Pending.

CorrelationAllocator is an object that creates sufficiently unique CI values.

Posted in Uncategorized | Leave a Comment »