Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions delayedqueue-jvm/api/delayedqueue-jvm.api
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ public final class org/funfix/delayedqueue/jvm/JdbcDriver : java/lang/Enum {
public static final field HSQLDB Lorg/funfix/delayedqueue/jvm/JdbcDriver;
public static final field MariaDB Lorg/funfix/delayedqueue/jvm/JdbcDriver;
public static final field MsSqlServer Lorg/funfix/delayedqueue/jvm/JdbcDriver;
public static final field Oracle Lorg/funfix/delayedqueue/jvm/JdbcDriver;
public static final field PostgreSQL Lorg/funfix/delayedqueue/jvm/JdbcDriver;
public static final field Sqlite Lorg/funfix/delayedqueue/jvm/JdbcDriver;
public final fun getClassName ()Ljava/lang/String;
Expand Down
3 changes: 3 additions & 0 deletions delayedqueue-jvm/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ dependencies {
testImplementation(libs.jdbc.mssql)
testImplementation(libs.jdbc.postgresql)
testImplementation(libs.jdbc.mariadb)
testImplementation(libs.jdbc.oracle)
testImplementation(platform(libs.junit.bom))
testImplementation(libs.junit.jupiter)
testImplementation(platform(libs.testcontainers.bom))
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.junit.jupiter)
testImplementation(libs.testcontainers.mssqlserver)
testImplementation(libs.testcontainers.postgresql)
testImplementation(libs.testcontainers.mariadb)
testImplementation(libs.testcontainers.oracle)
testRuntimeOnly(libs.junit.platform.launcher)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.filtersForDriver
import org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb.HSQLDBMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.mariadb.MariaDBMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.mssql.MsSqlServerMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.oracle.OracleMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.postgres.PostgreSQLMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SqliteMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.withConnection
Expand Down Expand Up @@ -613,6 +614,7 @@ private constructor(
JdbcDriver.MsSqlServer ->
MsSqlServerMigrations.getMigrations(config.tableName)
JdbcDriver.MariaDB -> MariaDBMigrations.getMigrations(config.tableName)
JdbcDriver.Oracle -> OracleMigrations.getMigrations(config.tableName)
}

val executed = MigrationRunner.runMigrations(connection, migrations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ public enum class JdbcDriver(public val className: String) {
MariaDB("org.mariadb.jdbc.Driver"),

/** PostgreSQL driver. */
PostgreSQL("org.postgresql.Driver");
PostgreSQL("org.postgresql.Driver"),

/** Oracle Database driver. */
Oracle("oracle.jdbc.OracleDriver");

public companion object {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.funfix.delayedqueue.jvm.JdbcDriver
import org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb.HSQLDBAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.mariadb.MariaDBAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.mssql.MsSqlServerAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.oracle.OracleAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.postgres.PostgreSQLAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SqliteAdapter
import org.funfix.delayedqueue.jvm.internals.utils.Raise
Expand Down Expand Up @@ -471,6 +472,7 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t
JdbcDriver.MsSqlServer -> MsSqlServerAdapter(driver, tableName)
JdbcDriver.PostgreSQL -> PostgreSQLAdapter(driver, tableName)
JdbcDriver.MariaDB -> MariaDBAdapter(driver, tableName)
JdbcDriver.Oracle -> OracleAdapter(driver, tableName)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.funfix.delayedqueue.jvm.JdbcDriver
import org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb.HSQLDBFilters
import org.funfix.delayedqueue.jvm.internals.jdbc.mariadb.MariaDBFilters
import org.funfix.delayedqueue.jvm.internals.jdbc.mssql.MSSQLFilters
import org.funfix.delayedqueue.jvm.internals.jdbc.oracle.OracleFilters
import org.funfix.delayedqueue.jvm.internals.jdbc.postgres.PostgreSQLFilters
import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SQLiteFilters

Expand Down Expand Up @@ -80,4 +81,5 @@ internal fun filtersForDriver(driver: JdbcDriver): RdbmsExceptionFilters =
JdbcDriver.Sqlite -> SQLiteFilters
JdbcDriver.MariaDB -> MariaDBFilters
JdbcDriver.PostgreSQL -> PostgreSQLFilters
JdbcDriver.Oracle -> OracleFilters
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,21 @@ context(_: Raise<SQLException>)
internal inline fun <T> runSQLOperation(block: () -> T): T = block()

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
internal fun <T> Database.withConnection(block: (SafeConnection) -> T): T =
runBlockingIO {
runSQLOperation {
source.connection.let {
internal fun <T> Database.withConnection(block: (SafeConnection) -> T): T = runBlockingIO {
runSQLOperation {
source.connection.let {
try {
block(SafeConnection(it, driver))
} finally {
try {
block(SafeConnection(it, driver))
} finally {
try {
it.close()
} catch (e: SQLException) {
logger.warn("While closing JDBC connection", e)
}
it.close()
} catch (e: SQLException) {
logger.warn("While closing JDBC connection", e)
}
}
}
}
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
internal fun <T> Database.withTransaction(block: (SafeConnection) -> T) =
Expand Down Expand Up @@ -170,6 +169,7 @@ internal object ConnectionPool {
* used as identifiers.
* - MariaDB: uses backticks (`)
* - PostgreSQL, HSQLDB, SQLite: use double quotes (")
* - Oracle: uses double quotes (")
* - MS SQL Server: uses square brackets ([])
*
* @param name The identifier to quote
Expand All @@ -182,6 +182,7 @@ internal fun JdbcDriver.quote(name: String): String =
JdbcDriver.PostgreSQL -> "\"$name\""
JdbcDriver.Sqlite -> "\"$name\""
JdbcDriver.MsSqlServer -> "[$name]"
JdbcDriver.Oracle -> "\"$name\""
}

/** Quotes a database identifier using the connection's driver. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package org.funfix.delayedqueue.jvm.internals.jdbc.oracle

import java.sql.SQLException
import java.time.Duration
import java.time.Instant
import org.funfix.delayedqueue.jvm.JdbcDriver
import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRow
import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRowWithId
import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.SafeConnection
import org.funfix.delayedqueue.jvm.internals.jdbc.prepareStatement
import org.funfix.delayedqueue.jvm.internals.jdbc.toDBTableRowWithId
import org.funfix.delayedqueue.jvm.internals.utils.Raise

/** Oracle-specific adapter. */
internal class OracleAdapter(driver: JdbcDriver, tableName: String) :
SQLVendorAdapter(driver, tableName) {

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
override fun insertOneRow(conn: SafeConnection, row: DBTableRow): Boolean {
val sql =
"""
INSERT INTO "$tableName"
(
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"createdAt"
)
SELECT ?, ?, ?, ?, ?, ?
FROM dual
WHERE NOT EXISTS (
SELECT 1
FROM "$tableName"
WHERE "pKey" = ? AND "pKind" = ?
)
"""

return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, row.pKey)
stmt.setString(2, row.pKind)
stmt.setBytes(3, row.payload)
stmt.setEpochMillis(4, row.scheduledAt)
stmt.setEpochMillis(5, row.scheduledAtInitially)
stmt.setEpochMillis(6, row.createdAt)
stmt.setString(7, row.pKey)
stmt.setString(8, row.pKind)
stmt.executeUpdate() > 0
}
Comment on lines +19 to +54
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insertOneRow is documented to return false when the key already exists, but this Oracle implementation can still throw a duplicate-key SQLException under concurrency (the NOT EXISTS check isn’t fully race-free). Consider catching duplicate-key exceptions here (using the existing filtersForDriver(driver).duplicateKey) and returning false, or at least add an explicit comment like the MS-SQL adapter does so callers don’t rely on the return value alone.

Copilot uses AI. Check for mistakes.
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
override fun selectForUpdateOneRow(
conn: SafeConnection,
kind: String,
key: String,
): DBTableRowWithId? {
val sql =
"""
SELECT
"id",
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"lockUuid",
"createdAt"
FROM "$tableName"
WHERE "pKey" = ? AND "pKind" = ? AND ROWNUM <= 1
FOR UPDATE
"""

return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, key)
stmt.setString(2, kind)
stmt.executeQuery().use { rs ->
if (rs.next()) {
rs.toDBTableRowWithId()
} else {
null
}
}
}
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
override fun selectByKey(conn: SafeConnection, kind: String, key: String): DBTableRowWithId? {
val sql =
"""
SELECT
"id",
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"lockUuid",
"createdAt"
FROM "$tableName"
WHERE "pKey" = ? AND "pKind" = ? AND ROWNUM <= 1
"""

return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, key)
stmt.setString(2, kind)
stmt.executeQuery().use { rs ->
if (rs.next()) {
rs.toDBTableRowWithId()
} else {
null
}
}
}
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
override fun selectFirstAvailableWithLock(
conn: SafeConnection,
kind: String,
now: Instant,
): DBTableRowWithId? {
val sql =
"""
SELECT
"id",
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"lockUuid",
"createdAt"
FROM "$tableName"
WHERE ROWID IN (
SELECT ROWID
FROM "$tableName"
WHERE "pKind" = ? AND "scheduledAt" <= ?
ORDER BY "scheduledAt"
FETCH FIRST 1 ROWS ONLY
)
FOR UPDATE SKIP LOCKED
Comment on lines +139 to +147
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Skip locked rows before limiting the Oracle pick

The Oracle query chooses a single candidate row in the subquery (FETCH FIRST 1) before FOR UPDATE SKIP LOCKED is applied. If that single row is already locked by another worker, Oracle will skip it in the outer query and return no rows, even when other ready rows exist. This can make selectFirstAvailableWithLock return null and stall processing under contention. To preserve the intended behavior (like the PostgreSQL adapter), apply FOR UPDATE SKIP LOCKED to the ordered selection itself (e.g., ORDER BY ... FETCH FIRST 1 ROWS ONLY FOR UPDATE SKIP LOCKED) or otherwise include SKIP LOCKED in the row selection step so locked rows are skipped before limiting.

Useful? React with 👍 / 👎.

"""

return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, kind)
stmt.setEpochMillis(2, now)
stmt.executeQuery().use { rs ->
if (rs.next()) {
rs.toDBTableRowWithId()
} else {
null
}
}
}
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
override fun acquireManyOptimistically(
conn: SafeConnection,
kind: String,
limit: Int,
lockUuid: String,
timeout: Duration,
now: Instant,
): Int {
require(limit > 0) { "Limit must be > 0" }
val expireAt = now.plus(timeout)

val selectSql =
"""
SELECT "id"
FROM "$tableName"
WHERE ROWID IN (
SELECT ROWID
FROM "$tableName"
WHERE "pKind" = ? AND "scheduledAt" <= ?
ORDER BY "scheduledAt"
FETCH FIRST $limit ROWS ONLY
)
FOR UPDATE SKIP LOCKED
"""

val ids =
conn.prepareStatement(selectSql) { stmt ->
stmt.setString(1, kind)
stmt.setEpochMillis(2, now)
stmt.executeQuery().use { rs ->
val results = mutableListOf<Long>()
while (rs.next()) {
results.add(rs.getLong("id"))
}
results
}
}

if (ids.isEmpty()) return 0

val placeholders = ids.joinToString(",") { "?" }
val updateSql =
"""
UPDATE "$tableName"
SET
"lockUuid" = ?,
"scheduledAt" = ?
WHERE "id" IN ($placeholders)
"""

return conn.prepareStatement(updateSql) { stmt ->
stmt.setString(1, lockUuid)
stmt.setEpochMillis(2, expireAt)
ids.forEachIndexed { index, id -> stmt.setLong(index + 3, id) }
stmt.executeUpdate()
}
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
override fun selectAllAvailableWithLock(
conn: SafeConnection,
lockUuid: String,
count: Int,
offsetId: Long?,
): List<DBTableRowWithId> {
val offsetClause = offsetId?.let { "AND \"id\" > ?" } ?: ""
val sql =
"""
SELECT
"id",
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"lockUuid",
"createdAt"
FROM (
SELECT
"id",
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"lockUuid",
"createdAt"
FROM "$tableName"
WHERE "lockUuid" = ? $offsetClause
ORDER BY "id"
)
WHERE ROWNUM <= $count
"""

return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, lockUuid)
offsetId?.let { stmt.setLong(2, it) }
stmt.executeQuery().use { rs ->
val results = mutableListOf<DBTableRowWithId>()
while (rs.next()) {
results.add(rs.toDBTableRowWithId())
}
results
}
}
}
}
Loading
Loading