Tidb: Support Background Execution or CREATE EVENT

Created on 4 Nov 2020  ยท  18Comments  ยท  Source: pingcap/tidb

Feature Request

Is your feature request related to a problem? Please describe:

TiDB contains some commands which requires a long-running connection, such as BACKUP, RESTORE, LOAD DATA, SELECT INTO, and in the future IMPORT and FLASHBACK TO TIMESTAMP. All of these operations will be aborted if the connection is lost. However, because of network conditions such requirement is fragile, and no really suitable for robust backup/restore scenario.

We would like to have TiDB itself manage the connection by placing these operations "in background", which should live as long as the TiDB node/cluster itself.

(Further details described in comment)

(Note: This feature is filed mainly as a discussion on the syntax and semantics of async import in SQL.)

siinfra typfeature-request

Most helpful comment

Very promising! Some random thoughts below:

I would like to see EVENT scheduling support on heterogeneous clusters. Different kind of EVENT requires very different ideal instance specifications with each other and the sql compute instance. For example, a problem we are facing now is that IMPORT requires a high CPU instance with effective local SSD to achieve ideal performance, while the SQL computation nodes are better to be low-end machines with no additional Disks in favor of cost-efficiency. It would save a lot of cost if we can schedule different IMPORT to a dynamically provisioned high CPU instance while keep using the low-end machines to host sql compute instances. In addition, a Pending state of EVENT should be introduced to indicate a EVENT cannot be successfully scheduled, which allows the infrastructure orchestration system like k8s to fit in.

A possible workflow would look like this:

  1. ON INSTANCE keyword supports label selection, e.g. ON INSTANCE WHERE type=importer
  2. EVENT stays in Pending state if there is no instance with label type=importer
  3. External orchestration system witness this either by SQL or watching PD/ETCD
  4. External orchestration system provisions a proper instance labelled type=importer
  5. Once the instance is no longer needed, it can be released by the external orchestration system

cc @gregwebs @IANTHEREAL

All 18 comments

There are 3 approaches to background execution, increasing in generality.

1. ASYNC option

In SAP HAHA, the BACKUP DATA statement (and only BACKUP DATA statement) can request the execution be asynchronous with the ASYNCHRONOUS option:

-- synchronous (return until success)
BACKUP DATA USING FILE ('prefix_1_');

-- asynchronous (return immediately)
BACKUP DATA USING FILE ('prefix_2_') ASYNCHRONOUS;

We could do the same to the BRIE statements like

IMPORT DATABASE * FROM 'file:///data/source' ASYNC=TRUE;

When ASYNC is enabled, TiDB will effectively create a local background connection, and then execute the IMPORT command on that new connection.

2. ASYNC modifier

Asynchronous execution is not exclusive to BRIE statements though. We can add an ASYNC keyword similar to TRACE and EXPLAIN:

mysql> ASYNC IMPORT DATABASE * FROM 'file:///data/source';
+----+
| Id |
+----+
| 31 |
+----+
1 row in set (0.00 sec)

Similar to option 1, this will create a local background connection and execute the inner statement.

The following statements must not be async-able:

  • BEGIN โ€” to avoid confusion with the compound statement syntax; and TiDB does not support compound statements.
  • Transaction statements (START TRANSACTION, COMMIT, etc) โ€” transactions are useless without compound statements.
  • Prepared statements (PREPARE, EXECUTE, DEALLOCATE) โ€” prepared statements are not shared across connections, so without compound statements they are useless.
  • Cursor statements (DECLARE, OPEN, FETCH, CLOSE)

And the following statements must be async-able:

  • ADMIN CHECK
  • ANALYZE
  • BRIE statements (BACKUP, RESTORE, IMPORT)
  • SELECT INTO
  • DML statements (INSERT, UPDATE, DELETE, REPLACE)
  • FLASHBACK TO TIMESTAMP
  • LOAD DATA (and LOAD STATS?)

Connections started by ASYNC can be KILLed like a normal connection.

KILL QUERY 31;

Async output

Output and errors from the ASYNC statement is lost. In real MySQL, this can be worked around by using SELECT INTO or INSERT SELECT to capture the output, and extract the error from the error log. But output from non-SELECT statements like BRIE, EXPLAIN ANALYZE and ADMIN CHECKSUM cannot be captured this way. For BRIE, we should modify the "queue" to also store results from completed tasks, so that we can recover those vital information via SHOW BACKUPS and friends. For ADMIN CHECKSUM, we can introduce a function like:

mysql> SELECT tidb_checksum_table(test.a);
+-----------------------------------------------------------------------------------+
| tidb_checksum_table(test.a)                                                       |
+-----------------------------------------------------------------------------------+
| {"total_kvs": 54, "total_bytes": 1294, "checksum_crc64_xor": 8419007208716623477} |
+-----------------------------------------------------------------------------------+
1 row in set (0.09 sec)

which allows for SELECT INTO. We don't know what to do with EXPLAIN ANALYZE.

An alternative to these ad-hoc solution is to always store at least 1 row of output into a shared table, and introduce the statement

SHOW ASYNC RESULT 31;

to extract it. This considerably complicates the design of ASYNC which I don't think we should pursue.

3. CREATE EVENT

MySQL and MariaDB has a built-in event scheduler, which allows background execution. The ASYNC statement above is equivalent to

CREATE EVENT `<unique_name>` ON SCHEDULE AT NOW() DO stmt;

MySQL is quite unique that provides a dedicated syntax for event schedulers. Other RDBMS does have the same concept but usually exposed via extensions or built-in stored procedures, e.g. Oracle's DBMS_SCHEDULER package, Transact-SQL's SQL Server Agent (sp_add_jobstep) and PostgreSQL's pg_cron extension.

(The advantage of supporting CREATE EVENT is that we could do cron job entirely in SQL:)

CREATE EVENT backup_daily 
    ON SCHEDULE EVERY 2 WEEK STARTS '2020-01-01 04:30:00'
    DO BACKUP DATABASE * TO date_format(now(), 's3://mybackupbucket/%Y-%m-%d');

MySQL events are concrete database objects like tables, views and sequences. So unlike ASYNC statements, events have to be managed by the whole cluster.

A simple implementation is to run a loop in the DDL owner:

nextTriggerTime := time.MaxTime
for {
    select {
    case <-eventListChanged:
    case <-time.After(nextTriggerTime.Sub(time.Now())): // TODO overflow check
    }

    if !m.IsOwner() {
        break
    }

    nextTriggerTime = time.MaxTime
    if !variable.EventScheduler {
        continue
    }

    now := time.Now()
    for _, event := range listAllEvents() {
        if event.ExecuteAt <= now {
            event.Trigger()
            event.Reschedule() // rescheduling may disable or drop the event
        }
        nextTriggerTime = min(nextTriggerTime, event.ExecuteAt)
    }
}

The eventListChanged channel is sent whenever one of the following happens:

  • The DDL owner is changed
  • CREATE EVENT
  • ALTER EVENT
  • DROP EVENT
  • DROP DATABASE (those which contains an event)
  • SET GLOBAL event_scheduler

The @@GLOBAL.event_scheduler system variable can be used to toggle whether events are going to be scheduled. Toggling this from OFF to ON will immediately schedule all overdue events, following MySQL's behavior. We should probably not support DISABLED because distributed server complicates the setting.

Dropping a running event will not kill the query. Multiple queries issued by the same event can run concurrently.

The event execution output and errors are lost, similar to the situation in ASYNC.

CREATE EVENT requires the PROCESS permission from the active role.

Executing node

Some statements like LOAD STATS, IMPORT and (non-LOCAL) LOAD DATA reads files from the TiDB's file system. Therefore, it is important for these to pin on which TiDB node to run on. The rest of the statement like BACKUP mainly rely on TiKV storage or cloud storage.

CREATE EVENT e1 ON CURRENT INSTANCE ONLY ON SCHEDULE AT NOW() DO IMPORT ...;
CREATE EVENT e2 ON ANY INSTANCE ON SCHEDULE AT NOW() DO BACKUP ...;

The DDL owner and the TiDB scheduling the event do not have to be the same node. Thus, the DDL owner has to send the event back to the original node to execute the statement. There are 4 approaches to this.

  1. Push mode, send the SQL through the :4000 port. But this requires TiDB to provide some way to login as the DEFINER of the event, which may be a security hole if handled incorrectly. This also assumes the --advertise-addr is always filled in correctly.
  2. Push mode, send the SQL through the :10080 port. Create a new HTTP API to trigger an event. This is "safe" in the sense that if the attacker can access :10080 everything is lost anyway. But this still assumes --advertise-addr is correct.
  3. Push mode, new gRPC service. TiDB could define a new gRPC service among all nodes for triggering events. This is quite a big architectural change however.
  4. Pull mode, watching etcd. The DDL owner would Put(WithLease) the event name onto etcd, the key prefixed by the target TiDB's ddlCtx.uuid, with a TTL of 30s (maybe shorter). Every TiDB will Watch this KV space, and delete the key once available (and execute the event).

In option 3 and 4, if the original TiDB node is restarted, the event will never start since the UUID is changed, while in options 1 and 2 they may execute it as the IP is unchanged. I consider TiDB crashing to be a rare event that should not be the optimization target. If reliability is required, using ON ANY INSTANCE with a distributed data source (e.g. IMPORT ... FROM 's3://...') is preferred.

Thus, it seems option 4 is the simplest solution.

var triggerCounter uint64

func (e *Event) Trigger() {
    if len(e.Instance) == 0 || e.Instance == d.uuid {
        go ctx.Execute(e.Statement) // login as definer and change the charset/timezone/sql_mode/...
    } else {
        grant, _ := etcdCli.Grant(ctx, 30)
        counter := atomic.AddUint64(&triggerCounter, 1)
        key := fmt.Sprintf("/tidb/events/exec/%s/%x", e.Instance, counter)
        var value [8]byte
        binary.LittleEndian.PutUint64(value[:], e.ID)
        etcdCli.Put(ctx, key, string(value), clientv3.WithLease(grant.ID))
    }
}
keyPrefix := fmt.Sprintf("/tidb/events/exec/%s/", e.Instance)
ch := etcdCli.Watch(ctx, keyPrefix, clientv3.WithPrefix(), clientv3.WithFilterDelete())
for resp := range ch {
    for ev := range resp.events {
        etcdCli.Delete(ctx, string(ev.Kv.Key))
        event := is.TableByID(binary.LittleEndian.Uint64(ev.Kv.Value))
        go ctx.Execute(event.Statement)
    }
}

Syntax

Create event:

CREATE [OR REPLACE]
    [DEFINER = username]
    EVENT [IF NOT EXISTS] event_name
    [ON ANY INSTANCE | ON CURRENT INSTANCE ONLY]
    ON SCHEDULE { AT expr | EVERY expr time_unit [STARTS expr] [ENDS expr] }
    [ON COMPLETION [NOT] PRESERVE]
    [ENABLE | DISABLE | DISABLE ON {SLAVE | REPLICA}]
    [COMMENT 'comment']
    DO asyncable_stmt;

Alter event:

ALTER
    [DEFINER = username]
    EVENT event_name
    [ON ANY INSTANCE | ON CURRENT INSTANCE ONLY]
    [ON SCHEDULE { AT expr | EVERY expr time_unit [STARTS expr] [ENDS expr] }]
    [ON COMPLETION [NOT] PRESERVE]
    [RENAME TO new_event_name]
    [ENABLE | DISABLE | DISABLE ON {SLAVE | REPLICA}]
    [COMMENT 'comment']
    [DO asyncable_stmt];

Drop event:

DROP EVENT [IF EXISTS] event_name;

Show events:

SHOW EVENTS [{FROM | IN} schema_name] [LIKE 'name_pattern' | WHERE expr];

Show create event:

SHOW CREATE EVENT event_name;

The INFORMATION_SCHEMA.EVENTS table (view) should exist.

Model

Since events are stored objects, they have to be represented as a *model.TableInfo just like views and sequences. The structure is used like this:

  • ID โ€” object ID
  • Name โ€” event name
  • Charset โ€” value of @@character_set_client when the event was created
  • Collate โ€” value of @@collation_connection when the event was created
  • Comment โ€” event's comment

an extra field, Event *model.EventInfo is included into the *model.TableInfo:

type EventEnableType uint8
const (
    EventDisabled EventEnableType = iota
    EventEnabled
    EventSlaveSideDisabled
)

type EventInfo struct {
    Definer       *auth.UserIdentity
    SQLMode       mysql.SQLMode // @@sql_mode when the event was created
    TimeZone      string // @@time_zone when the event was created
    Statement     string // the statement to execute
    SecureStmt    string // the statement to display in SHOW CREATE EVENT / SHOW EVENTS, with secret stuff wiped out
    Enable        EventEnableType
    Starts        uint64 // start TS in UTC
    Ends          uint64 // end TS in UTC
    IntervalValue string // repeat interval ("EVERY" clause)
    IntervalUnit  ast.TimeUnitType
    Preserve      bool
    Originator    int64 // server ID 
    Instance      string // UUID of the instance, empty string = any
}

Like sequence, an event stores some extra information into the meta KV storage:

  • ExecuteAt โ€” the next timestamp in UTC which the event will be executed. MaxUint64 = disabled.

Replication

When a CREATE EVENT or ALTER EVENT statement is replicated, the upstream (master) must change the replicated event's state to DISABLE ON SLAVE regardless of the upstream's state. The event's ORIGINATOR field must also be filled with the upstream's server ID.

(Note that DISABLE ON SLAVE does not have an alias syntax DISABLE ON REPLICA on both MySQL and MariaDB, which seems to be an oversight.)

The description is impressive and I think the feature is reasonable. Thanks @kennytm

PTAL @zz-jason @ilovesoup

very amazing. A suggestion, further consider Cloud usage scenarios, especially DBaaS, it will be better

Very promising! Some random thoughts below:

I would like to see EVENT scheduling support on heterogeneous clusters. Different kind of EVENT requires very different ideal instance specifications with each other and the sql compute instance. For example, a problem we are facing now is that IMPORT requires a high CPU instance with effective local SSD to achieve ideal performance, while the SQL computation nodes are better to be low-end machines with no additional Disks in favor of cost-efficiency. It would save a lot of cost if we can schedule different IMPORT to a dynamically provisioned high CPU instance while keep using the low-end machines to host sql compute instances. In addition, a Pending state of EVENT should be introduced to indicate a EVENT cannot be successfully scheduled, which allows the infrastructure orchestration system like k8s to fit in.

A possible workflow would look like this:

  1. ON INSTANCE keyword supports label selection, e.g. ON INSTANCE WHERE type=importer
  2. EVENT stays in Pending state if there is no instance with label type=importer
  3. External orchestration system witness this either by SQL or watching PD/ETCD
  4. External orchestration system provisions a proper instance labelled type=importer
  5. Once the instance is no longer needed, it can be released by the external orchestration system

cc @gregwebs @IANTHEREAL

We can consider doing async by default without any additional keywords. The command will output the job id. Alternatively the user's command can still even block until completion (by checking on the async task) but if the command is interrupted it does not kill the async job.

Another approach to async by default is to add a SYNCHRONOUS keyword for when async is not desired.

@aylei I don't think having someone specify ON INSTANCE at import time would work well in that situation. Rather we would want a hook to automatically handle the node scheduling of any import event.

@aylei Good suggestion. and agree with @gregwebs

I want to implementing a distributed scheduling framework. With this framework, TiDB can schedule the import job according to the fixed quota resource allocation firstly, and then realize the import job scheduling according to the real-time status evaluation of the cluster (without affecting the query significantly). which is mentioned in this spec design

@gregwebs

We can consider doing async by default without any additional keywords. The command will output the job id. Alternatively the user's command can still even block until completion (by checking on the async task) but if the command is interrupted it does not kill the async job.

