Distributed Transactions in Microservices with Spring Boot
source link: https://piotrminkowski.wordpress.com/2020/06/19/distributed-transactions-in-microservices-with-spring-boot/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Distributed Transactions in Microservices with Spring Boot
When I’m talking about microservices with other people they are often asking me about approach to distributed transactions. My advice is always the same – try to completely avoid distributed transactions in your microservices architecture. It is a very complex process with a lot of moving parts that can fail. That’s why it does not fit to the nature of microservices-based systems.
However, if for any reason you require to use distributed transactions, there are two popular approaches for that: Two Phase Commit Protocol and Eventual Consistency and Compensation also known as Saga pattern. You can read some interesting articles about it online. Most of them are discussing theoretical aspects related two those approaches, so in this article I’m going to present the sample implementation in Spring Boot. It is worth mentioning that there are some ready implementations of Saga pattern like support for complex business transaction provided by Axon Framework. The documentation of this solution is available here: https://docs.axoniq.io/reference-guide/implementing-domain-logic/complex-business-transactions.
Example
The source code with sample applications is as usual available on GitHub in the repository: https://github.com/piomin/sample-spring-microservices-transactions.git.
Architecture
First, we need to add a new component to our system. It is responsible just for managing distributed transactions across microservices. That element is described as transaction-server on the diagram below. We also use another popular component in microservices-based architecture discovery-server
. There are three applications: order-service
, account-service
and product-service
. The application order-service
is communicating with account-service
and product-service
. All these applications are using Postgres database as a backend store. Just for simplification I have run a single database with multiple tables. In a normal situation we would have a single database per each microservice.
Now, we will consider the following situation (it is visualized on the diagram below). The application order-service
is creating an order, storing it in the database and then starting a new distributed transaction (1). After that it is communicating with application product-service
to update the current number of stored products and get their price (2). In the same time product-service
is sending information to transaction-server
that it is participating int the transaction (3). Then order-service
is trying to withdraw the required funds from customer account and transfer them into another account related to a seller (4). Finally, we are rolling back the transaction by throwing exception inside transaction method from order-service
(6). This rollback should cause in rollback of the whole distributed transaction.
Building transaction server
We are starting implementation from transaction-server
. A transaction server is responsible for managing distributed transactions across all microservices in our sample system. It exposes REST API available for all other microservices for adding new transactions and updating their status. It also sends asynchronous broadcast events after receiving transaction confirmation or rollback from a source microservice. It using RabbitMQ message broker for sending events to other microservices via topic exchange. All other microservices are listening for incoming events, and after receiving them they are committing or rolling back transaction. We can avoid using message broker for exchanging events and use communication over HTTP endpoints, but that makes sense only if we have a single instance of every microservice. Here’s the picture that illustrates currently described architecture.
Let’s take a look on the list of required dependencies. It would be pretty the same for other applications. We need spring-boot-starter-amqp
for integration with RabbitMQ, spring-boot-starter-web
for exposing REST API over HTTP, spring-cloud-starter-netflix-eureka-client
for integration with Eureka discovery server and some basic Kotlin libraries.
<
dependency
>
<
groupId
>org.springframework.boot</
groupId
>
<
artifactId
>spring-boot-starter-amqp</
artifactId
>
</
dependency
>
<
dependency
>
<
groupId
>org.springframework.boot</
groupId
>
<
artifactId
>spring-boot-starter-web</
artifactId
>
</
dependency
>
<
dependency
>
<
groupId
>com.fasterxml.jackson.module</
groupId
>
<
artifactId
>jackson-module-kotlin</
artifactId
>
</
dependency
>
<
dependency
>
<
groupId
>org.jetbrains.kotlin</
groupId
>
<
artifactId
>kotlin-reflect</
artifactId
>
</
dependency
>
<
dependency
>
<
groupId
>org.jetbrains.kotlin</
groupId
>
<
artifactId
>kotlin-stdlib</
artifactId
>
</
dependency
>
<
dependency
>
<
groupId
>org.springframework.cloud</
groupId
>
<
artifactId
>spring-cloud-starter-netflix-eureka-client</
artifactId
>
</
dependency
>
In the main class we are defining a topic exchange for events sent to microservices. The name of exchange is trx-events
, and it is automatically created on RabbitMQ after application startup.
@
SpringBootApplication
class
TransactionServerApp {
@
Bean
fun topic()
:
TopicExchange
=
TopicExchange(
"trx-events"
)
}
fun main(args
:
Array) {
runApplication(*args)
}
Here are domain model classes used by a transaction server. The same classes are used by the microservices during communication with transaction-server
.
data
class
DistributedTransaction(
var
id
:
String?
=
null
,
var
status
:
DistributedTransactionStatus,
val
participants
:
MutableList<DistributedTransactionParticipant>
=
mutableListOf())
class
DistributedTransactionParticipant(
val
serviceId
:
String,
var
status
:
DistributedTransactionStatus)
enum
class
DistributedTransactionStatus {
NEW, CONFIRMED, ROLLBACK, TO
_
ROLLBACK
}
Here’s the controller class. It is using a simple in-memory implementation of repository and RabbitTemplate
for sending events to RabbitMQ. The HTTP API provides methods for adding new transaction, finishing existing transaction with a given status (CONFIRM
or ROLLBACK
), searching transaction by id
and adding participants (new services) into a transaction.
@
RestController
@
RequestMapping(
"/transactions"
)
class
TransactionController(
val
repository
:
TransactionRepository,
val
template
:
RabbitTemplate) {
@
PostMapping
fun add(
@
RequestBody transaction
:
DistributedTransaction)
:
DistributedTransaction
=
repository.save(transaction)
@
GetMapping(
"/{id}"
)
fun findById(
@
PathVariable id
:
String)
:
DistributedTransaction?
=
repository.findById(id)
@
PutMapping(
"/{id}/finish/{status}"
)
fun finish(
@
PathVariable id
:
String,
@
PathVariable status
:
DistributedTransactionStatus) {
val
transaction
:
DistributedTransaction?
=
repository.findById(id)
if
(transaction !
=
null
) {
transaction.status
=
status
repository.update(transaction)
template.convertAndSend(
"trx-events"
, DistributedTransaction(id, status))
}
}
@
PutMapping(
"/{id}/participants"
)
fun addParticipant(
@
PathVariable id
:
String,
@
RequestBody participant
:
DistributedTransactionParticipant)
=
repository.findById(id)?.participants?.add(participant)
@
PutMapping(
"/{id}/participants/{serviceId}/status/{status}"
)
fun updateParticipant(
@
PathVariable id
:
String,
@
PathVariable serviceId
:
String,
@
PathVariable status
:
DistributedTransactionStatus) {
val
transaction
:
DistributedTransaction?
=
repository.findById(id)
if
(transaction !
=
null
) {
val
index
=
transaction.participants.indexOfFirst { it.serviceId
==
serviceId }
if
(index !
=
-
1
) {
transaction.participants[index].status
=
status
template.convertAndSend(
"trx-events"
, DistributedTransaction(id, status))
}
}
}
}
Handling transactions in downstream services
Let’s analyze how our microservices are handling transaction on the example of account
. Here’s the implementation of AccountService
that is called by the controller for transfering funds from/to account. All methods here are @Transactional
and here we need an attention – @Async
. It means that each method is running in a new thread and is processing asynchronously. Why? That’s a key concept here. We will block the transaction in order to wait for confirmation from transaction-server
, but the main thread used by the controller will not be blocked. It returns the response with current state of Account
immediately.
@
Service
@
Transactional
@
Async
class
AccountService(
val
repository
:
AccountRepository,
var
applicationEventPublisher
:
ApplicationEventPublisher) {
fun payment(id
:
Int, amount
:
Int, transactionId
:
String)
=
transfer(id, amount, transactionId)
fun withdrawal(id
:
Int, amount
:
Int, transactionId
:
String)
=
transfer(id, (-
1
) * amount, transactionId)
private
fun transfer(id
:
Int, amount
:
Int, transactionId
:
String) {
val
accountOpt
:
Optional<Account>
=
repository.findById(id)
if
(accountOpt.isPresent) {
val
account
:
Account
=
accountOpt.get()
account.balance +
=
amount
applicationEventPublisher.publishEvent(AccountTransactionEvent(transactionId, account))
repository.save(account)
}
}
}
Here’s the implementation of @Controller
class. As you see it is calling methods from AccountService
, that are being processed asynchronously. The returned Account
object is taken from EventBus
bean. This bean is responsible for exchanging asynchronous events within the application scope. En event is sent by the AccountTransactionListener
bean responsible for handling Spring transaction events.
@
RestController
@
RequestMapping(
"/accounts"
)
class
AccountController(
val
repository
:
AccountRepository,
val
service
:
AccountService,
val
eventBus
:
EventBus) {
@
PostMapping
fun add(
@
RequestBody account
:
Account)
:
Account
=
repository.save(account)
@
GetMapping(
"/customer/{customerId}"
)
fun findByCustomerId(
@
PathVariable customerId
:
Int)
:
List<Account>
=
repository.findByCustomerId(customerId)
@
PutMapping(
"/{id}/payment/{amount}"
)
fun payment(
@
PathVariable id
:
Int,
@
PathVariable amount
:
Int,
@
RequestHeader(
"X-Transaction-ID"
) transactionId
:
String)
:
Account {
service.payment(id, amount, transactionId)
return
eventBus.receiveEvent(transactionId)!!.account
}
@
PutMapping(
"/{id}/withdrawal/{amount}"
)
fun withdrawal(
@
PathVariable id
:
Int,
@
PathVariable amount
:
Int,
@
RequestHeader(
"X-Transaction-ID"
) transactionId
:
String)
:
Account {
service.withdrawal(id, amount, transactionId)
return
eventBus.receiveEvent(transactionId)!!.account
}
}
The event object exchanged between bean is very simple. It contains an id of transaction and the current Account
object.
class
AccountTransactionEvent(
val
transactionId
:
String,
val
account
:
Account)
Finally, let’s take a look on the implementation of AccountTransactionListener
bean responsible for handling transactional events. We are using Spring @TransactionalEventListener
for annotating method that should handle incoming events. There are 4 possible event types to handle: BEFORE_COMMIT
, AFTER_COMMIT
, AFTER_ROLLBACK
and AFTER_COMPLETION
. There is one very important thing in @TransactionalEventListener
, which may be not very intuitive. It is being processed in the same thread as the transaction. So if you would to do something that should not block the thread with transaction you should annotate it with @Async
. However, in our case this behaviour is required, since we need to block a transactional thread until we receive a confirmation or rollback from transaction-server
for a given transaction. These events are sent by transaction-server
through RabbitMQ, and they are also exchanged between beans using EventBus
. If the status of received event is different than CONFIRMED
we are throwing the exception to rollback transaction.
The AccountTransactionListener
is also listening on AFTER_ROLLBACK
and AFTER_COMPLETION
. After receiving such event type it is changing the status of transaction by calling endpoint exposed by transaction-server
.
@
Component
class
AccountTransactionListener(
val
restTemplate
:
RestTemplate,
val
eventBus
:
EventBus) {
@
TransactionalEventListener(phase
=
TransactionPhase.BEFORE
_
COMMIT)
@
Throws(AccountProcessingException
::
class
)
fun handleEvent(event
:
AccountTransactionEvent) {
eventBus.sendEvent(event)
var
transaction
:
DistributedTransaction?
=
null
for
(x in
0
..
100
) {
transaction
=
eventBus.receiveTransaction(event.transactionId)
if
(transaction
==
null
)
Thread.sleep(
100
)
else
break
}
if
(transaction
==
null
|| transaction.status !
=
DistributedTransactionStatus.CONFIRMED)
throw
AccountProcessingException()
}
@
TransactionalEventListener(phase
=
TransactionPhase.AFTER
_
ROLLBACK)
fun handleAfterRollback(event
:
AccountTransactionEvent) {
restTemplate.put(
"http://transaction-server/transactions/transactionId/participants/{serviceId}/status/{status}"
,
null
,
"account-service"
,
"TO_ROLLBACK"
)
}
@
TransactionalEventListener(phase
=
TransactionPhase.AFTER
_
COMPLETION)
fun handleAfterCompletion(event
:
AccountTransactionEvent) {
restTemplate.put(
"http://transaction-server/transactions/transactionId/participants/{serviceId}/status/{status}"
,
null
,
"account-service"
,
"CONFIRM"
)
}
}
Here’s the implementation of the bean responsible for receiving asynchronous events from a message broker. As you see after receiving such event it is using EventBus
to forward that event to another beans.
@
Component
class
DistributedTransactionEventListener(
val
eventBus
:
EventBus) {
@
RabbitListener(bindings
=
[
QueueBinding(exchange
=
Exchange(
type
=
ExchangeTypes.TOPIC, name
=
"trx-events"
),
value
=
Queue(
"trx-events-account"
))
])
fun onMessage(transaction
:
DistributedTransaction) {
eventBus.sendTransaction(transaction)
}
}
Integration with database
Of course our application is using Postgres as a backend store, so we need to provide integration. In fact, that is the simplest step of our implementation. First we need to add the following 2 dependencies. We will use Spring Data JPA for integration with Postgres.
<
dependency
>
<
groupId
>org.springframework.boot</
groupId
>
<
artifactId
>spring-boot-starter-data-jpa</
artifactId
>
</
dependency
>
<
dependency
>
<
groupId
>org.postgresql</
groupId
>
<
artifactId
>postgresql</
artifactId
>
<
scope
>runtime</
scope
>
</
dependency
>
Our entity is very simple. Besides id
field it contains two fields: customerId
and balance
.
@Entity
data
class
Account(
@Id
@GeneratedValue
(strategy = GenerationType.AUTO) val id: Int,
val customerId: Int,
var balance: Int)
We are using well-known Spring Data repository pattern.
interface
AccountRepository: CrudRepository<Account, Int> {
fun findByCustomerId(id: Int): List<Account>
}
Here’s the suggested list of configuration settings.
spring:
application:
name: account-service
datasource:
url: jdbc:postgresql:
//postgresql:5432/trx
username: trx
password: trx
hikari:
connection-timeout:
2000
initialization-fail-timeout:
0
jpa:
database-platform: org.hibernate.dialect.PostgreSQLDialect
hibernate:
ddl-auto: create
show-sql:
true
properties:
hibernate:
format_sql:
true
rabbitmq:
host: rabbitmq
port:
5672
connection-timeout:
2000
Building order-service
Ok, we have already finished the implementation of transaction-server
, and two microservices account-service
and product-service
. Since the implementation of product-service
is very similar to account-service
, I have explained everything on the example of account-service
. Now, we may proceed to the last part – the implementation of order-service
. It is responsible for starting new transaction and marking it as finished. It also may finish it with rollback.Of course, rollback event may be sent by another two applications as well.
The implementation of @Controller
class is visible below. I’ll describe it step by step. We are starting a new distributed transaction by calling POST /transactions
endpoint exposed by transaction-server
(1). Then we are storing a new order in database (2). When we are calling a transactional method from downstream service we need to set HTTP header X-Transaction-ID
. The first transactional method that is called here is PUT /products/{id}/count/{count}
(3). It updates the number of products in the store and calculates a final price (4). In the step it is calling another transaction method – this time from account-service
(5). It is responsible for withdrawing money from customer account. We are enabling Spring transaction events processing (6). In the last step we are generating random number, and then basing on its value application is throwing exception to rollback transaction (7).
@
RestController
@
RequestMapping(
"/orders"
)
class
OrderController(
val
repository
:
OrderRepository,
val
restTemplate
:
RestTemplate,
var
applicationEventPublisher
:
ApplicationEventPublisher) {
@
PostMapping
@
Transactional
@
Throws(OrderProcessingException
::
class
)
fun addAndRollback(
@
RequestBody order
:
Order) {
DistributedTransaction(), DistributedTransaction
::
class
.java)
// (1)
val
orderSaved
=
repository.save(order)
// (2)
val
product
=
updateProduct(transaction!!.id!!, order)
// (3)
val
totalPrice
=
product.price * product.count
// (4)
Array<Account>
::
class
.java, order.customerId)
val
account
=
accounts!!.first { it.balance >
=
totalPrice}
updateAccount(transaction.id!!, account.id, totalPrice)
// (5)
applicationEventPublisher.publishEvent(OrderTransactionEvent(transaction.id!!))
// (6)
val
r
=
Random.nextInt(
100
)
// (7)
if
(r
%
2
==
0
)
throw
OrderProcessingException()
}
fun updateProduct(transactionId
:
String, order
:
Order)
:
Product {
val
headers
=
HttpHeaders()
headers.set(
"X-Transaction-ID"
, transactionId)
val
entity
:
HttpEntity<*>
=
HttpEntity<Any?>(headers)
HttpMethod.PUT,
null
, Product
::
class
.java, order.id, order.count)
return
product.body!!
}
fun updateAccount(transactionId
:
String, accountId
:
Int, totalPrice
:
Int)
:
Account {
val
headers
=
HttpHeaders()
headers.set(
"X-Transaction-ID"
, transactionId)
val
entity
:
HttpEntity<*>
=
HttpEntity<Any?>(headers)
HttpMethod.PUT,
null
, Account
::
class
.java, accountId, totalPrice)
return
account.body!!
}
}
Conclusion
Even a trivial implementation of distributed transactions in microservices, like the one demonstrated in this article, may be complicated. As you see we need to add a new element to our architecture, transaction-server
, responsible only for distributed transaction management. We also have to add message broker in order to exchange events between our applications and transaction-server
. However, many of you were asking me about distributed transactions in microservices world, so I decided to build these simple demo system. I’m waiting for you feedback and opinions.
Recommend
-
187
Testing Microservices — Java & Spring Boot Testing Microservices — Java & Spring Boot 19 Sep 2017 1 minute to read...
-
27
This article describes the basic ideas of the GRIT protocol, which was announced at the IEEE International Conference on Data Engineering (ICDE) 2019, and provides an example of using part of the protocol for implementing...
-
6
In this article I’m going to propose my list of “golden rules” for building Spring Boot applications, which are a part of microservices-based system. I’m basing on my experience in migrating monolithic SOAP applications runni...
-
6
Microservices with Spring Cloud Advanced Demo Project In this project I'm demonstrating you the most interesting features of Spring Cloud Project for building microservice-based...
-
6
spring-boot-microservices-best-practices Best Practices for Developing Rest-Based Microservices with Spring-Boot A service offers REST APIs for managing/consuming orders. Owner Ehab Qadah Tools an...
-
1
Reading Time: 4 minutes What is Spring Boot and Spring Cloud GCP? Spring Boot: Spring Boot is a java-based framework i.e. free and open-source. Using Spring boot you can create...
-
3
MicroServices using Spring Boot & Spring Cloud – Part 1 : OverviewSkip to contentShare this:
-
7
Kotlin Spring Boot Web-Service Integration With Database The p...
-
3
Microservices with Spring Boot 3 and Spring Cloud
-
4
...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK