Mongo Change Streams with Server-Sent Events in SpringBoot and Kotlin

Server-Sent Events (SSE) is a mechanism that can be used as an alternative to WebSockets to push streaming data to a client, there are many fundamental differences and a quick google search for “SSE vs WebSockets” will give you a lot of details if you want to understand both technologies in a deeper way. For the purposes of this tutorial, I’m just going to mention two important SSE characteristics to consider:

  • SSE are unidirectional, meaning that you can only push data to the client and the client can only listen but can’t send anything back.
  • SSE is an HTTP standard that makes its implementation relatively simple, from the SpringBoot perspective, the implementation is very similar to any other REST endpoint and you can even, for example, reuse the same authentication/authorization layer that you would use for your other REST endpoints.

In this tutorial, we will see an example of a client frontend application that subscribes to data insertion events, and for that, we will also use Mongo Change Streams which will allow us to downstream Collection changes to our SSE endpoint.

All the code is hosted in this repository in case you want to skip the explanation and go straight to the implementation. The repo also includes a Getting Started guide if you want to run it and see how it works.

And before anything, a few disclaimers about this tutorial:

  • I’m using Spring Webflux so the example endpoints are using reactive streams.
  • The project is in Kotlin
  • The project requires Docker to spin up a Mongo replica set

The use case

So let’s get started by first painting the picture of the use case we want to implement for this example.

Let's imagine that we have a factory that has an assembly line, in this assembly line there will be a robot that will be posting events of everything that is happening to a given “ServiceA”, this service will receive the event and store it in the database. Now, let’s imagine that there’s another department in the factory that needs to be notified of these events, they will have a separate application that will display the events information in real-time. This application will listen to these events via an SSE in a different service called “ServiceB”.

The diagram below shows the high-level description for this scenario:

Project Overview

For this example, I decided to decouple the logic to add events into the database and listening to those events into two different services because in real life these two services might scale in a different way and might need different requirements since they are being used by different applications, one for the Robot and the other by a Frontend application.

When you download the project from the repository you will notice the following 3 modules:

  • serviceA: Exposes an endpoint to be able to create and add events into a Mongo Database  with POST /events.
  • serviceB: Exposes an SSE that allows clients to listen to any new events added to the database with GET /events/stream.
  • library: Contains common logic used by two services such as the database connection and any logic related to the database, including the code to listen for database changes.

Setting up the database

As mentioned earlier, for this tutorial we will use Mongo Change Streams, and for that we not only need a Mongo Database but it needs to be set up as a replica set since Change Streams are only supported in this manner and to accomplish that I have used Docker Compose to spin up 3 Mongo Containers which will be configured as a replica set.

For simplicity and because that’s not the goal of this tutorial I’m not going to explain how to configure the replica set but you can see the example for a docker-compose.yml in my repository.

There are many tutorials and resources that show how to configure a Mongo replica set with docker-compose, just please, keep in mind that the docker-compose.yml from this example is just for testing and demonstrating purposes and is not a recommended configuration for a production environment because all the containers will be hosted in the same machine.

Adding Events to the database

Consider the following Event class that represents a document in our events Mongo Collection:

@Document("events")
data class Event (
   @Id val id: String = UUID.randomUUID().toString(),
   val inStage: String,
   val outStage: String,
   val productId: String,
   val info: String?,
   val createdOn: Instant = Instant.now()
)

In our ServiceA, we will have the following Router that defines the POST endpoint to create the event.

@Configuration
class Router(private val handler: Handler) {

   @Bean("routes")
   fun routes(): RouterFunction<ServerResponse> =
       router {
           POST("events", handler::createEvent)
       }
}

And the following Handler component shows how the request gets processed. Notice I’m using a two layer architecture so I need to convert a DTO to a database object before inserting.

@Component
class Handler(private val service: EventService) {

   fun createEvent(request: ServerRequest): Mono<ServerResponse> =
       request.bodyToMono<EventDTO>()
           .map { it.toEvent() }
           .flatMap { service.addEvent(it) }
           .map { it.toDTO() }
           .flatMap { ok().json().bodyValue(it) }
}

The code for the EventService class to add the document to the database is pretty standard, I’m using the ReactiveMongoRepository Interface and because it is not the goal of this example, I’m not going to show the code but you can see the implementation in my repository.

Listening for changes in the Database

Ok, now in serviceB we need to know when changes happen in our events Collection and for that I have implemented a class called EventListener that uses ReactiveMongoTemplate to listen for the Change Stream:

@Component
class EventListener(private val template: ReactiveMongoTemplate) {

   fun subscribe(): Flux<Event> {
       return template
           .changeStream(Event::class.java)
           .watchCollection("events")
           .listen()
           .map { it.body }
   }
}

Change Streams support cool features so you could even do things such as filtering and only react to specific type of events:  

fun subscribe(stage: String): Flux<Event> {
   return template
       .changeStream(Event::class.java)
       .watchCollection("events")
       .filter(Criteria.where("inStage").inValues(stage))
       .listen()
       .map { it.body }
}

Or you can only listen for specific type of operations, for example, in this case, we only want to listen for inserts in the events Collection, but you could also specify to listen for updates or deletes.

fun subscribe(): Flux<Event> {
   return template
       .changeStream(Event::class.java)
       .watchCollection("events")
       .filter(Criteria.where("operationType").`in`("insert"))
       .listen()
       .map { it.body }
}

This Spring Repo contains more information about how to implement Change Streams in Spring.

Stream Events with SSE

The last piece of the puzzle is to downstream those Collection changes to our SSE endpoint in serviceB, for that we have to define another Router:

@Configuration
class Route(private val handler: Handler) {

   @Bean("routes")
   fun routes(): RouterFunction<ServerResponse> =
       router {
           GET("events/stream", handler::subscribe)
       }
}

With the corresponding handler that uses the EvenListener class, defined in the previous step to subscribe to the Collection changes:

@Component
class Handler(private val eventListener: EventListener) {

   fun subscribe(request: ServerRequest): Mono<ServerResponse> =
       request.toMono()
           .map { eventListener.subscribe() }
           .flatMap { ServerResponse.ok().sse().body(it) }
}

When you have both services running and you add events with ServiceA you will see events streaming in ServiceB like this:

That’s it! I hope you found this article, at least, somehow useful.

Until next time and, happy coding! 🌮

Photo by Mika Baumeister on Unsplash