domingo, 6 de enero de 2019

Use an Akka actor as stream source

This is my first english post, so please bear with me 😅

I've recently started toying with Akka Streams, by trying to add it to an existing Akka HTTP API that I have created to crawl and aggregate data from other APIs and websites. I thought it would be nice to finally put them in practice, and that it was a good place to start because the project involves some data transformation and processing. The first version of the project was fully actor based, so I wanted to integrate Akka Streams for minor data operations like cleaning and normalization, while keeping most of the core logic within the actors that I already had. So I eventually came to the point where I needed to send the data from an actor into the streams that I had defined.

To use a scenario similar to the one that I faced with, let's suppose that we have an actor that, upon receiving a message, will generate a sequence of numbers starting from 1, and send them to the actor who requested them, killing himself after doing so. We may have something like this:


It certainly does its job. But what if we want to use that list of numbers as a source for an Akka Streams flow? We can use the list as source, sure. But what if the list is too long that we may end up with a very large message? We can avoid this, and do something better by sending each number as an independent message directly to the flow. And we can do this by defining an actor as a source for the flow.

In order to do so, we will refactor our NumbersActor a bit as first step:
  • Instead of returning the numbers to the sender, we will send the numbers to another actor.
  • We will send the numbers one by one, to avoid sending a big message with a long list of numbers.
  • We will notify the recipient actor that all the numbers have been sent, by using a special message.

Note: I could send each number as is, but I decided to wrap them inside a special message case class, because you will usually won't send only simple types, but complex data.

After that, we will define our special actor source:


Now that we have our source, how do we use it? Well, we need to materialize it. In this case, we will connect the source directly to a sequence Sink, and run it.


This process will return a tuple that contains an ActorRef, and a Future that wraps the asynchronous execution of the stream. The special part here is the ActorRef, because this is the starting point of the source. We just need to send our messages to this actor, and they will be forwarded to the flow. For our example, we will create our NumbersActor and tell it to generate a sequence of numbers, and send it to the actor reference from our source.


Remember the special message that we used in the NumbersActor to notify that all the number messages were sent? Well, Status.Success is the message that we use to notify that the stream has been finished successfully.

Finally, at least for this example, we can wait for the results, and print them in the terminal, for completeness sake.


That is for the moment. You can find the full SBT project for this on GitHub, and the source code for this specific example here. If you clone the repository, you can run this specific example with the command:
sbt runActorSourceSample

Happy coding!

Related Articles

0 comentarios:

Publicar un comentario

Con la tecnología de Blogger.