Commit 1775891c authored by Andrew Newdigate's avatar Andrew Newdigate Committed by Nick Thomas
Browse files

Add distributed tracing with LabKit

parent 254f37c7
......@@ -48,15 +48,15 @@ $(TARGET_SETUP):
 
gitlab-zip-cat: $(TARGET_SETUP) $(shell find cmd/gitlab-zip-cat/ -name '*.go')
$(call message,Building $@)
$(GOBUILD) -o $(BUILD_DIR)/$@ $(PKG)/cmd/$@
$(GOBUILD) -tags "$(BUILD_TAGS)" -o $(BUILD_DIR)/$@ $(PKG)/cmd/$@
 
gitlab-zip-metadata: $(TARGET_SETUP) $(shell find cmd/gitlab-zip-metadata/ -name '*.go')
$(call message,Building $@)
$(GOBUILD) -o $(BUILD_DIR)/$@ $(PKG)/cmd/$@
$(GOBUILD) -tags "$(BUILD_TAGS)" -o $(BUILD_DIR)/$@ $(PKG)/cmd/$@
 
gitlab-workhorse: $(TARGET_SETUP) $(shell find . -name '*.go' | grep -v '^\./_')
$(call message,Building $@)
$(GOBUILD) -o $(BUILD_DIR)/$@ $(PKG)
$(GOBUILD) -tags "$(BUILD_TAGS)" -o $(BUILD_DIR)/$@ $(PKG)
 
.PHONY: install
install: gitlab-workhorse gitlab-zip-cat gitlab-zip-metadata
......@@ -67,13 +67,13 @@ install: gitlab-workhorse gitlab-zip-cat gitlab-zip-metadata
.PHONY: test
test: $(TARGET_SETUP) prepare-tests
$(call message,$@)
@go test $(LOCAL_PACKAGES)
@go test -tags "$(BUILD_TAGS)" $(LOCAL_PACKAGES)
@echo SUCCESS
 
.PHONY: coverage
coverage: $(TARGET_SETUP) prepare-tests
$(call message,$@)
@go test -cover -coverprofile=test.coverage $(LOCAL_PACKAGES)
@go test -tags "$(BUILD_TAGS)" -cover -coverprofile=test.coverage $(LOCAL_PACKAGES)
go tool cover -html=test.coverage -o coverage.html
rm -f test.coverage
 
......
......@@ -204,6 +204,26 @@ the first priority during development.
 
It is OK if a feature is only covered by integration tests.
 
