Akka Streaming without backpressure

Posted on Oct 11, 2019

Recently I bumped into an interesting issue that started like this:

curl: (18) transfer closed with outstanding read data remaining

After googling a bit, it seems that the response entity was never reaching entirely the client issuing the request, therefore curl failed because it was expecting more data than received, and then the connection terminated.

How the hell can this ever happen?

It can happen when there is a network interruption somewhere between the client and the service, or when packets are dropped and never sent, therefore never received.

Internet facing services are many times just a gateway for some more services behind it, so it could happen anywhere. For example, a curl like:

Client-Service call

could mean:

Client-Server with backend services

which means that one request might actually be processed by several services that will respond according to their responsibility:

  • one pushes logs (Log Processing Service; yeah I know, this is not the proper way to collect logs)
  • one takes care of the fraud and gives us the OK/KO to process the request (Fraud Detection Service)
  • the other one builds and pushes the actual response (Streaming Data Service), after taking it first from the Data Source

Maybe the Data Source is the culprit. However, if you try to run curl on this service, it works perfectly. Surprisingly, the same curl on the Streaming Data Service doesn’t work. We see in the logs repeatedly the following warning coming from the Data Source service:

Dropping the new element because buffer is full and overflowStrategy is [DropHead]

Essentially, if we call Data Source directly, all good. If we call the Streaming Data Service, we get just some data + the warning message above.

We dig into the code, and we find that it’s using akka-http to transfer Chunked HTTP responses. The server side code does something like:

val route =
  path("data") {
    val data: Source[MessageType, NotUsed] = getData
    complete(data)
  }

Aha! Behind the scenes, the Source is created like:

Source.actorRef[Any](bufferSize, OverflowStrategy.dropHead)

and used somewhere else to receive and forward messages:

Actor that receives and sends

The MessageSender actor sends messages to the actorRef behind the Source. However, in this scenario, there is no way to tell the MessageSender to slow down. It just takes a short network problem between the Streaming Data Service and the Data Source to drop part of the response.

Our streaming application should take into consideration and respect the contract between producer and consumer, so that if the consumer is slow, the producer should not just push data.

One solution to this problem would be to use a Source.actorRefWithAck, so that the MessageSender would only send a message after receiving an Ack from the Source. In other cases, a Source.queue would be more appropriate, as it allows to buffer messages and to backpressure the producer by giving responses like Enqueued, etc.

For example, the code below tries to send the next piece of data every time the queue is able to enqueue the message sent.

override def preStart(): Unit = {
    self ! DataToSend(initialData)
}
 
def receive: Receive = {
    case data: DataToSend =>
      queue.offer(x).map {
        case QueueOfferResult.Enqueued => self ! data.next()
        case QueueOfferResult.Dropped => // nothing to do
        case QueueOfferResult.Failure(ex) => // handle failure
        case QueueOfferResult.QueueClosed => // handle queue closed
      }
  }

This code should not be used for production* :) It’s just an example of what you can do with queue.offer(…).

Now our Streaming Data Service will be able to consume all messages, as the producer sends messages at the speed chosen by the consumer.

And the external curl will work out of the box, because all data will be transferred from one node to the other according to each consumer’s speed.

External References