Akka Streaming without backpressure

Recently I bumped into an interesting issue that started like this:

curl: (18) transfer closed with outstanding read data remaining

After googling a bit, it seems that the response entity was never reaching entirely the client issuing the request, therefore curl failed because it was expecting more data than received, and then the connection terminated.

How the hell can this ever happen?

It can happen when there is a network interruption somewhere between the client and the service, or when packets are dropped and never sent, therefore never received.

Internet facing services are many times just a gateway for some more services behind it, so it could happen anywhere. For example, a curl like:

could mean:

which means that one request might actually be processed by several services that will respond according to their responsibility:

  • one pushes logs (Log Processing Service; yeah I know, this is not the proper way to collect logs)
  • one takes care of the fraud and gives us the OK/KO to process the request (Fraud Detection Service)
  • the other one builds and pushes the actual response (Streaming Data Service), after taking it first from the Data Source

Maybe the Data Source is the culprit. However, if you try to run curl on this service, it works perfectly. Surprisingly, the same curl on the Streaming Data Service doesn’t work. We see in the logs repeatedly the following warning coming from the Data Source service:

Dropping the new element because buffer is full and overflowStrategy is [DropHead]

Essentially, if we call Data Source directly, all good. If we call the Streaming Data Service, we get just some data + the warning message above.

We dig into the code, and we find that it’s using akka-http to transfer Chunked HTTP responses. The server side code does something like:

val route =
  path("data") {
    val data: Source[MessageType, NotUsed] = getData
    complete(data)
  }

Aha! Behind the scenes, the Source is created like:

Source.actorRef[Any](bufferSize, OverflowStrategy.dropHead)

and used somewhere else to receive and forward messages:

The MessageSender actor sends messages to the actorRef behind the Source. However, in this scenario, there is no way to tell the MessageSender to slow down. It just takes a short network problem between the Streaming Data Service and the Data Source to drop part of the response.

Our streaming application should take into consideration and respect the contract between producer and consumer, so that if the consumer is slow, the producer should not just push data.

One solution to this problem would be to use a Source.actorRefWithAck, so that the MessageSender would only send a message after receiving an Ack from the Source. In other cases, a Source.queue would be more appropriate, as it allows to buffer messages and to backpressure the producer by giving responses like Enqueued, etc.

For example, the code below tries to send the next piece of data every time the queue is able to enqueue the message sent.

override def preStart(): Unit = {
    self ! DataToSend(initialData)
}
 
def receive: Receive = {
    case data: DataToSend =>
      queue.offer(x).map {
        case QueueOfferResult.Enqueued => self ! data.next()
        case QueueOfferResult.Dropped => // nothing to do
        case QueueOfferResult.Failure(ex) => // handle failure
        case QueueOfferResult.QueueClosed => // handle queue closed
      }
  }

This code should not be used for production πŸ™‚ It’s just an example of what you can do with queue.offer(…).

Now our Streaming Data Service will be able to consume all messages, as the producer sends messages at the speed chosen by the consumer.

And the external curl will work out of the box, because all data will be transferred from one node to the other according to each consumer’s speed.

External References

Akka metrics and traces with kamon.io

Lately I have been working again with Akka, a fantastic framework to build concurrent, fault tolerant systems.

At first, it came as a surprise to me that besides Lightbend telemetry there was almost nothing “officially developed” for something that I consider essential to build a reactive system.

As you may have seen in the previous post, responsiveness without numbers is a bit weird

– we are cool

– do you have numbers to say that?

– no

– then you are not cool. Not at all πŸ™‚

I am not entirely sure about how much it would cost to subscribe to Lightbend – you need to get in touch with sales, and you may get a contract that probably depends on the volume of your apps, number of nodes, etc. – I am quite sure it would not be so expensive as someone might think. Still, I would prefer not to pay for something that I consider to be basic – this is for me not ancillary.

Enter Kamon.io.

Kamon-io is a quite powerful set of open source libraries that you can plug into your JVM-based projects to gather metrics, traces, spans, and so forth. It’s based on AspectJ, which may not be the most standard way to do things in Scala, but we have to admit that Akka is another kind of beast. In Scala you might have stackable traits to provide metrics, but in Akka they sound like hacks (see here, for example) – it’s not fun that you can’t really “stack” the receive method. Even then, how would you intercept messages going through system actors? You couldn’t do that – it should be done by the akka core team.

Now, the library is quite easy to integrate with – it takes more time to understand what you actually want to measure – see the quickstart. I am going to skip this part, because it’s already documented.

What I would like to show you is how we collect custom metrics – as this is not documented anywhere.

Custom Metrics

As we are going to need Kamon.io to collect metrics, it might be a good idea to use the same approach based on AspectJ, so that the final result is like an extension of the original library that we create based on our needs.

Be wary that you could have something like this every time you want to add something your metrics:

Kamon.counter("app.orders.sent").increment()

but eventually you’ll get tired of it, considering it will bloat your actors code. It’s like having a logged line for each new request your web server is handling – most of the time, web frameworks provide filters that you can apply before/after some events happened, so there is no need to add a single “log.info” statement for that – just create and apply a filter. If you have many actors and many events to record, extracting the handling part might be a better option.

Now, all you need to do is the following: create a new module in your project to have a dedicated resource handling custom metrics. Create the aspect that will handle the interception of the events plus the relative action to take (in this case, simply increment some metric):


package metrics.instrumentation
import com.typesafe.config.ConfigFactory
import org.aspectj.lang.annotation._
import org.slf4j.LoggerFactory
import models.messages.CustomActorEvent
import metrics.CustomMetrics
@Aspect
class CustomActorInstrumentation {
private val config = ConfigFactory.load()
@Pointcut("execution(* org.mypackage.actors.CustomActor.aroundReceive(..)) && args(*, msg)")
def onCustomActorMessagePointcut(msg: Any): Unit = {}
@Before("onCustomActorMessagePointcut(msg)")
def onCustomActorMessageHandler(msg: Any): Unit = {
val customMetrics = CustomMetrics.forSystem("my-system")
msg match {
case e: CustomActorEvent =>
customMetrics.customEvent.increment()
}
}
}

and the CustomMetrics object that wraps all the metrics you want to record – you can find some interesting way to do it here.

Now, CustomActorEvent is a trait. Why do I use pattern matching on an trait, instead of the real message that is received by the actor? As mentioned here:

  • It is a good practice to put an actor’s associated messages in its companion object. This makes it easier to understand what type of messages the actor expects and handles.

Therefore, we define messages inside the companion object that extend a trait that can be easily put into another package, so that we don’t have a tight coupling between the metric-handler and the actor itself.

One last thing worth mentioning: don’t forget to create the relative aop.xml file in your new module with the content you need:


<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"&gt;
<aspectj>
<aspects>
<aspect name="metrics.instrumentation.CustomActorInstrumentation"/>
</aspects>
<weaver>
<include within="metrics.instrumentation..*"/>
</weaver>
</aspectj>

view raw

aop.xml

hosted with ❤ by GitHub

You can find very useful information in the AspectJ documentation relative to the configuration.

Good to Know

You will need the following plugins if you plan to use the approach described above:


addSbtPlugin("io.kamon" % "sbt-aspectj-runner" % "1.1.0")
addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4")
addSbtPlugin("com.lightbend.sbt" % "sbt-aspectj" % "0.11.0")

view raw

plugins.sbt

hosted with ❤ by GitHub

Now, a question I would like to ask you is the following: what metrics are you collecting?

Reactive Systems – Responsiveness

This article is the first of a series that I plan to write about Reactive Systems.

A brief analysis: it’s 2018 now, and we often read words like “react”, “reactive”, and similar. As if the word “reactive” itself was not ambiguous enough, someone even started baking frameworks naming them “react.js” – this is unfortunately completely unrelated to the concept of reactive systems as we conceive them.

Ambiguities aside, a few years ago a group of people decided that it was necessary to put together a few non-functional requirements they had learnt during their lives to be essential if you want to build good software – then they named this document the Reactive Manifesto.

Granted that I dislike manifestos, because they scream for attention and due to their PowerPoint-like nature they tend to be misunderstood and often overlooked, I have to admit that in this case, if you are truly, positively led by curiosity about what Reactive Systems are without preconceptions (oh, yet another manifesto, …), you will surely agree that Reactive is the correct term to be used here. I still wouldn’t have called it manifesto, though, and I wouldn’t have asked people to sign it – but these are personal preferences πŸ™‚

So, this document – the manifesto – describes Reactive Systems as responsive, resilient, elastic and message-driven.

 

reactive-traits

In this article I will focus primarily on the first principle: responsive.

What Does It Mean?

Responsive means that our systems need to respond in a timely manner to offer a smooth experience. What does timely mean? 1 second? 3 seconds? There are tons of non-academic studies showing that if your customers have to wait more than X seconds, then Y % of them will choose another competitor. This is of course only one of the measurements you might be interested to. You could be interested to when your brand new printer has to start processing the text, once it receives a new request. Is it OK to have the user of your service/product wait for 10 seconds? Maybe. Everything depends on the use case.

Why Is It So Difficult?

The challenge here implies dealing with real systems – stuff that needs to be maintained, deployed, reviewed, etc., not proof of concepts or your Sundays experiments. For example, in the following picture we can see what a real-world architecture based on micro-services (at Netflix) looks like:

sl32

Saying that your system is responsive means that you have been able to solve lots of the challenges that responsiveness brings along, not always easy to solve: legacy systems still needed, more micro-services than needed, network latency, performance overhead in the software and technologies used, code not optimized, software running on old hardware. This is all part of responsiveness.

