Exposed: Support upsert functionality

Created on 26 Sep 2017  路  21Comments  路  Source: JetBrains/Exposed

enhancement good-first-issue

Most helpful comment

Hey, is there any plans to support upsert out of the box in Exposed?

All 21 comments

Should be fixed in pair with #186

Workaround is to implement this locally with help of "ON DUPLICATE KEY UPDATE":

class BatchInsertUpdateOnDuplicate(table: Table, val onDupUpdate: List<Column<*>>): BatchInsertStatement(table, false) {
    override fun prepareSQL(transaction: Transaction): String {
        val onUpdateSQL = if(onDupUpdate.isNotEmpty()) {
            " ON DUPLICATE KEY UPDATE " + onDupUpdate.joinToString { "${transaction.identity(it)}=VALUES(${transaction.identity(it)})" }
        } else ""
        return super.prepareSQL(transaction) + onUpdateSQL
    }
}

fun <T: Table, E> T.batchInsertOnDuplicateKeyUpdate(data: List<E>, onDupUpdateColumns: List<Column<*>>, body: T.(BatchInsertUpdateOnDuplicate, E) -> Unit): List<Int> {
    return data.takeIf { it.isNotEmpty() }?.let {
        val insert = BatchInsertUpdateOnDuplicate(this, onDupUpdateColumns)
        data.forEach {
            insert.addBatch()
            body(insert, it)
        }
        TransactionManager.current().exec(insert)
        columns.firstOrNull { it.columnType.isAutoInc }?.let { idCol ->
            insert.generatedKey?.mapNotNull {
                val value = it[idCol]
                when (value) {
                    is Long -> value.toInt()
                    is Int -> value
                    null -> null
                    else -> error("can't find primary key of type Int or Long; map['$idCol']='$value' (where map='$it')")
                }
            }
        }
    }.orEmpty()
}

// Usage sample
FooTable.batchInsertOnDuplicateKeyUpdate(listOf(fooObject), listOf(FooTable.barField)) { batch, foo ->
      batch[FooTable.id] = foo.id
      batch[FooTable.barField] = foo.bar
}

Here is another method that seems to work for PostgreSQL's upsert:

fun <T : Table> T.insertOrUpdate(key: Column<*>, body: T.(InsertStatement<Number>) -> Unit) =
        InsertOrUpdate<Number>(key, this).apply {
            body(this)
            execute(TransactionManager.current())
        }

class InsertOrUpdate<Key : Any>(private val key: Column<*>,
                                table: Table,
                                isIgnore: Boolean = false) : InsertStatement<Key>(table, isIgnore) {
    override fun prepareSQL(transaction: Transaction): String {
        val updateSetter = table.columns.joinToString { "${it.name} = EXCLUDED.${it.name}" }
        val onConflict = "ON CONFLICT (${key.name}) DO UPDATE SET $updateSetter"
        return "${super.prepareSQL(transaction)} $onConflict"
    }
}

table.columns may need to be mapped through transation.identity if there are some quotations that need to be taken care of

That upsert method didn't work for me. Instead of "insert or update" it would "insert or replace", meaning that unrelated columns got changed. The following fix seemed to work (the difference is that we only alter keys that are in the transaction):

fun <T : Table> T.insertOrUpdate(vararg keys: Column<*>, body: T.(InsertStatement<Number>) -> Unit) =
    InsertOrUpdate<Number>(keys, this).apply {
        body(this)
        execute(TransactionManager.current())
    }

class InsertOrUpdate<Key : Any>(private val keys: Array< out Column<*>>,
                                table: Table,
                                isIgnore: Boolean = false
                                ) : InsertStatement<Key>(table, isIgnore) {
    override fun prepareSQL(transaction: Transaction): String {
        val updateSetter = super.values.keys.joinToString { "${it.name} = EXCLUDED.${it.name}" }
        val keyColumns = keys.joinToString(","){it.name}
        val onConflict = "ON CONFLICT ($keyColumns) DO UPDATE SET $updateSetter"
        return "${super.prepareSQL(transaction)} $onConflict"
    }
}

This is for a single insert on update rather than a batch - tested on MySQL.

