visit
package grpcclient
import (
"errors"
"fmt"
"net/url"
"strings"
"utils/pkg/connection"
"utils/pkg/consul"
"github.com/hashicorp/consul/api"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/resolver"
)
const (
resolverSchemeConsul = "consul"
)
var (
errUnknownScheme = errors.New("unknown scheme. Only 'consul' is applicable")
)
// consulBuilder builds the address resolver for gRPC dialer.
type consulBuilder struct {
consul *consul.Cluster
serviceWatcher connection.Watcher
}
// newConsulBuilder is a constructor.
func newConsulBuilder(consul *consul.Cluster, serviceWatcher connection.Watcher) *consulBuilder {
return &consulBuilder{
consul: consul,
serviceWatcher: serviceWatcher,
}
}
// Builds the consul address resolver for gRPC.
func (c consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if target.Scheme != resolverSchemeConsul {
return nil, errUnknownScheme
}
serviceName, tag, err := parseTarget(target)
if err != nil {
return nil, fmt.Errorf("parse target: %w", err)
}
cr := newConsulResolver(c.serviceWatcher, cc)
err = cr.watch(serviceName, []string{tag})
if err != nil {
return nil, fmt.Errorf("watch: %w", err)
}
return cr, nil
}
// parses the target and returns service name and tag.
// example of target endpoint: 127.0.0.1:8500/service?tag=master
func parseTarget(target resolver.Target) (string, string, error) {
u, err := url.Parse(fmt.Sprintf("%s://%s", resolverSchemeConsul, target.Endpoint))
if err != nil {
return "", "", fmt.Errorf("parse: %w", err)
}
return strings.Trim(u.Path, "/"), u.Query().Get("tag"), nil
}
// Scheme returns the consul resolver scheme.
func (c consulBuilder) Scheme() string {
return resolverSchemeConsul
}
type consulResolver struct {
cc resolver.ClientConn
serviceWatcher connection.Watcher
unwatchFunc func() error
}
func newConsulResolver(serviceWatcher connection.Watcher, cc resolver.ClientConn) *consulResolver {
return &consulResolver{
serviceWatcher: serviceWatcher,
cc: cc,
}
}
func (c *consulResolver) onServiceChanged(entries []*api.ServiceEntry) {
addresses := make([]resolver.Address, len(entries))
for i, e := range entries {
addresses[i] = resolver.Address{
Addr: connection.BuildAddr(e),
}
}
err := c.cc.UpdateState(resolver.State{
Addresses: addresses,
})
if err != nil {
logrus.WithError(err).Error("update gRPC consul resolver addresses")
}
}
func (c *consulResolver) watch(serviceName string, tags []string) error {
var err error
c.unwatchFunc, err = c.serviceWatcher.WatchService(serviceName, tags, true, c.onServiceChanged)
if err != nil {
return fmt.Errorf("watch service: %w", err)
}
return nil
}
// ResolveNow we don't need to anything here because all addresses are updated on change in consul.
func (c *consulResolver) ResolveNow(options resolver.ResolveNowOptions) {}
// Close is an interface method. Gracefully closes the consulResolver.
func (c *consulResolver) Close() {
err := c.unwatchFunc()
if err != nil {
logrus.WithError(err).Error("unwatch service in gRPC consul resolver")
}
}
import (
...
"google.golang.org/grpc"
)
...
// Dial dials to a service through gRPC and returns a new connection.
func Dial(serviceName, tag string, timeout time.Duration) (grpc.ClientConnInterface, error) {
cfg := consul.Config()
target := fmt.Sprintf("%s:///%s/%s?tag=%s", resolverSchemeConsul, cfg.Address, serviceName, tag)
return grpc.Dial(
target,
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
grpc.WithResolvers(newConsulBuilder(consul, consul.ServiceWatcher())),
grpc.WithInsecure(),
)
}
As it turned out, we don’t need a custom host resolver for Consul, and gRPC has the built integration.
import (
...
"google.golang.org/grpc"
_ "github.com/mbobakov/grpc-consul-resolver"
)
...
// Dial dials a service through gRPC and returns a new connection.
func Dial(serviceName, tag string, timeout time.Duration) (grpc.ClientConnInterface, error) {
cfg := consul.Config()
target := fmt.Sprintf("consul://%s:%s@%s/%s?tag=%s", cfg.User, cfg.Password, cfg.Address, serviceName, tag)
return grpc.Dial(
target,
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
grpc.WithInsecure(),
)
}
consul://user:[email protected]:8500/service_name?tag=service_tag
.