Etcd: Loss watch events

Created on 7 May 2019  Â·  9Comments  Â·  Source: etcd-io/etcd

- which version

v3.3.13

- steps to reproduce the problem

  1. Run etcd command: ./etcd &
  2. Run test programe:
  • Create 20000 go routines to put values to etcd concurrently.

  • Wait for events from watch channel. Create a new go routine to handle the event.

  • I expect receiving 20000 events from the watch channel, but some events seems missing.

  1. The test source code:
func main () {
        etcdHost := "127.0.0.1:2379"
        keys := 20000

        fmt.Println("connecting to etcd - " + etcdHost)

        etcd, err := clientv3.New(clientv3.Config{
                Endpoints:   []string{"http://" + etcdHost},
                DialTimeout: 5 * time.Second,
        })
        if err != nil {
                panic(err)
        }

        fmt.Println("connected to etcd - " + etcdHost)

        defer etcd.Close()

        watchChan := etcd.Watch(context.Background(), "user/", clientv3.WithPrefix())

        data := make([]int64, keys)

        go func() {
                for i := 0; i < keys; i++ {
                        go func(index int) {
                                now := time.Now()
                                k := strconv.Itoa(index)
                                v := now.String()

                                data[index] = now.UnixNano()

                                etcd.Put(context.Background(), "user/"+k, v)
                        }(i)
                }
        }()

        var recv int32
        go func() {
                for watchResp := range watchChan {
                        go func(resp clientv3.WatchResponse) {
                                for _, event := range resp.Events {
                                        i, _ := strconv.Atoi(strings.TrimPrefix(string(event.Kv.Key), "user/"))
                                        data[i] = time.Now().UnixNano() - data[i]
                                }
                        }(watchResp)
                        atomic.AddInt32(&recv, 1)
                }
        }()

        for int(atomic.LoadInt32(&recv)) != keys {
                time.Sleep(time.Second)
                fmt.Printf("recv: %d\n", atomic.LoadInt32(&recv))
        }
}
  1. The output
    recv: 17766
    recv: 17766
    recv: 17766
    ……
arequestion

All 9 comments

There is an error in your example code:

atomic.AddInt32(&recv, 1)

upper line in the for range watchChan is incorrect, you should use

atomic.AddInt32(&recv, int32(len(watchResp.Events)))

the Events properties is an slice, and the watch may return multi event once.

@lsytj0413
Thanks very much. :)
I will fix the Bug and test again.

There is an error in your example code:

atomic.AddInt32(&recv, 1)

upper line in the for range watchChan is incorrect, you should use

atomic.AddInt32(&recv, int32(len(watchResp.Events)))

the Events properties is an slice, and the watch may return multi event once.

func main () {
        etcdHost := "127.0.0.1:2379"
        keys := 20000

        fmt.Println("connecting to etcd - " + etcdHost)

        etcd, err := clientv3.New(clientv3.Config{
                Endpoints:   []string{"http://" + etcdHost},
                DialTimeout: 5 * time.Second,
        })
        if err != nil {
                panic(err)
        }

        fmt.Println("connected to etcd - " + etcdHost)

        defer etcd.Close()

        watchChan := etcd.Watch(context.Background(), "user/", clientv3.WithPrefix())

        data := make([]int64, keys)

        go func() {
                for i := 0; i < keys; i++ {
                        go func(index int) {
                                now := time.Now()
                                k := strconv.Itoa(index)
                                v := now.String()

                                data[index] = now.UnixNano()

                                etcd.Put(context.Background(), "user/"+k, v)
                        }(i)
                }
        }()

        var recv int32
        go func() {
                for watchResp := range watchChan {
                        go func(resp clientv3.WatchResponse) {
                                for _, event := range resp.Events {
                                        i, _ := strconv.Atoi(strings.TrimPrefix(string(event.Kv.Key), "user/"))
                                        data[i] = time.Now().UnixNano() - data[i]
                                        atomic.AddInt32(&recv, 1)
                                }
                        }(watchResp)
                }
        }()

        for int(atomic.LoadInt32(&recv)) != keys {
                time.Sleep(time.Second)
                fmt.Printf("recv: %d\n", atomic.LoadInt32(&recv))
        }
}

I fixed the bug and tested again. I still couldn't receive 20000 events.

The output is:
recv: 14565
recv: 14565
……

Is there some err happend when etcd.Put ? can you receive the err and log it when err != nil ?

Is there some err happend when etcd.Put ? can you receive the err and log it when err != nil ?

Yes. I got the error: "etcdserver: too many requests". I think I have find the root cause. So in this situation, should I retry the error request until I get the right response ? Or can I change the server receive buffer to a larger size ?

the solution depends on your business. or you can limit the frequency of the request, the golang.org/x/time/rate will help you.

the solution depends on your business. or you can limit the frequency of the request, the golang.org/x/time/rate will help you.

Thanks for your quick response. :-)
I intend to use etcd as a configuration dispatcher which can receive a configuration message and broadcast it to all watchers. I need low latency between the sender and the receivers (watchers).
The etcd can meet my requirement except the "etcdserver: too many requests" errors.

"too many requests" can happen if the etcd's backend cannot keep up applying the requested changes. Slow apply could be due to many reasons, maybe first check if it is due to slow disk. You can check disk related metrics backend_commit_duration_seconds.

Ref:

  1. #8576
  2. https://github.com/etcd-io/etcd/blob/e899023f3f542ffd7a114536705f3909ff44ad3f/etcdserver/v3_server.go#L35-L40
  3. https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/hardware.md#disks

"too many requests" can happen if the etcd's backend cannot keep up applying the requested changes. Slow apply could be due to many reasons, maybe first check if it is due to slow disk. You can check disk related metrics backend_commit_duration_seconds.

Ref:

  1. #8576
  2. https://github.com/etcd-io/etcd/blob/e899023f3f542ffd7a114536705f3909ff44ad3f/etcdserver/v3_server.go#L35-L40
  3. https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/hardware.md#disks

Thanks. I will check these later.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

suresh-chaudhari picture suresh-chaudhari  Â·  3Comments

yuchengwu picture yuchengwu  Â·  4Comments

primeroz picture primeroz  Â·  3Comments

olalonde picture olalonde  Â·  4Comments

r007m4n picture r007m4n  Â·  3Comments