第一原理

让我们创建一个Go kit 服务,我们暂时只用一个main.go文件。

你的事务逻辑

你的服务从你的事务逻辑开始。在Go kit里,我们用一个interface来描述一个服务

// StringService provides operations on strings.
import "context"

type StringService interface {
	Uppercase(string) (string, error)
	Count(string) int
}

这个interface会有一个实现

import (
	"context"
	"errors"
	"strings"
)

type stringService struct{}

func (stringService) Uppercase(s string) (string, error) {
	if s == "" {
		return "", ErrEmpty
	}
	return strings.ToUpper(s), nil
}

func (stringService) Count(s string) int {
	return len(s)
}

// ErrEmpty is returned when input string is empty
var ErrEmpty = errors.New("Empty string")

请求和应答

在Go kit里,主要的通信模式是RPC。因此,我们的interface里的每个方法都会被作为一个远程过程调用。我们为每个方法定义请求和应答结构,独立的捕获所有的输入和输出。

type uppercaseRequest struct {
	S string `json:"s"`
}

type uppercaseResponse struct {
	V   string `json:"v"`
	Err string `json:"err,omitempty"` // errors don't JSON-marshal, so we use a string
}

type countRequest struct {
	S string `json:"s"`
}

type countResponse struct {
	V int `json:"v"`
}

终端 Go kit通过一个叫做endpoint的抽象来提供大部分功能。一个endpint的定义如下:

type Endpoint func(ctx context.Context, request interface{}) (response interface{}, err error)

它相当于一个独立的RPC,就是说,我们服务接口里一个独立的方法。我们会写简单的适配器来把我们服务里的每个方法转换成一个endpoint。每个适配器携带一个StringService,返回相当于一个方法的endpoint。

import (
	"context"
	"github.com/go-kit/kit/endpoint"
)

func makeUppercaseEndpoint(svc StringService) endpoint.Endpoint {
	return func(_ context.Context, request interface{}) (interface{}, error) {
		req := request.(uppercaseRequest)
		v, err := svc.Uppercase(req.S)
		if err != nil {
			return uppercaseResponse{v, err.Error()}, nil
		}
		return uppercaseResponse{v, ""}, nil
	}
}

func makeCountEndpoint(svc StringService) endpoint.Endpoint {
	return func(_ context.Context, request interface{}) (interface{}, error) {
		req := request.(countRequest)
		v := svc.Count(req.S)
		return countResponse{v}, nil
	}
}

传输

现在我们需要把你的服务暴露给外面的世界,这样它才能被调用。你的组织对于服务直接相互调用可能已经有了一些选择。可能你用Thrit,或者定制化的通过HTTP传输的JSON。Go kit多种向外的传输方式。
这个小服务,我们用通过HTTP传输的JSON。Go kit在transport/http包里提供了一个helper struct。

import (
	"context"
	"encoding/json"
	"log"
	"net/http"

	httptransport "github.com/go-kit/kit/transport/http"
)

func main() {
	svc := stringService{}

	uppercaseHandler := httptransport.NewServer(
		makeUppercaseEndpoint(svc),
		decodeUppercaseRequest,
		encodeResponse,
	)

	countHandler := httptransport.NewServer(
		makeCountEndpoint(svc),
		decodeCountRequest,
		encodeResponse,
	)

	http.Handle("/uppercase", uppercaseHandler)
	http.Handle("/count", countHandler)
	log.Fatal(http.ListenAndServe(":8080", nil))
}

func decodeUppercaseRequest(_ context.Context, r *http.Request) (interface{}, error) {
	var request uppercaseRequest
	if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
		return nil, err
	}
	return request, nil
}

func decodeCountRequest(_ context.Context, r *http.Request) (interface{}, error) {
	var request countRequest
	if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
		return nil, err
	}
	return request, nil
}

func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
	return json.NewEncoder(w).Encode(response)
}

stringsvc1

到现在为止,整个服务就是stringsvc1

$ go get github.com/go-kit/kit/examples/stringsvc1
$ stringsvc1
$ curl -XPOST -d'{"s":"hello, world"}' localhost:8080/uppercase
{"v":"HELLO, WORLD","err":null}
$ curl -XPOST -d'{"s":"hello, world"}' localhost:8080/count
{"v":12}

中间件

缺少了logging和instrumentation的服务,不能说做好了在生产环境使用的准备。

焦虑的分离

当你增加服务的endpoint数量的时候,将调用图的每一层分离为单独的文件是一个go-kit工程变得更可读。我们的第一个例子stringsvc1在一个main文件里包含了所有这些层。在增加更多复杂性之前,让我们把代码分离到下列文件中,剩下的留在main.go里。

把服务放在service.go文件里,包含下面的function和type。

type StringService
type stringService
var ErrEmpty

把transports放在transport.go里,包含如下的function和type。

func makeUppercaseEndpoint
func makeCountEndpoint
func decodeUppercaseRequest
func decodeCountRequest
func encodeResponse
type uppercaseRequest
type uppercaseResponse
type countRequest
type countResponse

传输日志

任何需要记录日志的组件都应该把logger当做一个依赖,就行对待一个数据库连接一样。因此,我们在func main里构建我们的logger,然后传给需要它的组件。我们从不使用一个全局的logger。

我们本可以直接把logger传递给stringService的实现,但是有更好的办法。让我们用一个中间件。也叫做装饰器。一个中间件就是一个传入endpoint然后返回endpoint的函数。

type Middleware func(Endpoint) Endpoint

注意,Middleware类型是go-kit提供给你的。

在这中间,它可以做任何事情。下面你可以看到一个可以被实现的基本的日志中间件:

 loggingMiddleware(logger log.Logger) Middleware {
	return func(next endpoint.Endpoint) endpoint.Endpoint {
		return func(_ context.Context, request interface{}) (interface{}, error) {
			logger.Log("msg", "calling endpoint")
			defer logger.Log("msg", "called endpoint")
			return next(request)
		}
	}
}

使用go-kit log包,并且删除标准库log。你需要从main.go底部删除log.Fatal。

import (
 "github.com/go-kit/kit/log"
)

把它写在每个处理器里。注意,下面的代码块在你学完Application Logging部分之前不会编译,它定义了loggingMiddleware。

logger := log.NewLogfmtLogger(os.Stderr)

svc := stringService{}

var uppercase endpoint.Endpoint
uppercase = makeUppercaseEndpoint(svc)
uppercase = loggingMiddleware(log.With(logger, "method", "uppercase"))(uppercase)

var count endpoint.Endpoint
count = makeCountEndpoint(svc)
count = loggingMiddleware(log.With(logger, "method", "count"))(count)

uppercaseHandler := httptransport.NewServer(
	// ...
	uppercase,
	// ...
)

countHandler := httptransport.NewServer(
	// ...
	count,
	// ...
)

它说明这个技术有更多的用途,而不只是用来记录日志。很多Go kit组件都实现成了endpoint的中间件。

应用日志

但是,如果我们想像传入的参数一样记录我们应用范畴的日志呢?我们可以为我们的服务定义一个中间件,获得一样优雅的组合效果。由于我们的StringService被定义成立一个interface,我们只需要一个包装了已有的StringService的新类型,来负责额外的日志记录。

type loggingMiddleware struct {
	logger log.Logger
	next   StringService
}

func (mw loggingMiddleware) Uppercase(s string) (output string, err error) {
	defer func(begin time.Time) {
		mw.logger.Log(
			"method", "uppercase",
			"input", s,
			"output", output,
			"err", err,
			"took", time.Since(begin),
		)
	}(time.Now())

	output, err = mw.next.Uppercase(s)
	return
}

func (mw loggingMiddleware) Count(s string) (n int) {
	defer func(begin time.Time) {
		mw.logger.Log(
			"method", "count",
			"input", s,
			"n", n,
			"took", time.Since(begin),
		)
	}(time.Now())

	n = mw.next.Count(s)
	return
}

然后,写入

import (
	"os"

	"github.com/go-kit/kit/log"
	httptransport "github.com/go-kit/kit/transport/http"
)

func main() {
	logger := log.NewLogfmtLogger(os.Stderr)

	var svc StringService
	svc = stringService{}
	svc = loggingMiddleware{logger, svc}

	// ...

	uppercaseHandler := httptransport.NewServer(
		// ...
		makeUppercaseEndpoint(svc),
		// ...
	)

	countHandler := httptransport.NewServer(
		// ...
		makeCountEndpoint(svc),
		// ...
	)
}

用endpoint中间件对付传输范畴的问题,例如熔断和限流。用服务中间件来处理事务领域的问题,例如日志和instrumentation。说道instrumentation。。。

应用instrumentation

在Go kit里,instrumentation表示用package metrics来记录关于你的服务的运行时行为的统计信息。统计job运行的数量,在请求结束后记录时长,跟踪正在运行的操作数量,这些都被认为是instrumentation。

我们可以用和logging相同的中间件模式

type instrumentingMiddleware struct {
	requestCount   metrics.Counter
	requestLatency metrics.Histogram
	countResult    metrics.Histogram
	next           StringService
}

func (mw instrumentingMiddleware) Uppercase(s string) (output string, err error) {
	defer func(begin time.Time) {
		lvs := []string{"method", "uppercase", "error", fmt.Sprint(err != nil)}
		mw.requestCount.With(lvs...).Add(1)
		mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds())
	}(time.Now())

	output, err = mw.next.Uppercase(s)
	return
}

func (mw instrumentingMiddleware) Count(s string) (n int) {
	defer func(begin time.Time) {
		lvs := []string{"method", "count", "error", "false"}
		mw.requestCount.With(lvs...).Add(1)
		mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds())
		mw.countResult.Observe(float64(n))
	}(time.Now())

	n = mw.next.Count(s)
	return
}

然后写入我们的服务里

import (
	stdprometheus "github.com/prometheus/client_golang/prometheus"
	kitprometheus "github.com/go-kit/kit/metrics/prometheus"
	"github.com/go-kit/kit/metrics"
)

func main() {
	logger := log.NewLogfmtLogger(os.Stderr)

	fieldKeys := []string{"method", "error"}
	requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
		Namespace: "my_group",
		Subsystem: "string_service",
		Name:      "request_count",
		Help:      "Number of requests received.",
	}, fieldKeys)
	requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
		Namespace: "my_group",
		Subsystem: "string_service",
		Name:      "request_latency_microseconds",
		Help:      "Total duration of requests in microseconds.",
	}, fieldKeys)
	countResult := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
		Namespace: "my_group",
		Subsystem: "string_service",
		Name:      "count_result",
		Help:      "The result of each count method.",
	}, []string{}) // no fields here

	var svc StringService
	svc = stringService{}
	svc = loggingMiddleware{logger, svc}
	svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc}

	uppercaseHandler := httptransport.NewServer(
		makeUppercaseEndpoint(svc),
		decodeUppercaseRequest,
		encodeResponse,
	)

	countHandler := httptransport.NewServer(
		makeCountEndpoint(svc),
		decodeCountRequest,
		encodeResponse,
	)

	http.Handle("/uppercase", uppercaseHandler)
	http.Handle("/count", countHandler)
	http.Handle("/metrics", promhttp.Handler())
	logger.Log("msg", "HTTP", "addr", ":8080")
	logger.Log("err", http.ListenAndServe(":8080", nil))
}

stringsvc2

到现在为止,完整的服务是stringsvc2

$ go get github.com/go-kit/kit/examples/stringsvc2
$ stringsvc2
msg=HTTP addr=:8080
$ curl -XPOST -d'{"s":"hello, world"}' localhost:8080/uppercase
{"v":"HELLO, WORLD","err":null}
$ curl -XPOST -d'{"s":"hello, world"}' localhost:8080/count
{"v":12}
method=uppercase input="hello, world" output="HELLO, WORLD" err=null took=2.455µs
method=count input="hello, world" n=12 took=743ns

调用其他服务

一个服务存在于一个真空里是很罕见的。通常,你需要调用其他的服务。这就是Go kit的价值所在。我们提供了传输中间件来解决接下来的很多问题。

假设我们让我们的string服务调用一个不同的string服务来满足Uppercase方法。就是说,把请求代理到另外的服务上。让我们来实现代理中间件叫做ServiceMiddleware,和logging或者instrumenting中间件一样。

// proxymw implements StringService, forwarding Uppercase requests to the
// provided endpoint, and serving all other (i.e. Count) requests via the
// next StringService.
type proxymw struct {
	next      StringService     // Serve most requests via this service...
	uppercase endpoint.Endpoint // ...except Uppercase, which gets served by this endpoint
}

客户端 endpoint

我们有一个我们已经了解的同样的endpont,但是我们将用它来转移一个请求,而不是直接提供服务。这样使用的时候,我们叫它客户端endpoint。我们只需要做一些简单的改动:

func (mw proxymw) Uppercase(s string) (string, error) {
	response, err := mw.uppercase(uppercaseRequest{S: s})
	if err != nil {
		return "", err
	}
	resp := response.(uppercaseResponse)
	if resp.Err != "" {
		return resp.V, errors.New(resp.Err)
	}
	return resp.V, nil
}

现在,为了构建一个这样的代理中间件,我们转换一个代理URL字符串给一个endpoint。如果我们假设使用通过HTTP的JSON,我们可以使用transport/http包里的helper。

import (
	httptransport "github.com/go-kit/kit/transport/http"
)

func proxyingMiddleware(proxyURL string) ServiceMiddleware {
	return func(next StringService) StringService {
		return proxymw{next, makeUppercaseProxy(proxyURL)}
	}
}

func makeUppercaseProxy(proxyURL string) endpoint.Endpoint {
	return httptransport.NewClient(
		"GET",
		mustParseURL(proxyURL),
		encodeUppercaseRequest,
		decodeUppercaseResponse,
	).Endpoint()
}

服务发现和负载均衡

如果我们只有一个远程服务还好。但是真实情况是,我们可能有很多可用的服务实例。我们希望通过一些服务发现机制发现它们。并且把我们的压力分散到他们身上。而且如果这些实例中的任何一个表现异常,我们希望可以处理,不影响我们的服务可用性。

Go kit为不同的服务发现系统提供了转换器,用来获取更新的实例集合,作为独立的endpoint暴露出去。这些转换器叫做subscriber。

type Subscriber interface {
	Endpoints() ([]endpoint.Endpoint, error)
}

在内部,subscriber使用一个工厂方法来把发现实例字符串转换成一个可用的endpoint。

type Factory func(instance string) (endpoint.Endpoint, error)

目前,我们的工厂方法,makeUppercaseProxy,还是直接调用URL。但是在工厂里放一些安全中间件,例如熔断和限流,同样很重要。

var e endpoint.Endpoint
e = makeUppercaseProxy(instance)
e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(maxQPS), int64(maxQPS)))(e)

现在,我们有了一个endpoint集合,我们需要选择一个。负载均衡器包装了subscriber,并且从中选择一个endpoint。Go kit提供了一些基础的负载均衡器,并且,如果有更多需求,自己写一个也很容易。

type Balancer interface {
	Endpoint() (endpoint.Endpoint, error)
}

现在,我们有能力根据一些逻辑选择endpoint。我们可以用来提供独立的,逻辑的,健壮的endpoint给消费者。重试策略包装一个负载均衡器,返回一个可用的endpoint。重试策略将会重试失败的请求,知道到达最大尝试次数或者超时。

func Retry(max int, timeout time.Duration, lb Balancer) endpoint.Endpoint

让我们把最终的代理中间件连起来。简化起见,我们假设用户提供多个用逗号分隔的实例endpoint。

func proxyingMiddleware(instances string, logger log.Logger) ServiceMiddleware {
	// If instances is empty, don't proxy.
	if instances == "" {
		logger.Log("proxy_to", "none")
		return func(next StringService) StringService { return next }
	}

	// Set some parameters for our client.
	var (
		qps         = 100                    // beyond which we will return an error
		maxAttempts = 3                      // per request, before giving up
		maxTime     = 250 * time.Millisecond // wallclock time, before giving up
	)

	// Otherwise, construct an endpoint for each instance in the list, and add
	// it to a fixed set of endpoints. In a real service, rather than doing this
	// by hand, you'd probably use package sd's support for your service
	// discovery system.
	var (
		instanceList = split(instances)
		subscriber   sd.FixedSubscriber
	)
	logger.Log("proxy_to", fmt.Sprint(instanceList))
	for _, instance := range instanceList {
		var e endpoint.Endpoint
		e = makeUppercaseProxy(instance)
		e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
		e = kitratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e)
		subscriber = append(subscriber, e)
	}

	// Now, build a single, retrying, load-balancing endpoint out of all of
	// those individual endpoints.
	balancer := lb.NewRoundRobin(subscriber)
	retry := lb.Retry(maxAttempts, maxTime, balancer)

	// And finally, return the ServiceMiddleware, implemented by proxymw.
	return func(next StringService) StringService {
		return proxymw{next, retry}
	}
}

stringsvc3

现在为止完整的服务是stringsvc3

$ go get github.com/go-kit/kit/examples/stringsvc3
$ stringsvc3 -listen=:8001 &
listen=:8001 caller=proxying.go:25 proxy_to=none
listen=:8001 caller=main.go:72 msg=HTTP addr=:8001
$ stringsvc3 -listen=:8002 &
listen=:8002 caller=proxying.go:25 proxy_to=none
listen=:8002 caller=main.go:72 msg=HTTP addr=:8002
$ stringsvc3 -listen=:8003 &
listen=:8003 caller=proxying.go:25 proxy_to=none
listen=:8003 caller=main.go:72 msg=HTTP addr=:8003
$ stringsvc3 -listen=:8080 -proxy=localhost:8001,localhost:8002,localhost:8003
listen=:8080 caller=proxying.go:29 proxy_to="[localhost:8001 localhost:8002 localhost:8003]"
listen=:8080 caller=main.go:72 msg=HTTP addr=:8080
$ for s in foo bar baz ; do curl -d"{\"s\":\"$s\"}" localhost:8080/uppercase ; done
{"v":"FOO","err":null}
{"v":"BAR","err":null}
{"v":"BAZ","err":null}
listen=:8001 caller=logging.go:28 method=uppercase input=foo output=FOO err=null took=5.168µs
listen=:8080 caller=logging.go:28 method=uppercase input=foo output=FOO err=null took=4.39012ms
listen=:8002 caller=logging.go:28 method=uppercase input=bar output=BAR err=null took=5.445µs
listen=:8080 caller=logging.go:28 method=uppercase input=bar output=BAR err=null took=2.04831ms
listen=:8003 caller=logging.go:28 method=uppercase input=baz output=BAZ err=null took=3.285µs
listen=:8080 caller=logging.go:28 method=uppercase input=baz output=BAZ err=null took=1.388155ms