Akka Streaming without backpressure

Recently I bumped into an interesting issue that started like this:

curl: (18) transfer closed with outstanding read data remaining

After googling a bit, it seems that the response entity was never reaching entirely the client issuing the request, therefore curl failed because it was expecting more data than received, and then the connection terminated.

How the hell can this ever happen?

It can happen when there is a network interruption somewhere between the client and the service, or when packets are dropped and never sent, therefore never received.

Internet facing services are many times just a gateway for some more services behind it, so it could happen anywhere. For example, a curl like:

could mean:

which means that one request might actually be processed by several services that will respond according to their responsibility:

  • one pushes logs (Log Processing Service; yeah I know, this is not the proper way to collect logs)
  • one takes care of the fraud and gives us the OK/KO to process the request (Fraud Detection Service)
  • the other one builds and pushes the actual response (Streaming Data Service), after taking it first from the Data Source

Maybe the Data Source is the culprit. However, if you try to run curl on this service, it works perfectly. Surprisingly, the same curl on the Streaming Data Service doesn’t work. We see in the logs repeatedly the following warning coming from the Data Source service:

Dropping the new element because buffer is full and overflowStrategy is [DropHead]

Essentially, if we call Data Source directly, all good. If we call the Streaming Data Service, we get just some data + the warning message above.

We dig into the code, and we find that it’s using akka-http to transfer Chunked HTTP responses. The server side code does something like:

val route =
  path("data") {
    val data: Source[MessageType, NotUsed] = getData
    complete(data)
  }

Aha! Behind the scenes, the Source is created like:

Source.actorRef[Any](bufferSize, OverflowStrategy.dropHead)

and used somewhere else to receive and forward messages:

The MessageSender actor sends messages to the actorRef behind the Source. However, in this scenario, there is no way to tell the MessageSender to slow down. It just takes a short network problem between the Streaming Data Service and the Data Source to drop part of the response.

Our streaming application should take into consideration and respect the contract between producer and consumer, so that if the consumer is slow, the producer should not just push data.

One solution to this problem would be to use a Source.actorRefWithAck, so that the MessageSender would only send a message after receiving an Ack from the Source. In other cases, a Source.queue would be more appropriate, as it allows to buffer messages and to backpressure the producer by giving responses like Enqueued, etc.

For example, the code below tries to send the next piece of data every time the queue is able to enqueue the message sent.

override def preStart(): Unit = {
    self ! DataToSend(initialData)
}
 
def receive: Receive = {
    case data: DataToSend =>
      queue.offer(x).map {
        case QueueOfferResult.Enqueued => self ! data.next()
        case QueueOfferResult.Dropped => // nothing to do
        case QueueOfferResult.Failure(ex) => // handle failure
        case QueueOfferResult.QueueClosed => // handle queue closed
      }
  }

This code should not be used for production ๐Ÿ™‚ It’s just an example of what you can do with queue.offer(…).

Now our Streaming Data Service will be able to consume all messages, as the producer sends messages at the speed chosen by the consumer.

And the external curl will work out of the box, because all data will be transferred from one node to the other according to each consumer’s speed.

External References

Akka metrics and traces with kamon.io

Lately I have been working again with Akka, a fantastic framework to build concurrent, fault tolerant systems.

At first, it came as a surprise to me that besides Lightbend telemetry there was almost nothing “officially developed” for something that I consider essential to build a reactive system.

As you may have seen in the previous post, responsiveness without numbers is a bit weird

– we are cool

– do you have numbers to say that?

– no

– then you are not cool. Not at all ๐Ÿ™‚

I am not entirely sure about how much it would cost to subscribe to Lightbend – you need to get in touch with sales, and you may get a contract that probably depends on the volume of your apps, number of nodes, etc. – I am quite sure it would not be so expensive as someone might think. Still, I would prefer not to pay for something that I consider to be basic – this is for me not ancillary.

Enter Kamon.io.

Kamon-io is a quite powerful set of open source libraries that you can plug into your JVM-based projects to gather metrics, traces, spans, and so forth. It’s based on AspectJ, which may not be the most standard way to do things in Scala, but we have to admit that Akka is another kind of beast. In Scala you might have stackable traits to provide metrics, but in Akka they sound like hacks (see here, for example) – it’s not fun that you can’t really “stack” the receive method. Even then, how would you intercept messages going through system actors? You couldn’t do that – it should be done by the akka core team.

Now, the library is quite easy to integrate with – it takes more time to understand what you actually want to measure – see the quickstart. I am going to skip this part, because it’s already documented.

What I would like to show you is how we collect custom metrics – as this is not documented anywhere.

Custom Metrics

As we are going to need Kamon.io to collect metrics, it might be a good idea to use the same approach based on AspectJ, so that the final result is like an extension of the original library that we create based on our needs.

Be wary that you could have something like this every time you want to add something your metrics:

Kamon.counter("app.orders.sent").increment()

but eventually you’ll get tired of it, considering it will bloat your actors code. It’s like having a logged line for each new request your web server is handling – most of the time, web frameworks provide filters that you can apply before/after some events happened, so there is no need to add a single “log.info” statement for that – just create and apply a filter. If you have many actors and many events to record, extracting the handling part might be a better option.

Now, all you need to do is the following: create a new module in your project to have a dedicated resource handling custom metrics. Create the aspect that will handle the interception of the events plus the relative action to take (in this case, simply increment some metric):


package metrics.instrumentation
import com.typesafe.config.ConfigFactory
import org.aspectj.lang.annotation._
import org.slf4j.LoggerFactory
import models.messages.CustomActorEvent
import metrics.CustomMetrics
@Aspect
class CustomActorInstrumentation {
private val config = ConfigFactory.load()
@Pointcut("execution(* org.mypackage.actors.CustomActor.aroundReceive(..)) && args(*, msg)")
def onCustomActorMessagePointcut(msg: Any): Unit = {}
@Before("onCustomActorMessagePointcut(msg)")
def onCustomActorMessageHandler(msg: Any): Unit = {
val customMetrics = CustomMetrics.forSystem("my-system")
msg match {
case e: CustomActorEvent =>
customMetrics.customEvent.increment()
}
}
}

and the CustomMetrics object that wraps all the metrics you want to record – you can find some interesting way to do it here.

Now, CustomActorEvent is a trait. Why do I use pattern matching on an trait, instead of the real message that is received by the actor? As mentioned here:

  • It is a good practice to put an actorโ€™s associated messages in its companion object. This makes it easier to understand what type of messages the actor expects and handles.

Therefore, we define messages inside the companion object that extend a trait that can be easily put into another package, so that we don’t have a tight coupling between the metric-handler and the actor itself.

One last thing worth mentioning: don’t forget to create the relative aop.xml file in your new module with the content you need:


<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"&gt;
<aspectj>
<aspects>
<aspect name="metrics.instrumentation.CustomActorInstrumentation"/>
</aspects>
<weaver>
<include within="metrics.instrumentation..*"/>
</weaver>
</aspectj>

view raw

aop.xml

hosted with ❤ by GitHub

You can find very useful information in the AspectJ documentation relative to the configuration.

Good to Know

You will need the following plugins if you plan to use the approach described above:


addSbtPlugin("io.kamon" % "sbt-aspectj-runner" % "1.1.0")
addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4")
addSbtPlugin("com.lightbend.sbt" % "sbt-aspectj" % "0.11.0")

view raw

plugins.sbt

hosted with ❤ by GitHub

Now, a question I would like to ask you is the following: what metrics are you collecting?