Richard Searle's Blog

Thoughts about software

Akka system IO client

Posted by eggsearle on March 10, 2012


Network clients can also be implemented using Akka System IO.

The following provides a simple client that contents to the LengthCountedServer,  simplistically modified to echo back the received text to all attached clients

Note that all the operations are asynchronous, including the connection creation on line 24. The API returns a SocketHandle, even if the connection cannot be made. The clarity of system state we wait for the Connected operation before registering a handle.


case Read(socket, bytes) =>
 state(socket)(Chunk(bytes))
 state.foreach{p => p._1.asWritable.write(bytes)}

import akka.actor._
import akka.util.{ ByteString, ByteStringBuilder }
import java.net.InetSocketAddress

class LengthBoundedClient(port: Int) extends Actor {
  import IO._

  var handle: Option[SocketHandle] = None
  var respondTo: Option[ActorRef] = None

  def receive = {
    case Read(socket, bytes) =>
      respondTo.foreach { _ ! bytes }

    case Closed(socket, cause) =>
      handle = None

    case Connected(h, _) => handle = Some(h)

    case s: String => handle.foreach { _ write ByteString("%04d%s".format(s.length, s)) }

    case Attach(r) =>
      respondTo = Some(r);
      IOManager(context.system).connect(new InetSocketAddress(port))

    case Detach => handle.foreach { _.close }; handle = None
  }

}

case class Attach(respondTo: ActorRef)
case object Detach

case class RespondTo extends Actor {
  def receive = {
    case bytes: ByteString => println(bytes.decodeString("US-ASCII"))
  }
}

object Client extends App {
  val port = Option(System.getenv("PORT")) map (_.toInt) getOrElse 9999
  val system = ActorSystem()
  val client = system.actorOf(Props(new LengthBoundedClient(port)))
  val respondTo = system.actorOf(Props[RespondTo])

  while (true) {
    Console.readLine() match {
      case "A" => client ! Attach(respondTo)
      case "D" => client ! Detach
      case _@ s => client ! s
    }
  }

}



					
Advertisements

3 Responses to “Akka system IO client”

  1. Greetings… I was just looking at some of your blog posts and they are exactly what I need to build what I’m planning on building, especially your size-framed text message socket server. I am attempting to convert that into a server that deals with google protobuf messages and I’m struggling trying to get the toByteArray() array of bytes from the protobuf message into an Akka actor util ByteString … Can you provide a little direction on that?? That would be extremely helpful…Thanks!

    • eggsearle said

      Line 20 creates the ByteString from a String: ByteString("%04d%s".format(s.length, s)
      Line 36 extracts a String from the ByteString: bytes.decodeString("US-ASCII")

      ByteString can be created from a byte[] using the companion object: def apply(bytes: Array[Byte]): ByteString

      The bytes within a ByteString can be extracted:
      copyToArray which populates an existing byte[]
      toArray which allocates a new byte[]

      See the API for ByteString.

      Note that the ByeString implementation was extended and optimized in the newer Akka releases.

  2. […] the code from https://cognitiveentity.wordpress.com/2012/03/10/akka-system-io-client/ and https://cognitiveentity.wordpress.com/2012/02/23/simpler-akka-io-example/ to send data […]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
%d bloggers like this: