Richard Searle's Blog

Thoughts about software

Archive for the ‘Akka’ Category

Server Sent Events and Akka IO performance

Posted by eggsearle on November 11, 2012

The experiment was extended with an Akka IO implementation, using https://gist.github.com/4051806 as the basis.

Latency is 16ms and CPU load 12%
The former is a little worse than the alternatives and the loading falls between them.
The IO implementation is merely a sample, so there is likely some performance improvement to be had.

 

Posted in Akka, Scala | Leave a Comment »

Akka system IO client

Posted by eggsearle on March 10, 2012


Network clients can also be implemented using Akka System IO.

The following provides a simple client that contents to the LengthCountedServer,  simplistically modified to echo back the received text to all attached clients

Note that all the operations are asynchronous, including the connection creation on line 24. The API returns a SocketHandle, even if the connection cannot be made. The clarity of system state we wait for the Connected operation before registering a handle.


case Read(socket, bytes) =>
 state(socket)(Chunk(bytes))
 state.foreach{p => p._1.asWritable.write(bytes)}

import akka.actor._
import akka.util.{ ByteString, ByteStringBuilder }
import java.net.InetSocketAddress

class LengthBoundedClient(port: Int) extends Actor {
  import IO._

  var handle: Option[SocketHandle] = None
  var respondTo: Option[ActorRef] = None

  def receive = {
    case Read(socket, bytes) =>
      respondTo.foreach { _ ! bytes }

    case Closed(socket, cause) =>
      handle = None

    case Connected(h, _) => handle = Some(h)

    case s: String => handle.foreach { _ write ByteString("%04d%s".format(s.length, s)) }

    case Attach(r) =>
      respondTo = Some(r);
      IOManager(context.system).connect(new InetSocketAddress(port))

    case Detach => handle.foreach { _.close }; handle = None
  }

}

case class Attach(respondTo: ActorRef)
case object Detach

case class RespondTo extends Actor {
  def receive = {
    case bytes: ByteString => println(bytes.decodeString("US-ASCII"))
  }
}

object Client extends App {
  val port = Option(System.getenv("PORT")) map (_.toInt) getOrElse 9999
  val system = ActorSystem()
  val client = system.actorOf(Props(new LengthBoundedClient(port)))
  val respondTo = system.actorOf(Props[RespondTo])

  while (true) {
    Console.readLine() match {
      case "A" => client ! Attach(respondTo)
      case "D" => client ! Detach
      case _@ s => client ! s
    }
  }

}



				

Posted in Akka, Scala | 3 Comments »

Data enrichment “forward” in Akka actor

Posted by eggsearle on February 28, 2012

The Akka  forward operation redirects a message to another actor, with any response flowing back to the originator of the message. This suffices if another actor can better process the message and the original message still suffices to describe the desired operation. This is not always the case:

  1. The second actor might require additional context from the forwarding actor
  2. The forwarding actor might provide data enrichment.

The tell actually accepts a second argument to specify the actor to which the response will be sent. That is normally the entity that invokes the tell operation but can actually be any actor. The tell operation can then be used to send on an augmented message  while retaining the originator actor.

The following code provides a (silly) example of how this might be used

case class DataEnrichmentActor extends Actor {
 def receive = {
   case msg ⇒ context.actorFor("/user/second").tell("enrichment"+msg,sender)
 }
}

Posted in Akka, Scala | Leave a Comment »

Testing akka system IO

Posted by eggsearle on February 25, 2012

All code needs tests and the contents of the previous post are no exception

The tests are surprisingly easy to define and certainly help to clarify how the implementation incrementally consumes the input as it becomes available.

import org.junit._
import Assert._
import akka.actor._
import akka.util.{ ByteString, ByteStringBuilder }
class LengthBoundedTest {
 import IO._
@Test
 def one {
 val i1 = LengthBoundedServer.readMessage(Chunk(ByteString("0001A")))
 assertEquals((Done("A"), Chunk(ByteString())), i1)
 assertEquals("A", i1._1.get)
 }
@Test
 def two {
 val i1 = LengthBoundedServer.readMessage(Chunk(ByteString("0002AB")))
 assertEquals((Done("AB"), Chunk(ByteString())), i1)
 assertEquals("AB", i1._1.get)
 }
@Test
 def oneTwo {
 val i1 = LengthBoundedServer.readMessage(Chunk(ByteString("0001A0002AB")))
 assertEquals((Done("A"), Chunk(ByteString(48, 48, 48, 50, 65, 66))), i1)
 assertEquals("A", i1._1.get)
 }
@Test
 def onePartial {
 val i1 = LengthBoundedServer.readMessage(Chunk(ByteString("0001")))
 assertEquals("(Cont(<function1>,None),Chunk(ByteString()))", i1.toString)
 assertEquals("", i1._1.get)
 val i2 = i1._1(Chunk(ByteString("A")))
 assertEquals((Done("A"), Chunk(ByteString())), i2)
 assertEquals("A", i2._1.get)
 }
}

Posted in Akka, Scala | Leave a Comment »

SImpler akka io example

Posted by eggsearle on February 23, 2012

Akka 2.0 provides a clean mechanism for non blocking I/O, abstracting all the details of NIO.

The example provided is very detailed but a little large to serve as an introduction.

The code below implements a very simple network service: read a length delimited ascii string and print it. The length is a 4 digit human readable number, for ease of testing.

The code can then exercised using netcat

nc localhost 9999 <<!
0001A0002XY
!

import akka.actor._
import akka.util.{ ByteString, ByteStringBuilder }
import java.net.InetSocketAddress

class LengthBoundedServer(port: Int) extends Actor {
  import IO._

  val state = IterateeRef.Map.async[IO.Handle]()(context.dispatcher)

  override def preStart {
    IOManager(context.system) listen new InetSocketAddress(port)
  }

  def receive = {

    case NewClient(server) =>
      val socket = server.accept()
      state(socket) flatMap (_ => LengthBoundedServer.printMessage)

    case Read(socket, bytes) =>
      state(socket)(Chunk(bytes))

    case Closed(socket, cause) =>
      state(socket)(EOF(None))
      state -= socket
  }

}

object LengthBoundedServer {
  import IO._
  def ascii(bytes: ByteString): String = bytes.decodeString("US-ASCII").trim

  def printMessage: IO.Iteratee[Unit] =
    repeat {
      for {
        string <- readMessage
 } yield {
    println(string)
    }
 }
def readMessage: IO.Iteratee[String] =
 for {
   lengthBytes <- take(4)
   len = ascii(lengthBytes).toInt
   bytes <- take(len)
 } yield {
   ascii(bytes)
 }
}
object Main extends App {
 val port = Option(System.getenv("PORT")) map (_.toInt) getOrElse 9999
 val system = ActorSystem()
 val server = system.actorOf(Props(new LengthBoundedServer(port)))
}

Posted in Akka, Scala | 5 Comments »

Akka dataflow example

Posted by eggsearle on February 18, 2012

Akka dataflow has a rather exotic form, especially given the need for a special compiler switch to enable the  Delimited Continuations plugin.

This presentation includes an interesting and realistic example.

Posted in Akka, Scala | Leave a Comment »

Adding typing to Monitor

Posted by eggsearle on February 5, 2012

The phantom type idiom can help to solve the difficulty of distinguishes Monitors that wrap the same type. Type erasure unfortunately limits its usefulness.

A bettter approach is to create a case class for each type, using inheritance to provide the boilerplate.

abstract class BaseMonitor[T, S](implicit p: S => T) {
  def s: S
  private val v = p(s)
  def apply[R](f: T => R) = synchronized { f(v) }
}

Example

    implicit def toDom(s: String): Document = (DocumentBuilderFactory.newInstance().newDocumentBuilder()).parse(new InputSource(new StringReader(s)))

    case class XA(s: String) extends BaseMonitor[org.w3c.dom.Document, String]

    val mdoc = XA("value")
    mdoc { _.getDocumentElement.getTextContent } must beEqualTo("value")

Note that the only field in the case class must be named s to match the abstract method in BaseMonitor.

The common BaseMonitor type structure can be extracted as follows

 type XML = BaseMonitor[org.w3c.dom.Document, String]
 case class XA(s: String) extends XML
 case class XB(s: String) extends XML

Posted in Akka, Scala | Leave a Comment »

Making mutable Java fit the workflow

Posted by eggsearle on February 5, 2012

The previous posts have all used Scala case classes to implement the illustrative Domain objects. The key attribute is their immutability, which allows to ignore where and how they are referenced. A production implementation will be brown field have to deal with existing Java code, which will not implement immutability. The signature JavaBean is explicitly mutable! The wide usage of XML means references to org.w3c.dom.Document, whose implementation is definitely not thread safe.

One might convert case classes to/from the external mutable implementations. For example, Camel’s translators could be added to the routes. The costs of such translators could be unacceptable  (in both development and runtime).

The mutable data could be hidden inside a wrapper class that only allows access under a monitor,  much like Collections.synchronized*.

A particular service often uses only a very small subset of the available functionality. That makes it sensible to have the wrapper accept functions to manipulate the object and return immutable results.

trait Monitor[T] {
  def apply[R](f: T => R): R
}

object Monitor {
  implicit def apply[S, T](s: S)(implicit p: S => T) = new MonitorImpl(p(s))

  private[Monitor] class MonitorImpl[T](private val v: T) extends Monitor[T] {
    def apply[R](f: T => R) = synchronized { f(v) }
  }
}

For example

    def content(d: Document) = d.getDocumentElement.getTextContent
    implicit def toDom(s: String): Document = (DocumentBuilderFactory.newInstance().newDocumentBuilder()).parse(new InputSource(new StringReader(s)))
    val mdoc: Monitor[Document] = "value"
    mdoc { content } must beEqualTo("value")

Which creates an instance from the serialized XML and then extracts the complete text content of the XML.

The Monitor.apply uses an implicit function to ensure the mutable data is not exposed. It also provides a particularly tidy usage, as seen in the above example. The gruesome details of the parsing are hidden away and the actual usage is stripped to bare essentials.

This approach does require discipline to ensure only immutable data is returned by any function executed by the Monitor.

The Monitor works quite well for unique domain classes, including XMLBeans created from XML.
Wrappers of org.w3c.dom.Document are essentially untyped, since they cannot be distinguished one from the other.  This is unfortunate when a single service implementation might reference a dozen different XML documents.

Posted in Akka, Scala | Leave a Comment »

Integrating Futures with the real world

Posted by eggsearle on January 28, 2012

The previous post described how to unit test a Futures based choreography implementation.

This post will examine how such implementation might be connected to the external world and thus perform useful work.

The Camel akka module permits integration of Camel routes with Akka actors and hence with Futures. Camel provides a wide range of components, supporting pretty much transport that might be required.

Two seperate Camel implementations are required:

  1. Expose the service to external clients
  2. Provide access to the lower level services required by this service.

The latter will always use a Request-Response MEP, while latter is determined by the needs of the client.

A JMS based service implementation that reads from one queue and responds on another is quite common. This might be implemented as follows

case class OWFlow[A, R](in: String, out: String, flow: A => Future[R]) {

  val inActor = Actor.actorOf(new InActor).start
  val outActor = Actor.actorOf(new OutActor).start

  private class OutActor extends Actor with Producer with Oneway {
    val endpointUri = out
  }

  private class InActor extends Actor with Consumer {
    val endpointUri = in

    def receive = {
      case akka.camel.Message(a: A, _) => (Future(a).flatMap(flow)).onComplete { outActor ! _ }
    }
  }
}

Where in and out are the Camel uris that reference the appropriate transport end-points. flow is the service to the connected between in and out.

The OutActor simply provides an oneway endpoint to Camel over which messages are sent.

The InActor

  1. Receives message from Camel endpoint.
  2. Creates a Future whose value is that message. The Future executes asynchronously, so the actor is not blocked.
  3. The Future executes the flow
  4. Upon completion, the result is send to the outActor
  5. The outActor delivers the result to the Camel endpoint.

An example using the JMS component

 val slbOW = new OWFlow("jms:slbIn", "jms:slbOut", SingleLineBalance.apply)

The lower level service can be implemented using a simple case class, wrapping a Map containing the data of interest.

case class Responder[K, V](map: Map[K, V]) {
  def apply(a: Any) = map(a.asInstanceOf[K])
}

This case class is then wired into a Camel route using the bean component and the Scala DSL.

val context = CamelContextManager.mandatoryContext
context.addRoutes(new RouteBuilder { "seda:num".bean(Responder(Map(
      Id(123) -> List(Num("124-555-1234"), Num("333-555-1234")))) })

The service can then be tested by sending a Num via the jms queue.

val producer = CamelContextManager.mandatoryContext.createProducerTemplate
producer.sendBody("jms:slbIn", Num("124-555-1234"))

The complete test can be found on github. That code uses seda components rather than jms.

Posted in Akka, Scala | Tagged: | Leave a Comment »

The future of Futures in Scala

Posted by eggsearle on January 28, 2012

This SIP provides a concise and clear description of how and why Futures will form an important part of any concurrent programming,

Posted in Akka, Scala | Leave a Comment »