Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ import scala.concurrent.Future
* 2. Wrap it in a non-closing proxy (commit/rollback/close are no-ops).
* 3. Store the proxy in requestProxyLocal (IOLocal) only — currentProxy (TTL) is
* NOT set here to avoid leaving compute threads dirty.
* 4. Run validateRequest + routes.run inside withRequestTransaction.
* 4. Run validateOnly (auth, roles, entity lookups) — outside the transaction, on
* auto-commit vendor connections. On Left: return error response, no transaction
* opened. On Right (GET/HEAD): run routes.run directly on auto-commit connections.
* On Right (POST/PUT/DELETE/PATCH): open the transaction and run routes.run inside it.
* 5. Each IO.fromFuture call site uses RequestScopeConnection.fromFuture, which in
* a single synchronous IO.defer block on compute thread T:
* a. Sets currentProxy (TTL) on T.
Expand Down Expand Up @@ -101,16 +104,11 @@ object RequestScopeConnection extends MdcLoggable {
else method.invoke(real, args: _*)
if (result == null || method.getReturnType == Void.TYPE) null else result
} catch {
case e: java.lang.reflect.InvocationTargetException
if Option(e.getCause).exists(_.isInstanceOf[java.sql.SQLException]) =>
case e: java.lang.reflect.InvocationTargetException =>
val cause = Option(e.getCause).getOrElse(e)
logger.error(
s"[RequestScopeProxy] method=${method.getName} failed on closed/returned connection. " +
s"This means the request-scoped proxy was handed to code that ran AFTER withRequestTransaction " +
s"committed and closed the underlying connection. " +
s"Likely cause: v7 path fell through to Http4sLiftWebBridge without a transaction scope — " +
s"currentProxy was still set on this thread from a previous fiber or was not cleared. " +
s"Cause: ${e.getCause.getMessage}",
e.getCause
s"[RequestScopeProxy] method=${method.getName} failed: ${cause.getClass.getName}: ${cause.getMessage}",
cause
)
throw e
}
Expand Down Expand Up @@ -162,7 +160,7 @@ class RequestAwareConnectionManager(delegate: ConnectionManager) extends Connect
if (proxy != null) {
// Guard: if the underlying connection is already closed, the proxy is stale — it
// was captured in a TtlRunnable submitted during a prior request and that request's
// withRequestTransaction has already committed and closed the real connection.
// withBusinessDBTransaction has already committed and closed the real connection.
// Returning a stale proxy would throw "Connection is closed" inside the caller's
// DB.use and, if that caller is inside authenticate, would be caught as Left(_)
// and silently turned into a 401 response.
Expand All @@ -184,15 +182,15 @@ class RequestAwareConnectionManager(delegate: ConnectionManager) extends Connect
}