fun <T : Table> T.insertOrUpdate(vararg onDuplicateUpdateKeys: Column<*>, body: T.(InsertStatement<Number>) -> Unit) =
        InsertOrUpdate<Number>(onDuplicateUpdateKeys,this).apply {
            body(this)
            execute(TransactionManager.current())
        }

class InsertOrUpdate<Key : Any>(
        private val onDuplicateUpdateKeys: Array< out Column<*>>,
        table: Table,
        isIgnore: Boolean = false
) : InsertStatement<Key>(table, isIgnore) {
    override fun prepareSQL(transaction: Transaction): String {
        val onUpdateSQL = if(onDuplicateUpdateKeys.isNotEmpty()) {
            " ON DUPLICATE KEY UPDATE " + onDuplicateUpdateKeys.joinToString { "${transaction.identity(it)}=VALUES(${transaction.identity(it)})" }
        } else ""
        return super.prepareSQL(transaction) + onUpdateSQL
    }
}

// Example:
//    CustomerTable.insertOrUpdate(
//            CustomerTable.favouriteColour
//    ) {
//        it[id] = customer.id
//        it[favouriteColour] = customerFavouriteColour
//    }

I made something similar to @carolosf 's code, however, it takes an index or column that is used as a constraint.
The code supports PostgreSQL and I also just tested it with MariaDB 10.8.3.
No warranties.

Example usage:

object MyTable : Table() { 
    val id = integer("id").primaryKey().autoIncrement()
    val attribute = integer("attribute")
}

fun setAttribute(id: Int, attribute: Int) {
    MyTable.upsert(MyTable.id) { 
        it[id] = id
        it[attribute] = attribute
    }
}

Do not call upsert without passing a column or an index.
The functions at the bottom can be used instead of index, uniqueIndex to keep a reference to the Index object in the table as a property: val pair_constraint = uniqueIndexR(col1, col2)

Code:

import org.jetbrains.exposed.sql.Column
import org.jetbrains.exposed.sql.Index
import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.statements.InsertStatement
import org.jetbrains.exposed.sql.transactions.TransactionManager

class UpsertStatement<Key : Any>(table: Table, conflictColumn: Column<*>? = null, conflictIndex: Index? = null)
    : InsertStatement<Key>(table, false) {
    val indexName: String
    val indexColumns: List<Column<*>>

    init {
        when {
            conflictIndex != null -> {
                indexName = conflictIndex.indexName
                indexColumns = conflictIndex.columns
            }
            conflictColumn != null -> {
                indexName = conflictColumn.name
                indexColumns = listOf(conflictColumn)
            }
            else -> throw IllegalArgumentException()
        }
    }

    override fun prepareSQL(transaction: Transaction) = buildString {
        append(super.prepareSQL(transaction))

        val dialect = transaction.db.vendor
        if (dialect == "postgresql") {

            append(" ON CONFLICT(")
            append(indexName)
            append(") DO UPDATE SET ")

            values.keys.filter { it !in indexColumns }.joinTo(this) { "${transaction.identity(it)}=EXCLUDED.${transaction.identity(it)}" }

        } else {

            append (" ON DUPLICATE KEY UPDATE ")
            values.keys.filter { it !in indexColumns }.joinTo(this) { "${transaction.identity(it)}=VALUES(${transaction.identity(it)})" }

        }
    }

}

inline fun <T : Table> T.upsert(conflictColumn: Column<*>? = null, conflictIndex: Index? = null, body: T.(UpsertStatement<Number>) -> Unit) =
    UpsertStatement<Number>(this, conflictColumn, conflictIndex).apply {
        body(this)
        execute(TransactionManager.current())
    }

fun Table.indexR(customIndexName: String? = null, isUnique: Boolean = false, vararg columns: Column<*>): Index {
    val index = Index(columns.toList(), isUnique, customIndexName)
    indices.add(index)
    return index
}

fun Table.uniqueIndexR(customIndexName: String? = null, vararg columns: Column<*>): Index = indexR(customIndexName, true, *columns)

Hey, is there any plans to support upsert out of the box in Exposed?

I modified @AllanWang's code to quote column names via identity() and support multiple vararg conflict columns for my own needs:

fun <T : Table> T.insertOrUpdate(vararg keys: Column<*>, body: T.(InsertStatement<Number>) -> Unit) =
    InsertOrUpdate<Number>(this, keys = *keys).apply {
        body(this)
        execute(TransactionManager.current())
    }

class InsertOrUpdate<Key : Any>(
    table: Table,
    isIgnore: Boolean = false,
    private vararg val keys: Column<*>
) : InsertStatement<Key>(table, isIgnore) {
    override fun prepareSQL(transaction: Transaction): String {
        val tm = TransactionManager.current()
        val updateSetter = table.columns.joinToString { "${tm.identity(it)} = EXCLUDED.${tm.identity(it)}" }
        val onConflict = "ON CONFLICT (${keys.joinToString { tm.identity(it) }}) DO UPDATE SET $updateSetter"
        return "${super.prepareSQL(transaction)} $onConflict"
    }
}

Would love to see official support in Exposed, though.

Used solution from @Dico200 , but had to modify it to work with index (only tested on Postgres).
Exposed would create a constraint and the syntax for that is: ON CONFLICT ON CONSTRAINT

import org.jetbrains.exposed.sql.Column
import org.jetbrains.exposed.sql.Index
import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.statements.InsertStatement
import org.jetbrains.exposed.sql.transactions.TransactionManager

class UpsertStatement<Key : Any>(table: Table, conflictColumn: Column<*>? = null, conflictIndex: Index? = null) :
    InsertStatement<Key>(table, false) {

    private val indexName: String
    private val indexColumns: List<Column<*>>
    private val index: Boolean

    init {
        when {
            conflictIndex != null -> {
                index = true
                indexName = conflictIndex.indexName
                indexColumns = conflictIndex.columns
            }
            conflictColumn != null -> {
                index = false
                indexName = conflictColumn.name
                indexColumns = listOf(conflictColumn)
            }
            else -> throw IllegalArgumentException()
        }
    }

    override fun prepareSQL(transaction: Transaction) = buildString {
        append(super.prepareSQL(transaction))

        val dialect = transaction.db.vendor
        if (dialect == "postgresql") {
            if (index) {
                append(" ON CONFLICT ON CONSTRAINT ")
                append(indexName)
            } else {
                append(" ON CONFLICT(")
                append(indexName)
                append(")")
            }
            append(" DO UPDATE SET ")

            values.keys.filter { it !in indexColumns }
                .joinTo(this) { "${transaction.identity(it)}=EXCLUDED.${transaction.identity(it)}" }

        } else {

            append(" ON DUPLICATE KEY UPDATE ")
            values.keys.filter { it !in indexColumns }
                .joinTo(this) { "${transaction.identity(it)}=VALUES(${transaction.identity(it)})" }

        }
    }

}

inline fun <T : Table> T.upsert(
    conflictColumn: Column<*>? = null,
    conflictIndex: Index? = null,
    body: T.(UpsertStatement<Number>) -> Unit
) =
    UpsertStatement<Number>(this, conflictColumn, conflictIndex).apply {
        body(this)
        execute(TransactionManager.current())
    }

fun Table.indexR(customIndexName: String? = null, isUnique: Boolean = false, vararg columns: Column<*>): Index {
    val index = Index(columns.toList(), isUnique, customIndexName)
    indices.add(index)
    return index
}

fun Table.uniqueIndexR(customIndexName: String? = null, vararg columns: Column<*>): Index =
    indexR(customIndexName, true, *columns)

+1 for official support

+1

Had to provide different values in insert and update blocks so I wrote my own implementation based on @carolosf version. Thought it might be useful. Tested with MySQL and MariaDB.

/**
 * Insert or update a-ka upsert implementation
 *
 * Sample usage:
 *
 * class SampleTable: IntIdTable("whatever"){
 *     val identifier = varchar("identifier", 32").uniqueIndex()
 *     val value = varchar("value", 32)
 * }
 *
 * transaction {
 *     SampleTable.insertOrUpdate({
 *         it[SampleTable.identifier] = "some identifier"
 *         it[SampleTable.value] = "inserted"
 *     }){
 *         it[SampleTable.value] = "updated"
 *     }
 * }
 *
 * Which is equivalent of:
 *
 * INSERT INTO whatever(identifier, value) VALUES('some identifier', 'inserted')
 * ON DUPLICATE KEY UPDATE value = 'updated'
 */
fun <T : Table> T.insertOrUpdate(insert: T.(InsertStatement<Number>) -> Unit, update: T.(UpdateBuilder<Int>) -> Unit) {
    val updateStatement = UpsertUpdateBuilder(this).apply { update(this) }
    InsertOrUpdate<Number>(updateStatement,this).apply {
        insert(this)
        execute(TransactionManager.current())
    }
}

private class UpsertUpdateBuilder(table: Table) : UpdateBuilder<Int>(StatementType.OTHER, listOf(table)){

    val firstDataSet: List<Pair<Column<*>, Any?>> get() = values.toList()

    override fun arguments(): List<List<Pair<IColumnType, Any?>>> = QueryBuilder(true).run {
        values.forEach {
            registerArgument(it.key, it.value)
        }
        if (args.isNotEmpty()) listOf(args) else emptyList()
    }

    override fun prepareSQL(transaction: Transaction): String {
        throw IllegalStateException("prepareSQL in UpsertUpdateBuilder is not supposed to be used")
    }

    override fun PreparedStatementApi.executeInternal(transaction: Transaction): Int {
        throw IllegalStateException("executeInternal in UpsertUpdateBuilder is not supposed to be used")
    }
}

private class InsertOrUpdate<Key : Any>(
        val update: UpsertUpdateBuilder,
        table: Table,
        isIgnore: Boolean = false
) : InsertStatement<Key>(table, isIgnore) {

    override fun arguments(): List<List<Pair<IColumnType, Any?>>> {
        val updateArgs = update.arguments()
        return super.arguments().mapIndexed { index, list -> list + (updateArgs.getOrNull(index) ?: return@mapIndexed list) }
    }

    override fun prepareSQL(transaction: Transaction): String {
        val values = update.firstDataSet
        if(values.isEmpty())
            return super.prepareSQL(transaction)


        val originalStatement = super.prepareSQL(transaction)

        val updateStm = with(QueryBuilder(true)){
            values.appendTo(this) { (col, value) ->
                append("${transaction.identity(col)}=")
                registerArgument(col, value)
            }
            toString()
        }

        return "$originalStatement ON DUPLICATE KEY UPDATE $updateStm"
    }
}

I'd love to see this functionality natively in Exposed, too.

Unfortunately this does not work with PostgreSQL as it uses a different syntax ('ON CONFLICT'). Would love to see this covered by Exposed directly, also to keep your app code more independent.

Here's my implementation of upsert and batch upsert. I tried to take the best of everything above. I use this w/ PostgreSQL in production and has been tested against MySQL.

fun <T : Table> T.upsert(
    vararg keys: Column<*>,
    body: T.(InsertStatement<Number>) -> Unit
) = UpsertStatement<Number>(this, keys = keys).apply {
    body(this)
    execute(TransactionManager.current())
}

fun <T : Table, E> T.batchUpsert(
    data: Collection<E>,
    vararg keys: Column<*>,
    body: T.(BatchUpsertStatement, E) -> Unit
) {
    if (data.isEmpty()) {
        return
    }

    BatchUpsertStatement(this, keys = keys).apply {
        data.forEach {
            addBatch()
            body(this, it)
        }
        execute(TransactionManager.current())
    }
}

class UpsertStatement<Key : Any>(
    table: Table,
    isIgnore: Boolean = false,
    private vararg val keys: Column<*>
) : InsertStatement<Key>(table, isIgnore) {
    override fun prepareSQL(transaction: Transaction) = buildString {
        append(super.prepareSQL(transaction))
        append(transaction.onUpdateSql(values.keys, *keys))
    }
}

class BatchUpsertStatement(
    table: Table,
    isIgnore: Boolean = false,
    private vararg val keys: Column<*>
) : BatchInsertStatement(table, isIgnore) {
    override fun prepareSQL(transaction: Transaction) = buildString {
        append(super.prepareSQL(transaction))
        append(transaction.onUpdateSql(values.keys, *keys))
    }
}

private fun Transaction.onUpdateSql(values: Iterable<Column<*>>, vararg keys: Column<*>) = buildString {
    if (db.isPostgreSQL()) {
        append(" ON CONFLICT (${keys.joinToString(",") { identity(it) }})")
        values.filter { it !in keys }.takeIf { it.isNotEmpty() }?.let { fields ->
            append(" DO UPDATE SET ")
            fields.joinTo(this, ", ") { "${identity(it)} = EXCLUDED.${identity(it)}" }
        } ?: append(" DO NOTHING")
    } else {
        append(" ON DUPLICATE KEY UPDATE ")
        values.joinTo(this, ", ") { "${identity(it)} = VALUES(${identity(it)})" }
    }
}

And you'll need this helper:

fun Database.isPostgreSQL() = vendor == "postgresql"

Rocket science.

In September we can celebrate 4 years since this issue was open and still hasn't been resolved.

Here's my implementation of upsert and batch upsert. I tried to take the best of everything above. I use this w/ PostgreSQL in production and has been tested against MySQL.

fun <T : Table> T.upsert(
    vararg keys: Column<*>,
    body: T.(InsertStatement<Number>) -> Unit
) = UpsertStatement<Number>(this, keys = keys).apply {
    body(this)
    execute(TransactionManager.current())
}

fun <T : Table, E> T.batchUpsert(
    data: Collection<E>,
    vararg keys: Column<*>,
    body: T.(BatchUpsertStatement, E) -> Unit
) {
    if (data.isEmpty()) {
        return
    }

    BatchUpsertStatement(this, keys = keys).apply {
        data.forEach {
            addBatch()
            body(this, it)
        }
        execute(TransactionManager.current())
    }
}

class UpsertStatement<Key : Any>(
    table: Table,
    isIgnore: Boolean = false,
    private vararg val keys: Column<*>
) : InsertStatement<Key>(table, isIgnore) {
    override fun prepareSQL(transaction: Transaction) = buildString {
        append(super.prepareSQL(transaction))
        append(transaction.onUpdateSql(values.keys, *keys))
    }
}

class BatchUpsertStatement(
    table: Table,
    isIgnore: Boolean = false,
    private vararg val keys: Column<*>
) : BatchInsertStatement(table, isIgnore) {
    override fun prepareSQL(transaction: Transaction) = buildString {
        append(super.prepareSQL(transaction))
        append(transaction.onUpdateSql(values.keys, *keys))
    }
}

private fun Transaction.onUpdateSql(values: Iterable<Column<*>>, vararg keys: Column<*>) = buildString {
    if (db.isPostgreSQL()) {
        append(" ON CONFLICT (${keys.joinToString(",") { identity(it) }})")
        values.filter { it !in keys }.takeIf { it.isNotEmpty() }?.let { fields ->
            append(" DO UPDATE SET ")
            fields.joinTo(this, ", ") { "${identity(it)} = EXCLUDED.${identity(it)}" }
        } ?: append(" DO NOTHING")
    } else {
        append(" ON DUPLICATE KEY UPDATE ")
        values.joinTo(this, ", ") { "${identity(it)} = VALUES(${identity(it)})" }
    }
}

And you'll need this helper:

fun Database.isPostgreSQL() = vendor == "postgresql"

Rocket science.

image

When updating a column by calculating a new value from an old value, "balance" will always be the default value

In September we can celebrate 4 years since this issue was open and still hasn't been resolved.

Kinda sad to see such an issue unresolved for 4 years by the framework that pretends to be an alternative for some other popular solutions :(

For those using the Postgres - a year or so ago I created super simple library based on the comments here -https://github.com/LukasForst/exposed-upsert

no need to copy paste code, the library is now available on the Maven Central

@jnfeinstein - any plans to make your solution into a library or create a PR to exposed? Would you mind attaching a license to that code so I can borrow it? Thanks!

If anyone still interested in a library based solution, here is the implementation that covers all dialects that support queries with upsert functionality: https://github.com/dzikoysk/exposed-upsert

  • Implements all dialects that support native upsert possibilities
  • Tested against real databases through dedicated Docker containers provided by Testcontainers
  • Licensed to public domain, you can do whatever you want with sources in this repository

The motivation behind this implementation is that nobody at this moment covered both MySQL and PostgreSQL based solutions at once with expressions support and separated insert and update body. The LukasForst's library supports only PostgreSQL and the issue related to MySQL just didn't receive any feedback. I hope it'll help someone who needs this feature.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

barry-m picture barry-m  路  3Comments

hannesstruss picture hannesstruss  路  5Comments

brabo-hi picture brabo-hi  路  4Comments

coolemza picture coolemza  路  3Comments

gcscaglia picture gcscaglia  路  3Comments