服务发现实战之一 consul服务发现构建 Discover Consul in Action
准备
- ubuntu v20.04
- git
- go v1.16
项目结构
.
├── config
├── discover
├── endpoint
├── go.mod
├── go.sum
├── main.go
├── service
└── transport
- transport层 项目提供的服务方式
- endpoint层 用于接收请求并返回响应
- service层 业务代码实现层
- discover 服务发现实现
什么是consul?
Consul 是一种服务网格解决方案,提供具有服务发现、配置和分段功能的全功能控制平面。这些功能中的每一个都可以根据需要单独使用,也可以一起使用以构建完整的服务网格。 Consul 需要一个数据平面并支持代理和本地集成模型。 Consul 附带一个简单的内置代理,因此一切都可以开箱即用,而且还支持 第三方方代理集成,例如 Envoy。
Consul 的主要特点是:
- 服务发现:Consul 的客户端可以注册一个服务,例如 api 或 mysql,其他客户端可以使用 Consul 来发现给定服务的提供者。使用 DNS 或 HTTP,应用程序可以轻松找到它们所依赖的服务。
- 健康检查:Consul 客户端可以提供任意数量的健康检查,要么与给定的服务相关(“网络服务器是否返回 200 OK”),要么与本地节点(“内存利用率是否低于 90%”)相关联。操作员可以使用此信息来监控集群健康状况,并且服务发现组件可以使用它来将流量路由到不健康的主机之外。
- KV 存储:应用程序可以将 Consul 的分层键/值存储用于多种目的,包括动态配置、功能标记、协调、领导选举等。简单的 HTTP API 使其易于使用。
- 安全的服务通信:Consul 可以为服务生成和分发 TLS 证书,以建立相互的 TLS 连接。意图可用于定义允许哪些服务进行通信。可以通过实时更改意图轻松管理服务分段,而不是使用复杂的网络拓扑和静态防火墙规则。
- 多数据中心:Consul 支持开箱即用的多个数据中心。这意味着 Consul 的用户不必担心构建额外的抽象层以扩展到多个区域。
consul的安装和启动
安装方式
- 二进制包下载
- 源码编译
源码编译
# 将 Consul 存储库从 GitHub 克隆到您的 GOPATH
$ mkdir -p $GOPATH/src/github.com/hashicorp && cd !$
$ git clone https://github.com/hashicorp/consul.git
$ cd consul
# 引导项目。这将下载和编译编译 Consul 所需的库和工具
make tools
# 为您当前的系统构建 Consul 并将二进制文件放入 ./bin/ (相对于 git checkout)。 make dev 目标只是为本地构建环境(没有交叉编译的目标)构建 consul 的快捷方式。
make dev
# 验证安装
consul -v
启动consul
# 启动consul, -dev表示以开发模式启动,该模式下会快速部署一个单节点的consul服务,部署好的节点既是server也是Leader,开发模式启动的consul不会持久化任何数据,数据仅存在内存中。在生产环境中建议使用-server模式启动
consul agent -dev -data-dir=/tmp/consul
服务注册与发现接口
定义与consul交互的discovery_client接口,/discover/discover_client.go
package discover
import "log"
type DiscoveryClient interface {
/**
* 服务注册接口
* @param serviceName 服务名
* @param instanceId 服务实例Id
* @param instancePort 服务实例端口
* @param healthCheckUrl 健康检查地址
* @param instanceHost 服务实例地址
* @param meta 服务实例元数据
*/
Register(serviceName, instanceId, healthCheckUrl string, instanceHost string, instancePort int, meta map[string]string, logger *log.Logger) bool
/**
* 服务注销接口
* @param instanceId 服务实例Id
*/
DeRegister(instanceId string, logger *log.Logger) bool
/**
* 发现服务实例接口
* @param serviceName 服务名
*/
DiscoverServices(serviceName string, logger *log.Logger) []interface{}
}
利用go-kit实现服务注册与发现接口,/discover/kit_discover_client.go, 先执行go get -u github.com/go-kit/kit/sd/consul github.com/hashicorp/consul/api github.com/hashicorp/consul/api/watch
package discover
import (
"log"
"strconv"
"sync"
"github.com/go-kit/kit/sd/consul"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
)
type KitDiscoverClient struct {
Addr string // Consul Host Address
Port int // Consul port
client consul.Client
config *api.Config // 连接consul的配置
mutex sync.Mutex
instancesMap sync.Map // 服务实例缓存字段
}
func NewKitDiscoverClient(consulAddress string, consulPort int) (DiscoveryClient, error) {
// 通过consul host和consul port创建一个consul.Client
consulConfig := api.DefaultConfig()
consulConfig.Address = consulAddress + ":" + strconv.Itoa(consulPort)
apiClient, err := api.NewClient(consulConfig)
if err != nil {
return nil, err
}
client := consul.NewClient(apiClient)
return &KitDiscoverClient{
Addr: consulAddress,
Port: consulPort,
config: consulConfig,
client: client,
}, err
}
func (k *KitDiscoverClient) Register(
serviceName, instanceId, healthCheckUrl string,
instanceAddress string,
instancePort int,
meta map[string]string,
logger *log.Logger) bool {
// 构建服务实例元数据
serviceRegisteration := &api.AgentServiceRegistration{
ID: instanceId,
Name: serviceName,
Address: instanceAddress,
Port: instancePort,
Meta: meta,
Check: &api.AgentServiceCheck{
DeregisterCriticalServiceAfter: "30s",
HTTP: "http://" + instanceAddress + ":" + strconv.Itoa(instancePort) + healthCheckUrl,
Interval: "15s",
},
}
// 发送服务注册到consul 中
err := k.client.Register(serviceRegisteration)
if err != nil {
log.Println("Register Service Error!")
return false
}
log.Println("Register Service Success!")
return true
}
func (k *KitDiscoverClient) DeRegister(instanceId string, logger *log.Logger) bool {
// 构建包含服务实例 ID 的元数据结构体
serviceRegisteration := &api.AgentServiceRegistration{
ID: instanceId,
}
// 发送服务注销请求
err := k.client.Deregister(serviceRegisteration)
if err != nil {
logger.Println("DeRegister Service Error!")
return false
}
log.Println("Register Service Success!")
return true
}
func (k *KitDiscoverClient) DiscoverServices(serviceName string, logger *log.Logger) []interface{} {
// 该服务已监控并缓存
instanceList, ok := k.instancesMap.Load(serviceName)
if ok {
return instanceList.([]interface{})
}
// 申请锁
k.mutex.Lock()
defer k.mutex.Unlock()
// 再次检查是否监控
instanceList, ok = k.instancesMap.Load(serviceName)
if ok {
return instanceList.([]interface{})
} else {
// 注册监控
go func() {
// 使用consul服务实例监控来监控某个服务名的服务实例列表变化
params := make(map[string]interface{})
params["type"] = "service"
params["service"] = serviceName
plan, _ := watch.Parse(params)
plan.Handler = func(u uint64, i interface{}) {
if i == nil {
return
}
v, ok := i.([]*api.ServiceEntry)
if !ok {
return // 数据异常,忽略
}
// 没有服务实例在线
if len(v) == 0 {
k.instancesMap.Store(serviceName, []interface{}{})
}
var healthServices []interface{}
for _, service := range v {
if service.Checks.AggregatedStatus() == api.HealthPassing {
healthServices = append(healthServices, service.Service)
}
}
k.instancesMap.Store(serviceName, healthServices)
}
defer plan.Stop()
plan.Run(k.config.Address)
}()
}
// 根据服务名请求服务实例列表
entries, _, err := k.client.Service(serviceName, "", false, nil)
if err != nil {
k.instancesMap.Store(serviceName, []interface{}{})
logger.Println("Discover Service Error!")
return nil
}
instances := make([]interface{}, len(entries))
for i := 0; i < len(instances); i++ {
instances[i] = entries[i].Service
}
k.instancesMap.Store(serviceName, instances)
return instances
}
配置信息代码实现
定义日志信息,/config/config.go
先执行go get -u github.com/go-kit/kit/log
package config
import (
kitlog "github.com/go-kit/kit/log"
"log"
"os"
)
var Logger *log.Logger
var KitLogger kitlog.Logger
func init() {
Logger = log.New(os.Stderr, "", log.LstdFlags)
KitLogger = kitlog.NewLogfmtLogger(os.Stderr)
KitLogger = kitlog.With(KitLogger, "ts", kitlog.DefaultTimestampUTC)
KitLogger = kitlog.With(KitLogger, "caller", kitlog.DefaultCaller)
}
代码实现
定义项目提供的服务接口,/service/service.go
package service
import (
"context"
"errors"
"github.com/realjf/consul-in-action/config"
"github.com/realjf/consul-in-action/discover"
)
type Service interface {
// 健康检查接口
HealthCheck() bool
// 打招呼接口
SayHello() string
// 服务发现接口
DiscoveryService(ctx context.Context, serviceName string) ([]interface{}, error)
}
接口实现
var ErrNotDiscoveryService = errors.New("discovery service not found")
type discoveryService struct {
discoveryClient discover.DiscoveryClient
}
func NewDiscoveryService(discoveryClient discover.DiscoveryClient) DiscoveryService {
return &discoveryService{
discoveryClient: discoveryClient,
}
}
func (s *discoveryService) SayHello() string {
return "Hello World"
}
// 从consul中根据服务名获取对应的服务实例信息列表并返回
func (s *discoveryService) DiscoveryService(ctx context.Context, serviceName string) ([]interface{}, error) {
ins := s.discoveryClient.DiscoverServices(serviceName, config.Logger)
if ins == nil || len(ins) == 0 {
return nil, ErrNotDiscoveryService
}
return ins, nil
}
func (s *discoveryService) HealthCheck() bool {
return true
}
endpoint层 将请求转化为服务接口可以处理的参数,并将结果封装为response返回给transport层。 首先执行go get -u github.com/go-kit/kit/endpoint,然后实现 endpoint层代码/endpoint/endpoints.go,
package endpoint
import (
"context"
"github.com/go-kit/kit/endpoint"
"github.com/realjf/consul-in-action/service"
)
// 定义服务发现endpoint结构
type DiscoveryEndpoints struct {
SayHelloEndpoint endpoint.Endpoint
DiscoveryEndpoint endpoint.Endpoint
HealthCheckEndpoint endpoint.Endpoint
}
// 然后实现sayHello请求:包括请求结构体、响应结构体和创建方法
type SayHelloRequest struct {
}
type SayHelloResponse struct {
Message string `json"message"`
}
func NewSayHelloEndpoint(svc service.DiscoveryService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
message := svc.SayHello()
return SayHelloResponse{
Message: message,
}, nil
}
}
// 实现服务发现请求:请求结构体、响应结构体和创建方法
type DiscoveryRequest struct {
ServiceName string
}
type DiscoveryResponse struct {
Instances []interface{} `json:"instances"`
Error string `json:"error"`
}
func NewDiscoveryEndpoint(svc service.DiscoveryService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(DiscoveryRequest)
instances, err := svc.DiscoveryService(ctx, req.ServiceName)
var errString = ""
if err != nil {
errString = err.Error()
}
return &DiscoveryResponse{
Instances: instances,
Error: errString,
}, nil
}
}
// 实现健康检查请求:请求结构体、响应结构体和创建方法
type HealthCheckRequest struct {
}
type HealthCheckResponse struct {
Status bool `json:"status"`
}
func NewHealthCheckEndpoint(svc service.DiscoveryService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
status := svc.HealthCheck()
return HealthCheckResponse{
Status: status,
}, nil
}
}
transport层 对外暴露http服务,将endpoint包中的Endpoint与对应http路径进行绑定。 先执行go get -u github.com/gorilla/mux github.com/go-kit/kit/transport/http github.com/go-kit/kit/transport github.com/go-kit/kit/log, 然后代码实现/transport/http.go
package transport
import (
"context"
"encoding/json"
"errors"
"net/http"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
kithttp "github.com/go-kit/kit/transport/http"
"github.com/gorilla/mux"
"github.com/realjf/consul-in-action/endpoint"
)
var (
ErrBadRequest = errors.New("invalid parameters")
)
func errorEncoder(ctx context.Context, err error, w http.ResponseWriter) {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, `{"error":"%s"}\n`, err.Error()) // or whatever
}
func NewHttpHandler(ctx context.Context, endpoints endpoint.DiscoveryEndpoints, logger log.Logger) http.Handler {
r := mux.NewRouter()
options := []kithttp.ServerOption{
kithttp.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
kithttp.ServerErrorEncoder(errorEncoder),
}
r.Methods("GET").Path("/say-hello").Handler(kithttp.NewServer(
endpoints.SayHelloEndpoint,
decodeSayHelloRequest,
encodeJsonResponse,
options...,
))
r.Methods("GET").Path("/discovery").Handler(kithttp.NewServer(
endpoints.DiscoveryEndpoint,
decodeDiscoveryRequest,
encodeJsonResponse,
options...,
))
r.Methods("GET").Path("/health").Handler(kithttp.NewServer(
endpoints.HealthCheckEndpoint,
decodeHealthCheckRequest,
encodeJsonResponse,
options...,
))
return r
}
// decodeXXXRequest将http请求转化为endpoint可处理的request请求体
func decodeSayHelloRequest(_ context.Context, r *http.Request) (interface{}, error) {
return endpoint.SayHelloRequest{}, nil
}
func decodeDiscoveryRequest(_ context.Context, r *http.Request) (interface{}, error) {
serviceName := r.URL.Query().Get("serviceName")
if serviceName == "" {
return nil, ErrBadRequest
}
return endpoint.DiscoveryRequest{
ServiceName: serviceName,
}, nil
}
func decodeHealthCheckRequest(_ context.Context, r *http.Request) (interface{}, error) {
return endpoint.HealthCheckRequest{}, nil
}
// encodeJsonResponse将返回的response转化为json格式
func encodeJsonResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
w.Header().Set("Content-Type", "application/json;charset=utf-8")
return json.NewEncoder(w).Encode(response)
}
最后实现main函数,先执行go get -u github.com/satori/go.uuid
package main
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"github.com/realjf/consul-in-action/config"
"github.com/realjf/consul-in-action/discover"
"github.com/realjf/consul-in-action/endpoint"
"github.com/realjf/consul-in-action/service"
"github.com/realjf/consul-in-action/transport"
uuid "github.com/satori/go.uuid"
)
func main() {
var (
// 服务地址和服务名
servicePort = flag.Int("service.port", 12000, "service port")
serviceHost = flag.String("service.host", "127.0.0.1", "service host")
serviceName = flag.String("service.name", "SayHello", "service name")
// consul 地址
consulPort = flag.Int("consul.port", 8500, "consul port")
consulAddress = flag.String("consul.host", "127.0.0.1", "consul host address")
)
flag.Parse()
ctx := context.Background()
errChan := make(chan error)
// 声明服务发现客户端
var discoveryClient discover.DiscoveryClient
discoveryClient, err := discover.NewKitDiscoverClient(*consulAddress, *consulPort)
// 获取服务发现客户端失败,直接关闭服务
if err != nil {
config.Logger.Println("Get Consul Client failed")
os.Exit(-1)
}
// 声明并初始化 Service
var svc = service.NewDiscoveryService(discoveryClient)
// 创建Endpoint
sayHelloEndpoint := endpoint.NewSayHelloEndpoint(svc)
discoveryEndpoint := endpoint.NewDiscoveryEndpoint(svc)
healthCheckEndpoint := endpoint.NewHealthCheckEndpoint(svc)
endpoints := endpoint.DiscoveryEndpoints{
SayHelloEndpoint: sayHelloEndpoint,
DiscoveryEndpoint: discoveryEndpoint,
HealthCheckEndpoint: healthCheckEndpoint,
}
// 创建http.Handler
r := transport.NewHttpHandler(ctx, endpoints, config.KitLogger)
// 定义服务实例ID
instanceId := *serviceName + "-" + uuid.NewV4().String()
// 启动http server
go func() {
config.Logger.Println("Http Server start at port:" + strconv.Itoa(*servicePort))
// 启动前执行注册
if !discoveryClient.Register(*serviceName, instanceId, "/health", *serviceHost, *servicePort, nil, config.Logger) {
config.Logger.Printf("string-service for service %s failed.", serviceName)
// 注册失败,服务启动失败
os.Exit(-1)
}
handler := r
errChan <- http.ListenAndServe(":"+strconv.Itoa(*servicePort), handler)
}()
go func() {
// 监控系统信号,等待ctrl + c 系统信号通知服务关闭
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errChan <- fmt.Errorf("%s", <-c)
}()
error := <-errChan
// 服务退出取消注册
discoveryClient.DeRegister(instanceId, config.Logger)
config.Logger.Println(error)
}
之后启动consul,然后,运行服务go run main.go,打开浏览器:http://localhost:8500查看consul服务信息, 服务发现地址:http://localhost:12000/discovery?serviceName=SayHello