Richard Searle's Blog

Thoughts about software

Outlining the flow implementation infrastructure

Posted by eggsearle on April 4, 2011

This entry expands on the implementation of the Actor within which the flow will execute. The complete implementation is fairly complex, especially once structured to permit the usage of different actor libraries.

We will thus start with a simple implementation that illustrates the principles. It does not use any actors and is based on the InlineProcessor used for synchronous unit tests. Only linear flows (without any parallelism) can be evaluated.

finalResult captures the value contained within Result and is the value computed by the completed workflow.

pfs contains the RPF that is awaiting responses from sub-services that would permit it to execute. A serial flow (as in examples covered in early entries) would have only one outstanding RPF. A list is required to permit parallel invocation of sub-services.

The core receive method expects

  • tuple containing a CI and response value from the sub-service that has completed its processingThe pfs RPF is expected to match the CI and its contained function is evaluated with the response value to compute the next RPF. A Result value indicates the completion of the flow and its value is recorded for future reference (by the initiating test code). Any other value is an RPF for which a response is expected.
  • The first RPF of the flow to be executed.This is simply assigned to pfs. This RPF is computed by evaluating the flow.
class SimpleExecutor{
  var finalResult:Any = _
  var pfs:RPF = _
  def receive:PartialFunction[Any,Unit] = {
         case (ci:CI,r:Any) => process(ci,r)
         case f:RPF => pfs = f
   private def process(ci:CI,in:Any){
     pfs.apply(ci)(in) match {
        case Result(r) => finalResult = r
        case r:RPF => pfs = r

The above code is passive, so we need a runtime to drive it through its pace.

response records the result of the most recent sub-service invocation. Doubler illustrates how that interaction occurs.

The object creates an instance of SimpleExecutor, feeding it the result of evaluating the flow (which returns the first RPF). Each response from the sub-services are then feed to the instance to drive its flow, until it computes the final result.

A copy of response is required since process receive will cause response to be indirectly modified.

object InlineProcessor {

  var response:(CI,Int) = _

   def apply[A](flow:A=>RPF,initial:A) = {
      val processor = new SimpleExecutor
      processor receive Trigger(flow(initial))

      while(processor.finalResult == null){
         val copy = response
         processor receive copy

   object Doubler extends Lookup[Int,Int]{
       protected def call(arg:Int):CI = {
           val ci = CorrelationAllocator()
           response = ci->arg*2


A simple example flow that uses a Lookup to map an Int into some other Int.

def flow(lookup:Lookup[Int,Int]) = {arg:Int => lookup(arg){Result(_)}}

Use the above infrastructure to execute the flow with the Doubler lookup for the value 12.


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s

%d bloggers like this: