Presto: Introduce TableWriterMergeOperator

Created on 17 Jul 2019  路  1Comment  路  Source: prestodb/presto

Recently fault-tolerance mechanisms were introduced for grouped execution: https://github.com/prestodb/presto/pull/12529.

If the Presto node participating in the query becomes unresponsive - Presto can reschedule the task from the unresponsive node to a different node. However it is not guaranteed that the unresponsive node won't finish the execution. Thus the same data can be written more than once.

To ensure correctness the partial commit operation was introduced: https://github.com/prestodb/presto/pull/12787, https://github.com/prestodb/presto/pull/12800

The TableWriterOperator produces the meta information about the list of files to be committed, and the TableCommitContext that contains the "id" (lifespan) and the stage id of the data written. Based on that id pair (lifespan + stage) the coordinator keeps on copy of the written data, and commits only a single set of files per lifespan discarding the data written by other (failed) workers. This is similar to the atomic commits protocol in MapReduce. [1]

To improve the commit latency, the TableFinishOperator commits as soon as the node has finished writing. TableFinishOperator knows that the node has finished execution by the assumption that the node will exactly 1 TableCommitContext right after the writing is finished. This works for bucketed table writes, as TableWriter concurrency is fixed to 1 in this case.

However, if the TableWriter concurrency is higher than 1 (more than a single TableWriterOperator is running per node) that assumption no longer holds. Every TableWriterOperator will produce its own TableCommitContext that makes it impossible to know if the node has finished writing or no.

Currently recoverable grouped execution is not supported if the table writer concurrency is higher than one.

The proposal is to merge the TableWriterOperator output into a single one before sending it to the coordinator. So it will be guaranteed that a single writer node will produce only a single TableCommitContext.

The TableWriterMergeOperator will collect all Fragments (that are used to store the file name to commit) into a single list, accumulate the row count and the column level statistics. And once all TableWriterOperators are done - it will send the merged TableCommitContext to the coordinator, indicating that the writing is finished for a given lifespan on the given node.

Current implementation:

before

Proposed implementation:

after

Proposed implementation will introduce a new plan node TableWriterMergeNode and a new operator TableWriterMergeOperator.

[1] [MapReduce: Simplified Data Processing on Large Clusters](https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf)

Most helpful comment

CC: @wenleix @rschlussel @shixuan-fan @tdcmeehan @highker

>All comments

CC: @wenleix @rschlussel @shixuan-fan @tdcmeehan @highker

Was this page helpful?
0 / 5 - 0 ratings