/**
* If conn is our request proxy, skip release — it is managed by withRequestTransaction.
* If conn is our request proxy, skip release — it is managed by withBusinessDBTransaction.
* Otherwise delegate to the original vendor (which does HikariCP ProxyConnection.close()).
*
* Reference equality is safe: one proxy instance per request, same object throughout.
*/
override def releaseConnection(conn: Connection): Unit = {
val proxy = RequestScopeConnection.currentProxy.get()
if (proxy != null && (conn eq proxy.asInstanceOf[AnyRef])) {
// Skip release — this connection is managed by withRequestTransaction.
// Skip release — this connection is managed by withBusinessDBTransaction.
} else {
delegate.releaseConnection(conn)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,25 @@ object ResourceDocMiddleware extends MdcLoggable {
case Some(resourceDoc) =>
val ccWithDoc = ResourceDocMatcher.attachToCallContext(cc, resourceDoc)
val pathParams = ResourceDocMatcher.extractPathParams(req.uri.path, resourceDoc)
// Wrap in a request-scoped transaction, then run full validation chain
OptionT(withRequestTransaction(
validateRequest(req, resourceDoc, pathParams, ccWithDoc, routes)
).map(Option(_)))
// Validate first (read-only, outside any transaction), then run business logic.
// GET/HEAD are safe methods — no writes, no transaction needed; they run on
// auto-commit vendor connections (same as validation). All other methods
// (POST/PUT/DELETE/PATCH) wrap routes.run in withBusinessDBTransaction.
OptionT(
validateOnly(req, resourceDoc, pathParams, ccWithDoc).flatMap {
case Left(errorResponse) =>
IO.pure(Option(errorResponse))
case Right(enrichedReq) =>
val routeIO =
routes.run(enrichedReq)
.map(ensureJsonContentType)
.getOrElseF(IO.pure(ensureJsonContentType(Response[IO](org.http4s.Status.NotFound))))
val executed =
if (req.method == Method.GET || req.method == Method.HEAD) routeIO
else withBusinessDBTransaction(routeIO)
executed.map(Option(_))
}
)

case None =>
// No matching ResourceDoc: fallback to original route (NO transaction scope opened).
Expand All @@ -117,61 +132,69 @@ object ResourceDocMiddleware extends MdcLoggable {
}

/**
* Wraps an IO[Response[IO]] in a request-scoped DB transaction.
* Wraps the business-logic IO in a request-scoped DB transaction.
*
* Called only for mutating methods (POST/PUT/DELETE/PATCH) after validateOnly succeeds.
* GET/HEAD bypass this entirely and run on auto-commit vendor connections, avoiding
* a pool borrow + empty-commit overhead on every read request.
*
* Borrows a Connection from HikariCP, wraps it in a non-closing proxy (so Lift's
* internal DB.use lifecycle cannot commit or return it to the pool prematurely),
* and stores it in requestProxyLocal (IOLocal — fiber-local source of truth).
* Borrows a Connection from HikariCP via Resource.make so close() is guaranteed
* even if commit/rollback throws or the fiber is cancelled. The proxy prevents
* Lift's internal DB.use lifecycle from committing or returning the connection
* prematurely.
*
* currentProxy (TTL) is NOT set here. Every DB call goes through
* RequestScopeConnection.fromFuture, which atomically sets + submits + clears the
* TTL within a single IO.defer block on the compute thread, so the thread is never
* left dirty after the fromFuture call returns.
*
* On success: commits and closes the real connection.
* On exception: rolls back and closes the real connection.
* On success: commits, then Resource finalizer closes.
* On error/cancellation: rolls back (errors swallowed to preserve original cause),
* then Resource finalizer closes.
*
* Metric writes (IO.blocking in recordMetric) run on the blocking pool where
* currentProxy is not set — they get their own pool connection and commit
* independently, matching v6 behaviour.
*/
private def withRequestTransaction(io: IO[Response[IO]]): IO[Response[IO]] = {
for {
realConn <- IO.blocking(APIUtil.vendor.HikariDatasource.ds.getConnection())
proxy = RequestScopeConnection.makeProxy(realConn)
_ <- RequestScopeConnection.requestProxyLocal.set(Some(proxy))
// Note: currentProxy (TTL) is NOT set here. Every DB call goes through
// RequestScopeConnection.fromFuture, which atomically sets + submits + clears
// the TTL within a single IO.defer block on the compute thread. Setting it
// here would leave the compute thread's TTL dirty if guaranteeCase runs on a
// different thread.
result <- io.guaranteeCase {
private def withBusinessDBTransaction(io: IO[Response[IO]]): IO[Response[IO]] =
Resource.make(
IO.blocking(APIUtil.vendor.HikariDatasource.ds.getConnection())
)(conn =>
IO.blocking { try { conn.close() } catch { case _: Exception => () } }
).use { realConn =>
val proxy = RequestScopeConnection.makeProxy(realConn)
for {
_ <- RequestScopeConnection.requestProxyLocal.set(Some(proxy))
// Note: currentProxy (TTL) is NOT set here. Every DB call goes through
// RequestScopeConnection.fromFuture, which atomically sets + submits + clears
// the TTL within a single IO.defer block on the compute thread.
result <- io.guaranteeCase {
case Outcome.Succeeded(_) =>
RequestScopeConnection.requestProxyLocal.set(None) *>
IO.blocking { try { realConn.commit() } finally { realConn.close() } }
IO.blocking { realConn.commit() }
case _ =>
RequestScopeConnection.requestProxyLocal.set(None) *>
IO.blocking { try { realConn.rollback() } finally { realConn.close() } }
IO.blocking { try { realConn.rollback() } catch { case _: Exception => () } }
}
} yield result
}
} yield result
}

/**
* Executes the full validation chain for the request.
* Returns either an error Response or enriched request routed to the handler.
* Runs the full validation chain (auth → roles → bank → account → view → counterparty)
* and returns either an error Response or an enriched Request ready for the handler.
*
* All steps are read-only and execute outside any DB transaction, so no locks are
* held during validation. The caller opens a transaction only after this returns Right.
*/
private def validateRequest(
req: Request[IO],
resourceDoc: ResourceDoc,
pathParams: Map[String, String],
cc: CallContext,
routes: HttpRoutes[IO]
): IO[Response[IO]] = {
private def validateOnly(
req: Request[IO],
resourceDoc: ResourceDoc,
pathParams: Map[String, String],
cc: CallContext
): IO[Either[Response[IO], Request[IO]]] = {

// Initial context with just CallContext
val initialContext = ValidationContext(callContext = cc)

// Compose all validation steps using EitherT
val result: Validation[ValidationContext] = for {
context <- authenticate(req, resourceDoc, initialContext)
context <- authorizeRoles(resourceDoc, pathParams, context)
Expand All @@ -181,23 +204,19 @@ object ResourceDocMiddleware extends MdcLoggable {
context <- validateCounterparty(pathParams, context)
} yield context

// Convert Validation result to Response
result.value.flatMap {
case Left(errorResponse) => IO.pure(ensureJsonContentType(errorResponse)) // Ensure all error responses are JSON
result.value.map {
case Left(errorResponse) =>
Left(ensureJsonContentType(errorResponse))
case Right(validCtx) =>
// Enrich request with validated CallContext
val enrichedReq = req.withAttribute(
Right(req.withAttribute(
Http4sRequestAttributes.callContextKey,
validCtx.callContext.copy(
bank = validCtx.bank,
bankAccount = validCtx.account,
view = validCtx.view,
counterparty = validCtx.counterparty
)
)
routes.run(enrichedReq)
.map(ensureJsonContentType) // Ensure routed response has JSON content type
.getOrElseF(IO.pure(ensureJsonContentType(Response[IO](org.http4s.Status.NotFound))))
))
}
}

Expand All @@ -207,8 +226,8 @@ object ResourceDocMiddleware extends MdcLoggable {
logger.debug(s"[ResourceDocMiddleware] needsAuthentication for ${resourceDoc.partialFunctionName}: $needsAuth")

val io =
if (needsAuth) RequestScopeConnection.fromFuture(APIUtil.authenticatedAccess(ctx.callContext))
else RequestScopeConnection.fromFuture(APIUtil.anonymousAccess(ctx.callContext))
if (needsAuth) IO.fromFuture(IO(APIUtil.authenticatedAccess(ctx.callContext)))
else IO.fromFuture(IO(APIUtil.anonymousAccess(ctx.callContext)))

EitherT(
io.attempt.flatMap {
Expand Down Expand Up @@ -266,7 +285,7 @@ object ResourceDocMiddleware extends MdcLoggable {
pathParams.get("BANK_ID") match {
case Some(bankId) =>
EitherT(
RequestScopeConnection.fromFuture(NewStyle.function.getBank(BankId(bankId), Some(ctx.callContext)))
IO.fromFuture(IO(NewStyle.function.getBank(BankId(bankId), Some(ctx.callContext))))
.attempt.flatMap {
case Right((bank, Some(updatedCC))) => IO.pure(Right(ctx.copy(bank = Some(bank), callContext = updatedCC)))
case Right((bank, None)) => IO.pure(Right(ctx.copy(bank = Some(bank))))
Expand All @@ -284,7 +303,7 @@ object ResourceDocMiddleware extends MdcLoggable {
(pathParams.get("BANK_ID"), pathParams.get("ACCOUNT_ID")) match {
case (Some(bankId), Some(accountId)) =>
EitherT(
RequestScopeConnection.fromFuture(NewStyle.function.getBankAccount(BankId(bankId), AccountId(accountId), Some(ctx.callContext)))
IO.fromFuture(IO(NewStyle.function.getBankAccount(BankId(bankId), AccountId(accountId), Some(ctx.callContext))))
.attempt.flatMap {
case Right((acc, Some(updatedCC))) => IO.pure(Right(ctx.copy(account = Some(acc), callContext = updatedCC)))
case Right((acc, None)) => IO.pure(Right(ctx.copy(account = Some(acc))))
Expand All @@ -302,7 +321,7 @@ object ResourceDocMiddleware extends MdcLoggable {
(pathParams.get("BANK_ID"), pathParams.get("ACCOUNT_ID"), pathParams.get("VIEW_ID")) match {
case (Some(bankId), Some(accountId), Some(viewId)) =>
EitherT(
RequestScopeConnection.fromFuture(ViewNewStyle.checkViewAccessAndReturnView(ViewId(viewId), BankIdAccountId(BankId(bankId), AccountId(accountId)), ctx.user.toOption, Some(ctx.callContext)))
IO.fromFuture(IO(ViewNewStyle.checkViewAccessAndReturnView(ViewId(viewId), BankIdAccountId(BankId(bankId), AccountId(accountId)), ctx.user.toOption, Some(ctx.callContext))))
.attempt.flatMap {
case Right(view) => IO.pure(Right(ctx.copy(view = Some(view))))
case Left(e: APIFailureNewStyle) => ErrorResponseConverter.createErrorResponse(e.failCode, e.failMsg, ctx.callContext).map(Left(_))
Expand All @@ -319,7 +338,7 @@ object ResourceDocMiddleware extends MdcLoggable {
(pathParams.get("BANK_ID"), pathParams.get("ACCOUNT_ID"), pathParams.get("COUNTERPARTY_ID")) match {
case (Some(bankId), Some(accountId), Some(counterpartyId)) =>
EitherT(
RequestScopeConnection.fromFuture(NewStyle.function.getCounterpartyTrait(BankId(bankId), AccountId(accountId), counterpartyId, Some(ctx.callContext)))
IO.fromFuture(IO(NewStyle.function.getCounterpartyTrait(BankId(bankId), AccountId(accountId), counterpartyId, Some(ctx.callContext))))
.attempt.flatMap {
case Right((cp, Some(updatedCC))) => IO.pure(Right(ctx.copy(counterparty = Some(cp), callContext = updatedCC)))
case Right((cp, None)) => IO.pure(Right(ctx.copy(counterparty = Some(cp))))
Expand Down
36 changes: 35 additions & 1 deletion obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import code.api.util.{APIUtil, ApiRole, ApiVersionUtils, CallContext, CustomJson
import code.api.util.ApiRole.{canCreateEntitlementAtAnyBank, canCreateEntitlementAtOneBank, canDeleteEntitlementAtAnyBank, canGetAnyUser, canGetCacheConfig, canGetCacheInfo, canGetCacheNamespaces, canGetCardsForBank, canGetConnectorHealth, canGetCustomersAtOneBank, canGetDatabasePoolInfo, canGetMigrations}
import code.api.util.ApiTag._
import code.api.util.ErrorMessages._
import code.api.util.http4s.{ErrorResponseConverter, Http4sRequestAttributes, ResourceDocMiddleware}
import code.api.util.http4s.{ErrorResponseConverter, Http4sRequestAttributes, RequestScopeConnection, ResourceDocMiddleware}
import code.api.util.http4s.Http4sRequestAttributes.{EndpointHelpers, RequestOps}
import code.api.util.newstyle.ViewNewStyle
import code.api.v1_3_0.JSONFactory1_3_0
Expand Down Expand Up @@ -1091,6 +1091,40 @@ object Http4s700 {

// ── End Phase 1 batch 3 ──────────────────────────────────────────────────

// ── Test-only rollback endpoint ───────────────────────────────────────────
// Enabled only in Lift test mode (Props.testMode == true, i.e. -Drun.mode=test).
// Props.testMode is set from the JVM system property before any props file loads,
// so it is reliably available at object-initialization time unlike file-based props.
// POST /obp/v7.0.0/test/rollback-check: writes one entitlement to DB via
// RequestScopeConnection.fromFuture, then raises IO.raiseError so the middleware
// hits Outcome.Errored → rollback. Used by Http4s700TransactionTest to verify
// that data written inside a failed request is never committed.
if (net.liftweb.util.Props.testMode) {
val testRollbackEndpoint: HttpRoutes[IO] = HttpRoutes.of[IO] {
case req @ POST -> `prefixPath` / "test" / "rollback-check" =>
val cc = req.callContext
cc.user.toOption match {
case Some(user) =>
RequestScopeConnection.fromFuture(
Future(Entitlement.entitlement.vend.addEntitlement("", user.userId, "TestRollbackSentinel"))
).flatMap(_ => IO.raiseError[Response[IO]](new RuntimeException("[test] intentional rollback")))
case None =>
IO.pure(Response[IO](Status.Unauthorized))
}
}
resourceDocs += ResourceDoc(
null,
implementedInApiVersion,
"testRollbackEndpoint",
"POST", "/test/rollback-check", "Test rollback", "Test-only: write then throw to verify rollback",
EmptyBody, EmptyBody,
List($AuthenticatedUserIsRequired, UnknownError),
Nil,
None,
http4sPartialFunction = Some(testRollbackEndpoint)
)
}

// All routes combined (without middleware - for direct use).
//
// Routes are sorted automatically by URL template specificity (segment count,
Expand Down
Loading
Loading