## Distributed Tracing
Workhorse supports distributed tracing through [LabKit](https://gitlab.com/gitlab-org/labkit/) using [OpenTracing APIs](https://opentracing.io).
By default, no tracing implementation is linked into the binary, but different OpenTracing providers can be linked in using [build tags](https://golang.org/pkg/go/build/#hdr-Build_Constraints)/[build constraints](https://golang.org/pkg/go/build/#hdr-Build_Constraints). This can be done by setting the `BUILD_TAGS` make variable.
For more details of the supported providers, see LabKit, but as an example, for Jaeger tracing support, include the tags: `BUILD_TAGS="tracer_static tracer_static_jaeger"`.
```shell
make BUILD_TAGS="tracer_static tracer_static_jaeger"
```
Once Workhorse is compiled with an opentracing provider, the tracing configuration is configured via the `GITLAB_TRACING` environment variable.
For example:
```shell
GITLAB_TRACING=opentracing://jaeger ./gitlab-workhorse
```
## License
 
This code is distributed under the MIT license, see the LICENSE file.
......
......@@ -11,6 +11,7 @@ import (
"google.golang.org/grpc"
 
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
)
 
type Server struct {
......@@ -113,6 +114,7 @@ func newConnection(server Server) (*grpc.ClientConn, error) {
grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(server.Token)),
grpc.WithStreamInterceptor(
grpc_middleware.ChainStreamClient(
grpctracing.StreamClientTracingInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpccorrelation.StreamClientCorrelationInterceptor(),
),
......@@ -120,6 +122,7 @@ func newConnection(server Server) (*grpc.ClientConn, error) {
 
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpctracing.UnaryClientTracingInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpccorrelation.UnaryClientCorrelationInterceptor(),
),
......
......@@ -10,6 +10,7 @@ import (
"time"
 
"gitlab.com/gitlab-org/labkit/correlation"
"gitlab.com/gitlab-org/labkit/tracing"
 
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
......@@ -18,7 +19,7 @@ import (
// that are more restrictive than for http.DefaultTransport,
// they define shorter TLS Handshake, and more agressive connection closing
// to prevent the connection hanging and reduce FD usage
var httpTransport = correlation.NewInstrumentedRoundTripper(&http.Transport{
var httpTransport = tracing.NewRoundTripper(correlation.NewInstrumentedRoundTripper(&http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
......@@ -29,7 +30,7 @@ var httpTransport = correlation.NewInstrumentedRoundTripper(&http.Transport{
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
})
}))
 
var httpClient = &http.Client{
Transport: httpTransport,
......
......@@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
 
"gitlab.com/gitlab-org/labkit/correlation"
"gitlab.com/gitlab-org/labkit/tracing"
 
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/log"
......@@ -38,7 +39,7 @@ var rangeHeaderKeys = []string{
// that are more restrictive than for http.DefaultTransport,
// they define shorter TLS Handshake, and more agressive connection closing
// to prevent the connection hanging and reduce FD usage
var httpTransport = correlation.NewInstrumentedRoundTripper(&http.Transport{
var httpTransport = tracing.NewRoundTripper(correlation.NewInstrumentedRoundTripper(&http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
......@@ -49,7 +50,7 @@ var httpTransport = correlation.NewInstrumentedRoundTripper(&http.Transport{
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
})
}))
 
var httpClient = &http.Client{
Transport: httpTransport,
......
......@@ -9,6 +9,7 @@ import (
"time"
 
"gitlab.com/gitlab-org/labkit/correlation"
"gitlab.com/gitlab-org/labkit/tracing"
 
"gitlab.com/gitlab-org/gitlab-workhorse/internal/badgateway"
)
......@@ -47,7 +48,11 @@ func NewBackendRoundTripper(backend *url.URL, socket string, proxyHeadersTimeout
panic("backend is nil and socket is empty")
}
 
return correlation.NewInstrumentedRoundTripper(badgateway.NewRoundTripper(developmentMode, transport))
return tracing.NewRoundTripper(
correlation.NewInstrumentedRoundTripper(
badgateway.NewRoundTripper(developmentMode, transport),
),
)
}
 
// NewTestBackendRoundTripper sets up a RoundTripper for testing purposes
......
......@@ -7,6 +7,8 @@ import (
 
"github.com/gorilla/websocket"
 
"gitlab.com/gitlab-org/labkit/tracing"
apipkg "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/artifacts"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/builds"
......@@ -78,6 +80,10 @@ func route(method, regexpStr string, handler http.Handler, opts ...func(*routeOp
 
handler = denyWebsocket(handler) // Disallow websockets
handler = instrumentRoute(handler, method, regexpStr) // Add prometheus metrics
if options.tracing {
// Add distributed tracing
handler = tracing.Handler(handler)
}
 
return routeEntry{
method: method,
......
......@@ -14,6 +14,7 @@ import (
"github.com/jfbus/httprs"
 
"gitlab.com/gitlab-org/labkit/correlation"
"gitlab.com/gitlab-org/labkit/tracing"
 
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
......@@ -25,7 +26,7 @@ var ErrNotAZip = errors.New("not a zip")
var ErrArchiveNotFound = errors.New("archive not found")
 
var httpClient = &http.Client{
Transport: correlation.NewInstrumentedRoundTripper(&http.Transport{
Transport: tracing.NewRoundTripper(correlation.NewInstrumentedRoundTripper(&http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
......@@ -35,7 +36,7 @@ var httpClient = &http.Client{
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
}),
})),
}
 
// OpenArchive will open a zip.Reader from a local path or a remote object store URL
......
......@@ -26,6 +26,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
 
"gitlab.com/gitlab-org/labkit/correlation"
"gitlab.com/gitlab-org/labkit/tracing"
 
"gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/log"
......@@ -81,6 +82,8 @@ func main() {
startLogging(logConfig)
logger := log.NoContext()
 
tracing.Initialize(tracing.WithServiceName("gitlab-workhorse"))
backendURL, err := parseAuthBackend(*authBackend)
if err != nil {
logger.WithError(err).Fatal("invalid authBackend")
......
# grpc_opentracing
`import "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"`
* [Overview](#pkg-overview)
* [Imported Packages](#pkg-imports)
* [Index](#pkg-index)
## <a name="pkg-overview">Overview</a>
`grpc_opentracing` adds OpenTracing
### OpenTracing Interceptors
These are both client-side and server-side interceptors for OpenTracing. They are a provider-agnostic, with backends
such as Zipkin, or Google Stackdriver Trace.
For a service that sends out requests and receives requests, you *need* to use both, otherwise downstream requests will
not have the appropriate requests propagated.
All server-side spans are tagged with grpc_ctxtags information.
For more information see:
<a href="http://opentracing.io/documentation/">http://opentracing.io/documentation/</a>
<a href="https://github.com/opentracing/specification/blob/master/semantic_conventions.md">https://github.com/opentracing/specification/blob/master/semantic_conventions.md</a>
## <a name="pkg-imports">Imported Packages</a>
- [github.com/grpc-ecosystem/go-grpc-middleware](./../..)
- [github.com/grpc-ecosystem/go-grpc-middleware/tags](./../../tags)
- [github.com/grpc-ecosystem/go-grpc-middleware/util/metautils](./../../util/metautils)
- [github.com/opentracing/opentracing-go](https://godoc.org/github.com/opentracing/opentracing-go)
- [github.com/opentracing/opentracing-go/ext](https://godoc.org/github.com/opentracing/opentracing-go/ext)
- [github.com/opentracing/opentracing-go/log](https://godoc.org/github.com/opentracing/opentracing-go/log)
- [golang.org/x/net/context](https://godoc.org/golang.org/x/net/context)
- [google.golang.org/grpc](https://godoc.org/google.golang.org/grpc)
- [google.golang.org/grpc/grpclog](https://godoc.org/google.golang.org/grpc/grpclog)
- [google.golang.org/grpc/metadata](https://godoc.org/google.golang.org/grpc/metadata)
## <a name="pkg-index">Index</a>
* [Constants](#pkg-constants)
* [func ClientAddContextTags(ctx context.Context, tags opentracing.Tags) context.Context](#ClientAddContextTags)
* [func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor](#StreamClientInterceptor)
* [func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor](#StreamServerInterceptor)
* [func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor](#UnaryClientInterceptor)
* [func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor](#UnaryServerInterceptor)
* [type FilterFunc](#FilterFunc)
* [type Option](#Option)
* [func WithFilterFunc(f FilterFunc) Option](#WithFilterFunc)
* [func WithTracer(tracer opentracing.Tracer) Option](#WithTracer)
#### <a name="pkg-files">Package files</a>
[client_interceptors.go](./client_interceptors.go) [doc.go](./doc.go) [id_extract.go](./id_extract.go) [metadata.go](./metadata.go) [options.go](./options.go) [server_interceptors.go](./server_interceptors.go)
## <a name="pkg-constants">Constants</a>
``` go
const (
TagTraceId = "trace.traceid"
TagSpanId = "trace.spanid"
)
```
## <a name="ClientAddContextTags">func</a> [ClientAddContextTags](./client_interceptors.go#L105)
``` go
func ClientAddContextTags(ctx context.Context, tags opentracing.Tags) context.Context
```
ClientAddContextTags returns a context with specified opentracing tags, which
are used by UnaryClientInterceptor/StreamClientInterceptor when creating a
new span.
## <a name="StreamClientInterceptor">func</a> [StreamClientInterceptor](./client_interceptors.go#L35)
``` go
func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor
```
StreamClientInterceptor returns a new streaming client interceptor for OpenTracing.
## <a name="StreamServerInterceptor">func</a> [StreamServerInterceptor](./server_interceptors.go#L37)
``` go
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor
```
StreamServerInterceptor returns a new streaming server interceptor for OpenTracing.
## <a name="UnaryClientInterceptor">func</a> [UnaryClientInterceptor](./client_interceptors.go#L21)
``` go
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor
```
UnaryClientInterceptor returns a new unary client interceptor for OpenTracing.
## <a name="UnaryServerInterceptor">func</a> [UnaryServerInterceptor](./server_interceptors.go#L23)
``` go
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor
```
UnaryServerInterceptor returns a new unary server interceptor for OpenTracing.
## <a name="FilterFunc">type</a> [FilterFunc](./options.go#L22)
``` go
type FilterFunc func(ctx context.Context, fullMethodName string) bool
```
FilterFunc allows users to provide a function that filters out certain methods from being traced.
If it returns false, the given request will not be traced.
## <a name="Option">type</a> [Option](./options.go#L41)
``` go
type Option func(*options)
```
### <a name="WithFilterFunc">func</a> [WithFilterFunc](./options.go#L44)
``` go
func WithFilterFunc(f FilterFunc) Option
```
WithFilterFunc customizes the function used for deciding whether a given call is traced or not.
### <a name="WithTracer">func</a> [WithTracer](./options.go#L51)
``` go
func WithTracer(tracer opentracing.Tracer) Option
```
WithTracer sets a custom tracer to be used for this middleware, otherwise the opentracing.GlobalTracer is used.
- - -
Generated by [godoc2ghmd](https://github.com/GandalfUK/godoc2ghmd)
\ No newline at end of file
# grpc_opentracing
`import "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"`
* [Overview](#pkg-overview)
* [Imported Packages](#pkg-imports)
* [Index](#pkg-index)
## <a name="pkg-overview">Overview</a>
`grpc_opentracing` adds OpenTracing
### OpenTracing Interceptors
These are both client-side and server-side interceptors for OpenTracing. They are a provider-agnostic, with backends
such as Zipkin, or Google Stackdriver Trace.
For a service that sends out requests and receives requests, you *need* to use both, otherwise downstream requests will
not have the appropriate requests propagated.
All server-side spans are tagged with grpc_ctxtags information.
For more information see:
<a href="http://opentracing.io/documentation/">http://opentracing.io/documentation/</a>
<a href="https://github.com/opentracing/specification/blob/master/semantic_conventions.md">https://github.com/opentracing/specification/blob/master/semantic_conventions.md</a>
## <a name="pkg-imports">Imported Packages</a>
- [github.com/grpc-ecosystem/go-grpc-middleware](./../..)
- [github.com/grpc-ecosystem/go-grpc-middleware/tags](./../../tags)
- [github.com/grpc-ecosystem/go-grpc-middleware/util/metautils](./../../util/metautils)
- [github.com/opentracing/opentracing-go](https://godoc.org/github.com/opentracing/opentracing-go)
- [github.com/opentracing/opentracing-go/ext](https://godoc.org/github.com/opentracing/opentracing-go/ext)
- [github.com/opentracing/opentracing-go/log](https://godoc.org/github.com/opentracing/opentracing-go/log)
- [golang.org/x/net/context](https://godoc.org/golang.org/x/net/context)
- [google.golang.org/grpc](https://godoc.org/google.golang.org/grpc)
- [google.golang.org/grpc/grpclog](https://godoc.org/google.golang.org/grpc/grpclog)
- [google.golang.org/grpc/metadata](https://godoc.org/google.golang.org/grpc/metadata)
## <a name="pkg-index">Index</a>
* [Constants](#pkg-constants)
* [func ClientAddContextTags(ctx context.Context, tags opentracing.Tags) context.Context](#ClientAddContextTags)
* [func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor](#StreamClientInterceptor)
* [func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor](#StreamServerInterceptor)
* [func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor](#UnaryClientInterceptor)
* [func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor](#UnaryServerInterceptor)
* [type FilterFunc](#FilterFunc)
* [type Option](#Option)
* [func WithFilterFunc(f FilterFunc) Option](#WithFilterFunc)
* [func WithTracer(tracer opentracing.Tracer) Option](#WithTracer)
#### <a name="pkg-files">Package files</a>
[client_interceptors.go](./client_interceptors.go) [doc.go](./doc.go) [id_extract.go](./id_extract.go) [metadata.go](./metadata.go) [options.go](./options.go) [server_interceptors.go](./server_interceptors.go)
## <a name="pkg-constants">Constants</a>
``` go
const (
TagTraceId = "trace.traceid"
TagSpanId = "trace.spanid"
)
```
## <a name="ClientAddContextTags">func</a> [ClientAddContextTags](./client_interceptors.go#L105)
``` go
func ClientAddContextTags(ctx context.Context, tags opentracing.Tags) context.Context
```
ClientAddContextTags returns a context with specified opentracing tags, which
are used by UnaryClientInterceptor/StreamClientInterceptor when creating a
new span.
## <a name="StreamClientInterceptor">func</a> [StreamClientInterceptor](./client_interceptors.go#L35)
``` go
func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor
```
StreamClientInterceptor returns a new streaming client interceptor for OpenTracing.
## <a name="StreamServerInterceptor">func</a> [StreamServerInterceptor](./server_interceptors.go#L37)
``` go
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor
```
StreamServerInterceptor returns a new streaming server interceptor for OpenTracing.
## <a name="UnaryClientInterceptor">func</a> [UnaryClientInterceptor](./client_interceptors.go#L21)
``` go
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor
```
UnaryClientInterceptor returns a new unary client interceptor for OpenTracing.
## <a name="UnaryServerInterceptor">func</a> [UnaryServerInterceptor](./server_interceptors.go#L23)
``` go
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor
```
UnaryServerInterceptor returns a new unary server interceptor for OpenTracing.
## <a name="FilterFunc">type</a> [FilterFunc](./options.go#L22)
``` go
type FilterFunc func(ctx context.Context, fullMethodName string) bool
```
FilterFunc allows users to provide a function that filters out certain methods from being traced.
If it returns false, the given request will not be traced.
## <a name="Option">type</a> [Option](./options.go#L41)
``` go
type Option func(*options)
```
### <a name="WithFilterFunc">func</a> [WithFilterFunc](./options.go#L44)
``` go
func WithFilterFunc(f FilterFunc) Option
```
WithFilterFunc customizes the function used for deciding whether a given call is traced or not.
### <a name="WithTracer">func</a> [WithTracer](./options.go#L51)
``` go
func WithTracer(tracer opentracing.Tracer) Option
```
WithTracer sets a custom tracer to be used for this middleware, otherwise the opentracing.GlobalTracer is used.
- - -
Generated by [godoc2ghmd](https://github.com/GandalfUK/godoc2ghmd)
\ No newline at end of file
// Copyright 2017 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package grpc_opentracing
import (
"io"
"sync"
"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
)
// UnaryClientInterceptor returns a new unary client interceptor for OpenTracing.
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
o := evaluateOptions(opts)
return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) {
return invoker(parentCtx, method, req, reply, cc, opts...)
}
newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method)
err := invoker(newCtx, method, req, reply, cc, opts...)
finishClientSpan(clientSpan, err)
return err
}
}
// StreamClientInterceptor returns a new streaming client interceptor for OpenTracing.
func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
o := evaluateOptions(opts)
return func(parentCtx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) {
return streamer(parentCtx, desc, cc, method, opts...)
}
newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method)
clientStream, err := streamer(newCtx, desc, cc, method, opts...)
if err != nil {
finishClientSpan(clientSpan, err)
return nil, err
}
return &tracedClientStream{ClientStream: clientStream, clientSpan: clientSpan}, nil
}
}
// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
// a new ClientStream according to the retry policy.
type tracedClientStream struct {
grpc.ClientStream
mu sync.Mutex
alreadyFinished bool
clientSpan opentracing.Span
}
func (s *tracedClientStream) Header() (metadata.MD, error) {
h, err := s.ClientStream.Header()
if err != nil {
s.finishClientSpan(err)
}
return h, err
}
func (s *tracedClientStream) SendMsg(m interface{}) error {
err := s.ClientStream.SendMsg(m)
if err != nil {
s.finishClientSpan(err)
}
return err
}
func (s *tracedClientStream) CloseSend() error {
err := s.ClientStream.CloseSend()
if err != nil {
s.finishClientSpan(err)
}
return err
}
func (s *tracedClientStream) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m)
if err != nil {
s.finishClientSpan(err)
}
return err
}
func (s *tracedClientStream) finishClientSpan(err error) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.alreadyFinished {
finishClientSpan(s.clientSpan, err)
s.alreadyFinished = true
}
}
// ClientAddContextTags returns a context with specified opentracing tags, which
// are used by UnaryClientInterceptor/StreamClientInterceptor when creating a
// new span.
func ClientAddContextTags(ctx context.Context, tags opentracing.Tags) context.Context {
return context.WithValue(ctx, clientSpanTagKey{}, tags)
}
type clientSpanTagKey struct{}
func newClientSpanFromContext(ctx context.Context, tracer opentracing.Tracer, fullMethodName string) (context.Context, opentracing.Span) {
var parentSpanCtx opentracing.SpanContext
if parent := opentracing.SpanFromContext(ctx); parent != nil {
parentSpanCtx = parent.Context()
}
opts := []opentracing.StartSpanOption{
opentracing.ChildOf(parentSpanCtx),
ext.SpanKindRPCClient,
grpcTag,
}
if tagx := ctx.Value(clientSpanTagKey{}); tagx != nil {
if opt, ok := tagx.(opentracing.StartSpanOption); ok {
opts = append(opts, opt)
}