Au travail actuel, nous écrivons dans Reactor. La technologie est cool, mais comme toujours, il y a beaucoup de MAIS. Certaines choses sont ennuyeuses, le code est plus difficile à écrire et à lire, et ThreadLocal est un vrai désastre. J'ai décidé de voir quels problèmes vont disparaître si vous passez à Kotlin Coroutines, et quels problèmes, au contraire, seront ajoutés.
Carte patient
J'ai écrit un petit projet pour l'article , reproduisant les problèmes que j'ai rencontrés au travail. Le code principal est ici . L'algorithme ne l'a délibérément pas divisé en méthodes distinctes, de sorte que les problèmes sont mieux perçus.
En un mot sur l'algorithme:
Nous transférons de l'argent d'un compte à un autre, enregistrant les transactions sur le fait du transfert.
La traduction est idempotente, donc si la transaction est déjà dans la base de données, nous répondons au client que tout va bien. Lors de l'insertion d'une transaction, une exception DataIntegrityViolationException peut être levée, cela signifie également que la transaction existe déjà.
Afin de ne pas entrer dans le négatif, il y a une vérification dans le code de verrouillage + Optimiste, qui ne permet pas la mise à jour compétitive des comptes. Il faut réessayer et gérer les erreurs supplémentaires pour le faire fonctionner.
Pour ceux qui n'aiment pas l'algorithme lui-même
L'algorithme du projet a été choisi pour reproduire les problèmes, non pour être efficace et correct d'un point de vue architectural. Au lieu d'une transaction, vous devez insérer des semi-conducteurs, un verrou optimiste n'est pas du tout nécessaire (au lieu de vérifier la positivité du compte en sql), select + insert doit être remplacé par upsert.
Plaintes des patients
Stacktrace ne montre pas comment nous sommes arrivés à la zone problématique.
Le code est clairement plus complexe qu'il ne le serait sur les technologies de blocage.
Imbrication de code en plusieurs étapes grâce à flatMap.
Traitement et lancement des erreurs peu pratiques.
Gestion complexe des comportements pour Mono.empty ().
Problèmes de journalisation, si vous avez besoin d'ajouter quelque chose de global au journal, par exemple traceId. (Je ne décris pas dans l'article, mais les mêmes problèmes avec d'autres variables ThreadLocal, par exemple SpringSecurity)
Il n'est pas pratique de déboguer.
API implicite pour la parallélisation.
Progression du traitement
PR Java Kotlin.
.
com.fasterxml.jackson.module:jackson-module-kotlin data org.jetbrains.kotlin.plugin.spring open .
suspend fun transfer(@RequestBody request: TransferRequest)
public Mono<Void> transfer(@RequestBody TransferRequest request)
suspend fun save(account: Account): Account
Mono<Account> save(Account account);
, , suspend , , Reactor .
runBlocking { … }
, suspend .
Retry kotlin-retry. , , ( PR).
, , . -.
:
public Mono<Void> transfer(String transactionKey, long fromAccountId,
long toAccountId, BigDecimal amount) {
return transactionRepository.findByUniqueKey(transactionKey)
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(withMDC(foundTransaction -> {
if (foundTransaction.isPresent()) {
log.warn("retry of transaction " + transactionKey);
return Mono.empty();
}
return accountRepository.findById(fromAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound()))
.flatMap(fromAccount -> accountRepository.findById(toAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound()))
.flatMap(toAccount -> {
var transactionToInsert = Transaction.builder()
.amount(amount)
.fromAccountId(fromAccountId)
.toAccountId(toAccountId)
.uniqueKey(transactionKey)
.build();
var amountAfter = fromAccount.getAmount().subtract(amount);
if (amountAfter.compareTo(BigDecimal.ZERO) < 0) {
return Mono.error(new NotEnoghtMoney());
}
return transactionalOperator.transactional(
transactionRepository.save(transactionToInsert)
.onErrorResume(error -> {
//transaction was inserted on parallel transaction,
//we may return success response
if (error instanceof DataIntegrityViolationException
&& error.getMessage().contains("TRANSACTION_UNIQUE_KEY")) {
return Mono.empty();
} else {
return Mono.error(error);
}
})
.then(accountRepository.transferAmount(
fromAccount.getId(), fromAccount.getVersion(),
amount.negate()
))
.then(accountRepository.transferAmount(
toAccount.getId(), toAccount.getVersion(), amount
))
);
}));
}))
.retryWhen(Retry.backoff(3, Duration.ofMillis(1))
.filter(OptimisticLockException.class::isInstance)
.onRetryExhaustedThrow((__, retrySignal) -> retrySignal.failure())
)
.onErrorMap(
OptimisticLockException.class,
e -> new ResponseStatusException(
BANDWIDTH_LIMIT_EXCEEDED,
"limit of OptimisticLockException exceeded", e
)
)
.onErrorResume(withMDC(e -> {
log.error("error on transfer", e);
return Mono.error(e);
}));
}
:
suspend fun transfer(transactionKey: String, fromAccountId: Long,
toAccountId: Long, amount: BigDecimal) {
try {
try {
retry(limitAttempts(3) + filter { it is OptimisticLockException }) {
val foundTransaction = transactionRepository
.findByUniqueKey(transactionKey)
if (foundTransaction != null) {
logger.warn("retry of transaction $transactionKey")
return@retry
}
val fromAccount = accountRepository.findById(fromAccountId)
?: throw AccountNotFound()
val toAccount = accountRepository.findById(toAccountId)
?: throw AccountNotFound()
if (fromAccount.amount - amount < BigDecimal.ZERO) {
throw NotEnoghtMoney()
}
val transactionToInsert = Transaction(
amount = amount,
fromAccountId = fromAccountId,
toAccountId = toAccountId,
uniqueKey = transactionKey
)
transactionalOperator.executeAndAwait {
try {
transactionRepository.save(transactionToInsert)
} catch (e: DataIntegrityViolationException) {
if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {
throw e;
}
}
accountRepository.transferAmount(
fromAccount.id!!, fromAccount.version, amount.negate()
)
accountRepository.transferAmount(
toAccount.id!!, toAccount.version, amount
)
}
}
} catch (e: OptimisticLockException) {
throw ResponseStatusException(
BANDWIDTH_LIMIT_EXCEEDED,
"limit of OptimisticLockException exceeded", e
)
}
} catch (e: Exception) {
logger.error(e) { "error on transfer" }
throw e;
}
}
Stacktraces
, .
:
o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockException
at c.g.c.v.r.services.Ledger.lambda$transfer$5(Ledger.java:75)
...
Caused by: c.g.c.v.r.OptimisticLockException: null
at c.g.c.v.r.repos.AccountRepositoryImpl.lambda$transferAmount$0(AccountRepositoryImpl.java:27)
at r.c.p.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
...
:
error on transfer o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockException
at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:70)
at c.g.c.v.r.services.Ledger$transfer$1.invokeSuspend(Ledger.kt)
...
Caused by: c.g.c.v.r.OptimisticLockException: null
at c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)
...
at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)
at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)
at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)
(Coroutine boundary)
at o.s.t.r.TransactionalOperatorExtensionsKt.executeAndAwait(TransactionalOperatorExtensions.kt:31)
at c.g.c.v.r.services.Ledger$transfer$3.invokeSuspend(Ledger.kt:56)
at com.github.michaelbull.retry.RetryKt$retry$3.invokeSuspend(Retry.kt:38)
at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:35)
at c.g.c.v.r.controllers.LedgerController$transfer$2$1.invokeSuspend(LedgerController.kt:20)
at c.g.c.v.r.controllers.LedgerController$transfer$2.invokeSuspend(LedgerController.kt:19)
at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)
at o.s.c.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)
(Coroutine creation stacktrace)
at k.c.i.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:122)
at k.c.i.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
...
Caused by: c.g.c.v.r.OptimisticLockException: null
at c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)
...
at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)
at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)
at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)
...
, ( , ).
Java . , . . . Kotlin .
, - . ? . , - traceId (thread name ) .
Kotlin , , . ( : ).
flatMap. - try catch, .
:
return accountRepository.findById(fromAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound()))
.flatMap(fromAccount -> accountRepository.findById(toAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound()))
.flatMap(toAccount -> {
...
})
:
val fromAccount = accountRepository.findById(fromAccountId)
?: throw AccountNotFound()
val toAccount = accountRepository.findById(toAccountId)
?: throw AccountNotFound()
...
try catch, .
:
return transactionRepository.findByUniqueKey(transactionKey)
...
.onErrorMap(
OptimisticLockException.class,
e -> new ResponseStatusException(
BANDWIDTH_LIMIT_EXCEEDED,
"limit of OptimisticLockException exceeded", e
)
)
:
try {
val foundTransaction = transactionRepository
.findByUniqueKey(transactionKey)
...
} catch (e: OptimisticLockException) {
throw ResponseStatusException(
BANDWIDTH_LIMIT_EXCEEDED,
"limit of OptimisticLockException exceeded", e
)
}
throw, . Reactor :
.flatMap(foo -> {
if (foo.isEmpty()) {
return Mono.error(new IllegalStateException());
} else {
return Mono.just(foo);
}
})
, , . - .
Mono.empty()
. null . ¨C5C.
Ide , mono . . , - .
Kotlin not null , , . nullable - .
:
:
return transactionRepository.findByUniqueKey(transactionKey)
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(foundTransaction -> {
if (foundTransaction.isPresent()) {
log.warn("retry of transaction " + transactionKey);
return Mono.empty();
}
...
:
val foundTransaction = transactionRepository
.findByUniqueKey(transactionKey)
if (foundTransaction != null) {
logger.warn("retry of transaction $transactionKey")
return@retry
}
...
, - Reactor, .
, traceId . ThreadLocal , MDC ( ). ?
. Reactor Coroutines immutable, MDC ( ).
Java , traceId :
@Component
public class TraceIdFilter implements WebFilter {
@Override
public Mono<Void> filter(
ServerWebExchange exchange, WebFilterChain chain
) {
var traceId = Optional.ofNullable(
exchange.getRequest().getHeaders().get("X-B3-TRACEID")
)
.orElse(Collections.emptyList())
.stream().findAny().orElse(UUID.randomUUID().toString());
return chain.filter(exchange)
.contextWrite(context ->
LoggerHelper.addEntryToMDCContext(context, "traceId", traceId)
);
}
}
, - , traceId MDC:
public static <T, R> Function<T, Mono<R>> withMDC(
Function<T, Mono<R>> block
) {
return value -> Mono.deferContextual(context -> {
Optional<Map<String, String>> mdcContext = context
.getOrEmpty(MDC_ID_KEY);
if (mdcContext.isPresent()) {
try {
MDC.setContextMap(mdcContext.get());
return block.apply(value);
} finally {
MDC.clear();
}
} else {
return block.apply(value);
}
});
}
, Mono. .. , Mono. :
.onErrorResume(withMDC(e -> {
log.error("error on transfer", e);
return Mono.error(e);
}))
Kotlin . , traceId MDC:
@Component
class TraceIdFilter : WebFilter {
override fun filter(
exchange: ServerWebExchange, chain: WebFilterChain
): Mono<Void> {
val traceId = exchange.request.headers["X-B3-TRACEID"]?.first()
MDC.put("traceId", traceId ?: UUID.randomUUID().toString())
return chain.filter(exchange)
}
}
withContext(MDCContext()) { … }
, MDC traceId. .
Java Reactor , : , , breakpoints ...
: stepOver, , ( ).
, suspend . issue. , , Java Reactor evaluate , .
, , .
:
return Mono.zip(
transactionRepository.findByUniqueKey(transactionKey)
.map(Optional::of)
.defaultIfEmpty(Optional.empty()),
accountRepository.findById(fromAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound())),
accountRepository.findById(toAccountId)
.switchIfEmpty(Mono.error(new AccountNotFound())),
).flatMap(withMDC(fetched -> {
var foundTransaction = fetched.getT1();
var fromAccount = fetched.getT2();
var toAccount = fetched.getT3();
if (foundTransaction.isPresent()) {
log.warn("retry of transaction " + transactionKey);
return Mono.empty();
}
...
}
:
coroutineScope {
val foundTransactionAsync = async {
logger.info("async fetch of transaction $transactionKey")
transactionRepository.findByUniqueKey(transactionKey)
}
val fromAccountAsync = async {
accountRepository.findById(fromAccountId)
}
val toAccountAsync = async {
accountRepository.findById(toAccountId)
}
if (foundTransactionAsync.await() != null) {
logger.warn("retry of transaction $transactionKey")
return@retry
}
val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()
val toAccount = toAccountAsync.await() ?: throw AccountNotFound()
...
}
Kotlin “ ”, “ ” Reactor.
, -. Reactor , . - foundTransactionAsync.await(). , transactionRepository.findByUniqueKey() , , accountRepository.findById() ( ).
. , Reactor :
coroutineScope {
val foundTransactionAsync = async {
logger.info("async fetch of transaction $transactionKey")
transactionRepository.findByUniqueKey(transactionKey)
}
val fromAccountAsync = async {
accountRepository.findById(fromAccountId)
}
val toAccountAsync = async {
accountRepository.findById(toAccountId)
}
if (foundTransactionAsync.await() != null) {
logger.warn("retry of transaction $transactionKey")
return@retry
}
val transactionToInsert = Transaction(
amount = amount,
fromAccountId = fromAccountId,
toAccountId = toAccountId,
uniqueKey = transactionKey
)
transactionalOperator.executeAndAwait {
try {
transactionRepository.save(transactionToInsert)
} catch (e: DataIntegrityViolationException) {
if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {
throw e;
}
}
val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()
val toAccount = toAccountAsync.await() ?: throw AccountNotFound()
if (fromAccount.amount - amount < BigDecimal.ZERO) {
throw NotEnoghtMoney()
}
accountRepository.transferAmount(
fromAccount.id!!, fromAccount.version, amount.negate()
)
accountRepository.transferAmount(
toAccount.id!!, toAccount.version, amount
)
}
}
. .. , . , , ( ).
, , .
context scope
, :
scope. , , .
context. .
Spring , :
@PutMapping("/transfer")
suspend fun transfer(@RequestBody request: TransferRequest) {
coroutineScope {
withContext(MDCContext()) {
ledger.transfer(request.transactionKey, request.fromAccountId,
request.toAccountId, request.amount)
}
}
}
, regexp , . - .
AOP suspend
, , . aspect suspend .
. , .
, ( ).
. , .
, .
, JetBrains . , - , .
Plus important encore, avec les coroutines, vous n'avez pas besoin de garder à l'esprit toutes les fonctionnalités de Reactor et de sa puissante API. Vous écrivez simplement le code.