Skip to content
Snippets Groups Projects
Unverified Commit 2881d73e authored by Fabian Reinartz's avatar Fabian Reinartz Committed by GitHub
Browse files

Merge pull request #3362 from krasi-georgiev/discovery-refactoring

Decouple the discovery and refactor the retrieval package
parents c3f92387 587dec9e
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -43,6 +43,7 @@ import (
"github.com/prometheus/common/promlog"
promlogflag "github.com/prometheus/common/promlog/flag"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/retrieval"
Loading
Loading
@@ -230,27 +231,29 @@ func main() {
 
cfg.queryEngine.Logger = log.With(logger, "component", "query engine")
var (
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
targetManager = retrieval.NewTargetManager(fanoutStorage, log.With(logger, "component", "target manager"))
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ctx, cancelCtx = context.WithCancel(context.Background())
ctxWeb, cancelWeb = context.WithCancel(context.Background())
ctxRule = context.Background()
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
discoveryManager = discovery.NewManager(log.With(logger, "component", "discovery manager"))
scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine),
NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
})
)
 
ruleManager := rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine),
NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()),
Context: ctx,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
})
cfg.web.Context = ctx
cfg.web.Context = ctxWeb
cfg.web.TSDB = localStorage.Get
cfg.web.Storage = fanoutStorage
cfg.web.QueryEngine = queryEngine
cfg.web.TargetManager = targetManager
cfg.web.ScrapeManager = scrapeManager
cfg.web.RuleManager = ruleManager
cfg.web.Notifier = notifier
 
Loading
Loading
@@ -268,6 +271,7 @@ func main() {
cfg.web.Flags[f.Name] = f.Value.String()
}
 
// Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)
 
