4

Distributed Transactions in Microservices with Spring Boot

 3 years ago
source link: https://piotrminkowski.wordpress.com/2020/06/19/distributed-transactions-in-microservices-with-spring-boot/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Distributed Transactions in Microservices with Spring Boot

When I’m talking about microservices with other people they are often asking me about approach to distributed transactions. My advice is always the same – try to completely avoid distributed transactions in your microservices architecture. It is a very complex process with a lot of moving parts that can fail. That’s why it does not fit to the nature of microservices-based systems.

However, if for any reason you require to use distributed transactions, there are two popular approaches for that: Two Phase Commit Protocol and Eventual Consistency and Compensation also known as Saga pattern. You can read some interesting articles about it online. Most of them are discussing theoretical aspects related two those approaches, so in this article I’m going to present the sample implementation in Spring Boot. It is worth mentioning that there are some ready implementations of Saga pattern like support for complex business transaction provided by Axon Framework. The documentation of this solution is available here: https://docs.axoniq.io/reference-guide/implementing-domain-logic/complex-business-transactions.

Example

The source code with sample applications is as usual available on GitHub in the repository: https://github.com/piomin/sample-spring-microservices-transactions.git.

Architecture

First, we need to add a new component to our system. It is responsible just for managing distributed transactions across microservices. That element is described as transaction-server on the diagram below. We also use another popular component in microservices-based architecture discovery-server. There are three applications: order-service, account-service and product-service. The application order-service is communicating with account-service and product-service. All these applications are using Postgres database as a backend store. Just for simplification I have run a single database with multiple tables. In a normal situation we would have a single database per each microservice.

spring-microservice-transactions-arch1

Now, we will consider the following situation (it is visualized on the diagram below). The application order-service is creating an order, storing it in the database and then starting a new distributed transaction (1). After that it is communicating with application product-service to update the current number of stored products and get their price (2). In the same time product-service is sending information to transaction-server that it is participating int the transaction (3). Then order-service is trying to withdraw the required funds from customer account and transfer them into another account related to a seller (4). Finally, we are rolling back the transaction by throwing exception inside transaction method from order-service (6). This rollback should cause in rollback of the whole distributed transaction.

spring-microservices-transactions-arch2 (1)

Building transaction server

We are starting implementation from transaction-server. A transaction server is responsible for managing distributed transactions across all microservices in our sample system. It exposes REST API available for all other microservices for adding new transactions and updating their status. It also sends asynchronous broadcast events after receiving transaction confirmation or rollback from a source microservice. It using RabbitMQ message broker for sending events to other microservices via topic exchange. All other microservices are listening for incoming events, and after receiving them they are committing or rolling back transaction. We can avoid using message broker for exchanging events and use communication over HTTP endpoints, but that makes sense only if we have a single instance of every microservice. Here’s the picture that illustrates currently described architecture.

spring-microservice-transactions-server (1)

Let’s take a look on the list of required dependencies. It would be pretty the same for other applications. We need spring-boot-starter-amqp for integration with RabbitMQ, spring-boot-starter-web for exposing REST API over HTTP, spring-cloud-starter-netflix-eureka-client for integration with Eureka discovery server and some basic Kotlin libraries.

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

In the main class we are defining a topic exchange for events sent to microservices. The name of exchange is trx-events, and it is automatically created on RabbitMQ after application startup.

@SpringBootApplication
class TransactionServerApp {
@Bean
fun topic(): TopicExchange = TopicExchange("trx-events")
}
fun main(args: Array) {
runApplication(*args)
}

Here are domain model classes used by a transaction server. The same classes are used by the microservices during communication with transaction-server.

data class DistributedTransaction(var id: String? = null,var status: DistributedTransactionStatus,
val participants: MutableList<DistributedTransactionParticipant> = mutableListOf())
class DistributedTransactionParticipant(val serviceId: String, var status: DistributedTransactionStatus)
enum class DistributedTransactionStatus {
NEW, CONFIRMED, ROLLBACK, TO_ROLLBACK
}  

Here’s the controller class. It is using a simple in-memory implementation of repository and RabbitTemplate for sending events to RabbitMQ. The HTTP API provides methods for adding new transaction, finishing existing transaction with a given status (CONFIRM or ROLLBACK), searching transaction by id and adding participants (new services) into a transaction.

@RestController
@RequestMapping("/transactions")
class TransactionController(val repository: TransactionRepository,
val template: RabbitTemplate) {
@PostMapping
fun add(@RequestBody transaction: DistributedTransaction): DistributedTransaction =
repository.save(transaction)
@GetMapping("/{id}")
fun findById(@PathVariable id: String): DistributedTransaction? = repository.findById(id)
@PutMapping("/{id}/finish/{status}")
fun finish(@PathVariable id: String, @PathVariable status: DistributedTransactionStatus) {
val transaction: DistributedTransaction? = repository.findById(id)
if (transaction != null) {
transaction.status = status
repository.update(transaction)
template.convertAndSend("trx-events", DistributedTransaction(id, status))
}
}
@PutMapping("/{id}/participants")
fun addParticipant(@PathVariable id: String,
@RequestBody participant: DistributedTransactionParticipant) =
repository.findById(id)?.participants?.add(participant)
@PutMapping("/{id}/participants/{serviceId}/status/{status}")
fun updateParticipant(@PathVariable id: String,
@PathVariable serviceId: String,
@PathVariable status: DistributedTransactionStatus) {
val transaction: DistributedTransaction? = repository.findById(id)
if (transaction != null) {
val index = transaction.participants.indexOfFirst { it.serviceId == serviceId }
if (index != -1) {
transaction.participants[index].status = status
template.convertAndSend("trx-events", DistributedTransaction(id, status))
}
}
}
}  

Handling transactions in downstream services

Let’s analyze how our microservices are handling transaction on the example of account. Here’s the implementation of AccountService that is called by the controller for transfering funds from/to account. All methods here are @Transactional and here we need an attention – @Async. It means that each method is running in a new thread and is processing asynchronously. Why? That’s a key concept here. We will block the transaction in order to wait for confirmation from transaction-server, but the main thread used by the controller will not be blocked. It returns the response with current state of Account immediately.

@Service
@Transactional
@Async
class AccountService(val repository: AccountRepository,
var applicationEventPublisher: ApplicationEventPublisher) {
fun payment(id: Int, amount: Int, transactionId: String) =
transfer(id, amount, transactionId)
fun withdrawal(id: Int, amount: Int, transactionId: String) =
transfer(id, (-1) * amount, transactionId)
private fun transfer(id: Int, amount: Int, transactionId: String) {
val accountOpt: Optional<Account> = repository.findById(id)
if (accountOpt.isPresent) {
val account: Account = accountOpt.get()
account.balance += amount
applicationEventPublisher.publishEvent(AccountTransactionEvent(transactionId, account))
repository.save(account)
}
}
}

Here’s the implementation of @Controller class. As you see it is calling methods from AccountService, that are being processed asynchronously. The returned Account object is taken from EventBus bean. This bean is responsible for exchanging asynchronous events within the application scope. En event is sent by the AccountTransactionListener bean responsible for handling Spring transaction events.

@RestController
@RequestMapping("/accounts")
class AccountController(val repository: AccountRepository,
val service: AccountService,
val eventBus: EventBus) {
@PostMapping
fun add(@RequestBody account: Account): Account = repository.save(account)
@GetMapping("/customer/{customerId}")
fun findByCustomerId(@PathVariable customerId: Int): List<Account> =
repository.findByCustomerId(customerId)
@PutMapping("/{id}/payment/{amount}")
fun payment(@PathVariable id: Int, @PathVariable amount: Int,
@RequestHeader("X-Transaction-ID") transactionId: String): Account {
service.payment(id, amount, transactionId)
return eventBus.receiveEvent(transactionId)!!.account
}
@PutMapping("/{id}/withdrawal/{amount}")
fun withdrawal(@PathVariable id: Int, @PathVariable amount: Int,
@RequestHeader("X-Transaction-ID") transactionId: String): Account {
service.withdrawal(id, amount, transactionId)
return eventBus.receiveEvent(transactionId)!!.account
}
}

The event object exchanged between bean is very simple. It contains an id of transaction and the current Account object.

class AccountTransactionEvent(val transactionId: String, val account: Account)

Finally, let’s take a look on the implementation of AccountTransactionListener bean responsible for handling transactional events. We are using Spring @TransactionalEventListener for annotating method that should handle incoming events. There are 4 possible event types to handle: BEFORE_COMMIT, AFTER_COMMIT, AFTER_ROLLBACK and AFTER_COMPLETION. There is one very important thing in @TransactionalEventListener, which may be not very intuitive. It is being processed in the same thread as the transaction. So if you would to do something that should not block the thread with transaction you should annotate it with @Async. However, in our case this behaviour is required, since we need to block a transactional thread until we receive a confirmation or rollback from transaction-server for a given transaction. These events are sent by transaction-server through RabbitMQ, and they are also exchanged between beans using EventBus. If the status of received event is different than CONFIRMED we are throwing the exception to rollback transaction.
The AccountTransactionListener is also listening on AFTER_ROLLBACK and AFTER_COMPLETION. After receiving such event type it is changing the status of transaction by calling endpoint exposed by transaction-server.

@Component
class AccountTransactionListener(val restTemplate: RestTemplate,
val eventBus: EventBus) {
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
@Throws(AccountProcessingException::class)
fun handleEvent(event: AccountTransactionEvent) {
eventBus.sendEvent(event)
var transaction: DistributedTransaction? = null
for (x in 0..100) {
transaction = eventBus.receiveTransaction(event.transactionId)
if (transaction == null)
Thread.sleep(100)
else break
}
if (transaction == null || transaction.status != DistributedTransactionStatus.CONFIRMED)
throw AccountProcessingException()
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
fun handleAfterRollback(event: AccountTransactionEvent) {
restTemplate.put("http://transaction-server/transactions/transactionId/participants/{serviceId}/status/{status}",
null, "account-service", "TO_ROLLBACK")
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
fun handleAfterCompletion(event: AccountTransactionEvent) {
restTemplate.put("http://transaction-server/transactions/transactionId/participants/{serviceId}/status/{status}",
null, "account-service", "CONFIRM")
}
}

Here’s the implementation of the bean responsible for receiving asynchronous events from a message broker. As you see after receiving such event it is using EventBus to forward that event to another beans.

@Component
class DistributedTransactionEventListener(val eventBus: EventBus) {
@RabbitListener(bindings = [
QueueBinding(exchange = Exchange(type = ExchangeTypes.TOPIC, name = "trx-events"),
value = Queue("trx-events-account"))
])
fun onMessage(transaction: DistributedTransaction) {
eventBus.sendTransaction(transaction)
}
}

Integration with database

Of course our application is using Postgres as a backend store, so we need to provide integration. In fact, that is the simplest step of our implementation. First we need to add the following 2 dependencies. We will use Spring Data JPA for integration with Postgres.

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>

Our entity is very simple. Besides id field it contains two fields: customerId and balance.

@Entity
data class Account(@Id @GeneratedValue(strategy = GenerationType.AUTO) val id: Int,
val customerId: Int,
var balance: Int)

We are using well-known Spring Data repository pattern.

interface AccountRepository: CrudRepository<Account, Int> {
fun findByCustomerId(id: Int): List<Account>
}

Here’s the suggested list of configuration settings.

spring:
application:
name: account-service
datasource:
url: jdbc:postgresql://postgresql:5432/trx
username: trx
password: trx
hikari:
connection-timeout: 2000
initialization-fail-timeout: 0
jpa:
database-platform: org.hibernate.dialect.PostgreSQLDialect
hibernate:
ddl-auto: create
show-sql: true
properties:
hibernate:
format_sql: true
rabbitmq:
host: rabbitmq
port: 5672
connection-timeout: 2000

Building order-service

Ok, we have already finished the implementation of transaction-server, and two microservices account-service and product-service. Since the implementation of product-service is very similar to account-service, I have explained everything on the example of account-service. Now, we may proceed to the last part – the implementation of order-service. It is responsible for starting new transaction and marking it as finished. It also may finish it with rollback.Of course, rollback event may be sent by another two applications as well.
The implementation of @Controller class is visible below. I’ll describe it step by step. We are starting a new distributed transaction by calling POST /transactions endpoint exposed by transaction-server (1). Then we are storing a new order in database (2). When we are calling a transactional method from downstream service we need to set HTTP header X-Transaction-ID. The first transactional method that is called here is PUT /products/{id}/count/{count}(3). It updates the number of products in the store and calculates a final price (4). In the step it is calling another transaction method – this time from account-service (5). It is responsible for withdrawing money from customer account. We are enabling Spring transaction events processing (6). In the last step we are generating random number, and then basing on its value application is throwing exception to rollback transaction (7).

@RestController
@RequestMapping("/orders")
class OrderController(val repository: OrderRepository,
val restTemplate: RestTemplate,
var applicationEventPublisher: ApplicationEventPublisher) {
@PostMapping
@Transactional
@Throws(OrderProcessingException::class)
fun addAndRollback(@RequestBody order: Order) {
var transaction  = restTemplate.postForObject("http://transaction-server/transactions",
DistributedTransaction(), DistributedTransaction::class.java) // (1)
val orderSaved = repository.save(order) // (2)
val product = updateProduct(transaction!!.id!!, order) // (3)
val totalPrice = product.price * product.count // (4)
val accounts = restTemplate.getForObject("http://account-service/accounts/customer/{customerId}",
Array<Account>::class.java, order.customerId)
val account  = accounts!!.first { it.balance >= totalPrice}
updateAccount(transaction.id!!, account.id, totalPrice) // (5)
applicationEventPublisher.publishEvent(OrderTransactionEvent(transaction.id!!)) // (6)
val r = Random.nextInt(100) // (7)
if (r % 2 == 0)
throw OrderProcessingException()
}
fun updateProduct(transactionId: String, order: Order): Product {
val headers = HttpHeaders()
headers.set("X-Transaction-ID", transactionId)
val entity: HttpEntity<*> = HttpEntity<Any?>(headers)
val product = restTemplate.exchange("http://product-service/products/{id}/count/{count}",
HttpMethod.PUT, null, Product::class.java, order.id, order.count)
return product.body!!
}
fun updateAccount(transactionId: String, accountId: Int, totalPrice: Int): Account {
val headers = HttpHeaders()
headers.set("X-Transaction-ID", transactionId)
val entity: HttpEntity<*> = HttpEntity<Any?>(headers)
val account = restTemplate.exchange("http://account-service/accounts/{id}/withdrawal/{amount}",
HttpMethod.PUT, null, Account::class.java, accountId, totalPrice)
return account.body!!
}
}

Conclusion

Even a trivial implementation of distributed transactions in microservices, like the one demonstrated in this article, may be complicated. As you see we need to add a new element to our architecture, transaction-server, responsible only for distributed transaction management. We also have to add message broker in order to exchange events between our applications and transaction-server. However, many of you were asking me about distributed transactions in microservices world, so I decided to build these simple demo system. I’m waiting for you feedback and opinions.


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK