Introduction to Spring Data R2DBC with Kotlin

R2DBC is a specification that provides database connectivity in a non blocking way and Spring Data R2DBC is one of the implementations of this specification. Reactive Connectivity for Relational Databases is still relatively new and, if like me, you are working with Kotlin it can be a little bit challenging to find some examples, that’s the reason why I wanted to write this tutorial to show some of the most common use cases such as custom repositories, batch operations and transactions.

Configuration

Keep in mind that the examples that I will show in this tutorial use Spring Boot version 2.4.2, this is important because you will see examples of the DatabaseClient Interface in other tutorials where they use a version that corresponds to when Spring Data R2DBC was in experimental mode. Since October 2020, DatabaseClient was moved to Spring Core, this is the most recent version and hence using that for this tutorial. You can read more about the differences between org.springframework.data.r2dbc.core.DatabaseClient  and org.springframework.r2dbc.core.DatabaseClient here.

Adding dependencies

Besides the correct Spring version other things you will need to define in your Gradle file are:

  • Spring Data R2DBC
  • The database’s driver you will be using, in my case PostgreSQL
// build.gradle
plugins {
   id("org.springframework.boot") version "2.4.2"
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
    runtimeOnly("io.r2dbc:r2dbc-postgresql")
}

Configuring the connection

Specify the URL and credentials of your database in your application file as shown below, these properties will be used to create a default ConnectionFactory (you can create and customize your own by adding a new bean in any configuration class).

#application.yml
spring:
  r2dbc:
    url: r2dbc:postgresql://localhost/postgres
    username: postgres
    password: strongPassword

Custom Repositories

Performing CRUD operations for single tables it’s relatively easy. A simple implementation can be achieved by using the default functions defined in ReactiveCrudRepository Interface.

Consider a colors table, if you wanted to get all the colors from that table your code would look like this:

@Table("colors")
data class Color(
    @Id val id: Int?,
    val code: String,
    val name: String
)

interface ColorRepository: ReactiveCrudRepository<Color, Int>

fun getAllColors(): Flux<Color> {
    return colorRepository.findAll()
}

However if you want to perform more complex operations like getting data from joined tables you might want to use DatabaseClient instead so you can have more control over your queries and mapping the data to the correct classes.

One way to add custom functions to your repository is via composition. In the example below I’m creating a new interface where I can define any functions I want to describe my custom behavior.

interface ColorRepository: ReactiveCrudRepository<Color, Int>, CustomColorRepository {}

interface CustomColorRepository {
    fun findByProduct(productId: Int): Flux<Color>
}

Continue to the next section to see how the implementation for this custom function looks like.

Read

Continuing with the colors table example from earlier. Consider the following relation with a products table.

Now imagine that you want to get all the colors (including code and name) given a productId. The code snippet below shows a query to do that and how to execute it by using the sql function from DatabaseClient. You can also bind parameters to the query by using the bind function and passing the parameter name used in the query.

Notice this is the implementation of the custom interface CustomColorRepository that was declared above.

const val selectColorsByProduct = """
    SELECT co.id as color_id, co.code as color_code, co.name as color_name
    FROM colors as co
    INNER JOIN product_colors pc on co.id = pc.color_id
    WHERE product_id = :productId
"""

// Custom Repository implementations need to end with "Impl"
// You can configure this suffix by overriding the default value
// of repositoryImplementationPostfix from EnableR2dbcRepositories 
class CustomColorRepositoryImpl(
    private val databaseClient: DatabaseClient,
    private val mapper: ColorMapper
): CustomColorRepository {
    override fun findByProduct(productId: Int): Flux<Color> {
        return databaseClient.sql(selectColorsByProduct)
            .bind("productId", productId)
            .map(mapper::apply)
            .all()
    }
}

Mapping

You might have noticed that we are also mapping the results of the query using the map function. When using DatabaseClient we always want to do this otherwise it doesn’t know how to map your results to the corresponding object. The code below shows what that mapper looks like.

@Component
class ColorMapper: BiFunction<Row, Any, Color> {
    override fun apply(row: Row, o: Any): Color {
       return Color(
           row.get("color_id", Number::class.java)?.toInt(),
           row.get("color_code", String::class.java) ?: "",
           row.get("color_name", String::class.java) ?: ""
       )
   }
}

Insert

Inserting, updating and deleting data into the database can be easily accomplished if you use the default functions from ReactiveCrudRepository, however, you can also use DatabaseClient to perform these operations. The following sections show an example for each one of these operations by using the sql function to execute the statements and the bind function to add any required parameters.

const val insertProduct = """
   INSERT INTO products (code, description) VALUES (:code, :description)
"""
class CustomProductRepositoryImpl(
   private val databaseClient: DatabaseClient,
   private val mapper: ProductMapper
): CustomProductRepository {

   override fun save(product: Product): Mono<Product> {
       return databaseClient.sql(insertProduct)
           .filter { statement, _ -> statement.returnGeneratedValues("id").execute() }
           .bind("code", product.code)
           .bind("description", product.description)
           .fetch()
           .first()
           .map { product.copy(id = it["id"] as Int) }
   }
}

Update

const val updateProduct = """
   UPDATE products SET code = :code, description = :description 
   WHERE id = :productId
"""
class CustomProductRepositoryImpl(
   private val databaseClient: DatabaseClient,
   private val mapper: ProductMapper
): CustomProductRepository {

    override fun update(product: Product): Mono<Int> {
       return databaseClient.sql(updateProduct)
           .bind("code", product.code)
           .bind("description", product.description)
           .bind("productId", product.id)
           .fetch().rowsUpdated()
    }
}

Delete

const val deleteAllProductColors = """
   DELETE FROM product_colors WHERE product_id = :productId
"""
class CustomProductRepositoryImpl(
   private val databaseClient: DatabaseClient,
   private val mapper: ProductMapper
): CustomProductRepository {

    override fun deleteAllProductColors(productId: Int): Mono<Int> {
        return databaseClient
           .sql(deleteAllProductColors)
           .bind("productId", productId)
           .fetch().rowsUpdated()
   }
}

Batch Operations

Batch operations are another good use case for DatabaseClient. Imagine that you want to associate a product with multiple colors, this means that you would need to add multiple entries to the product_colors table. This can be accomplished by using the inConnectionMany function, creating a connection statement and binding the values using a for loop.

const val insertProductColor = """
   INSERT INTO product_colors (product_id, color_id) VALUES ($1, $2)
"""
class CustomProductRepositoryImpl(
   private val databaseClient: DatabaseClient,
   private val mapper: ProductMapper
): CustomProductRepository {

   override fun insertProductColors(productId: Int, colorIds: List<Int>):
   Flux<Number> {
      return databaseClient.inConnectionMany { connection ->
         val statement = connection.createStatement(insertProductColor)
         colorIds.forEach {
            statement.bind(0, productId).bind(1, it).add()
         }
         statement.execute().toFlux().flatMap { result ->
             result.map { row, _ -> row.get("color_id", Int::class.java) }
         }
      }
   }
}

Transactions

Finally, another common feature needed when working with relational databases is having the ability to rollback changes if any of them fail when getting executed in a chain.

Consider the example of a Product. When creating a product, we want to add the product to the products table and also associate colors and sizes by adding entries to the product_colors and product_sizes tables respectively. In order to do this, first, we need to enable transaction management and create a ReactiveTransactionManager as shown in the snippet below.

@Configuration
@EnableTransactionManagement
class DatabaseConfiguration {

   @Bean
   fun transactionManager(connectionFactory: ConnectionFactory):
   ReactiveTransactionManager {
      return R2dbcTransactionManager(connectionFactory)
   }
}

The last step is to add a @Transactional annotation wherever you have a function that is chaining multiple repository calls. In the example below, If any of the productRepository functions were to fail, none of the changes will be applied to the database.

@Transactional
fun createProduct(product: Product): Mono<Product> {
   val colorIds = product.colors.map { it.id }
   val sizeIds = product.sizes.map { it.id }
   return productRepository
      .save(product)
      .flatMap { productRepository.insertProductColors(it.id, colorIds) }
      .flatMap { productRepository.insertProductSizes(it.id, sizeIds) }
}

Conclusion

That's it! I hope you find these examples useful, you can find the full code example on my GitHub repository. Happy Coding! 👋

Photo by Denis Pavlovic on Unsplash