Cockroach: changefeedccl: implement `WITH diff` option

Created on 15 Aug 2018  路  15Comments  路  Source: cockroachdb/cockroach

This alters the emitted messages to have the previous and new versions of each updated row instead of just the new one. It will probably require a followup scan to get the old data and so will be slower.

A-cdc C-enhancement

Most helpful comment

I think it could be pushed over the finish line in 1-2 days. Want to work on it tomorrow together since we'll be sitting next to each other?

Works for me!

All 15 comments

to be clear, this would add a before field to complement the existing after field, showing the state of the row before the KV change got written. I'm wondering if this would also enable us to suppress output of unchanged rows (e.g. the extra identical writes that currently show up during schema changes).

More context because I had already typed it:

Is your feature request related to a problem? Please describe.

Changefeeds currently provide users with a mechanism to mirror table state by starting with a snapshot of the data and then emitting events corresponding to each update to the table. This is useful for continuous export into a data warehouse or for event processing for append only tables (or append/delete tables).

Often systems would like to react to state changes, such as row updates which change the row from state A->B. Currently it is very difficult to use changefeeds for this purpose.

Describe the solution you'd like

Ideally there would be a changefeed option to request the "before" state of a row in added.

Describe alternatives you've considered

In theory one could write code that performs an AS OF SYSTEM TIME query at updated.Prev() but doing that is more painful than it sounds because one has to be aware of the schema of the table and then needs to convert the json to the appropriate SQL values for the predicate for use in the SELECT and then one has to find a way to convert that SQL back to json such that the read previous value looks like that "after" value.

Perhaps a SQL builtin to turn a tuple into a JSON object would lessen the reach here enough to make it plausible.

I'm wondering if this would also enable us to suppress output of unchanged rows (e.g. the extra identical writes that currently show up during schema changes).

Oh by skipping ones that are identical when converted to SQL, how interesting. You might even be able to distinguish between that and someone updating (via SQL) some column with a value that happens to be the old value because in that case the kvs would be identical.

I haven't worked out all the details, but this passes the eye test and is probably worth exploring.

Btw, @nvanbenschoten had a flex friday project to implement this before/after. Nathan, how far away from mergeable was that? It might be worth getting this scheduled if it's not too much extra work.

The missing parts of that prototype were testing and actually exposing a WITH diff option that's threaded all the way down to rangefeed. The latter is a little tricky because we'll need to synchronize with the raft processing goroutine in addition to the rangefeed goroutine. That's not impossible though, and we'll also need to solve that problem to efficiently support per-key rangefeeds. We could also always include the diff in rangefeed publications and avoid some of this complication, but that would likely have a performance impact that we'd like to avoid.

I think it could be pushed over the finish line in 1-2 days. Want to work on it tomorrow together since we'll be sitting next to each other? This was the remaining blocker for @benesch so I'm sure he'd be happy to have it hooked up.

Often systems would like to react to state changes, such as row updates which change the row from state A->B. Currently it is very difficult to use changefeeds for this purpose.

FWIW, this is my use case for this.

I think it could be pushed over the finish line in 1-2 days. Want to work on it tomorrow together since we'll be sitting next to each other?

Works for me!

@danhhz I got the plumbing hooked up, so this is mostly just missing testing and polish at this point. See https://github.com/cockroachdb/cockroach/pull/41793.

Also, while we're here, @benesch do you mind writing down what you mentioned in-person about improving the Avro encoding for Materialize's use case? I believe your suggestions were split between re-ordering the fields in the acro encoding to allow for less buffering (ceddb83638dc7f9fe1973a1c5afd564c83ab7ed1) and creating an Avro-style union type between the resolved messages and the before/after/updated messages (not implemented). Do you know which of these are backward compatible changes?

This was the remaining blocker for @benesch so I'm sure he'd be happy to have it hooked up.

Oh man, I'd be over the moon!

Also, while we're here, @benesch do you mind writing down what you mentioned in-person about improving the Avro encoding for Materialize's use case? I believe your suggestions were split between re-ordering the fields in the acro encoding to allow for less buffering (ceddb83) and creating an Avro-style union type between the resolved messages and the before/after/updated messages (not implemented). Do you know which of these are backward compatible changes?

Yeah, the first one was the most important. It's totally backwards compatible assuming that the change will cause a new schema to be generated with the new field ordering... which, now that I think about it, isn't obviously going to happen. @danhhz, how does CDC decide when to publish a new schema? Is it maintaining a hash map from schema -> schema registry ID, or does it do something more complicated based on the schema of the underlying table?

The second one is most definitely not backwards compatible, and more of a philosophical point than anything else. Cockroach currently pushes records with entirely different shapes into the same Kafka topic, namely, the actual data records and the watermark (metadata) records. AFAICT this is fairly nonstandard use of the schema registry, which goes to great lengths to allow users to specify the backwards/forward compatibility requirements for all schemas in a given subject (see: https://docs.confluent.io/current/schema-registry/avro.html).

I haven't tried this, but I wonder if Cockroach would have trouble if the schema registry it was communicating with required global forward compatibility? There's a test that claims this is handled https://github.com/cockroachdb/cockroach/blob/c0e8390a83a3de692f0178b3af651ea79d1ab7e5/pkg/cmd/roachtest/cdc.go#L360-L361 but I haven't tracked down the code around it.

So yeah, it seems to me that it might be preferable to have a single schema used for the topic that has a top-level union type with two variants, one for the data records and one for the metadata records. Given that the metadata variant won't evolve, the compatibility requirements will naturally apply to the data variant and therefore the union as a whole.

Perhaps the variant would be too annoying, though, for the users who don't want to constantly unpack watermarks? Then again, you don't get the watermarks unless you request them. One trick we've been using at Materialize is to stuff watermarks in records like this:

{
  "before": null,
  "after": null,
  "arbitrary_metadata": {
    "watermark": 12345,
  }
}

The {"before": null, "after": null} is a bit weird, but has the nice property of being pretty obviously a no-op.

In any case, I'm sure @danhhz has thought about this鈥攃urious to hear your thoughts, Dan!

@danhhz, how does CDC decide when to publish a new schema? Is it maintaining a hash map from schema -> schema registry ID, or does it do something more complicated based on the schema of the underlying table?

Our internal caching is based on the table id and version, but the schema registry itself does it based on some fingerprint of the final schema. I think this might end up being okay. If a node switches the avro "meta-schema" (the change you're talking about here), it will necessarily be because the node restarted with a new binary version, which means its in-memory cache will be cold. It fills it by talking to the schema registry, which should do the right thing here.

This will cause the topic to be a strange mixture of the two meta-schemas as the nodes are rolled over to the new version, but this is probably fine for an experimental feature.

https://github.com/cockroachdb/cockroach/blob/77764a5e5655dde20535d7095b3c59003dfea817/pkg/ccl/changefeedccl/encoder.go#L319

(Side Q since I've paged all this out: Does reordering fields in an avro schema matter? Can you not read records written with the same fields in a different order?)

The second one is most definitely not backwards compatible, and more of a philosophical point than anything else. Cockroach currently pushes records with entirely different shapes into the same Kafka topic, namely, the actual data records and the watermark (metadata) records. AFAICT this is fairly nonstandard use of the schema registry, which goes to great lengths to allow users to specify the backwards/forward compatibility requirements for all schemas in a given subject (see: https://docs.confluent.io/current/schema-registry/avro.html).

I haven't tried this, but I wonder if Cockroach would have trouble if the schema registry it was communicating with required global forward compatibility? There's a test that claims this is handled https://github.com/cockroachdb/cockroach/blob/c0e8390a83a3de692f0178b3af651ea79d1ab7e5/pkg/cmd/roachtest/cdc.go#L360-L361 but I haven't tracked down the code around it.

So yeah, it seems to me that it might be preferable to have a single schema used for the topic that has a top-level union type with two variants, one for the data records and one for the metadata records. Given that the metadata variant won't evolve, the compatibility requirements will naturally apply to the data variant and therefore the union as a whole.

Perhaps the variant would be too annoying, though, for the users who don't want to constantly unpack watermarks? Then again, you don't get the watermarks unless you request them. One trick we've been using at Materialize is to stuff watermarks in records like this:

{
  "before": null,
  "after": null,
  "arbitrary_metadata": {
    "watermark": 12345,
  }
}

The {"before": null, "after": null} is a bit weird, but has the nice property of being pretty obviously a no-op.

I thought pretty hard about being a good citizen of the schema registry so interested into digging into this. IIRC, we do actually offer full backward and forward compatibility though unions with null. There are two variants, one for the resolved timestamp messages and one for the data rows.

Basically, as far as I can tell what we're doing is fundamentally what you're describing. We do flatten the metadata down into the top-level instead of its own "arbitrary_metadata" record, which I'm hesitant to change since it currently exactly mirrors the json format. Is that important for some reason I'm not seeing?

@benesch How about this, can you give me a concrete example of something actually output by CockroachDB (one data entry and one metadata entry) and the same example formatted in the way you're looking for?

This will cause the topic to be a strange mixture of the two meta-schemas as the nodes are rolled over to the new version, but this is probably fine for an experimental feature.

Sweet, that sounds perfect.

(Side Q since I've paged all this out: Does reordering fields in an avro schema matter? Can you not read records written with the same fields in a different order?)

So it doesn't matter for correctness, only for performance. Since Avro records are identified in the schema by their name, not their position, Avro decoders will give you the same object back regardless of the ordering of the fields in the record schema. For example, both of these Avro schemas

{
  "type": "record",
  "name": "row",
  "fields" : [
    {"name": "a", "type": "int"},
    {"name": "b", "type": "int"}
  ]
}

```json
{
"type": "record",
"name": "row",
"fields" : [
{"name": "b", "type": "int"},
{"name": "a", "type": "int"}
]
}

can be decoded into the same struct:

```c

struct row {
    int a;
    int b;
}

The on-the-wire encoding is order sensitive, of course, and we play some tricks if the record is laid out in exactly the right order, namely a nullable before field, followed by a nullable after field, followed by anything. For example, decoding the before/after fields via a full-fledged Avro decoder results in an allocation for each field; our specialized decoder can avoid that allocation.

The more compelling point鈥攁nd the reason that I feel somewhat reasonable requesting this change in Cockroach鈥攊s that Avro doesn't allow efficient skipping of fields. If the unimportant fields come first, you have no choice but to decode them to get to the data that you care about. If the important fields come first, you can leave the unimportant fields undecoded at the end of the record.

@benesch How about this, can you give me a concrete example of something actually output by CockroachDB (one data entry and one metadata entry) and the same example formatted in the way you're looking for?

For sure! Here's some output from the office dogs example.

{"updated": {"string": "1571844805329302000.0000000000"}, "after": {"office_dogs": {"id": {"long": 1}, "name": {"string": "Petee H"}}}}
{"resolved": {"string": "1571844807817082000.0000000000"}}

The first record has this schema:

{
  "type": "record",
  "name": "office_dogs_envelope",
  "fields": [
    {
      "name": "updated",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        {
          "type": "record",
          "name": "office_dogs",
          "fields": [
            {
              "name": "id",
              "type": [
                "null",
                "long"
              ],
              "default": null,
              "__crdb__": "id INT8 NOT NULL"
            },
            {
              "name": "name",
              "type": [
                "null",
                "string"
              ],
              "default": null,
              "__crdb__": "name STRING NULL"
            }
          ]
        }
      ],
      "default": null
    }
  ]
}

while the second record has this schema:

{
  "type": "record",
  "name": "office_dogs_envelope",
  "fields": [
    {
      "name": "resolved",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

Now that I look at it, I guess the fact that resolved, before, after, and updated are all nullable means that these two schemas are forwards and backwards compatible with one another. I guess I just would have expected one schema that unions the two schema versions together like this:

[
  {
    "type": "record",
    "name": "office_dogs_envelope",
    "fields": [
      {
        "name": "updated",
        "type": [
          "null",
          "string"
        ]
      },
      {
        "name": "before",
        "type": [
          "null",
          {
            "type": "record",
            "name": "office_dogs",
            "fields": [
              {
                "name": "id",
                "type": [
                  "null",
                  "long"
                ],
                "default": null,
                "__crdb__": "id INT8 NOT NULL"
              },
              {
                "name": "name",
                "type": [
                  "null",
                  "string"
                ],
                "default": null,
                "__crdb__": "name STRING NULL"
              }
            ]
          }
        ]
      },
      { "name": "after", "type": "office_dogs" }
    ]
  },
  {
    "type": "record",
    "name": "cockroachdb-timestamp-metadata",
    "fields": [
      {
        "name": "resolved",
        "type": "string"
      }
    ]
  }
]

If avro is fine decoding a struct that was encoded using a schema with the same fields in a different order and it gets the same result, then I'm absolutely fine making the field reordering change.

If we were doing this all from scratch, I think I'd agree with you that a union of the two resolved/row records is more natural. I like that a user would basically have a switch/match that hands one or the other to whatever code is responsible for processing it. However, having it mirror the JSON format also seems important to me. If it's better for avro, I don't see why it wouldn't also be better for json, so I would do it for both. This would require adding a new format= option and changing the default for newly created changefeeds from envelope to this new one. TBH, I'm extremely hesitant to do that (again, we changed the default format type in 19.2) without more design partners than just y'all. Luckily this is always something we can do later and it also sounds like the one you care about less.

If avro is fine decoding a struct that was encoded using a schema with the same fields in a different order and it gets the same result, then I'm absolutely fine making the field reordering change.

Ok, great! And yeah, every Avro library I've seen is like goavro in that a record is encoded to/decoded from a map[string]interface{}, and so the ordering of the fields is totally invisible to the user.

If we were doing this all from scratch, I think I'd agree with you that a union of the two resolved/row records is more natural. I like that a user would basically have a switch/match that hands one or the other to whatever code is responsible for processing it...

...If we were doing this all from scratch, I think I'd agree with you that a union of the two resolved/row records is more natural. I like that a user would basically have a switch/match that hands one or the other to whatever code is responsible for processing it.

Cool, sounds like we're on the same page. The current format is totally workable; it was just a bit surprising/confusing to see two different schema IDs, since the Avro decoding code I wrote expected multiple schema IDs to mean that a schema change was happening and started to warn the user. Now we have special logic to determine whether the new schema ID we're seeing is just a special CockroachDB resolved timestamp.

I'm still a bit confused by what you mean when you say it should mirror the JSON format. Do you mean that with the union-style schema, the JSON records would look like this:

{"office_dogs_envelope": {"updated": {"string": "1571844805329302000.0000000000"}, "after": {"office_dogs": {"id": {"long": 1}, "name": {"string": "Petee H"}}}}
{"cockroachdb-timestamp-metadata": {"resolved": {"string": "1571844807817082000.0000000000"}}}

(I guess that makes sense!)

I'm still a bit confused by what you mean when you say it should mirror the JSON format. Do you mean that with the union-style schema, the JSON records would look like this:

{"office_dogs_envelope": {"updated": {"string": "1571844805329302000.0000000000"}, "after": {"office_dogs": {"id": {"long": 1}, "name": {"string": "Petee H"}}}}
{"cockroachdb-timestamp-metadata": {"resolved": {"string": "1571844807817082000.0000000000"}}}

(I guess that makes sense!)

Yeah, that's exactly what I'm saying. Though I'd probably call the fields something that doesn't vary per table/topic.

Should this be labeled as 20.1?

Was this page helpful?
0 / 5 - 0 ratings