We tried doing async by default when implementing BACKUP and RESTORE (#15274). The user experience is extremely weird because the statement returned without any immediate effect. This also goes against the mainstream behavior of existing commands like SELECT INTO and LOAD DATA, or that in other DBMS like CockroachDB (IMPORT), all of which block and abort if the connection is closed. The only async examples I know are:

  • MemSQL which uses the syntax CREATE PIPELINE ...
  • SAP HANA which uses the syntax BACKUP ... ASYNCHRONOUS

both contain enough indication that the command is async / partial. So if the syntax stays IMPORT DATABASE * FROM 'x'; โ€” no I oppose to making it non-blocking.


@gregwebs @IANTHEREAL

Yes the IMPORT command should eventually automatically handle the node scheduling, just like BACKUP and RESTORE delegating the jobs to TiKV. Those are a bit beyond the scope of this issue. CREATE EVENT's ON INSTANCE clause schedule where the command itself is executed, which matters for commands that are not distributed (like IMPORT now) or bound to a node (like LOAD STATS).

+--------------+      +--------+      +--------+
|              |      |        |      |        |
| CREATE EVENT +----->+ BACKUP +--+-->+ TiKV-1 |
|              |      |        |  |   |        |
+--------------+      +--------+  |   +--------+
                                  |
             ON INSTANCE          |   +--------+
                                  |   |        |
                                  +-->+ TiKV-2 |
                                  |   |        |
                                  |   +--------+
                                  |
                                  |   +--------+
                                  |   |        |
                                  +-->+ TiKV-3 |
     command's internal scheduling    |        |
                                      +--------+

This is a more complicated issue. let us consider a question, why need the schedule?

I think there are two main use case

  1. allocating sufficient resources for task running
  2. resource isolation to reduce the impact

    CREATE EVENT's ON INSTANCE is enough for case 1.

my main concern is case 2, which is a more important issue for users. We can delegate the responsibility of scheduling to the user/app, but I think this is not a good choice. Scheduling design that can control the impact may affect this design, but these are still uncertain in arch

@kennytm

I think aylei's labeling idea works, although you could also just log into that isolated TiDB and execute CREATE EVENT ON CURRENT INSTANCE ONLY on that machine, since the SQL statement itself can only be executed by exactly one node at a time (labeling is still more flexible for recurring events).

Anything lower level than "which TiDB to run this SQL" is not the concern of this issue, since that pokes into how the command is sent to TiKV/TiFlash/PD/etc. They should be designed in case-by-case basis (e.g. using placement rules / transaction scopes in case of DMLs).

That is, if IMPORT became distributed, and the user wish to do their own resource allocation, it should be an option to IMPORT to decide which TiKV / "import unit" to execute the import processes, not CREATE EVENT.

CREATE EVENT ON ANY INSTANCE ON SCHEDULE AT NOW() DO  -- doesn't matter which TiDB runs the IMPORT manager
IMPORT DATABSE * FROM 's3://bucket/prefix' PLACEMENT CONSTRAINTS='forimport'; -- IMPORT decides scheduling

No technical problem, but technology cannot be separated from the use case. From the user perspective, I don't prefer to provide schedule syntax, which also is related to the evolution of the function arch

Yes you can leave the PLACEMENT CONSTRAINTS out. That ON INSTANCE clause is also optional. What I mean is that CREATE EVENT / ASYNC / etc do not care about how IMPORT works, it just creates a connection when time's up, execute the IMPORT as instructed, and that's all.

Yes you can leave the PLACEMENT CONSTRAINTS out. That ON INSTANCE clause is also optional. What I mean is that CREATE EVENT / ASYNC / etc do not care about how IMPORT works, it just creates a connection when time's up, execute the IMPORT as instructed, and that's all.

Yes, I agree with it. ON INSTANCE clause is optional, would the syntax confuse the user?

ON INSTANCE clause is optional, would the syntax confuse the user?

Since MySQL and MariaDB (being a single instance) does not have this clause, I think no users will be confused by the lack of it ๐Ÿ˜….

When it is not provided one default should be picked. ON ANY INSTANCE should be better for flexibility, and allows us to switch to any magical scheduling scheme in the future.

I very much like the idea of this! Auto-scheduling backups to S3 solves a major pain point for users.

I prefer labeling over ON INSTANCE. It is more flexible to topology changes, such as instances disappearing, or new ones being added (assuming that many types of jobs can eventually be split and run concurrently).

A DDL scheduling algorithm was previously proposed in https://github.com/pingcap/tidb/issues/19397 (I'm not sure of the current state) and then https://github.com/pingcap/tidb/issues/19386 - I am not sure if a full job scheduling system is in scope for this issue, but it would be nice to work towards it. If the system is under load, I think it's good to slightly defer auto-analyze jobs, DDL jobs etc. Similarly, if there is free capacity it would be nice to accelerate DDL jobs. Regular OLTP queries are highest priority, but backup needs to fit in there too.

Just a note: recently #17649 introduced "global kill" into TiDB. It implemented cross-TiDB communication through the coprocoessor.

I prefer the idea of CREATE EVENT and event scheduler, it can solve these problems:

  • for DBaaS users, periodically analyze table without contacting SRE to create crontab jobs.
  • for DBaaS users, periodically backup or restore the database without contacting SRE.
  • it's flexible: for normal users, they can explicitly submit DDL jobs and close the client without waiting.
Was this page helpful?
0 / 5 - 0 ratings