Only exists in current master branch db102de6753d8d541379479fd1920af6ff6fad02 with new client balancer.
Once balancer has been constructed with multiple endpoints, subsequent dialer targets are fixed to the first endpoint. Thus, breaks maintenance APIs (endpoint status, snapshot, etc.).
func TestMaintenanceStatus(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
clus.WaitLeader(t)
eps := make([]string, 3)
for i := 0; i < 3; i++ {
eps[i] = clus.Members[i].GRPCAddr()
}
cli, err := clientv3.New(clientv3.Config{Endpoints: eps, DialOptions: []grpc.DialOption{grpc.WithBlock()}})
if err != nil {
t.Fatal(err)
}
defer cli.Close()
prevID, leaderFound := uint64(0), false
for i := 0; i < 3; i++ {
resp, err := cli.Status(context.TODO(), eps[i])
if err != nil {
t.Fatal(err)
}
if prevID == 0 {
prevID, leaderFound = resp.Header.MemberId, resp.Header.MemberId == resp.Leader
continue
}
if prevID == resp.Header.MemberId {
t.Errorf("#%d: status returned duplicate member ID with %016x", i, prevID)
}
if leaderFound && resp.Header.MemberId == resp.Leader {
t.Errorf("#%d: leader already found, but found another %016x", i, resp.Header.MemberId)
}
if !leaderFound {
leaderFound = resp.Header.MemberId == resp.Leader
}
}
if !leaderFound {
t.Fatal("no leader found")
}
}
Here we are passing the correct target endpoints:
https://github.com/coreos/etcd/blob/688043a7c2ac8bbb0e73ca5694c7815275865e24/clientv3/client.go#L381-L385
But before this, we create our own custom dialer
https://github.com/coreos/etcd/blob/688043a7c2ac8bbb0e73ca5694c7815275865e24/clientv3/client.go#L339-L342
where we still pass the correct dial target.
Problem is that upstream grpc.DialContext takes this custom dialer from etcd but overwrites with its default load balancer in handleResolvedAddrs.
If given dial target is different than the one passed from gRPC, use the one from user.
And pass our custom balancer in grpc.DialContext.
--- a/clientv3/client.go
+++ b/clientv3/client.go
@@ -247,7 +247,7 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts []
f := func(dialEp string, t time.Duration) (net.Conn, error) {
proto, host, _ := endpoint.ParseEndpoint(dialEp)
- if host == "" && ep != "" {
+ if host != ep {
// dialing an endpoint not in the balancer; use
// endpoint passed into dial
proto, host, _ = endpoint.ParseEndpoint(ep)
@@ -370,6 +370,7 @@ func (c *Client) dial(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, er
}
opts = append(opts, c.cfg.DialOptions...)
+ opts = append(opts, grpc.WithBalancerName(roundRobinBalancerName))
This fixes https://github.com/coreos/etcd/issues/9915.
The proposed fix above surfaces another issue in our balancer.
ETCD_CLIENT_DEBUG=1 go test -v -run TestBalancerUnderBlackholeKeepAliveWatch
...
--- FAIL: TestBalancerUnderBlackholeKeepAliveWatch (5.73s)
black_hole_test.go:83: took too long to receive watch events
Simpler reproduce:
func TestTest(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 2,
GRPCKeepAliveMinTime: 1 * time.Millisecond, // avoid too_many_pings
})
defer clus.Terminate(t)
eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()}
ccfg := clientv3.Config{
Endpoints: []string{eps[0]},
DialTimeout: 1 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
DialKeepAliveTime: 1 * time.Second,
DialKeepAliveTimeout: 500 * time.Millisecond,
}
cli, err := clientv3.New(ccfg)
if err != nil {
t.Fatal(err)
}
defer cli.Close()
wch := cli.Watch(context.Background(), "foo", clientv3.WithCreatedNotify())
if _, ok := <-wch; !ok {
t.Fatalf("watch failed on creation")
}
// endpoint can switch to eps[1] when it detects the failure of eps[0]
cli.SetEndpoints(eps...)
println()
println()
println()
fmt.Println("BLACKHOLE 1", eps[0])
clus.Members[0].Blackhole()
fmt.Println("BLACKHOLE 2", eps[0])
println()
println()
println()
if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
}
select {
case <-wch:
case <-time.After(3 * time.Second):
t.Error("took too long to receive watch events")
}
}
Once eps[0] gets blackholed, the balancer will keep calling regeneratePicker but fails right here:
TBD...
Instead of calling bb.Picker = picker.NewErr(balancer.ErrTransientFailure) to reset balancer picker, update Picker with picker.NewRoundrobinBalanced with some starting index for round-robin? We could set rrBalanced.next initial value, so it doesn't get stuck with failed one when given multiple endpoints.
@jpbetz Could you look into this?
@gyuho Thanks for all the analysis and reproduction cases. I'll have a closer look in the morning. At first glance, it feels like it would be ideal to allow the clients to send maintenance requests to any etcd member, and let the member that receives the request forward it on as needed to other cluster members, but that we need the client to be backward compatible with older server versions, so the approach of fixing the balancer/picker is necessary, I'll dig in.
Sorry for the delay, this is my top priority for today.
@jpbetz No problem. I think this will take awhile :)
If I'm understanding the code correctly, we might want to remove the host == "" && ep != "" conditional when we create the dialer.DialContext in dialSetupOpts entirely. This circumvents whatever decision the balancer made. We could instead modify the maintenance APIs to use a different balancer that pins us to a specific endpoint like we want. This seems cleaner since instead of overloading one balancer to handle multiple needs, we create different balancers for each need.
Agree that we should update our picker to now get stuck on a failed endpoint by trying the one at index zero repeatedly when it fails. If we don't already randomize the endpoint list (or the initial index) we should do that too. I'll follow up on that.
We could instead modify the maintenance APIs to use a different balancer that pins us to a specific endpoint like we want.
This is an interesting idea. So, there need be another balancer policy, whiling letting users will pass multiple endpoints to maintenance API?
This is an interesting idea. So, there need be another balancer policy, whiling letting users will pass multiple endpoints to maintenance API?
I'll prototype is real quick and see if works out to be simpler/cleaner or not.
https://github.com/coreos/etcd/pull/9945 attempts to fix this be removing the special casing logic and performing direct grpc dials for maintenance APIs
Most helpful comment
@gyuho Thanks for all the analysis and reproduction cases. I'll have a closer look in the morning. At first glance, it feels like it would be ideal to allow the clients to send maintenance requests to any etcd member, and let the member that receives the request forward it on as needed to other cluster members, but that we need the client to be backward compatible with older server versions, so the approach of fixing the balancer/picker is necessary, I'll dig in.