Connecting akka actors to scalaz-streams

I've been using scalaz-stream more and more lately, and I'm finding it to be quite pleasant to work with. Unfortunately I'm still running into akka actor systems often, as they seem to be ubiquitious. Luckly there is a nice queueing mechanism in the scalaz.stream.async package which can be used to easily create a process which is asynchronously recieving data from somewhere such as an actor. This asynchronous queueing mechanism has two components, a queue which you can write stream elements to, and a Process[Task,A] which emits values which were enqueued.

We'll start by creating an unbounded queue of strings, then we can call dequeue on the queue to get a Process[String] which will stream the Strings which were enqueued:

  // create a queue
  val queue: Queue[String] = async.unboundedQueue[String]
  // get the process fed by this queue    
  val strings: Process[Task,String] = queue.dequeue 

Now we can hand our queue to an actor which can asynchrounously write to the queue, here are the messages we will plan to send to our actor:

/** A string to enqueue */
case class Str(s: String)
/** A signal to terminate the process normally */
case object End
/** A signal to fail the process with an error */
case object AbEnd

The implementation of our actor is simple for our demonstration, it is just feeding the queue based on the messages it receives. When it receives a string, it enqueues it, which will make it available to the Process backed by the queue. When it receives an End message it will close the queue, which causes the Process to halt normally. When it receives an AbEnd message it fails the queue, which will cause the Process it backs to halt with the Exception passed to fail.

class EnqueueActor(queue: Queue[String]) extends Actor {
  def receive: Receive = {
    case Str(s) =>
      // add the string to the queue
      val enq: Task[Unit] = queue.enqueueOne(s)
      enq.run

    case End =>
      // close the queue which will halt the Process normally
      val close: Task[Unit] = queue.close
      close.run

    case AbEnd =>
      // fail the queue which will halt the Process with an error
      val fail: Task[Unit] = queue.fail(new Exception("fail"))
      fail.run
  }
}

Now we need something which is sending messages to the actor. For this demo, we are going to start a Thread which will read lines from stdin and send a message to the actor for each line of input, so we will create a Sink which looks for the special input lines "bye" or "die" as a signal to terminate the process, otherwise it passes the input line to the actor.

  // a Sink which will pass messages to our akka actor
  def toActor(recv: ActorRef): Sink[Task,String] = io.channel { str =>
    str match {
      case "bye" => Task.delay {
        recv ! End
        throw Cause.Terminated(Cause.End)
      }
      case "die" => Task.delay {
        recv ! AbEnd
        throw Cause.Terminated(Cause.End)
      }
      case x => Task.delay {
        recv ! Str(x)
      }
    }
  }

Our Thread for driving input to the actor has a simple run method which creates the actor, then hooks up stdin to our sink:

class ConsoleInput(queue: Queue[String]) extends Runnable {
  val system = ActorSystem("queue-demo")

  override def run(): Unit = {
    val actor = system.actorOf(Props(classOf[EnqueueActor], queue))
    (io.stdInLines to toActor(actor)).run.run
    system.shutdown()
  }
}

Here's our demo's Main class, all put together. It creates the queue, starts the input thread, then writes lines coming out of our output process to stdout:

object QueueDemo extends App {
  val queue: Queue[String] = async.unboundedQueue[String]
  val strings: Process[Task,String] = queue.dequeue

  val t = new Thread(new ConsoleInput(queue))
  t.start()

  val counted = strings map (str => s"${str.length} chars")
  (counted to io.printLines(System.out)).run.run

  t.join()
}

Here's a sample session with our application. I typed two strings "Hello world" and "0123456789" before sending "bye" which is a signal to terminate the process.

❯ sbt run
[info] Set current project to queue-demo (in build file:/Users/stew/devel/queuedemo/)
[info] Running queue.QueueDemo 
Hello world
11 chars
0123456789
10 chars
bye
[success] Total time: 19 s, completed Dec 7, 2014 1:50:19 PM

Its worth noting that here that we used an unboundedQueue, this is a queue which will always accept more inputs to be enqueued, storing more inputs in memory, there will be no form of backpressure if thigns are being enqueued faster than they are being read from the Process. One can also create a boundedQueue which, when the queue is full, will not complete the enqueue Task until there is room in the queue. The queue implementation itself can be found here.

I've posted the source code as a working project to github so that you can clone it and play with it yourself.