Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions delayedqueue-jvm/api/delayedqueue-jvm.api
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ public final class org/funfix/delayedqueue/jvm/JdbcDriver {
public static final field HSQLDB Lorg/funfix/delayedqueue/jvm/JdbcDriver;
public static final field MariaDB Lorg/funfix/delayedqueue/jvm/JdbcDriver;
public static final field MsSqlServer Lorg/funfix/delayedqueue/jvm/JdbcDriver;
public static final field Oracle Lorg/funfix/delayedqueue/jvm/JdbcDriver;
public static final field PostgreSQL Lorg/funfix/delayedqueue/jvm/JdbcDriver;
public static final field Sqlite Lorg/funfix/delayedqueue/jvm/JdbcDriver;
public final fun getClassName ()Ljava/lang/String;
Expand Down
3 changes: 3 additions & 0 deletions delayedqueue-jvm/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ dependencies {
testImplementation(libs.jdbc.mssql)
testImplementation(libs.jdbc.postgresql)
testImplementation(libs.jdbc.mariadb)
testImplementation(libs.jdbc.oracle)
testImplementation(platform(libs.junit.bom))
testImplementation(libs.junit.jupiter)
testImplementation(platform(libs.testcontainers.bom))
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.junit.jupiter)
testImplementation(libs.testcontainers.mssqlserver)
testImplementation(libs.testcontainers.postgresql)
testImplementation(libs.testcontainers.mariadb)
testImplementation(libs.testcontainers.oracle)
testRuntimeOnly(libs.junit.platform.launcher)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.h2.H2Migrations
import org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb.HSQLDBMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.mariadb.MariaDBMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.mssql.MsSqlServerMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.oracle.OracleMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.postgres.PostgreSQLMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SqliteMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.withConnection
Expand Down Expand Up @@ -615,6 +616,7 @@ private constructor(
JdbcDriver.MsSqlServer ->
MsSqlServerMigrations.getMigrations(config.tableName)
JdbcDriver.MariaDB -> MariaDBMigrations.getMigrations(config.tableName)
JdbcDriver.Oracle -> OracleMigrations.getMigrations(config.tableName)
else ->
throw IllegalArgumentException(
"Unsupported JDBC driver: ${config.db.driver}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class JdbcDriver private constructor(public val className: String) {

@JvmField public val PostgreSQL: JdbcDriver = JdbcDriver("org.postgresql.Driver")

@JvmField public val Oracle: JdbcDriver = JdbcDriver("oracle.jdbc.OracleDriver")

@JvmStatic
public val entries: List<JdbcDriver> =
listOf(H2, HSQLDB, MariaDB, MsSqlServer, PostgreSQL, Sqlite)
Comment on lines 22 to 24
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Add Oracle to JdbcDriver.entries lookup list

The new JdbcDriver.Oracle constant is not included in entries, which is the backing list for JdbcDriver.invoke(className). As a result, any Java/Kotlin consumer that resolves a driver by class name (e.g., JdbcDriver("oracle.jdbc.OracleDriver")) will get null and may reject an otherwise valid Oracle configuration, even though Oracle is now supported elsewhere in the code. This makes Oracle behave inconsistently with the other drivers and will break class-name based configuration flows.

Useful? React with 👍 / 👎.

Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JdbcDriver.Oracle is defined, but it is not added to JdbcDriver.entries. This breaks JdbcDriver.invoke(className) lookups for Oracle and means Oracle won’t be included anywhere entries is used. Add Oracle to the entries list (ideally in a consistent order with the other drivers).

Suggested change
listOf(H2, HSQLDB, MariaDB, MsSqlServer, PostgreSQL, Sqlite)
listOf(H2, HSQLDB, MariaDB, MsSqlServer, Oracle, PostgreSQL, Sqlite)

Copilot uses AI. Check for mistakes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.h2.H2Adapter
import org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb.HSQLDBAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.mariadb.MariaDBAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.mssql.MsSqlServerAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.oracle.OracleAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.postgres.PostgreSQLAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SqliteAdapter
import org.funfix.delayedqueue.jvm.internals.utils.Raise
Expand Down Expand Up @@ -473,6 +474,7 @@ internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val t
JdbcDriver.MsSqlServer -> MsSqlServerAdapter(driver, tableName)
JdbcDriver.PostgreSQL -> PostgreSQLAdapter(driver, tableName)
JdbcDriver.MariaDB -> MariaDBAdapter(driver, tableName)
JdbcDriver.Oracle -> OracleAdapter(driver, tableName)
else -> throw IllegalArgumentException("Unsupported JDBC driver: $driver")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.h2.H2Filters
import org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb.HSQLDBFilters
import org.funfix.delayedqueue.jvm.internals.jdbc.mariadb.MariaDBFilters
import org.funfix.delayedqueue.jvm.internals.jdbc.mssql.MSSQLFilters
import org.funfix.delayedqueue.jvm.internals.jdbc.oracle.OracleFilters
import org.funfix.delayedqueue.jvm.internals.jdbc.postgres.PostgreSQLFilters
import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SQLiteFilters

Expand Down Expand Up @@ -82,5 +83,6 @@ internal fun filtersForDriver(driver: JdbcDriver): RdbmsExceptionFilters =
JdbcDriver.Sqlite -> SQLiteFilters
JdbcDriver.MariaDB -> MariaDBFilters
JdbcDriver.PostgreSQL -> PostgreSQLFilters
JdbcDriver.Oracle -> OracleFilters
else -> throw IllegalArgumentException("Unsupported JDBC driver: ${driver.className}")
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ internal object ConnectionPool {
* used as identifiers.
* - MariaDB: uses backticks (`)
* - PostgreSQL, HSQLDB, H2, SQLite: use double quotes (")
* - Oracle: uses double quotes (")
* - MS SQL Server: uses square brackets ([])
*
* @param name The identifier to quote
Expand All @@ -195,6 +196,7 @@ internal fun JdbcDriver.quote(name: String): String =
JdbcDriver.PostgreSQL -> "\"$name\""
JdbcDriver.Sqlite -> "\"$name\""
JdbcDriver.MsSqlServer -> "[$name]"
JdbcDriver.Oracle -> "\"$name\""
else -> throw IllegalArgumentException("Unsupported JDBC driver: ${className}")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
package org.funfix.delayedqueue.jvm.internals.jdbc.oracle

import java.sql.SQLException
import java.time.Duration
import java.time.Instant
import org.funfix.delayedqueue.jvm.JdbcDriver
import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRow
import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRowWithId
import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.SafeConnection
import org.funfix.delayedqueue.jvm.internals.jdbc.prepareStatement
import org.funfix.delayedqueue.jvm.internals.jdbc.toDBTableRowWithId
import org.funfix.delayedqueue.jvm.internals.utils.Raise

/** Oracle-specific adapter. */
internal class OracleAdapter(driver: JdbcDriver, tableName: String) :
SQLVendorAdapter(driver, tableName) {

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
override fun insertOneRow(conn: SafeConnection, row: DBTableRow): Boolean {
// NOTE: this query can still throw an SQLException under concurrency,
// because the NOT EXISTS check is not atomic. But this is still fine,
// as we reduce the error rate, and the call-site does catch the SQLException.
val sql =
"""
INSERT INTO "$tableName"
(
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"createdAt"
)
SELECT ?, ?, ?, ?, ?, ?
FROM dual
WHERE NOT EXISTS (
SELECT 1
FROM "$tableName"
WHERE "pKey" = ? AND "pKind" = ?
)
"""

return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, row.pKey)
stmt.setString(2, row.pKind)
stmt.setBytes(3, row.payload)
stmt.setEpochMillis(4, row.scheduledAt)
stmt.setEpochMillis(5, row.scheduledAtInitially)
stmt.setEpochMillis(6, row.createdAt)
stmt.setString(7, row.pKey)
stmt.setString(8, row.pKind)
stmt.executeUpdate() > 0
}
Comment on lines +19 to +54
Copy link

Copilot AI Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insertOneRow is documented to return false when the key already exists, but this Oracle implementation can still throw a duplicate-key SQLException under concurrency (the NOT EXISTS check isn’t fully race-free). Consider catching duplicate-key exceptions here (using the existing filtersForDriver(driver).duplicateKey) and returning false, or at least add an explicit comment like the MS-SQL adapter does so callers don’t rely on the return value alone.

Copilot uses AI. Check for mistakes.
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
override fun selectForUpdateOneRow(
conn: SafeConnection,
kind: String,
key: String,
): DBTableRowWithId? {
val sql =
"""
SELECT
"id",
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"lockUuid",
"createdAt"
FROM "$tableName"
WHERE "pKey" = ? AND "pKind" = ? AND ROWNUM <= 1
FOR UPDATE
"""

return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, key)
stmt.setString(2, kind)
stmt.executeQuery().use { rs ->
if (rs.next()) {
rs.toDBTableRowWithId()
} else {
null
}
}
}
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
override fun selectByKey(conn: SafeConnection, kind: String, key: String): DBTableRowWithId? {
val sql =
"""
SELECT
"id",
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"lockUuid",
"createdAt"
FROM "$tableName"
WHERE "pKey" = ? AND "pKind" = ? AND ROWNUM <= 1
"""

return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, key)
stmt.setString(2, kind)
stmt.executeQuery().use { rs ->
if (rs.next()) {
rs.toDBTableRowWithId()
} else {
null
}
}
}
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
override fun selectFirstAvailableWithLock(
conn: SafeConnection,
kind: String,
now: Instant,
): DBTableRowWithId? {
val sql =
"""
SELECT
"id",
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"lockUuid",
"createdAt"
FROM "$tableName"
WHERE ROWID IN (
SELECT ROWID
FROM "$tableName"
WHERE "pKind" = ? AND "scheduledAt" <= ?
ORDER BY "scheduledAt"
FETCH FIRST 1 ROWS ONLY
)
FOR UPDATE SKIP LOCKED
Comment on lines +139 to +147
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Skip locked rows before limiting the Oracle pick

The Oracle query chooses a single candidate row in the subquery (FETCH FIRST 1) before FOR UPDATE SKIP LOCKED is applied. If that single row is already locked by another worker, Oracle will skip it in the outer query and return no rows, even when other ready rows exist. This can make selectFirstAvailableWithLock return null and stall processing under contention. To preserve the intended behavior (like the PostgreSQL adapter), apply FOR UPDATE SKIP LOCKED to the ordered selection itself (e.g., ORDER BY ... FETCH FIRST 1 ROWS ONLY FOR UPDATE SKIP LOCKED) or otherwise include SKIP LOCKED in the row selection step so locked rows are skipped before limiting.

Useful? React with 👍 / 👎.

"""

return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, kind)
stmt.setEpochMillis(2, now)
stmt.executeQuery().use { rs ->
if (rs.next()) {
rs.toDBTableRowWithId()
} else {
null
}
}
}
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
override fun acquireManyOptimistically(
conn: SafeConnection,
kind: String,
limit: Int,
lockUuid: String,
timeout: Duration,
now: Instant,
): Int {
require(limit > 0) { "Limit must be > 0" }
val expireAt = now.plus(timeout)

val selectSql =
"""
SELECT "id"
FROM "$tableName"
WHERE ROWID IN (
SELECT ROWID
FROM "$tableName"
WHERE "pKind" = ? AND "scheduledAt" <= ?
ORDER BY "scheduledAt"
FETCH FIRST $limit ROWS ONLY
)
FOR UPDATE SKIP LOCKED
"""

val ids =
conn.prepareStatement(selectSql) { stmt ->
stmt.setString(1, kind)
stmt.setEpochMillis(2, now)
stmt.executeQuery().use { rs ->
val results = mutableListOf<Long>()
while (rs.next()) {
results.add(rs.getLong("id"))
}
results
}
}

if (ids.isEmpty()) return 0

val placeholders = ids.joinToString(",") { "?" }
val updateSql =
"""
UPDATE "$tableName"
SET
"lockUuid" = ?,
"scheduledAt" = ?
WHERE "id" IN ($placeholders)
"""

return conn.prepareStatement(updateSql) { stmt ->
stmt.setString(1, lockUuid)
stmt.setEpochMillis(2, expireAt)
ids.forEachIndexed { index, id -> stmt.setLong(index + 3, id) }
stmt.executeUpdate()
}
}

context(_: Raise<InterruptedException>, _: Raise<SQLException>)
override fun selectAllAvailableWithLock(
conn: SafeConnection,
lockUuid: String,
count: Int,
offsetId: Long?,
): List<DBTableRowWithId> {
val offsetClause = offsetId?.let { "AND \"id\" > ?" } ?: ""
val sql =
"""
SELECT
"id",
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"lockUuid",
"createdAt"
FROM (
SELECT
"id",
"pKey",
"pKind",
"payload",
"scheduledAt",
"scheduledAtInitially",
"lockUuid",
"createdAt"
FROM "$tableName"
WHERE "lockUuid" = ? $offsetClause
ORDER BY "id"
)
WHERE ROWNUM <= $count
"""

return conn.prepareStatement(sql) { stmt ->
stmt.setString(1, lockUuid)
offsetId?.let { stmt.setLong(2, it) }
stmt.executeQuery().use { rs ->
val results = mutableListOf<DBTableRowWithId>()
while (rs.next()) {
results.add(rs.toDBTableRowWithId())
}
results
}
}
}
}
Loading