// Monitor outgoing connections on default transport with conntrack.
Loading
Loading
@@ -277,9 +281,10 @@ func main() {
 
reloaders := []func(cfg *config.Config) error{
remoteStorage.ApplyConfig,
targetManager.ApplyConfig,
webHandler.ApplyConfig,
notifier.ApplyConfig,
discoveryManager.ApplyConfig,
scrapeManager.ApplyConfig,
func(cfg *config.Config) error {
// Get all rule files matching the configuration oaths.
var files []string
Loading
Loading
@@ -326,6 +331,35 @@ func main() {
},
)
}
{
ctxDiscovery, cancelDiscovery := context.WithCancel(context.Background())
g.Add(
func() error {
err := discoveryManager.Run(ctxDiscovery)
level.Info(logger).Log("msg", "Discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping discovery manager...")
cancelDiscovery()
},
)
}
{
g.Add(
func() error {
err := scrapeManager.Run(discoveryManager.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)
}
{
// Make sure that sighup handler is registered with a redirect to the channel before the potentially
// long and synchronous tsdb init.
Loading
Loading
@@ -426,7 +460,7 @@ func main() {
{
g.Add(
func() error {
if err := webHandler.Run(ctx); err != nil {
if err := webHandler.Run(ctxWeb); err != nil {
return fmt.Errorf("Error starting web server: %s", err)
}
return nil
Loading
Loading
@@ -435,7 +469,7 @@ func main() {
// Keep this interrupt before the ruleManager.Stop().
// Shutting down the query engine before the rule manager will cause pending queries
// to be canceled and ensures a quick shutdown of the rule manager.
cancelCtx()
cancelWeb()
},
)
}
Loading
Loading
@@ -467,21 +501,6 @@ func main() {
},
)
}
{
// TODO(krasi) refactor targetManager.Run() to be blocking to avoid using an extra blocking channel.
cancel := make(chan struct{})
g.Add(
func() error {
targetManager.Run()
<-cancel
return nil
},
func(err error) {
targetManager.Stop()
close(cancel)
},
)
}
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
}
Loading
Loading
Loading
Loading
@@ -721,7 +721,7 @@ func (a *BasicAuth) UnmarshalYAML(unmarshal func(interface{}) error) error {
return checkOverflow(a.XXX, "basic_auth")
}
 
// TargetGroup is a set of targets with a common label set.
// TargetGroup is a set of targets with a common label set(production , test, staging etc.).
type TargetGroup struct {
// Targets is a list of targets identified by a label set. Each target is
// uniquely identifiable in the group by its address label.
Loading
Loading
Loading
Loading
@@ -133,9 +133,9 @@ the Prometheus server will be able to see them.
 
A Service Discovery (SD) mechanism has to discover targets and provide them to Prometheus. We expect similar targets to be grouped together, in the form of a [`TargetGroup`](https://godoc.org/github.com/prometheus/prometheus/config#TargetGroup). The SD mechanism sends the targets down to prometheus as list of `TargetGroups`.
 
An SD mechanism has to implement the `TargetProvider` Interface:
An SD mechanism has to implement the `Discoverer` Interface:
```go
type TargetProvider interface {
type Discoverer interface {
Run(ctx context.Context, up chan<- []*config.TargetGroup)
}
```
Loading
Loading
This diff is collapsed.
Loading
Loading
@@ -16,12 +16,13 @@ package discovery
import (
"context"
"fmt"
"sync"
"time"
"sort"
 
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/azure"
"github.com/prometheus/prometheus/discovery/consul"
"github.com/prometheus/prometheus/discovery/dns"
Loading
Loading
@@ -35,93 +36,234 @@ import (
"github.com/prometheus/prometheus/discovery/zookeeper"
)
 
// A TargetProvider provides information about target groups. It maintains a set
// of sources from which TargetGroups can originate. Whenever a target provider
// detects a potential change, it sends the TargetGroup through its provided channel.
// Discoverer provides information about target groups. It maintains a set
// of sources from which TargetGroups can originate. Whenever a discovery provider
// detects a potential change, it sends the TargetGroup through its channel.
//
// The TargetProvider does not have to guarantee that an actual change happened.
// Discoverer does not know if an actual change happened.
// It does guarantee that it sends the new TargetGroup whenever a change happens.
//
// TargetProviders should initially send a full set of all discoverable TargetGroups.
type TargetProvider interface {
// Run hands a channel to the target provider through which it can send
// Discoverers should initially send a full set of all discoverable TargetGroups.
type Discoverer interface {
// Run hands a channel to the discovery provider(consul,dns etc) through which it can send
// updated target groups.
// Must returns if the context gets canceled. It should not close the update
// channel on returning.
Run(ctx context.Context, up chan<- []*config.TargetGroup)
}
 
// ProvidersFromConfig returns all TargetProviders configured in cfg.
func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig, logger log.Logger) map[string]TargetProvider {
providers := map[string]TargetProvider{}
type poolKey struct {
setName string
provider string
}
// byProvider implements sort.Interface for []poolKey based on the provider field.
// Sorting is needed so that we can have predictable tests.
type byProvider []poolKey
func (a byProvider) Len() int { return len(a) }
func (a byProvider) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byProvider) Less(i, j int) bool { return a[i].provider < a[j].provider }
// NewManager is the Discovery Manager constructor
func NewManager(logger log.Logger) *Manager {
return &Manager{
logger: logger,
actionCh: make(chan func(context.Context)),
syncCh: make(chan map[string][]*config.TargetGroup),
targets: make(map[poolKey][]*config.TargetGroup),
discoverCancel: []context.CancelFunc{},
}
}
// Manager maintains a set of discovery providers and sends each update to a channel used by other packages.
type Manager struct {
logger log.Logger
actionCh chan func(context.Context)
discoverCancel []context.CancelFunc
targets map[poolKey][]*config.TargetGroup
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
syncCh chan map[string][]*config.TargetGroup
}
// Run starts the background processing
func (m *Manager) Run(ctx context.Context) error {
for {
select {
case f := <-m.actionCh:
f(ctx)
case <-ctx.Done():
m.cancelDiscoverers()
return ctx.Err()
}
}
}
// SyncCh returns a read only channel used by all Discoverers to send target updates.
func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup {
return m.syncCh
}
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg *config.Config) error {
err := make(chan error)
m.actionCh <- func(ctx context.Context) {
m.cancelDiscoverers()
for _, scfg := range cfg.ScrapeConfigs {
for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) {
m.startProvider(ctx, poolKey{setName: scfg.JobName, provider: provName}, prov)
}
}
close(err)
}
return <-err
}
func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) {
ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*config.TargetGroup)
m.discoverCancel = append(m.discoverCancel, cancel)
go worker.Run(ctx, updates)
go m.runProvider(ctx, poolKey, updates)
}
func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*config.TargetGroup) {
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return
}
m.addGroup(poolKey, tgs)
m.syncCh <- m.allGroups(poolKey)
}
}
}
func (m *Manager) cancelDiscoverers() {
for _, c := range m.discoverCancel {
c()
}
m.targets = make(map[poolKey][]*config.TargetGroup)
m.discoverCancel = nil
}
func (m *Manager) addGroup(poolKey poolKey, tg []*config.TargetGroup) {
done := make(chan struct{})
m.actionCh <- func(ctx context.Context) {
if tg != nil {
m.targets[poolKey] = tg
}
close(done)
}
<-done
}
func (m *Manager) allGroups(pk poolKey) map[string][]*config.TargetGroup {
tSets := make(chan map[string][]*config.TargetGroup)
m.actionCh <- func(ctx context.Context) {
// Sorting by the poolKey is needed so that we can have predictable tests.
var pKeys []poolKey
for pk := range m.targets {
pKeys = append(pKeys, pk)
}
sort.Sort(byProvider(pKeys))
tSetsAll := map[string][]*config.TargetGroup{}
for _, pk := range pKeys {
for _, tg := range m.targets[pk] {
if tg.Source != "" { // Don't add empty targets.
tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg)
}
}
}
tSets <- tSetsAll
}
return <-tSets
}
func (m *Manager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]Discoverer {
providers := map[string]Discoverer{}
 
app := func(mech string, i int, tp TargetProvider) {
app := func(mech string, i int, tp Discoverer) {
providers[fmt.Sprintf("%s/%d", mech, i)] = tp
}
 
for i, c := range cfg.DNSSDConfigs {
app("dns", i, dns.NewDiscovery(c, log.With(logger, "discovery", "dns")))
app("dns", i, dns.NewDiscovery(c, log.With(m.logger, "discovery", "dns")))
}
for i, c := range cfg.FileSDConfigs {
app("file", i, file.NewDiscovery(c, log.With(logger, "discovery", "file")))
app("file", i, file.NewDiscovery(c, log.With(m.logger, "discovery", "file")))
}
for i, c := range cfg.ConsulSDConfigs {
k, err := consul.NewDiscovery(c, log.With(logger, "discovery", "consul"))
k, err := consul.NewDiscovery(c, log.With(m.logger, "discovery", "consul"))
if err != nil {
level.Error(logger).Log("msg", "Cannot create Consul discovery", "err", err)
level.Error(m.logger).Log("msg", "Cannot create Consul discovery", "err", err)
continue
}
app("consul", i, k)
}
for i, c := range cfg.MarathonSDConfigs {
m, err := marathon.NewDiscovery(c, log.With(logger, "discovery", "marathon"))
t, err := marathon.NewDiscovery(c, log.With(m.logger, "discovery", "marathon"))
if err != nil {
level.Error(logger).Log("msg", "Cannot create Marathon discovery", "err", err)
level.Error(m.logger).Log("msg", "Cannot create Marathon discovery", "err", err)
continue
}
app("marathon", i, m)
app("marathon", i, t)
}
for i, c := range cfg.KubernetesSDConfigs {
k, err := kubernetes.New(log.With(logger, "discovery", "k8s"), c)
k, err := kubernetes.New(log.With(m.logger, "discovery", "k8s"), c)
if err != nil {
level.Error(logger).Log("msg", "Cannot create Kubernetes discovery", "err", err)
level.Error(m.logger).Log("msg", "Cannot create Kubernetes discovery", "err", err)
continue
}
app("kubernetes", i, k)
}
for i, c := range cfg.ServersetSDConfigs {
app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(logger, "discovery", "zookeeper")))
app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(m.logger, "discovery", "zookeeper")))
}
for i, c := range cfg.NerveSDConfigs {
app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(logger, "discovery", "nerve")))
app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(m.logger, "discovery", "nerve")))
}
for i, c := range cfg.EC2SDConfigs {
app("ec2", i, ec2.NewDiscovery(c, log.With(logger, "discovery", "ec2")))
app("ec2", i, ec2.NewDiscovery(c, log.With(m.logger, "discovery", "ec2")))
}
for i, c := range cfg.OpenstackSDConfigs {
openstackd, err := openstack.NewDiscovery(c, log.With(logger, "discovery", "openstack"))
openstackd, err := openstack.NewDiscovery(c, log.With(m.logger, "discovery", "openstack"))
if err != nil {
level.Error(logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err)
level.Error(m.logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err)
continue
}
app("openstack", i, openstackd)
}
 
