Akka system IO client
Posted by eggsearle on March 10, 2012
Network clients can also be implemented using Akka System IO.
The following provides a simple client that contents to the LengthCountedServer, simplistically modified to echo back the received text to all attached clients
Note that all the operations are asynchronous, including the connection creation on line 24. The API returns a SocketHandle, even if the connection cannot be made. The clarity of system state we wait for the Connected operation before registering a handle.
case Read(socket, bytes) => state(socket)(Chunk(bytes)) state.foreach{p => p._1.asWritable.write(bytes)}
import akka.actor._ import akka.util.{ ByteString, ByteStringBuilder } import java.net.InetSocketAddress class LengthBoundedClient(port: Int) extends Actor { import IO._ var handle: Option[SocketHandle] = None var respondTo: Option[ActorRef] = None def receive = { case Read(socket, bytes) => respondTo.foreach { _ ! bytes } case Closed(socket, cause) => handle = None case Connected(h, _) => handle = Some(h) case s: String => handle.foreach { _ write ByteString("%04d%s".format(s.length, s)) } case Attach(r) => respondTo = Some(r); IOManager(context.system).connect(new InetSocketAddress(port)) case Detach => handle.foreach { _.close }; handle = None } } case class Attach(respondTo: ActorRef) case object Detach case class RespondTo extends Actor { def receive = { case bytes: ByteString => println(bytes.decodeString("US-ASCII")) } } object Client extends App { val port = Option(System.getenv("PORT")) map (_.toInt) getOrElse 9999 val system = ActorSystem() val client = system.actorOf(Props(new LengthBoundedClient(port))) val respondTo = system.actorOf(Props[RespondTo]) while (true) { Console.readLine() match { case "A" => client ! Attach(respondTo) case "D" => client ! Detach case _@ s => client ! s } } }
Advertisements
Kevin Hoffman said
Greetings… I was just looking at some of your blog posts and they are exactly what I need to build what I’m planning on building, especially your size-framed text message socket server. I am attempting to convert that into a server that deals with google protobuf messages and I’m struggling trying to get the toByteArray() array of bytes from the protobuf message into an Akka actor util ByteString … Can you provide a little direction on that?? That would be extremely helpful…Thanks!
eggsearle said
Line 20 creates the ByteString from a String:
ByteString("%04d%s".format(s.length, s)
Line 36 extracts a String from the ByteString:
bytes.decodeString("US-ASCII")
ByteString can be created from a byte[] using the companion object:
def apply(bytes: Array[Byte]): ByteString
The bytes within a ByteString can be extracted:
–
copyToArray
which populates an existing byte[]–
toArray
which allocates a new byte[]
See the API for ByteString.
Note that the ByeString implementation was extended and optimized in the newer Akka releases.
Akka IO using byte[] « Richard Searle's Blog said
[…] the code from https://cognitiveentity.wordpress.com/2012/03/10/akka-system-io-client/ and https://cognitiveentity.wordpress.com/2012/02/23/simpler-akka-io-example/ to send data […]