Grpc-go: Implementing a custom naming.Resolver for client side RoundRobin LoadBalancing

Created on 15 May 2017  路  4Comments  路  Source: grpc/grpc-go

Why not provide an implementation for naming.Resolver something like StaticResolver for client side Load Balancing. which can be initialized by a list of addresses and passed to the RoundRobin function?

I did implement something thing but took it too far and over complicated the shit out of it.

package staticresolver

import (
    "errors"
    "sync"

    "google.golang.org/grpc/naming"
)

type balancer struct {
    name      string
    addresses map[string]bool //Will be exposed via a function
    removed   map[string]bool
    update    chan bool
    updater   *sync.Once
    mutex     *sync.Mutex
}

type StaticResolver interface {
    Add(string) error
    Remove(string) error
    naming.Resolver
    naming.Watcher
}

func NewClientSideBalancer(domain string, addresses []string) (StaticResolver, error) {
    b := &balancer{
        name:      domain,
        addresses: make(map[string]bool),
        removed:   make(map[string]bool),
        updater:   &sync.Once{},
        update:    make(chan bool, 1),
        mutex:     &sync.Mutex{},
    }
    for _, addr := range addresses {
        if _, ok := b.addresses[addr]; ok {
            return nil, errors.New("Duplicate addresses found")
        }
        b.addresses[addr] = false
    }
    b.signalUpdate()
    return b, nil
}

func (b *balancer) signalUpdate() {
    b.updater.Do(func() {
        b.update <- true
    })
}

func (b *balancer) Add(address string) error {
    b.mutex.Lock()
    defer b.mutex.Unlock()
    if _, ok := b.addresses[address]; ok {
        return errors.New("Address already exists")
    }
    b.addresses[address] = false
    b.signalUpdate()
    return nil
}

func (b *balancer) Remove(address string) error {
    b.mutex.Lock()
    defer b.mutex.Unlock()
    if _, ok := b.addresses[address]; !ok {
        return errors.New("Address does not exists")
    }

    delete(b.addresses, address)
    b.removed[address] = false
    b.signalUpdate()
    return nil
}

func (b *balancer) Resolve(target string) (naming.Watcher, error) {
    if b.name != target {
        return nil, errors.New("Invalid target domain")
    }
    return b, nil
}

func (b *balancer) Close() {
    b.addresses = nil
    b.addresses = nil
    close(b.update)
}

func (b *balancer) Next() ([]*naming.Update, error) {
    _, ok := <-b.update
    b.mutex.Lock()
    defer func() {
        b.updater = &sync.Once{}
        b.removed = make(map[string]bool)
        b.mutex.Unlock()
    }()

    if !ok {
        return nil, errors.New("Resolver closed")
    }
    updates := make([]*naming.Update, 0)
    for k, v := range b.addresses {
        if !v {
            updates = append(updates, &naming.Update{Op: naming.Add, Addr: k})
            b.addresses[k] = true
        }
    }
    for k, v := range b.removed {
        if !v {
            updates = append(updates, &naming.Update{Op: naming.Delete, Addr: k})
            b.removed[k] = true
        }
    }
    return updates, nil
}
Requires Reporter Clarification Question

Most helpful comment

In your implementation, balancer and resolver (and watcher) are mixed together into one struct.
Balancer and resolver should be implemented independently.

A static resolver could look like:

type staticWatcher struct {
    updates chan []*naming.Update
}

func (w *staticWatcher) Next() ([]*naming.Update, error) {
    u, ok := <-w.updates
    if ok {
        return u, nil
    }
    return nil, fmt.Errorf("watcher closed")
}
func (w *staticWatcher) Close() {
    close(w.updates)
}

type staticResolver struct{}

func (r *staticResolver) Resolve(target string) (naming.Watcher, error) {
    ch := make(chan []*naming.Update, 1)
    ch <- []*naming.Update{{Op: naming.Add, Addr: "127.0.0.1"}}
    return &staticWatcher{ch}, nil
}

Use it with RoundRobin:

b := grpc.RoundRobin(&staticResolver{})
grpc.Dial(grpc.WithBalancer(b))

We are also planning on splitting balancer and resolver in gRPC, so there will be separate APIs to set resolver and to set balancer. With the new set of APIs, we will have default resolvers implemented in the repo, and more documentation on resolvers. Hopefully this will be less confusing.

All 4 comments

Sorry, can you clarify you question?

The balancer you implemented is both a Balancer and a Resolver, which seems weird...
If you need a roundrobin with a static list of addresses, you have answered your own question: create a StaticResolver and use it to initialize the RoundRobin balancer.

Are you suggesting we should have this implemented in the repo? Or ask for an example?

@menghanl i was saying something like this should be either in the docs or repo also I'm not sure about my implementation whether this is the best way to do it or not...an example?

In your implementation, balancer and resolver (and watcher) are mixed together into one struct.
Balancer and resolver should be implemented independently.

A static resolver could look like:

type staticWatcher struct {
    updates chan []*naming.Update
}

func (w *staticWatcher) Next() ([]*naming.Update, error) {
    u, ok := <-w.updates
    if ok {
        return u, nil
    }
    return nil, fmt.Errorf("watcher closed")
}
func (w *staticWatcher) Close() {
    close(w.updates)
}

type staticResolver struct{}

func (r *staticResolver) Resolve(target string) (naming.Watcher, error) {
    ch := make(chan []*naming.Update, 1)
    ch <- []*naming.Update{{Op: naming.Add, Addr: "127.0.0.1"}}
    return &staticWatcher{ch}, nil
}

Use it with RoundRobin:

b := grpc.RoundRobin(&staticResolver{})
grpc.Dial(grpc.WithBalancer(b))

We are also planning on splitting balancer and resolver in gRPC, so there will be separate APIs to set resolver and to set balancer. With the new set of APIs, we will have default resolvers implemented in the repo, and more documentation on resolvers. Hopefully this will be less confusing.

@menghanl Thats great

Was this page helpful?
0 / 5 - 0 ratings