for i, c := range cfg.GCESDConfigs {
gced, err := gce.NewDiscovery(c, log.With(logger, "discovery", "gce"))
gced, err := gce.NewDiscovery(c, log.With(m.logger, "discovery", "gce"))
if err != nil {
level.Error(logger).Log("msg", "Cannot initialize GCE discovery", "err", err)
level.Error(m.logger).Log("msg", "Cannot initialize GCE discovery", "err", err)
continue
}
app("gce", i, gced)
}
for i, c := range cfg.AzureSDConfigs {
app("azure", i, azure.NewDiscovery(c, log.With(logger, "discovery", "azure")))
app("azure", i, azure.NewDiscovery(c, log.With(m.logger, "discovery", "azure")))
}
for i, c := range cfg.TritonSDConfigs {
t, err := triton.New(log.With(logger, "discovery", "trition"), c)
t, err := triton.New(log.With(m.logger, "discovery", "trition"), c)
if err != nil {
level.Error(logger).Log("msg", "Cannot create Triton discovery", "err", err)
level.Error(m.logger).Log("msg", "Cannot create Triton discovery", "err", err)
continue
}
app("triton", i, t)
Loading
Loading
@@ -147,7 +289,7 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
return &StaticProvider{groups}
}
 
// Run implements the TargetProvider interface.
// Run implements the Worker interface.
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// We still have to consider that the consumer exits right away in which case
// the context will be canceled.
Loading
Loading
@@ -157,163 +299,3 @@ func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGro
}
close(ch)
}
// TargetSet handles multiple TargetProviders and sends a full overview of their
// discovered TargetGroups to a Syncer.
type TargetSet struct {
mtx sync.RWMutex
// Sets of targets by a source string that is unique across target providers.
tgroups map[string]*config.TargetGroup
syncer Syncer
syncCh chan struct{}
providerCh chan map[string]TargetProvider
cancelProviders func()
}
// Syncer receives updates complete sets of TargetGroups.
type Syncer interface {
Sync([]*config.TargetGroup)
}
// NewTargetSet returns a new target sending TargetGroups to the Syncer.
func NewTargetSet(s Syncer) *TargetSet {
return &TargetSet{
syncCh: make(chan struct{}, 1),
providerCh: make(chan map[string]TargetProvider),
syncer: s,
}
}
// Run starts the processing of target providers and their updates.
// It blocks until the context gets canceled.
func (ts *TargetSet) Run(ctx context.Context) {
Loop:
for {
// Throttle syncing to once per five seconds.
select {
case <-ctx.Done():
break Loop
case p := <-ts.providerCh:
ts.updateProviders(ctx, p)
case <-time.After(5 * time.Second):
}
select {
case <-ctx.Done():
break Loop
case <-ts.syncCh:
ts.sync()
case p := <-ts.providerCh:
ts.updateProviders(ctx, p)
}
}
}
func (ts *TargetSet) sync() {
ts.mtx.RLock()
var all []*config.TargetGroup
for _, tg := range ts.tgroups {
all = append(all, tg)
}
ts.mtx.RUnlock()
ts.syncer.Sync(all)
}
// UpdateProviders sets new target providers for the target set.
func (ts *TargetSet) UpdateProviders(p map[string]TargetProvider) {
ts.providerCh <- p
}
func (ts *TargetSet) updateProviders(ctx context.Context, providers map[string]TargetProvider) {
// Stop all previous target providers of the target set.
if ts.cancelProviders != nil {
ts.cancelProviders()
}
ctx, ts.cancelProviders = context.WithCancel(ctx)
var wg sync.WaitGroup
// (Re-)create a fresh tgroups map to not keep stale targets around. We
// will retrieve all targets below anyway, so cleaning up everything is
// safe and doesn't inflict any additional cost.
ts.mtx.Lock()
ts.tgroups = map[string]*config.TargetGroup{}
ts.mtx.Unlock()
for name, prov := range providers {
wg.Add(1)
updates := make(chan []*config.TargetGroup)
go prov.Run(ctx, updates)
go func(name string, prov TargetProvider) {
select {
case <-ctx.Done():
case initial, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
break
}
// First set of all targets the provider knows.
for _, tgroup := range initial {
ts.setTargetGroup(name, tgroup)
}
case <-time.After(5 * time.Second):
// Initial set didn't arrive. Act as if it was empty
// and wait for updates later on.
}
wg.Done()
// Start listening for further updates.
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return
}
for _, tg := range tgs {
ts.update(name, tg)
}
}
}
}(name, prov)
}
// We wait for a full initial set of target groups before releasing the mutex
// to ensure the initial sync is complete and there are no races with subsequent updates.
wg.Wait()
// Just signal that there are initial sets to sync now. Actual syncing must only
// happen in the runScraping loop.
select {
case ts.syncCh <- struct{}{}:
default:
}
}
// update handles a target group update from a target provider identified by the name.
func (ts *TargetSet) update(name string, tgroup *config.TargetGroup) {
ts.setTargetGroup(name, tgroup)
select {
case ts.syncCh <- struct{}{}:
default:
}
}
func (ts *TargetSet) setTargetGroup(name string, tg *config.TargetGroup) {
ts.mtx.Lock()
defer ts.mtx.Unlock()
if tg == nil {
return
}
ts.tgroups[name+"/"+tg.Source] = tg
}
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"context"
"fmt"
"reflect"
"strconv"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
yaml "gopkg.in/yaml.v2"
)
// TestDiscoveryManagerSyncCalls checks that the target updates are received in the expected order.
func TestDiscoveryManagerSyncCalls(t *testing.T) {
// The order by which the updates are send is detirmened by the interval passed to the mock discovery adapter
// Final targets array is ordered alphabetically by the name of the discoverer.
// For example discoverer "A" with targets "t2,t3" and discoverer "B" with targets "t1,t2" will result in "t2,t3,t1,t2" after the merge.
testCases := []struct {
title string
updates map[string][]update
expectedTargets [][]*config.TargetGroup
}{
{
title: "Single TP no updates",
updates: map[string][]update{
"tp1": {},
},
expectedTargets: nil,
},
{
title: "Multips TPs no updates",
updates: map[string][]update{
"tp1": {},
"tp2": {},
"tp3": {},
},
expectedTargets: nil,
},
{
title: "Single TP empty initials",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{},
interval: 5,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{},
},
},
{
title: "Multiple TPs empty initials",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{},
interval: 5,
},
},
"tp2": {
{
targetGroups: []config.TargetGroup{},
interval: 200,
},
},
"tp3": {
{
targetGroups: []config.TargetGroup{},
interval: 100,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{},
{},
{},
},
},
{
title: "Single TP initials only",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
}},
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
},
},
{
title: "Multiple TPs initials only",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
},
},
"tp2": {
{
targetGroups: []config.TargetGroup{
{
Source: "tp2-initial1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
},
interval: 10,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
}, {
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp2-initial1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
},
},
},
{
title: "Single TP initials followed by empty updates",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 0,
},
{
targetGroups: []config.TargetGroup{},
interval: 10,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{},
},
},
{
title: "Single TP initials and new groups",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 0,
},
{
targetGroups: []config.TargetGroup{
{
Source: "update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
interval: 10,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
},
},
{
title: "Multiple TPs initials and new groups",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 10,
},
{
targetGroups: []config.TargetGroup{
{
Source: "tp1-update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
interval: 500,
},
},
"tp2": {
{
targetGroups: []config.TargetGroup{
{
Source: "tp2-initial1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2-initial2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
interval: 100,
},
{
targetGroups: []config.TargetGroup{
{
Source: "tp2-update1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
interval: 10,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp2-initial1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2-initial2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp2-update1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
{
{
Source: "tp1-update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp2-update1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
},
},
{
title: "One tp initials arrive after other tp updates.",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 10,
},
{
targetGroups: []config.TargetGroup{
{
Source: "tp1-update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
interval: 150,
},
},
"tp2": {
{
targetGroups: []config.TargetGroup{
{
Source: "tp2-initial1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2-initial2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
interval: 200,
},
{
targetGroups: []config.TargetGroup{
{
Source: "tp2-update1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
interval: 100,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "tp1-update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
{
{
Source: "tp1-update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp2-initial1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2-initial2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
{
{
Source: "tp1-update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp2-update1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
},
},
{
title: "Single TP Single provider empty update in between",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 30,
},
{
targetGroups: []config.TargetGroup{},
interval: 10,
},
{
targetGroups: []config.TargetGroup{
{
Source: "update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
interval: 300,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{},
{
{
Source: "update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
},
},
}
for testIndex, testCase := range testCases {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(nil)
go discoveryManager.Run(ctx)
var totalUpdatesCount int
for tpName, update := range testCase.updates {
provider := newMockDiscoveryProvider(update)
discoveryManager.startProvider(ctx, poolKey{setName: strconv.Itoa(testIndex), provider: tpName}, provider)
if len(update) > 0 {
totalUpdatesCount = totalUpdatesCount + len(update)
}
}
Loop:
for x := 0; x < totalUpdatesCount; x++ {
select {
case <-time.After(10 * time.Second):
t.Errorf("%v. %q: no update arrived within the timeout limit", x, testCase.title)
break Loop
case tsetMap := <-discoveryManager.SyncCh():
for _, received := range tsetMap {
if !reflect.DeepEqual(received, testCase.expectedTargets[x]) {
var receivedFormated string
for _, receivedTargets := range received {
receivedFormated = receivedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets)
}
var expectedFormated string
for _, expectedTargets := range testCase.expectedTargets[x] {
expectedFormated = expectedFormated + expectedTargets.Source + ":" + fmt.Sprint(expectedTargets.Targets)
}
t.Errorf("%v. %v: \ntargets mismatch \nreceived: %v \nexpected: %v",
x, testCase.title,
receivedFormated,
expectedFormated)
}
}
}
}
}
}
func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
verifyPresence := func(tSets map[poolKey][]*config.TargetGroup, poolKey poolKey, label string, present bool) {
if _, ok := tSets[poolKey]; !ok {
t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets)
return
}
match := false
var mergedTargets string
for _, targetGroup := range tSets[poolKey] {
for _, l := range targetGroup.Targets {
mergedTargets = mergedTargets + " " + l.String()
if l.String() == label {
match = true
}
}
}
if match != present {
msg := ""
if !present {
msg = "not"
}
t.Fatalf("'%s' should %s be present in Targets labels: %v", label, msg, mergedTargets)
}
}
cfg := &config.Config{}
sOne := `
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ["foo:9090"]
- targets: ["bar:9090"]
`
if err := yaml.Unmarshal([]byte(sOne), cfg); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(nil)
go discoveryManager.Run(ctx)
discoveryManager.ApplyConfig(cfg)
_ = <-discoveryManager.SyncCh()
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true)
sTwo := `
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ["foo:9090"]
`
if err := yaml.Unmarshal([]byte(sTwo), cfg); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err)
}
discoveryManager.ApplyConfig(cfg)
_ = <-discoveryManager.SyncCh()
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false)
}
type update struct {
targetGroups []config.TargetGroup
interval time.Duration
}
type mockdiscoveryProvider struct {
updates []update
up chan<- []*config.TargetGroup
}
func newMockDiscoveryProvider(updates []update) mockdiscoveryProvider {
tp := mockdiscoveryProvider{
updates: updates,
}
return tp
}
func (tp mockdiscoveryProvider) Run(ctx context.Context, up chan<- []*config.TargetGroup) {
tp.up = up
tp.sendUpdates()
}
func (tp mockdiscoveryProvider) sendUpdates() {
for _, update := range tp.updates {
time.Sleep(update.interval * time.Millisecond)
tgs := make([]*config.TargetGroup, len(update.targetGroups))
for i := range update.targetGroups {
tgs[i] = &update.targetGroups[i]
}
tp.up <- tgs
}
}
Loading
Loading
@@ -35,7 +35,6 @@ import (
"golang.org/x/net/context/ctxhttp"
 
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/util/httputil"
Loading
Loading
@@ -248,7 +247,6 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs
 
amSets := []*alertmanagerSet{}
ctx, cancel := context.WithCancel(n.ctx)
 
for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs {
ams, err := newAlertmanagerSet(cfg, n.logger)
Loading
Loading
@@ -261,17 +259,6 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
amSets = append(amSets, ams)
}
 
// After all sets were created successfully, start them and cancel the
// old ones.
for _, ams := range amSets {
go ams.ts.Run(ctx)
ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig, n.logger))
}
if n.cancelDiscovery != nil {
n.cancelDiscovery()
}
n.cancelDiscovery = cancel
n.alertmanagers = amSets
 
return nil
Loading
Loading
@@ -504,7 +491,6 @@ func (a alertmanagerLabels) url() *url.URL {
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service
// discovery definitions that have a common configuration on how alerts should be sent.
type alertmanagerSet struct {
ts *discovery.TargetSet
cfg *config.AlertmanagerConfig
client *http.Client
 
Loading
Loading
@@ -525,8 +511,6 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*ale
cfg: cfg,
logger: logger,
}
s.ts = discovery.NewTargetSet(s)
return s, nil
}
 
Loading
Loading
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"fmt"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
)
// Appendable returns an Appender.
type Appendable interface {
Appender() (storage.Appender, error)
}
// NewScrapeManager is the ScrapeManager constructor
func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager {
return &ScrapeManager{
append: app,
logger: logger,
actionCh: make(chan func()),
scrapeConfigs: make(map[string]*config.ScrapeConfig),
scrapePools: make(map[string]*scrapePool),
graceShut: make(chan struct{}),
}
}
// ScrapeManager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups form the discovery manager.
type ScrapeManager struct {
logger log.Logger
append Appendable
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
actionCh chan func()
graceShut chan struct{}
}
// Run starts background processing to handle target updates and reload the scraping loops.
func (m *ScrapeManager) Run(tsets <-chan map[string][]*config.TargetGroup) error {
level.Info(m.logger).Log("msg", "Starting scrape manager...")
for {
select {
case f := <-m.actionCh:
f()
case ts := <-tsets:
if err := m.reload(ts); err != nil {
level.Error(m.logger).Log("msg", "error reloading the scrape manager", "err", err)
}
case <-m.graceShut:
return nil
}
}
}
// Stop cancels all running scrape pools and blocks until all have exited.
func (m *ScrapeManager) Stop() {
for _, sp := range m.scrapePools {
sp.stop()
}
close(m.graceShut)
}
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error {
done := make(chan struct{})
m.actionCh <- func() {
for _, scfg := range cfg.ScrapeConfigs {
m.scrapeConfigs[scfg.JobName] = scfg
}
close(done)
}
<-done
return nil
}
// TargetMap returns map of active and dropped targets and their corresponding scrape config job name.
func (m *ScrapeManager) TargetMap() map[string][]*Target {
targetsMap := make(chan map[string][]*Target)
m.actionCh <- func() {
targets := make(map[string][]*Target)
for jobName, sp := range m.scrapePools {
sp.mtx.RLock()
for _, t := range sp.targets {
targets[jobName] = append(targets[jobName], t)
}
targets[jobName] = append(targets[jobName], sp.droppedTargets...)
sp.mtx.RUnlock()
}
targetsMap <- targets
}
return <-targetsMap
}
// Targets returns the targets currently being scraped.
func (m *ScrapeManager) Targets() []*Target {
targets := make(chan []*Target)
m.actionCh <- func() {
var t []*Target
for _, p := range m.scrapePools {
p.mtx.RLock()
for _, tt := range p.targets {
t = append(t, tt)
}
p.mtx.RUnlock()
}
targets <- t
}
return <-targets
}
func (m *ScrapeManager) reload(t map[string][]*config.TargetGroup) error {
for tsetName, tgroup := range t {
scrapeConfig, ok := m.scrapeConfigs[tsetName]
if !ok {
return fmt.Errorf("target set '%v' doesn't have valid config", tsetName)
}
// Scrape pool doesn't exist so start a new one.
existing, ok := m.scrapePools[tsetName]
if !ok {
sp := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
m.scrapePools[tsetName] = sp
sp.Sync(tgroup)
} else {
existing.Sync(tgroup)
}
// Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config.
jobs := make(map[string]struct{})
for k := range m.scrapeConfigs {
jobs[k] = struct{}{}
}
for name, sp := range m.scrapePools {
if _, ok := jobs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
}
}
}
return nil
}
File moved
Loading
Loading
@@ -117,7 +117,6 @@ func init() {
type scrapePool struct {
appendable Appendable
logger log.Logger
ctx context.Context
 
mtx sync.RWMutex
config *config.ScrapeConfig
Loading
Loading
@@ -127,6 +126,7 @@ type scrapePool struct {
targets map[uint64]*Target
droppedTargets []*Target
loops map[uint64]loop
cancel context.CancelFunc
 
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(*Target, scraper) loop
Loading
Loading
@@ -136,7 +136,7 @@ const maxAheadTime = 10 * time.Minute
 
type labelsMutator func(labels.Labels) labels.Labels
 
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
if logger == nil {
logger = log.NewNopLogger()
}
Loading
Loading
@@ -149,17 +149,20 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
 
buffers := pool.NewBytesPool(163, 100e6, 3)
 
ctx, cancel := context.WithCancel(context.Background())
sp := &scrapePool{
cancel: cancel,
appendable: app,
config: cfg,
ctx: ctx,
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
logger: logger,
}
sp.newLoop = func(t *Target, s scraper) loop {
return newScrapeLoop(sp.ctx, s,
return newScrapeLoop(
ctx,
s,
log.With(logger, "target", t),
buffers,
func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) },
Loading
Loading
@@ -173,6 +176,7 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
 
// stop terminates all scrape loops and returns after they all terminated.
func (sp *scrapePool) stop() {
sp.cancel()
var wg sync.WaitGroup
 
sp.mtx.Lock()
Loading
Loading
@@ -189,7 +193,6 @@ func (sp *scrapePool) stop() {
delete(sp.loops, fp)
delete(sp.targets, fp)
}
wg.Wait()
}
 
Loading
Loading
@@ -582,8 +585,7 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
}
}
 
func newScrapeLoop(
ctx context.Context,
func newScrapeLoop(ctx context.Context,
sc scraper,
l log.Logger,
buffers *pool.BytesPool,
Loading
Loading
@@ -605,8 +607,8 @@ func newScrapeLoop(
sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator,
stopped: make(chan struct{}),
ctx: ctx,
l: l,
ctx: ctx,
}
sl.scrapeCtx, sl.cancel = context.WithCancel(ctx)
 
Loading
Loading
Loading
Loading
@@ -44,7 +44,7 @@ func TestNewScrapePool(t *testing.T) {
var (
app = &nopAppendable{}
cfg = &config.ScrapeConfig{}
sp = newScrapePool(context.Background(), cfg, app, nil)
sp = newScrapePool(cfg, app, nil)
)
 
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
Loading
Loading
@@ -75,6 +75,7 @@ func TestScrapePoolStop(t *testing.T) {
sp := &scrapePool{
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
cancel: func() {},
}
var mtx sync.Mutex
stopped := map[uint64]bool{}
Loading
Loading
@@ -231,7 +232,7 @@ func TestScrapePoolReload(t *testing.T) {
func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{}
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app, nil)
sp := newScrapePool(cfg, app, nil)
 
wrapped := sp.appender()
 
Loading
Loading
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"context"
"sync"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/storage"
)
// TargetManager maintains a set of targets, starts and stops their scraping and
// creates the new targets based on the target groups it receives from various
// target providers.
type TargetManager struct {
append Appendable
scrapeConfigs []*config.ScrapeConfig
mtx sync.RWMutex
ctx context.Context
cancel func()
wg sync.WaitGroup
// Set of unqiue targets by scrape configuration.
targetSets map[string]*targetSet
logger log.Logger
starting chan struct{}
}
type targetSet struct {
ctx context.Context
cancel func()
ts *discovery.TargetSet
sp *scrapePool
}
// Appendable returns an Appender.
type Appendable interface {
Appender() (storage.Appender, error)
}
// NewTargetManager creates a new TargetManager.
func NewTargetManager(app Appendable, logger log.Logger) *TargetManager {
return &TargetManager{
append: app,
targetSets: map[string]*targetSet{},
logger: logger,
starting: make(chan struct{}),
}
}
// Run starts background processing to handle target updates.
func (tm *TargetManager) Run() {
level.Info(tm.logger).Log("msg", "Starting target manager...")
tm.mtx.Lock()
tm.ctx, tm.cancel = context.WithCancel(context.Background())
tm.reload()
tm.mtx.Unlock()
close(tm.starting)
tm.wg.Wait()
}
// Stop all background processing.
func (tm *TargetManager) Stop() {
<-tm.starting
level.Info(tm.logger).Log("msg", "Stopping target manager...")
tm.mtx.Lock()
// Cancel the base context, this will cause all target providers to shut down
// and all in-flight scrapes to abort immmediately.
// Started inserts will be finished before terminating.
tm.cancel()
tm.mtx.Unlock()
// Wait for all scrape inserts to complete.
tm.wg.Wait()
level.Info(tm.logger).Log("msg", "Target manager stopped")
}
func (tm *TargetManager) reload() {
jobs := map[string]struct{}{}
// Start new target sets and update existing ones.
for _, scfg := range tm.scrapeConfigs {
jobs[scfg.JobName] = struct{}{}
ts, ok := tm.targetSets[scfg.JobName]
if !ok {
ctx, cancel := context.WithCancel(tm.ctx)
ts = &targetSet{
ctx: ctx,
cancel: cancel,
sp: newScrapePool(ctx, scfg, tm.append, log.With(tm.logger, "scrape_pool", scfg.JobName)),
}
ts.ts = discovery.NewTargetSet(ts.sp)
tm.targetSets[scfg.JobName] = ts
tm.wg.Add(1)
go func(ts *targetSet) {
// Run target set, which blocks until its context is canceled.
// Gracefully shut down pending scrapes in the scrape pool afterwards.
ts.ts.Run(ctx)
ts.sp.stop()
tm.wg.Done()
}(ts)
} else {
ts.sp.reload(scfg)
}
ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig, tm.logger))
}
// Remove old target sets. Waiting for scrape pools to complete pending
// scrape inserts is already guaranteed by the goroutine that started the target set.
for name, ts := range tm.targetSets {
if _, ok := jobs[name]; !ok {
ts.cancel()
delete(tm.targetSets, name)
}
}
}
// TargetMap returns map of active and dropped targets and their corresponding scrape config job name.
func (tm *TargetManager) TargetMap() map[string][]*Target {
tm.mtx.RLock()
defer tm.mtx.RUnlock()
targetsMap := make(map[string][]*Target)
for jobName, ps := range tm.targetSets {
ps.sp.mtx.RLock()
for _, t := range ps.sp.targets {
targetsMap[jobName] = append(targetsMap[jobName], t)
}
targetsMap[jobName] = append(targetsMap[jobName], ps.sp.droppedTargets...)
ps.sp.mtx.RUnlock()
}
return targetsMap
}
// Targets returns the targets currently being scraped.
func (tm *TargetManager) Targets() []*Target {
tm.mtx.RLock()
defer tm.mtx.RUnlock()
targets := []*Target{}
for _, ps := range tm.targetSets {
ps.sp.mtx.RLock()
for _, t := range ps.sp.targets {
targets = append(targets, t)
}
ps.sp.mtx.RUnlock()
}
return targets
}
// ApplyConfig resets the manager's target providers and job configurations as defined
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
func (tm *TargetManager) ApplyConfig(cfg *config.Config) error {
tm.mtx.Lock()
defer tm.mtx.Unlock()
tm.scrapeConfigs = cfg.ScrapeConfigs
if tm.ctx != nil {
tm.reload()
}
return nil
}
Loading
Loading
@@ -71,7 +71,7 @@ var localhostRepresentations = []string{"127.0.0.1", "localhost"}
type Handler struct {
logger log.Logger
 
targetManager *retrieval.TargetManager
scrapeManager *retrieval.ScrapeManager
ruleManager *rules.Manager
queryEngine *promql.Engine
context context.Context
Loading
Loading
@@ -125,7 +125,7 @@ type Options struct {
TSDB func() *tsdb.DB
Storage storage.Storage
QueryEngine *promql.Engine
TargetManager *retrieval.TargetManager
ScrapeManager *retrieval.ScrapeManager
RuleManager *rules.Manager
Notifier *notifier.Notifier
Version *PrometheusVersion
Loading
Loading
@@ -169,7 +169,7 @@ func New(logger log.Logger, o *Options) *Handler {
flagsMap: o.Flags,
 
context: o.Context,
targetManager: o.TargetManager,
scrapeManager: o.ScrapeManager,
ruleManager: o.RuleManager,
queryEngine: o.QueryEngine,
tsdb: o.TSDB,
Loading
Loading
@@ -181,7 +181,7 @@ func New(logger log.Logger, o *Options) *Handler {
ready: 0,
}
 
h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.targetManager, h.notifier,
h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.scrapeManager, h.notifier,
func() config.Config {
h.mtx.RLock()
defer h.mtx.RUnlock()
Loading
Loading
@@ -405,7 +405,7 @@ func (h *Handler) Run(ctx context.Context) error {
h.options.QueryEngine,
h.options.Storage.Querier,
func() []*retrieval.Target {
return h.options.TargetManager.Targets()
return h.options.ScrapeManager.Targets()
},
func() []*url.URL {
return h.options.Notifier.Alertmanagers()
Loading
Loading
@@ -587,7 +587,7 @@ func (h *Handler) rules(w http.ResponseWriter, r *http.Request) {
 
func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) {
var index []string
targets := h.targetManager.TargetMap()
targets := h.scrapeManager.TargetMap()
for job := range targets {
index = append(index, job)
}
Loading
Loading
@@ -605,7 +605,7 @@ func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) {
func (h *Handler) targets(w http.ResponseWriter, r *http.Request) {
// Bucket targets by job label
tps := map[string][]*retrieval.Target{}
for _, t := range h.targetManager.Targets() {
for _, t := range h.scrapeManager.Targets() {
job := t.Labels().Get(model.JobLabel)
tps[job] = append(tps[job], t)
}
Loading
Loading
Loading
Loading
@@ -96,7 +96,7 @@ func TestReadyAndHealthy(t *testing.T) {
Context: nil,
Storage: &tsdb.ReadyStorage{},
QueryEngine: nil,
TargetManager: nil,
ScrapeManager: nil,
RuleManager: nil,
Notifier: nil,
RoutePrefix: "/",
Loading
Loading
@@ -187,7 +187,7 @@ func TestRoutePrefix(t *testing.T) {
Context: nil,
Storage: &tsdb.ReadyStorage{},
QueryEngine: nil,
TargetManager: nil,
ScrapeManager: nil,
RuleManager: nil,
Notifier: nil,
RoutePrefix: "/prometheus",
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment