Since we announced CDC support targeting Kafka, we've had a handful of requests to support different message queue systems for CDC: Google Cloud pubsub, NATS.io, AWS Kinesis, RabbitMQ, ActiveMQ, and others. @gedw99 pointed me to the Go CDK pubsub implementation, an extensible module which aims to support a variety of sinks across cloud providers, including many of the ones we've gotten questions about. We'd need to do more analysis to make sure the delivery and ordering guarantees offered by individual sinks are appropriate for CockroachDB, but aside from that overhead this seems like a relatively cheap and painless way to add new sink types.
This would be super useful as we think google pubsub would be enough in our microservices infrastructure. Mainly because:
Do you think this is something we could push forward?
+1 for this
@danhhz Would you mind taking a look at the GCP pubsub ordering documentation to confirm whether it'd be possible to reliably use pubsub to materialize a table sent through CDC? It doesn't appear that pubsub natively provides any ordering guarantees, but they have a metric that the consumer can monitor to determine how caught-up a pubsub feed is. I _think_ this means that you could buffer data until that metric advances, then reorder the data based on updated/resolved timestamps and process it in strongly-ordered batches. Do you agree? Or are there subtleties that I'm missing that would prevent this from working as expected? (To be clear, this question is specifically about the GCP pubsub product, not the go-cloud pubsub library, which provides an interface to various cloud and self-hosted messaging systems.)
I think you'd need some way to tie that "how caught-up a pubsub feed is" metric back to our resolved timestamps. Basically, when some consumer sees a resolved timestamp message, it needs a way to wait for every message before that one to have been provably flushed out of the pubsub. To do this with the metric you mention (an age in seconds), we'd need a way for a pubsub consumer to compute now() - a possibly new timestamp on the resolved message vs the pubsub age metric. The tricky part here is that now() and a possibly new timestamp on the resolved message have to be somehow comparable to whatever google clock is computing the age metric. Or am I missing something?
That said, I have two thoughts here.
1) We already send partial progress updates from the nodes that emit row update messages to the single node that emits resolved timestamp messages. If each producer attached its node id, a sequence number, and a per-job-restart session id, then we could plumb the sequence numbers down in the partial progress updates. Then, the resolved timestamp message would include a job session id along with a high-water sequence number per node. If a consumer could prove that it had processed, for every node, every message with a sequence number from 0 to the high-water, then it would have the resolved timestamp guarantee. Unless I'm missing something, this would work in any pubsub that can guarantee as least once delivery (which is many/most of them). The big downside here is that actually proving the 0 to high-water thing involves a lot of distributed state and potentially unbounded buffering.
(Interestingly, all of the above metadata is already now present for the cloud storage sink.)
cc @ajwerner any thoughts on above? ^^
2) One thing that I realize is that if we do ever figure out how to do (1), the way we'd emit the updated row messages would be identical to the naive way (modulo adding new metadata to the messages, which is backward compatible). This means that right now, we could build a google pubsub integration that would give at least once delivery guarantees of all row changes, including timestamps. It's likely that this would even be a vanilla enough use of pubsub that we could do it using a generic interface over pubsubs like is mentioned in the issue. We'd have to document that it intentionally doesn't include resolved timestamps because the pubsubs it targets don't offer the ordering guarantees we need for them, but it sounds like there's demand for this even without that.
If a consumer could prove that it had processed, for every node, every message with a sequence number from 0 to the high-water, then it would have the resolved timestamp guarantee.
Sounds right to me.
The big downside here is that actually proving the 0 to high-water thing involves a lot of distributed state and potentially unbounded buffering.
Technically this is true given the complete lack of ordering guarantees. I suspect that in the common case the amount of state which needs to be tracked will be quite small. Most pub sub systems provide some sort of latency expectations. I think it's reasonable to think of these pub sub systems as at least offering something of a synchrony guarantee. I don't expect a message to take 5 days to arrive and then eventually arrive. Maybe a message doesn't arrive at all, that'd be a problem, but that'd be a problem for all of the sinks.
The state tracking is sort of a pain. It effectively boils down to keeping track of the frontier except you probably need to do it in a distributed way. It wouldn't be too bad to do if the client was talking to a scalable, consistent SQL database in the course of processing the message.
I think it's a good idea to play around with implementing the GCP sink on top of the generic interface with a prototype, maybe side-stepping the guarantees on a first pass just to get a feel for it. Then we should add on the necessary metadata to do frontier tracking and verify that we can make it work. I agree with your assessment about the additional metadata that it would take and that we already have it. My guess is while the code is non-trivial the amount of state you practically need to maintain isn't going to be that large in use cases which don't have high throughput.
Quoting @glerchundi :
We don't require ordering (just a timestamp)
Quoting @danhhz:
We'd have to document that it intentionally doesn't include resolved timestamps because the pubsubs it targets don't offer the ordering guarantees we need for them, but it sounds like there's demand for this even without that.
He's right, there are a lot of usecases where you don't need ordering, just "as least once delivery" guarantee.
Thanks for pushing this forward @rolandcrosby
Technically this is true given the complete lack of ordering guarantees. I suspect that in the common case the amount of state which needs to be tracked will be quite small.
Yeah, I think you're right that I'm being unfairly pessimistic here. The worst case is unbounded but that's certainly not the common case.
It wouldn't be too bad to do if the client was talking to a scalable, consistent SQL database in the course of processing the message.
I had the same thought! 馃槅 I guess by definition you do have access to one.
I think it's a good idea to play around with implementing the GCP sink on top of the generic interface with a prototype, maybe side-stepping the guarantees on a first pass just to get a feel for it. Then we should add on the necessary metadata to do frontier tracking and verify that we can make it work.
Agreed, I also think this is the next steps. The library mentioned in the issue even has an in-mem implementation, so maybe it's easy to hook up to testFeed.
Quoting directly from PubSub documentation:
Typically, Cloud Pub/Sub delivers each message once and in the order in which it was published. However, messages may sometimes be delivered out of order or more than once. In general, accommodating more-than-once delivery requires your subscriber to be idempotent when processing messages. You can achieve exactly once processing of Cloud Pub/Sub message streams using Cloud Dataflow PubsubIO. PubsubIO de-duplicates messages on custom message identifiers or those assigned by Cloud Pub/Sub. You can also achieve ordered processing with Cloud Dataflow by using the standard sorting APIs of the service. Alternatively, to achieve ordering, the publisher of the topic to which you subscribe can include a sequence token in the message. See Message Ordering for more information.
https://cloud.google.com/pubsub/docs/subscriber
So there would be a couple of options here:
Having worked a lot with RabbitMQ in the past, within the Cloud Foundry realm, I'm a big fan and would love to see it as an option here.
Not sure if y'all have seen this or not https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/v2/cdc-parent#deploying-the-connector could be a helpful example on how to setup CDC with dataflow. Specifically I'm trying to make sure that I can get the changes that are in cockroachdb into big query
Any news on this?
We are using EXPERIMENTAL CHANGEFEED FOR and not considering the switch to CREATE CHANGEFEED until this is implemented.
I don't know if the proposed solution would vary with the ordering guarantees they provide in Pre-GA.
Just pinging you all so it can be taken into account.
Thanks for the update. We can take this into account when we do finish this off.
Here's a first draft of a CDC -> PubSub bridge that uses an HTTP feed: https://github.com/bobvawter/cdc-pubsub