Reactive Programming with Akka and Scala

During our lab, we wanted to implement an application with Akka and Scala, because we’re going to evaluate highly performing and scalable software architectures on the JVM. In this blog we’re describing how to set up an Akka app and show a few simple demo applications.

Bootstrapping an Akka/Scala app

The basic setup of the application is simple. We use sbt as build tool. Therefore we need to create a build.sbt and add the required Akka artifacts as dependencies:

name := "The Akka Lab"

version := "0.1"

scalaVersion := "2.10.4 "

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.2.3",
  "com.typesafe.akka" %% "akka-testkit" % "2.2.3",
  "org.scalatest" %% "scalatest" % "2.0" % "test",
)

You can easily import the project into IntelliJ or use sbt plugins to generate project files for your preferred IDE:

Simple Message passing

After importing the project, we can implement our first ActorSystem. Its structure is shown below:

Simple Actor Systems Structure

We want to create a single ActorSystem called routing having a Receiver Actor called single next to a RoundRobinRouter router with 10 children of type Receiver. We just need to instantiate the ActorSystem and create both the children single and router. The RoundRobinRouter creates its children by itself:

    import scala.concurrent.duration._
    val duration = 3.seconds
    implicit val timeout = Timeout(duration)

    val sys = ActorSystem("Routing")

    val single = sys.actorOf(Props[Receiver](new Receiver(2.seconds.toMillis)), "single")
    val router = sys.actorOf(Props[Receiver].withRouter(RoundRobinRouter(nrOfInstances = 10)), "router")

The Receiver receives messages of type Message(String) and prints its message parameter. After receiving a message, we toggle the state of our receiver by using Akka’s become mechanism. So here is the definition of our Receiver actor:

  class Receiver(timeout: Long) extends Actor with ActorLogging {
    import demo.RoutingStrategies.Receiver._

    def this() = this(1000)

    override def receive = fastReceive

    def fastReceive: Receive = {
      case Message(m)=> {
        log.info(m)

        context.become(slowReceive)
      }
    }

    def slowReceive: Receive = {
      case Message(m) => {
        Thread.sleep(timeout)

        log.info(s"Slow: $m")

        context.become(fastReceive)
      }
    }
  }

As mentioned before, the actor simply prints the message. After it’s received a message it toggles its state from fastReceive to slowReceive and vice versa to simulate a more complex and time-consuming operation. Now that our system is complete, we can start sending messages to single and router:

// Sending a message by using actors path
sys.actorSelection("user/single") ! Message("Hello You, by path! [fast]") // fast really?

// Sending a message by using ActorRef
single ! Message("Hello You! [slow]") // slow really?
single ! Message("Hello You! [fast]") // fast really?

// Sending a message by using Router
router ! Message("Hello Anybody! [fast]")   // route message to next Receiver actor
router ! Broadcast(Message("Hello World! [1xslow, 9xfast]")) // route message to all Receiver actors

Right here we got our first problem. As you can see, expected that Akka preserves the order of the messages and that is true – as long as you don’t mix up sending messages by ActorRef and ActorSelection. In this case the only guarantee is that all messages sent to an ActorRef will have a defined order and all messages sent by an ActorSelection have a defined order, too. But between these two mechanism of addressing messages, there is no guaranteed order. The last thing that we want to try is to shut down the ActorSystem after all messages have been processed. Because we’re in a multi-threaded environment, we cannot simply shutdown the system at the end of the main method. We could call system.shutdown() and then use system.awaitTermination() to wait until all currently active operations are finished, but we don’t know whether all messages have been processed. For this reason, Akka provides the gracefulShutdown mechanism: Using it means that a special message, the PoisonPill, is enqueued in the actors mailbox. All messages before the PoisonPill will be processed normally. When the PoisonPill is processed, the actor terminates and sends a Terminated message. After we’ve picked up all Terminated messages, we can shutdown the system safely:

for {
  routerTerminated  <- gracefulStop(router, duration, Broadcast(PoisonPill))
  singleTerminated <- gracefulStop(single, duration)
} {
  sys.shutdown()
}

PingPong: Remote Messages

To try remoting in Akka, we have decided to play Actor ping-pong. The basic actor code is quite simple (simplified version):

object PingPongActor {
  case class Ping(message : String)
  case class Pong(message : String)
}

class PingPongActor extends Actor with ActorLogging {
  import demo.PingPongActor.{Pong, Ping}
  def receive = {
    case Ping(m) => {
      log.info(s"Ping: $m")
      sender ! Pong(m)
    }

    case Pong(m) => {
      log.info(s"Pong: $m")
      sender ! Ping(m)
    }
  }
}

Based on an Akka Remote Hello-World example we wrote a “client” and a “server” application and configured them using Typesafe Config. One of the Actors just needs to kick off the game and then both ping-pong happily ever after. As the message protocol is very simple, the application is well-suited to measure Akka message latencies. Hence, we attached a timestamp to each message using System#nanoTime(). However, as stated in the Javadoc of System#nanoTime(), it is only suited for time measurements within a single JVM. So, instead of measuring only the latency from one actor to the other, we decided to measure roundtrip latency which allows us to use System#nanoTime() safely. To measure them, both messages are extended by a timestamp property and receive is changed accordingly:

def receive = {
    case Ping(m, ts) => {
        log.info(s"Ping: $m")
        //just forward the timestamp
        sender ! Pong(m, ts)
    }

    case Pong(m, ts) => {
        val roundTripTime = System.nanoTime() - ts
        log.info(s"Pong: $m with round trip time $roundTripTime ns.")
        sender ! Ping(m, next, System.nanoTime())
    }
}

