Richard Searle's Blog

Thoughts about software

Archive for February, 2012

Data enrichment “forward” in Akka actor

Posted by eggsearle on February 28, 2012

The Akka  forward operation redirects a message to another actor, with any response flowing back to the originator of the message. This suffices if another actor can better process the message and the original message still suffices to describe the desired operation. This is not always the case:

  1. The second actor might require additional context from the forwarding actor
  2. The forwarding actor might provide data enrichment.

The tell actually accepts a second argument to specify the actor to which the response will be sent. That is normally the entity that invokes the tell operation but can actually be any actor. The tell operation can then be used to send on an augmented message  while retaining the originator actor.

The following code provides a (silly) example of how this might be used

case class DataEnrichmentActor extends Actor {
 def receive = {
   case msg ⇒ context.actorFor("/user/second").tell("enrichment"+msg,sender)
 }
}
Advertisements

Posted in Akka, Scala | Leave a Comment »

Testing akka system IO

Posted by eggsearle on February 25, 2012

All code needs tests and the contents of the previous post are no exception

The tests are surprisingly easy to define and certainly help to clarify how the implementation incrementally consumes the input as it becomes available.

import org.junit._
import Assert._
import akka.actor._
import akka.util.{ ByteString, ByteStringBuilder }
class LengthBoundedTest {
 import IO._
@Test
 def one {
 val i1 = LengthBoundedServer.readMessage(Chunk(ByteString("0001A")))
 assertEquals((Done("A"), Chunk(ByteString())), i1)
 assertEquals("A", i1._1.get)
 }
@Test
 def two {
 val i1 = LengthBoundedServer.readMessage(Chunk(ByteString("0002AB")))
 assertEquals((Done("AB"), Chunk(ByteString())), i1)
 assertEquals("AB", i1._1.get)
 }
@Test
 def oneTwo {
 val i1 = LengthBoundedServer.readMessage(Chunk(ByteString("0001A0002AB")))
 assertEquals((Done("A"), Chunk(ByteString(48, 48, 48, 50, 65, 66))), i1)
 assertEquals("A", i1._1.get)
 }
@Test
 def onePartial {
 val i1 = LengthBoundedServer.readMessage(Chunk(ByteString("0001")))
 assertEquals("(Cont(<function1>,None),Chunk(ByteString()))", i1.toString)
 assertEquals("", i1._1.get)
 val i2 = i1._1(Chunk(ByteString("A")))
 assertEquals((Done("A"), Chunk(ByteString())), i2)
 assertEquals("A", i2._1.get)
 }
}

Posted in Akka, Scala | Leave a Comment »

SImpler akka io example

Posted by eggsearle on February 23, 2012

Akka 2.0 provides a clean mechanism for non blocking I/O, abstracting all the details of NIO.

The example provided is very detailed but a little large to serve as an introduction.

The code below implements a very simple network service: read a length delimited ascii string and print it. The length is a 4 digit human readable number, for ease of testing.

The code can then exercised using netcat

nc localhost 9999 <<!
0001A0002XY
!

import akka.actor._
import akka.util.{ ByteString, ByteStringBuilder }
import java.net.InetSocketAddress

class LengthBoundedServer(port: Int) extends Actor {
  import IO._

  val state = IterateeRef.Map.async[IO.Handle]()(context.dispatcher)

  override def preStart {
    IOManager(context.system) listen new InetSocketAddress(port)
  }

  def receive = {

    case NewClient(server) =>
      val socket = server.accept()
      state(socket) flatMap (_ => LengthBoundedServer.printMessage)

    case Read(socket, bytes) =>
      state(socket)(Chunk(bytes))

    case Closed(socket, cause) =>
      state(socket)(EOF(None))
      state -= socket
  }

}

object LengthBoundedServer {
  import IO._
  def ascii(bytes: ByteString): String = bytes.decodeString("US-ASCII").trim

  def printMessage: IO.Iteratee[Unit] =
    repeat {
      for {
        string <- readMessage
 } yield {
    println(string)
    }
 }
def readMessage: IO.Iteratee[String] =
 for {
   lengthBytes <- take(4)
   len = ascii(lengthBytes).toInt
   bytes <- take(len)
 } yield {
   ascii(bytes)
 }
}
object Main extends App {
 val port = Option(System.getenv("PORT")) map (_.toInt) getOrElse 9999
 val system = ActorSystem()
 val server = system.actorOf(Props(new LengthBoundedServer(port)))
}

Posted in Akka, Scala | 5 Comments »

Akka dataflow example

Posted by eggsearle on February 18, 2012

Akka dataflow has a rather exotic form, especially given the need for a special compiler switch to enable the  Delimited Continuations plugin.

This presentation includes an interesting and realistic example.

Posted in Akka, Scala | Leave a Comment »

Play 2 expands the reach of Future based implementations

Posted by eggsearle on February 13, 2012

Play 2  provides another variant on the concept.  Their iteratees provides an interesting mechanism for handling streams of data.  Akka provides access to broader range of functionality (and would be expected now that Play falls under the Typesafe banner).

Posted in Uncategorized | Leave a Comment »

Adding typing to Monitor

Posted by eggsearle on February 5, 2012

The phantom type idiom can help to solve the difficulty of distinguishes Monitors that wrap the same type. Type erasure unfortunately limits its usefulness.

A bettter approach is to create a case class for each type, using inheritance to provide the boilerplate.

abstract class BaseMonitor[T, S](implicit p: S => T) {
  def s: S
  private val v = p(s)
  def apply[R](f: T => R) = synchronized { f(v) }
}

Example

    implicit def toDom(s: String): Document = (DocumentBuilderFactory.newInstance().newDocumentBuilder()).parse(new InputSource(new StringReader(s)))

    case class XA(s: String) extends BaseMonitor[org.w3c.dom.Document, String]

    val mdoc = XA("value")
    mdoc { _.getDocumentElement.getTextContent } must beEqualTo("value")

Note that the only field in the case class must be named s to match the abstract method in BaseMonitor.

The common BaseMonitor type structure can be extracted as follows

 type XML = BaseMonitor[org.w3c.dom.Document, String]
 case class XA(s: String) extends XML
 case class XB(s: String) extends XML

Posted in Akka, Scala | Leave a Comment »

Making mutable Java fit the workflow

Posted by eggsearle on February 5, 2012

The previous posts have all used Scala case classes to implement the illustrative Domain objects. The key attribute is their immutability, which allows to ignore where and how they are referenced. A production implementation will be brown field have to deal with existing Java code, which will not implement immutability. The signature JavaBean is explicitly mutable! The wide usage of XML means references to org.w3c.dom.Document, whose implementation is definitely not thread safe.

One might convert case classes to/from the external mutable implementations. For example, Camel’s translators could be added to the routes. The costs of such translators could be unacceptable  (in both development and runtime).

The mutable data could be hidden inside a wrapper class that only allows access under a monitor,  much like Collections.synchronized*.

A particular service often uses only a very small subset of the available functionality. That makes it sensible to have the wrapper accept functions to manipulate the object and return immutable results.

trait Monitor[T] {
  def apply[R](f: T => R): R
}

object Monitor {
  implicit def apply[S, T](s: S)(implicit p: S => T) = new MonitorImpl(p(s))

  private[Monitor] class MonitorImpl[T](private val v: T) extends Monitor[T] {
    def apply[R](f: T => R) = synchronized { f(v) }
  }
}

For example

    def content(d: Document) = d.getDocumentElement.getTextContent
    implicit def toDom(s: String): Document = (DocumentBuilderFactory.newInstance().newDocumentBuilder()).parse(new InputSource(new StringReader(s)))
    val mdoc: Monitor[Document] = "value"
    mdoc { content } must beEqualTo("value")

Which creates an instance from the serialized XML and then extracts the complete text content of the XML.

The Monitor.apply uses an implicit function to ensure the mutable data is not exposed. It also provides a particularly tidy usage, as seen in the above example. The gruesome details of the parsing are hidden away and the actual usage is stripped to bare essentials.

This approach does require discipline to ensure only immutable data is returned by any function executed by the Monitor.

The Monitor works quite well for unique domain classes, including XMLBeans created from XML.
Wrappers of org.w3c.dom.Document are essentially untyped, since they cannot be distinguished one from the other.  This is unfortunate when a single service implementation might reference a dozen different XML documents.

Posted in Akka, Scala | Leave a Comment »