Old Hardware? What…?

Before we get into more details, let me share a little observation: since the Cloud has taken off, I have noticed that we developers are less and less careful about the resources that our software needs.

Nowadays, we think in terms of CPU units, which translate differently according to the different cloud vendor (1 vCPU can be a full core or just a half, more or less – there are lots of comparisons between AWS/Azure/GCP out there). However, with these machines on demand, we barely know what processors they have! Who cares? Just give it a t2.large instance and that’s it. Server-less architectures increased even more this disconnection between developers and machines.

This is certainly part of a broader topic that involves costs optimization, resource consumption, and so forth, yet I consider it important, because it has an impact on responsiveness as well. If you use old machines, you may be disappointed.

What Is Responsiveness About?

Responsiveness has the noble goal of providing the best usability – it’s not fun to have to wait for 2 minutes to do something that we think could or should take less. Responsiveness has at its base foundation reliability and availability, and these all serve the goal of creating a valid SLA.

SLA: you may have in your contract that certain API calls will not respond in more than 10 seconds on average per day. By having numbers that define upper boundaries (how long should a response take?), we can quickly decide whether some event is exceptional and deserves attention – for example, last 10 requests took more than 5 seconds? If so, send an alert.

Availability: the famous 99.many-nines% that lots of cloud vendors offer. This simply measures the uptime/total-time, or the % of operable state of your service.

Reliability: often confused with availability, this is more related to stability and fault-tolerance. It measures how long a system performs its function (given an interval). For example, if the service is systematically down for 6 minutes per hour, its availability is 90%. However, its reliability is less than one hour, which could be way more interesting than the overall availability percentage.

A responsive system should be available and reliable, otherwise it can’t stay responsive. Even responding with an error is certainly better than not responding at all. Also, when we have numbers we can act on error conditions, we can offer guarantees, and we can sell a service that returns always something.

Why Responsiveness?

In fact, responsiveness is often perceived as an optimization “feature”, like security. The infamous misunderstanding of Donald Knut’s words “premature optimization is the root of all evil” didn’t help here.

Now, I love quality and I strongly believe it’s the main differentiator between multiple products – why to choose X instead of Y, W, and Z. I see also the value in trying to have stuff done, though. So, why don’t we implement from the ground up a mentality leading to high quality products? A mentality that doesn’t procrastinate, that is not lazy and that believes that the product under development will take off and will be successful. I see more and more often that due to this time to market madness, products lack a lot of non-functional features. Security, quick responses, usability, of course, depending on the domain. For some reason, we tend to think that non functional requirements are useless. However, the fact that we have multiple search engines, multiple e-commerce, etc., should tell us that time to market is important, yes, but on long term what matters is also the set of non functional requirements. You can’t always think that your customers will use your products because you were the first. Eventually someone will do the same and better and add a non-functional feature to it, like security, which seems pretty important lately.

Responsiveness is also important, considering that most people are connected to the internet via a mobile, and when something is slow on a mobile phone, it looks twice as slow as on a laptop – probably due to the fact the focus is higher on the little screen.
Long story short: plan for responsiveness as early as possible in your product roadmap. Don’t procrastinate, trust developers and define a threshold with them – it can be as stupid as a simple “this call has to take up to X seconds”. Only if you have numbers you can brag about it, otherwise it’s pure speculation.

Reality Check – The Role of Technology

There is a sort of myth about responsiveness that tells us that one of the first steps to have responsive services is to choose a great technology. In fact, in the Web Services/SaaS world, it seems that those are often chosen by a trend. As if that wasn’t enough, there are tons of benchmarks online, like https://www.techempower.com, that are often considered as a starting point to choose next framework or whatsoever.

Now, it’s stupidly simple to say my API is responsive, if all your API does is to return a canned response. No framework will disappoint you here, even some old CGI script is able to handle gazillion calls per minute on a modern machine. There are also benchmarks offering some “dynamic” features – like querying. Still, the question I ask myself is how relevant are those benchmarks for what we want to achieve?

I still believe it’s good to have such informative websites, because they give a rough idea about the computing power needed (which could decrease costs, like how big your EC2 instances have to be), yet we have to evaluate properly a technology before falling for it just because it’s in the top-10 fastest/quickest/<superlative-positive-adjective> technology. If you look at the charts on the website mentioned above, as of today, django is in deep troubles compared to almost any other technology out there. However, there are dozens of highly responsive websites using Django, for example instagram, Disqus, pinterest – you can find more here: https://stackshare.io/.

How Do We Achieve Responsiveness?

Having good technologies helps here. Same applies to good code, good design patterns, and so forth. However, if we are able to implement elasticity and resilience we are through.

Next article will focus on those two principles.