Our takeaways for this example:

  • Actor distribution is easily possible but it is not immediately obvious how actors are distributed (i.e. we have to write a client and server application in our case)
  • Time measurement in a distributed system requires some thought but we got away with a very simple solution to measure roundtrip latencies

Aside: Typesafe Config

We found that Typesafe config is noteworthy because it has an easy syntax, it is easy to use the Scala/Java API and it is Akka’s configuration mechanism. Typesafe config has a JSON-like syntax, called HOCON, that allows using different data types e.g. numbers, strings, arrays or nested “objects”. It also has built-in support for placeholder replacement. You can use it to override Akka defaults to tune your Akka application without changing a single line of code or to provide custom configuration for you own application. Here’s a structural excerpt from our application config:

  # overriding Akka defaults
  akka {
    ...
  }

  # server-side Akka overrides
  server {
    akka {
      ...
    }
  }

  # client-side Akka overrides
  client {
    ...
  }

In the server application, we’re loading the configuration as follows:

  // load akka defaults, ignore others
  val akkaConf = ConfigFactory.load("application-remoting.conf").withOnlyPath("akka")
  // load server default
  val serverConf = ConfigFactory.load("application-remoting.conf").getConfig("server")

  // merge server and akka config
  val conf = serverConf.withFallback(akkaConf)

First, we are loading the default configuration into akkaConf and afterwards the dedicated server configuration into serverConf. Finally, we merge them into a single configuration called conf. When we’re reading a property from conf, we will get the one from ‘akka’ block in server section if it is present or the one from the root ‘akka’ block if not. The same way, Akka reads defaults from reference.conf and overrides them with properties from application.conf if such a file is present in the application’s classpath. If you want to know Akka’s default configuration you can take a look into reference.conf or into the Akka documentation.

The Trading App

Finally, we wanted to try a more involved example which needs more domain modeling and a more sophisticated messaging protocol. The example is inspired by the trading application based on the Akka trading performance example but we deviated in multiple aspects. In its current state, the trading application has some quirks, lacks a lot of features and it currently even lets money evaporate in certain circumstances…. That’s not nice, but we were able to prototype some concepts and it is a starting place for further experiments and enhancements, which we’ll discuss later in more detail.

The Domain

The purpose of the application is to simulate market participants who want to buy securities. Each participant can place orders: buyers place a bid, sellers place an ask. Bids and asks are matched in an orderbook (one per security) and a trade is made. The algorithm is based on Akka’s OrderBook.scala. It basically tries to match the highest bids with the lowest asks as long as possible. If an order cannot be fulfilled entirely, it is split. All participants’ goods are tracked in accounts: securities are kept in a depot, cash is kept in a deposit. Each account is charged as soon as an order is placed to avoid overcommitment. Upon fulfillment of an order the goods are credited.

Modeling an Akka app

The application consists of two actors which are coupled by a custom router:

  • MarketParticipant: A market participant periodically places orders. It randomly decides whether to place a bid or an ask and also randomly decides on the offered price which is based on the current market price of the security including a random spread.
  • OrderBook: There is one OrderBook actor for each security within the system to match trades. It takes orders and periodically matches them. Afterwards, it notifies the involved MarketParticipants of the successful trade.
  • OrderRouter: We decided to couple MarketParticipants and OrderBookss via a custom router. During startup the router creates OrderBook actors. When an order arrives, it decides which OrderBook is responsible and forwards the order.

System Structure of the Trading System The diagram below shows the message flow of a trade through the system. The market participants place a bid and an ask through the OrderRouter which forwards the messages to the corresponding OrderBook for this security. It matches the orders and replies to both parties with BidResponse and AskResponse on success. They can in turn adjust their account balances accordingly. Message flow through the application

Implementation

The simulation exists in two flavours: A single-node implementation which is bootstrapped in TradingSimulationApp and a distributed implementation which is implemented in RemoteClientApp which simulates the market and RemoteServerApp which simulates order books. To configure various aspects of the application such as the securities or number or market participants we used Typesafe Config. Wiring of the specific implementation is achieved with the Cake pattern.

Open Issues

We very able to try a lot of features of Akka such as become/unbecome, stashing, custom routers or remoting. However, the domain allows to expand the example application further in many different aspects which we’ll describe shortly below.

Domain

Regarding the domain we see the following areas of improvement:

  • Losing money: The application holds money back in case an order is split or even evaporates it if the buying price differs from the bidding price. This hasn’t been much of an issue for our short-running simulation but it clearly is a showstopper for a real application. This issue can be solved in different ways. For example, we could cancel orders after a specific amount of time if they cannot be fulfilled or just reserve money instead of really charging the deposit.
  • Acknowledgements: Acknowledgements of order receipts would allow for easier state tracking by market participants.

Technological

  • Replication and fault tolerance: Currently, if an OrderBook actor fails, all open trades and the market valuation are lost. Using a dedicated supervisor node and a replication mechanism for each OrderBook would make the application far more reliable.
  • Monitoring: The demo could include a monitoring mechanism to visualize different business metrics such as current market prices, number of open orders, or aggregated revenue and also technical metrics such as messages delivered, message latency and throughput of the system.
  • Performance: The system is not tuned for performance at all. Based on the monitoring and different scenarios we could look into bottlenecks and tweak the system based on the vast configuration options of Akka.

Final Thoughts

Although our three lab days were very productive we barely scratched the surface of what’s possible with Akka. As you may have guessed from reading this blog post we struggled with some aspects but that’s a good sign: After all, we only learn something new by struggling first. We had a lot of fun in this lab and we’re looking forward to the next one to explore further aspects of Akka.

The sources of the demo can be found in the comSyto GitHub repo.

About these ads

One thought on “Reactive Programming with Akka and Scala

  1. Pingback: Akka: Links, News And Resources (9) | Angel "Java" Lopez on Blog

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s