Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • gitlab-org/build/omnibus-mirror/prometheus
1 result
Show changes
Commits on Source (18)
Loading
Loading
@@ -43,6 +43,7 @@ import (
"github.com/prometheus/common/promlog"
promlogflag "github.com/prometheus/common/promlog/flag"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/retrieval"
Loading
Loading
@@ -230,27 +231,29 @@ func main() {
 
cfg.queryEngine.Logger = log.With(logger, "component", "query engine")
var (
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
targetManager = retrieval.NewTargetManager(fanoutStorage, log.With(logger, "component", "target manager"))
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ctx, cancelCtx = context.WithCancel(context.Background())
ctxWeb, cancelWeb = context.WithCancel(context.Background())
ctxRule = context.Background()
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
discoveryManager = discovery.NewManager(log.With(logger, "component", "discovery manager"))
scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine),
NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
})
)
 
ruleManager := rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine),
NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()),
Context: ctx,
ExternalURL: cfg.web.ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
})
cfg.web.Context = ctx
cfg.web.Context = ctxWeb
cfg.web.TSDB = localStorage.Get
cfg.web.Storage = fanoutStorage
cfg.web.QueryEngine = queryEngine
cfg.web.TargetManager = targetManager
cfg.web.ScrapeManager = scrapeManager
cfg.web.RuleManager = ruleManager
cfg.web.Notifier = notifier
 
Loading
Loading
@@ -268,6 +271,7 @@ func main() {
cfg.web.Flags[f.Name] = f.Value.String()
}
 
// Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)
 
// Monitor outgoing connections on default transport with conntrack.
Loading
Loading
@@ -277,9 +281,10 @@ func main() {
 
reloaders := []func(cfg *config.Config) error{
remoteStorage.ApplyConfig,
targetManager.ApplyConfig,
webHandler.ApplyConfig,
notifier.ApplyConfig,
discoveryManager.ApplyConfig,
scrapeManager.ApplyConfig,
func(cfg *config.Config) error {
// Get all rule files matching the configuration oaths.
var files []string
Loading
Loading
@@ -326,6 +331,35 @@ func main() {
},
)
}
{
ctxDiscovery, cancelDiscovery := context.WithCancel(context.Background())
g.Add(
func() error {
err := discoveryManager.Run(ctxDiscovery)
level.Info(logger).Log("msg", "Discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping discovery manager...")
cancelDiscovery()
},
)
}
{
g.Add(
func() error {
err := scrapeManager.Run(discoveryManager.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)
}
{
// Make sure that sighup handler is registered with a redirect to the channel before the potentially
// long and synchronous tsdb init.
Loading
Loading
@@ -426,7 +460,7 @@ func main() {
{
g.Add(
func() error {
if err := webHandler.Run(ctx); err != nil {
if err := webHandler.Run(ctxWeb); err != nil {
return fmt.Errorf("Error starting web server: %s", err)
}
return nil
Loading
Loading
@@ -435,7 +469,7 @@ func main() {
// Keep this interrupt before the ruleManager.Stop().
// Shutting down the query engine before the rule manager will cause pending queries
// to be canceled and ensures a quick shutdown of the rule manager.
cancelCtx()
cancelWeb()
},
)
}
Loading
Loading
@@ -467,21 +501,6 @@ func main() {
},
)
}
{
// TODO(krasi) refactor targetManager.Run() to be blocking to avoid using an extra blocking channel.
cancel := make(chan struct{})
g.Add(
func() error {
targetManager.Run()
<-cancel
return nil
},
func(err error) {
targetManager.Stop()
close(cancel)
},
)
}
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
}
Loading
Loading
Loading
Loading
@@ -14,11 +14,101 @@
package main
 
import (
"flag"
"fmt"
"net/http"
"os"
"os/exec"
"path/filepath"
"testing"
"time"
 
"github.com/prometheus/prometheus/util/testutil"
)
 
var promPath string
var promConfig = filepath.Join("..", "..", "documentation", "examples", "prometheus.yml")
var promData = filepath.Join(os.TempDir(), "data")
func TestMain(m *testing.M) {
flag.Parse()
if testing.Short() {
os.Exit(m.Run())
}
var err error
promPath, err = os.Getwd()
if err != nil {
fmt.Printf("can't get current dir :%s \n", err)
os.Exit(1)
}
promPath = filepath.Join(promPath, "prometheus")
build := exec.Command("go", "build", "-o", promPath)
output, err := build.CombinedOutput()
if err != nil {
fmt.Printf("compilation error :%s \n", output)
os.Exit(1)
}
exitCode := m.Run()
os.Remove(promPath)
os.RemoveAll(promData)
os.Exit(exitCode)
}
// As soon as prometheus starts responding to http request should be able to accept Interrupt signals for a gracefull shutdown.
func TestStartupInterrupt(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
prom := exec.Command(promPath, "--config.file="+promConfig, "--storage.tsdb.path="+promData)
err := prom.Start()
if err != nil {
t.Errorf("execution error: %v", err)
return
}
done := make(chan error)
go func() {
done <- prom.Wait()
}()
var startedOk bool
var stoppedOk bool
var stoppedErr error
Loop:
for x := 0; x < 10; x++ {
// error=nil means prometheus has started so can send the interrupt signal and wait for the grace shutdown.
if _, err := http.Get("http://localhost:9090/graph"); err == nil {
startedOk = true
prom.Process.Signal(os.Interrupt)
select {
case stoppedErr = <-done:
stoppedOk = true
break Loop
case <-time.After(10 * time.Second):
}
break Loop
}
time.Sleep(500 * time.Millisecond)
}
if !startedOk {
t.Errorf("prometheus didn't start in the specified timeout")
return
}
if err := prom.Process.Kill(); err == nil && !stoppedOk {
t.Errorf("prometheus didn't shutdown gracefully after sending the Interrupt signal")
} else if stoppedErr != nil {
t.Errorf("prometheus exited with an error:%v", stoppedErr)
}
}
func TestComputeExternalURL(t *testing.T) {
tests := []struct {
input string
Loading
Loading
Loading
Loading
@@ -721,7 +721,7 @@ func (a *BasicAuth) UnmarshalYAML(unmarshal func(interface{}) error) error {
return checkOverflow(a.XXX, "basic_auth")
}
 
// TargetGroup is a set of targets with a common label set.
// TargetGroup is a set of targets with a common label set(production , test, staging etc.).
type TargetGroup struct {
// Targets is a list of targets identified by a label set. Each target is
// uniquely identifiable in the group by its address label.
Loading
Loading
Loading
Loading
@@ -133,9 +133,9 @@ the Prometheus server will be able to see them.
 
A Service Discovery (SD) mechanism has to discover targets and provide them to Prometheus. We expect similar targets to be grouped together, in the form of a [`TargetGroup`](https://godoc.org/github.com/prometheus/prometheus/config#TargetGroup). The SD mechanism sends the targets down to prometheus as list of `TargetGroups`.
 
An SD mechanism has to implement the `TargetProvider` Interface:
An SD mechanism has to implement the `Discoverer` Interface:
```go
type TargetProvider interface {
type Discoverer interface {
Run(ctx context.Context, up chan<- []*config.TargetGroup)
}
```
Loading
Loading
This diff is collapsed.
Loading
Loading
@@ -16,12 +16,13 @@ package discovery
import (
"context"
"fmt"
"sync"
"time"
"sort"
 
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/azure"
"github.com/prometheus/prometheus/discovery/consul"
"github.com/prometheus/prometheus/discovery/dns"
Loading
Loading
@@ -35,93 +36,234 @@ import (
"github.com/prometheus/prometheus/discovery/zookeeper"
)
 
// A TargetProvider provides information about target groups. It maintains a set
// of sources from which TargetGroups can originate. Whenever a target provider
// detects a potential change, it sends the TargetGroup through its provided channel.
// Discoverer provides information about target groups. It maintains a set
// of sources from which TargetGroups can originate. Whenever a discovery provider
// detects a potential change, it sends the TargetGroup through its channel.
//
// The TargetProvider does not have to guarantee that an actual change happened.
// Discoverer does not know if an actual change happened.
// It does guarantee that it sends the new TargetGroup whenever a change happens.
//
// TargetProviders should initially send a full set of all discoverable TargetGroups.
type TargetProvider interface {
// Run hands a channel to the target provider through which it can send
// Discoverers should initially send a full set of all discoverable TargetGroups.
type Discoverer interface {
// Run hands a channel to the discovery provider(consul,dns etc) through which it can send
// updated target groups.
// Must returns if the context gets canceled. It should not close the update
// channel on returning.
Run(ctx context.Context, up chan<- []*config.TargetGroup)
}
 
// ProvidersFromConfig returns all TargetProviders configured in cfg.
func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig, logger log.Logger) map[string]TargetProvider {
providers := map[string]TargetProvider{}
type poolKey struct {
setName string
provider string
}
// byProvider implements sort.Interface for []poolKey based on the provider field.
// Sorting is needed so that we can have predictable tests.
type byProvider []poolKey
func (a byProvider) Len() int { return len(a) }
func (a byProvider) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byProvider) Less(i, j int) bool { return a[i].provider < a[j].provider }
// NewManager is the Discovery Manager constructor
func NewManager(logger log.Logger) *Manager {
return &Manager{
logger: logger,
actionCh: make(chan func(context.Context)),
syncCh: make(chan map[string][]*config.TargetGroup),
targets: make(map[poolKey][]*config.TargetGroup),
discoverCancel: []context.CancelFunc{},
}
}
// Manager maintains a set of discovery providers and sends each update to a channel used by other packages.
type Manager struct {
logger log.Logger
actionCh chan func(context.Context)
discoverCancel []context.CancelFunc
targets map[poolKey][]*config.TargetGroup
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
syncCh chan map[string][]*config.TargetGroup
}
// Run starts the background processing
func (m *Manager) Run(ctx context.Context) error {
for {
select {
case f := <-m.actionCh:
f(ctx)
case <-ctx.Done():
m.cancelDiscoverers()
return ctx.Err()
}
}
}
// SyncCh returns a read only channel used by all Discoverers to send target updates.
func (m *Manager) SyncCh() <-chan map[string][]*config.TargetGroup {
return m.syncCh
}
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg *config.Config) error {
err := make(chan error)
m.actionCh <- func(ctx context.Context) {
m.cancelDiscoverers()
for _, scfg := range cfg.ScrapeConfigs {
for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) {
m.startProvider(ctx, poolKey{setName: scfg.JobName, provider: provName}, prov)
}
}
close(err)
}
return <-err
}
func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) {
ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*config.TargetGroup)
m.discoverCancel = append(m.discoverCancel, cancel)
go worker.Run(ctx, updates)
go m.runProvider(ctx, poolKey, updates)
}
func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*config.TargetGroup) {
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return
}
m.addGroup(poolKey, tgs)
m.syncCh <- m.allGroups(poolKey)
}
}
}
func (m *Manager) cancelDiscoverers() {
for _, c := range m.discoverCancel {
c()
}
m.targets = make(map[poolKey][]*config.TargetGroup)
m.discoverCancel = nil
}
func (m *Manager) addGroup(poolKey poolKey, tg []*config.TargetGroup) {
done := make(chan struct{})
m.actionCh <- func(ctx context.Context) {
if tg != nil {
m.targets[poolKey] = tg
}
close(done)
}
<-done
}
func (m *Manager) allGroups(pk poolKey) map[string][]*config.TargetGroup {
tSets := make(chan map[string][]*config.TargetGroup)
m.actionCh <- func(ctx context.Context) {
// Sorting by the poolKey is needed so that we can have predictable tests.
var pKeys []poolKey
for pk := range m.targets {
pKeys = append(pKeys, pk)
}
sort.Sort(byProvider(pKeys))
tSetsAll := map[string][]*config.TargetGroup{}
for _, pk := range pKeys {
for _, tg := range m.targets[pk] {
if tg.Source != "" { // Don't add empty targets.
tSetsAll[pk.setName] = append(tSetsAll[pk.setName], tg)
}
}
}
tSets <- tSetsAll
}
return <-tSets
}
func (m *Manager) providersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]Discoverer {
providers := map[string]Discoverer{}
 
app := func(mech string, i int, tp TargetProvider) {
app := func(mech string, i int, tp Discoverer) {
providers[fmt.Sprintf("%s/%d", mech, i)] = tp
}
 
for i, c := range cfg.DNSSDConfigs {
app("dns", i, dns.NewDiscovery(c, log.With(logger, "discovery", "dns")))
app("dns", i, dns.NewDiscovery(c, log.With(m.logger, "discovery", "dns")))
}
for i, c := range cfg.FileSDConfigs {
app("file", i, file.NewDiscovery(c, log.With(logger, "discovery", "file")))
app("file", i, file.NewDiscovery(c, log.With(m.logger, "discovery", "file")))
}
for i, c := range cfg.ConsulSDConfigs {
k, err := consul.NewDiscovery(c, log.With(logger, "discovery", "consul"))
k, err := consul.NewDiscovery(c, log.With(m.logger, "discovery", "consul"))
if err != nil {
level.Error(logger).Log("msg", "Cannot create Consul discovery", "err", err)
level.Error(m.logger).Log("msg", "Cannot create Consul discovery", "err", err)
continue
}
app("consul", i, k)
}
for i, c := range cfg.MarathonSDConfigs {
m, err := marathon.NewDiscovery(c, log.With(logger, "discovery", "marathon"))
t, err := marathon.NewDiscovery(c, log.With(m.logger, "discovery", "marathon"))
if err != nil {
level.Error(logger).Log("msg", "Cannot create Marathon discovery", "err", err)
level.Error(m.logger).Log("msg", "Cannot create Marathon discovery", "err", err)
continue
}
app("marathon", i, m)
app("marathon", i, t)
}
for i, c := range cfg.KubernetesSDConfigs {
k, err := kubernetes.New(log.With(logger, "discovery", "k8s"), c)
k, err := kubernetes.New(log.With(m.logger, "discovery", "k8s"), c)
if err != nil {
level.Error(logger).Log("msg", "Cannot create Kubernetes discovery", "err", err)
level.Error(m.logger).Log("msg", "Cannot create Kubernetes discovery", "err", err)
continue
}
app("kubernetes", i, k)
}
for i, c := range cfg.ServersetSDConfigs {
app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(logger, "discovery", "zookeeper")))
app("serverset", i, zookeeper.NewServersetDiscovery(c, log.With(m.logger, "discovery", "zookeeper")))
}
for i, c := range cfg.NerveSDConfigs {
app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(logger, "discovery", "nerve")))
app("nerve", i, zookeeper.NewNerveDiscovery(c, log.With(m.logger, "discovery", "nerve")))
}
for i, c := range cfg.EC2SDConfigs {
app("ec2", i, ec2.NewDiscovery(c, log.With(logger, "discovery", "ec2")))
app("ec2", i, ec2.NewDiscovery(c, log.With(m.logger, "discovery", "ec2")))
}
for i, c := range cfg.OpenstackSDConfigs {
openstackd, err := openstack.NewDiscovery(c, log.With(logger, "discovery", "openstack"))
openstackd, err := openstack.NewDiscovery(c, log.With(m.logger, "discovery", "openstack"))
if err != nil {
level.Error(logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err)
level.Error(m.logger).Log("msg", "Cannot initialize OpenStack discovery", "err", err)
continue
}
app("openstack", i, openstackd)
}
 
for i, c := range cfg.GCESDConfigs {
gced, err := gce.NewDiscovery(c, log.With(logger, "discovery", "gce"))
gced, err := gce.NewDiscovery(c, log.With(m.logger, "discovery", "gce"))
if err != nil {
level.Error(logger).Log("msg", "Cannot initialize GCE discovery", "err", err)
level.Error(m.logger).Log("msg", "Cannot initialize GCE discovery", "err", err)
continue
}
app("gce", i, gced)
}
for i, c := range cfg.AzureSDConfigs {
app("azure", i, azure.NewDiscovery(c, log.With(logger, "discovery", "azure")))
app("azure", i, azure.NewDiscovery(c, log.With(m.logger, "discovery", "azure")))
}
for i, c := range cfg.TritonSDConfigs {
t, err := triton.New(log.With(logger, "discovery", "trition"), c)
t, err := triton.New(log.With(m.logger, "discovery", "trition"), c)
if err != nil {
level.Error(logger).Log("msg", "Cannot create Triton discovery", "err", err)
level.Error(m.logger).Log("msg", "Cannot create Triton discovery", "err", err)
continue
}
app("triton", i, t)
Loading
Loading
@@ -147,7 +289,7 @@ func NewStaticProvider(groups []*config.TargetGroup) *StaticProvider {
return &StaticProvider{groups}
}
 
// Run implements the TargetProvider interface.
// Run implements the Worker interface.
func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// We still have to consider that the consumer exits right away in which case
// the context will be canceled.
Loading
Loading
@@ -157,163 +299,3 @@ func (sd *StaticProvider) Run(ctx context.Context, ch chan<- []*config.TargetGro
}
close(ch)
}
// TargetSet handles multiple TargetProviders and sends a full overview of their
// discovered TargetGroups to a Syncer.
type TargetSet struct {
mtx sync.RWMutex
// Sets of targets by a source string that is unique across target providers.
tgroups map[string]*config.TargetGroup
syncer Syncer
syncCh chan struct{}
providerCh chan map[string]TargetProvider
cancelProviders func()
}
// Syncer receives updates complete sets of TargetGroups.
type Syncer interface {
Sync([]*config.TargetGroup)
}
// NewTargetSet returns a new target sending TargetGroups to the Syncer.
func NewTargetSet(s Syncer) *TargetSet {
return &TargetSet{
syncCh: make(chan struct{}, 1),
providerCh: make(chan map[string]TargetProvider),
syncer: s,
}
}
// Run starts the processing of target providers and their updates.
// It blocks until the context gets canceled.
func (ts *TargetSet) Run(ctx context.Context) {
Loop:
for {
// Throttle syncing to once per five seconds.
select {
case <-ctx.Done():
break Loop
case p := <-ts.providerCh:
ts.updateProviders(ctx, p)
case <-time.After(5 * time.Second):
}
select {
case <-ctx.Done():
break Loop
case <-ts.syncCh:
ts.sync()
case p := <-ts.providerCh:
ts.updateProviders(ctx, p)
}
}
}
func (ts *TargetSet) sync() {
ts.mtx.RLock()
var all []*config.TargetGroup
for _, tg := range ts.tgroups {
all = append(all, tg)
}
ts.mtx.RUnlock()
ts.syncer.Sync(all)
}
// UpdateProviders sets new target providers for the target set.
func (ts *TargetSet) UpdateProviders(p map[string]TargetProvider) {
ts.providerCh <- p
}
func (ts *TargetSet) updateProviders(ctx context.Context, providers map[string]TargetProvider) {
// Stop all previous target providers of the target set.
if ts.cancelProviders != nil {
ts.cancelProviders()
}
ctx, ts.cancelProviders = context.WithCancel(ctx)
var wg sync.WaitGroup
// (Re-)create a fresh tgroups map to not keep stale targets around. We
// will retrieve all targets below anyway, so cleaning up everything is
// safe and doesn't inflict any additional cost.
ts.mtx.Lock()
ts.tgroups = map[string]*config.TargetGroup{}
ts.mtx.Unlock()
for name, prov := range providers {
wg.Add(1)
updates := make(chan []*config.TargetGroup)
go prov.Run(ctx, updates)
go func(name string, prov TargetProvider) {
select {
case <-ctx.Done():
case initial, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
break
}
// First set of all targets the provider knows.
for _, tgroup := range initial {
ts.setTargetGroup(name, tgroup)
}
case <-time.After(5 * time.Second):
// Initial set didn't arrive. Act as if it was empty
// and wait for updates later on.
}
wg.Done()
// Start listening for further updates.
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
// Handle the case that a target provider exits and closes the channel
// before the context is done.
if !ok {
return
}
for _, tg := range tgs {
ts.update(name, tg)
}
}
}
}(name, prov)
}
// We wait for a full initial set of target groups before releasing the mutex
// to ensure the initial sync is complete and there are no races with subsequent updates.
wg.Wait()
// Just signal that there are initial sets to sync now. Actual syncing must only
// happen in the runScraping loop.
select {
case ts.syncCh <- struct{}{}:
default:
}
}
// update handles a target group update from a target provider identified by the name.
func (ts *TargetSet) update(name string, tgroup *config.TargetGroup) {
ts.setTargetGroup(name, tgroup)
select {
case ts.syncCh <- struct{}{}:
default:
}
}
func (ts *TargetSet) setTargetGroup(name string, tg *config.TargetGroup) {
ts.mtx.Lock()
defer ts.mtx.Unlock()
if tg == nil {
return
}
ts.tgroups[name+"/"+tg.Source] = tg
}
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package discovery
import (
"context"
"fmt"
"reflect"
"strconv"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
yaml "gopkg.in/yaml.v2"
)
// TestDiscoveryManagerSyncCalls checks that the target updates are received in the expected order.
func TestDiscoveryManagerSyncCalls(t *testing.T) {
// The order by which the updates are send is detirmened by the interval passed to the mock discovery adapter
// Final targets array is ordered alphabetically by the name of the discoverer.
// For example discoverer "A" with targets "t2,t3" and discoverer "B" with targets "t1,t2" will result in "t2,t3,t1,t2" after the merge.
testCases := []struct {
title string
updates map[string][]update
expectedTargets [][]*config.TargetGroup
}{
{
title: "Single TP no updates",
updates: map[string][]update{
"tp1": {},
},
expectedTargets: nil,
},
{
title: "Multips TPs no updates",
updates: map[string][]update{
"tp1": {},
"tp2": {},
"tp3": {},
},
expectedTargets: nil,
},
{
title: "Single TP empty initials",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{},
interval: 5,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{},
},
},
{
title: "Multiple TPs empty initials",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{},
interval: 5,
},
},
"tp2": {
{
targetGroups: []config.TargetGroup{},
interval: 200,
},
},
"tp3": {
{
targetGroups: []config.TargetGroup{},
interval: 100,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{},
{},
{},
},
},
{
title: "Single TP initials only",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
}},
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
},
},
{
title: "Multiple TPs initials only",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
},
},
"tp2": {
{
targetGroups: []config.TargetGroup{
{
Source: "tp2-initial1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
},
interval: 10,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
}, {
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp2-initial1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
},
},
},
{
title: "Single TP initials followed by empty updates",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 0,
},
{
targetGroups: []config.TargetGroup{},
interval: 10,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{},
},
},
{
title: "Single TP initials and new groups",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 0,
},
{
targetGroups: []config.TargetGroup{
{
Source: "update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
interval: 10,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
},
},
{
title: "Multiple TPs initials and new groups",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 10,
},
{
targetGroups: []config.TargetGroup{
{
Source: "tp1-update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
interval: 500,
},
},
"tp2": {
{
targetGroups: []config.TargetGroup{
{
Source: "tp2-initial1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2-initial2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
interval: 100,
},
{
targetGroups: []config.TargetGroup{
{
Source: "tp2-update1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
interval: 10,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp2-initial1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2-initial2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
{
Source: "tp2-update1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
{
{
Source: "tp1-update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp2-update1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
},
},
{
title: "One tp initials arrive after other tp updates.",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 10,
},
{
targetGroups: []config.TargetGroup{
{
Source: "tp1-update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
interval: 150,
},
},
"tp2": {
{
targetGroups: []config.TargetGroup{
{
Source: "tp2-initial1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2-initial2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
interval: 200,
},
{
targetGroups: []config.TargetGroup{
{
Source: "tp2-update1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
interval: 100,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "tp1-initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "tp1-initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{
{
Source: "tp1-update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
{
{
Source: "tp1-update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp2-initial1",
Targets: []model.LabelSet{{"__instance__": "5"}},
},
{
Source: "tp2-initial2",
Targets: []model.LabelSet{{"__instance__": "6"}},
},
},
{
{
Source: "tp1-update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "tp1-update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
{
Source: "tp2-update1",
Targets: []model.LabelSet{{"__instance__": "7"}},
},
{
Source: "tp2-update2",
Targets: []model.LabelSet{{"__instance__": "8"}},
},
},
},
},
{
title: "Single TP Single provider empty update in between",
updates: map[string][]update{
"tp1": {
{
targetGroups: []config.TargetGroup{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
interval: 30,
},
{
targetGroups: []config.TargetGroup{},
interval: 10,
},
{
targetGroups: []config.TargetGroup{
{
Source: "update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
interval: 300,
},
},
},
expectedTargets: [][]*config.TargetGroup{
{
{
Source: "initial1",
Targets: []model.LabelSet{{"__instance__": "1"}},
},
{
Source: "initial2",
Targets: []model.LabelSet{{"__instance__": "2"}},
},
},
{},
{
{
Source: "update1",
Targets: []model.LabelSet{{"__instance__": "3"}},
},
{
Source: "update2",
Targets: []model.LabelSet{{"__instance__": "4"}},
},
},
},
},
}
for testIndex, testCase := range testCases {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(nil)
go discoveryManager.Run(ctx)
var totalUpdatesCount int
for tpName, update := range testCase.updates {
provider := newMockDiscoveryProvider(update)
discoveryManager.startProvider(ctx, poolKey{setName: strconv.Itoa(testIndex), provider: tpName}, provider)
if len(update) > 0 {
totalUpdatesCount = totalUpdatesCount + len(update)
}
}
Loop:
for x := 0; x < totalUpdatesCount; x++ {
select {
case <-time.After(10 * time.Second):
t.Errorf("%v. %q: no update arrived within the timeout limit", x, testCase.title)
break Loop
case tsetMap := <-discoveryManager.SyncCh():
for _, received := range tsetMap {
if !reflect.DeepEqual(received, testCase.expectedTargets[x]) {
var receivedFormated string
for _, receivedTargets := range received {
receivedFormated = receivedFormated + receivedTargets.Source + ":" + fmt.Sprint(receivedTargets.Targets)
}
var expectedFormated string
for _, expectedTargets := range testCase.expectedTargets[x] {
expectedFormated = expectedFormated + expectedTargets.Source + ":" + fmt.Sprint(expectedTargets.Targets)
}
t.Errorf("%v. %v: \ntargets mismatch \nreceived: %v \nexpected: %v",
x, testCase.title,
receivedFormated,
expectedFormated)
}
}
}
}
}
}
func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
verifyPresence := func(tSets map[poolKey][]*config.TargetGroup, poolKey poolKey, label string, present bool) {
if _, ok := tSets[poolKey]; !ok {
t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets)
return
}
match := false
var mergedTargets string
for _, targetGroup := range tSets[poolKey] {
for _, l := range targetGroup.Targets {
mergedTargets = mergedTargets + " " + l.String()
if l.String() == label {
match = true
}
}
}
if match != present {
msg := ""
if !present {
msg = "not"
}
t.Fatalf("'%s' should %s be present in Targets labels: %v", label, msg, mergedTargets)
}
}
cfg := &config.Config{}
sOne := `
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ["foo:9090"]
- targets: ["bar:9090"]
`
if err := yaml.Unmarshal([]byte(sOne), cfg); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(nil)
go discoveryManager.Run(ctx)
discoveryManager.ApplyConfig(cfg)
_ = <-discoveryManager.SyncCh()
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true)
sTwo := `
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ["foo:9090"]
`
if err := yaml.Unmarshal([]byte(sTwo), cfg); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err)
}
discoveryManager.ApplyConfig(cfg)
_ = <-discoveryManager.SyncCh()
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false)
}
type update struct {
targetGroups []config.TargetGroup
interval time.Duration
}
type mockdiscoveryProvider struct {
updates []update
up chan<- []*config.TargetGroup
}
func newMockDiscoveryProvider(updates []update) mockdiscoveryProvider {
tp := mockdiscoveryProvider{
updates: updates,
}
return tp
}
func (tp mockdiscoveryProvider) Run(ctx context.Context, up chan<- []*config.TargetGroup) {
tp.up = up
tp.sendUpdates()
}
func (tp mockdiscoveryProvider) sendUpdates() {
for _, update := range tp.updates {
time.Sleep(update.interval * time.Millisecond)
tgs := make([]*config.TargetGroup, len(update.targetGroups))
for i := range update.targetGroups {
tgs[i] = &update.targetGroups[i]
}
tp.up <- tgs
}
}
Loading
Loading
@@ -5,11 +5,11 @@ sort_rank: 4
 
# Template examples
 
Prometheus supports templating in the summary and description fields of
alerts, as well as in served console pages. Templates have the ability to run
queries against the local database, iterate over data, use conditionals, format
data, etc. The Prometheus templating language is based on the
[Go templating](http://golang.org/pkg/text/template/) system.
Prometheus supports templating in the annotations and labels of alerts,
as well as in served console pages. Templates have the ability to run
queries against the local database, iterate over data, use conditionals,
format data, etc. The Prometheus templating language is based on the [Go
templating](http://golang.org/pkg/text/template/) system.
 
## Simple alert field templates
 
Loading
Loading
@@ -19,7 +19,6 @@ expr: up == 0
for: 5m
labels:
- severity: page
annotations:
- summary: "Instance {{$labels.instance}} down"
- description: "{{$labels.instance}} of job {{$labels.job}} has been down for more than 5 minutes."
Loading
Loading
Loading
Loading
@@ -5,11 +5,11 @@ sort_rank: 5
 
# Template reference
 
Prometheus supports templating in the summary and description fields of
alerts, as well as in served console pages. Templates have the ability to run
queries against the local database, iterate over data, use conditionals, format
data, etc. The Prometheus templating language is based on the
[Go templating](http://golang.org/pkg/text/template/) system.
Prometheus supports templating in the annotations and labels of alerts,
as well as in served console pages. Templates have the ability to run
queries against the local database, iterate over data, use conditionals,
format data, etc. The Prometheus templating language is based on the [Go
templating](http://golang.org/pkg/text/template/) system.
 
## Data Structures
 
Loading
Loading
Loading
Loading
@@ -35,7 +35,6 @@ import (
"golang.org/x/net/context/ctxhttp"
 
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/util/httputil"
Loading
Loading
@@ -248,7 +247,6 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs
 
amSets := []*alertmanagerSet{}
ctx, cancel := context.WithCancel(n.ctx)
 
for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs {
ams, err := newAlertmanagerSet(cfg, n.logger)
Loading
Loading
@@ -261,17 +259,6 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
amSets = append(amSets, ams)
}
 
// After all sets were created successfully, start them and cancel the
// old ones.
for _, ams := range amSets {
go ams.ts.Run(ctx)
ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig, n.logger))
}
if n.cancelDiscovery != nil {
n.cancelDiscovery()
}
n.cancelDiscovery = cancel
n.alertmanagers = amSets
 
return nil
Loading
Loading
@@ -504,7 +491,6 @@ func (a alertmanagerLabels) url() *url.URL {
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service
// discovery definitions that have a common configuration on how alerts should be sent.
type alertmanagerSet struct {
ts *discovery.TargetSet
cfg *config.AlertmanagerConfig
client *http.Client
 
Loading
Loading
@@ -525,8 +511,6 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*ale
cfg: cfg,
logger: logger,
}
s.ts = discovery.NewTargetSet(s)
return s, nil
}
 
Loading
Loading
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"fmt"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage"
)
// Appendable returns an Appender.
type Appendable interface {
Appender() (storage.Appender, error)
}
// NewScrapeManager is the ScrapeManager constructor
func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager {
return &ScrapeManager{
append: app,
logger: logger,
actionCh: make(chan func()),
scrapeConfigs: make(map[string]*config.ScrapeConfig),
scrapePools: make(map[string]*scrapePool),
graceShut: make(chan struct{}),
}
}
// ScrapeManager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups form the discovery manager.
type ScrapeManager struct {
logger log.Logger
append Appendable
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
actionCh chan func()
graceShut chan struct{}
}
// Run starts background processing to handle target updates and reload the scraping loops.
func (m *ScrapeManager) Run(tsets <-chan map[string][]*config.TargetGroup) error {
level.Info(m.logger).Log("msg", "Starting scrape manager...")
for {
select {
case f := <-m.actionCh:
f()
case ts := <-tsets:
if err := m.reload(ts); err != nil {
level.Error(m.logger).Log("msg", "error reloading the scrape manager", "err", err)
}
case <-m.graceShut:
return nil
}
}
}
// Stop cancels all running scrape pools and blocks until all have exited.
func (m *ScrapeManager) Stop() {
for _, sp := range m.scrapePools {
sp.stop()
}
close(m.graceShut)
}
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error {
done := make(chan struct{})
m.actionCh <- func() {
for _, scfg := range cfg.ScrapeConfigs {
m.scrapeConfigs[scfg.JobName] = scfg
}
close(done)
}
<-done
return nil
}
// TargetMap returns map of active and dropped targets and their corresponding scrape config job name.
func (m *ScrapeManager) TargetMap() map[string][]*Target {
targetsMap := make(chan map[string][]*Target)
m.actionCh <- func() {
targets := make(map[string][]*Target)
for jobName, sp := range m.scrapePools {
sp.mtx.RLock()
for _, t := range sp.targets {
targets[jobName] = append(targets[jobName], t)
}
targets[jobName] = append(targets[jobName], sp.droppedTargets...)
sp.mtx.RUnlock()
}
targetsMap <- targets
}
return <-targetsMap
}
// Targets returns the targets currently being scraped.
func (m *ScrapeManager) Targets() []*Target {
targets := make(chan []*Target)
m.actionCh <- func() {
var t []*Target
for _, p := range m.scrapePools {
p.mtx.RLock()
for _, tt := range p.targets {
t = append(t, tt)
}
p.mtx.RUnlock()
}
targets <- t
}
return <-targets
}
func (m *ScrapeManager) reload(t map[string][]*config.TargetGroup) error {
for tsetName, tgroup := range t {
scrapeConfig, ok := m.scrapeConfigs[tsetName]
if !ok {
return fmt.Errorf("target set '%v' doesn't have valid config", tsetName)
}
// Scrape pool doesn't exist so start a new one.
existing, ok := m.scrapePools[tsetName]
if !ok {
sp := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
m.scrapePools[tsetName] = sp
sp.Sync(tgroup)
} else {
existing.Sync(tgroup)
}
// Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config.
jobs := make(map[string]struct{})
for k := range m.scrapeConfigs {
jobs[k] = struct{}{}
}
for name, sp := range m.scrapePools {
if _, ok := jobs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
}
}
}
return nil
}
Loading
Loading
@@ -117,7 +117,6 @@ func init() {
type scrapePool struct {
appendable Appendable
logger log.Logger
ctx context.Context
 
mtx sync.RWMutex
config *config.ScrapeConfig
Loading
Loading
@@ -127,6 +126,7 @@ type scrapePool struct {
targets map[uint64]*Target
droppedTargets []*Target
loops map[uint64]loop
cancel context.CancelFunc
 
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(*Target, scraper) loop
Loading
Loading
@@ -136,7 +136,7 @@ const maxAheadTime = 10 * time.Minute
 
type labelsMutator func(labels.Labels) labels.Labels
 
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
if logger == nil {
logger = log.NewNopLogger()
}
Loading
Loading
@@ -149,17 +149,20 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
 
buffers := pool.NewBytesPool(163, 100e6, 3)
 
ctx, cancel := context.WithCancel(context.Background())
sp := &scrapePool{
cancel: cancel,
appendable: app,
config: cfg,
ctx: ctx,
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
logger: logger,
}
sp.newLoop = func(t *Target, s scraper) loop {
return newScrapeLoop(sp.ctx, s,
return newScrapeLoop(
ctx,
s,
log.With(logger, "target", t),
buffers,
func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) },
Loading
Loading
@@ -173,6 +176,7 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
 
// stop terminates all scrape loops and returns after they all terminated.
func (sp *scrapePool) stop() {
sp.cancel()
var wg sync.WaitGroup
 
sp.mtx.Lock()
Loading
Loading
@@ -189,7 +193,6 @@ func (sp *scrapePool) stop() {
delete(sp.loops, fp)
delete(sp.targets, fp)
}
wg.Wait()
}
 
Loading
Loading
@@ -582,8 +585,7 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
}
}
 
func newScrapeLoop(
ctx context.Context,
func newScrapeLoop(ctx context.Context,
sc scraper,
l log.Logger,
buffers *pool.BytesPool,
Loading
Loading
@@ -605,8 +607,8 @@ func newScrapeLoop(
sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator,
stopped: make(chan struct{}),
ctx: ctx,
l: l,
ctx: ctx,
}
sl.scrapeCtx, sl.cancel = context.WithCancel(ctx)
 
Loading
Loading
Loading
Loading
@@ -44,7 +44,7 @@ func TestNewScrapePool(t *testing.T) {
var (
app = &nopAppendable{}
cfg = &config.ScrapeConfig{}
sp = newScrapePool(context.Background(), cfg, app, nil)
sp = newScrapePool(cfg, app, nil)
)
 
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
Loading
Loading
@@ -75,6 +75,7 @@ func TestScrapePoolStop(t *testing.T) {
sp := &scrapePool{
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
cancel: func() {},
}
var mtx sync.Mutex
stopped := map[uint64]bool{}
Loading
Loading
@@ -231,7 +232,7 @@ func TestScrapePoolReload(t *testing.T) {
func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{}
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app, nil)
sp := newScrapePool(cfg, app, nil)
 
wrapped := sp.appender()
 
Loading
Loading
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"context"
"sync"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/storage"
)
// TargetManager maintains a set of targets, starts and stops their scraping and
// creates the new targets based on the target groups it receives from various
// target providers.
type TargetManager struct {
append Appendable
scrapeConfigs []*config.ScrapeConfig
mtx sync.RWMutex
ctx context.Context
cancel func()
wg sync.WaitGroup
// Set of unqiue targets by scrape configuration.
targetSets map[string]*targetSet
logger log.Logger
starting chan struct{}
}
type targetSet struct {
ctx context.Context
cancel func()
ts *discovery.TargetSet
sp *scrapePool
}
// Appendable returns an Appender.
type Appendable interface {
Appender() (storage.Appender, error)
}
// NewTargetManager creates a new TargetManager.
func NewTargetManager(app Appendable, logger log.Logger) *TargetManager {
return &TargetManager{
append: app,
targetSets: map[string]*targetSet{},
logger: logger,
starting: make(chan struct{}),
}
}
// Run starts background processing to handle target updates.
func (tm *TargetManager) Run() {
level.Info(tm.logger).Log("msg", "Starting target manager...")
tm.mtx.Lock()
tm.ctx, tm.cancel = context.WithCancel(context.Background())
tm.reload()
tm.mtx.Unlock()
close(tm.starting)
tm.wg.Wait()
}
// Stop all background processing.
func (tm *TargetManager) Stop() {
<-tm.starting
level.Info(tm.logger).Log("msg", "Stopping target manager...")
tm.mtx.Lock()
// Cancel the base context, this will cause all target providers to shut down
// and all in-flight scrapes to abort immmediately.
// Started inserts will be finished before terminating.
tm.cancel()
tm.mtx.Unlock()
// Wait for all scrape inserts to complete.
tm.wg.Wait()
level.Info(tm.logger).Log("msg", "Target manager stopped")
}
func (tm *TargetManager) reload() {
jobs := map[string]struct{}{}
// Start new target sets and update existing ones.
for _, scfg := range tm.scrapeConfigs {
jobs[scfg.JobName] = struct{}{}
ts, ok := tm.targetSets[scfg.JobName]
if !ok {
ctx, cancel := context.WithCancel(tm.ctx)
ts = &targetSet{
ctx: ctx,
cancel: cancel,
sp: newScrapePool(ctx, scfg, tm.append, log.With(tm.logger, "scrape_pool", scfg.JobName)),
}
ts.ts = discovery.NewTargetSet(ts.sp)
tm.targetSets[scfg.JobName] = ts
tm.wg.Add(1)
go func(ts *targetSet) {
// Run target set, which blocks until its context is canceled.
// Gracefully shut down pending scrapes in the scrape pool afterwards.
ts.ts.Run(ctx)
ts.sp.stop()
tm.wg.Done()
}(ts)
} else {
ts.sp.reload(scfg)
}
ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig, tm.logger))
}
// Remove old target sets. Waiting for scrape pools to complete pending
// scrape inserts is already guaranteed by the goroutine that started the target set.
for name, ts := range tm.targetSets {
if _, ok := jobs[name]; !ok {
ts.cancel()
delete(tm.targetSets, name)
}
}
}
// TargetMap returns map of active and dropped targets and their corresponding scrape config job name.
func (tm *TargetManager) TargetMap() map[string][]*Target {
tm.mtx.RLock()
defer tm.mtx.RUnlock()
targetsMap := make(map[string][]*Target)
for jobName, ps := range tm.targetSets {
ps.sp.mtx.RLock()
for _, t := range ps.sp.targets {
targetsMap[jobName] = append(targetsMap[jobName], t)
}
targetsMap[jobName] = append(targetsMap[jobName], ps.sp.droppedTargets...)
ps.sp.mtx.RUnlock()
}
return targetsMap
}
// Targets returns the targets currently being scraped.
func (tm *TargetManager) Targets() []*Target {
tm.mtx.RLock()
defer tm.mtx.RUnlock()
targets := []*Target{}
for _, ps := range tm.targetSets {
ps.sp.mtx.RLock()
for _, t := range ps.sp.targets {
targets = append(targets, t)
}
ps.sp.mtx.RUnlock()
}
return targets
}
// ApplyConfig resets the manager's target providers and job configurations as defined
// by the new cfg. The state of targets that are valid in the new configuration remains unchanged.
func (tm *TargetManager) ApplyConfig(cfg *config.Config) error {
tm.mtx.Lock()
defer tm.mtx.Unlock()
tm.scrapeConfigs = cfg.ScrapeConfigs
if tm.ctx != nil {
tm.reload()
}
return nil
}
Loading
Loading
@@ -71,7 +71,7 @@ var localhostRepresentations = []string{"127.0.0.1", "localhost"}
type Handler struct {
logger log.Logger
 
targetManager *retrieval.TargetManager
scrapeManager *retrieval.ScrapeManager
ruleManager *rules.Manager
queryEngine *promql.Engine
context context.Context
Loading
Loading
@@ -125,7 +125,7 @@ type Options struct {
TSDB func() *tsdb.DB
Storage storage.Storage
QueryEngine *promql.Engine
TargetManager *retrieval.TargetManager
ScrapeManager *retrieval.ScrapeManager
RuleManager *rules.Manager
Notifier *notifier.Notifier
Version *PrometheusVersion
Loading
Loading
@@ -169,7 +169,7 @@ func New(logger log.Logger, o *Options) *Handler {
flagsMap: o.Flags,
 
context: o.Context,
targetManager: o.TargetManager,
scrapeManager: o.ScrapeManager,
ruleManager: o.RuleManager,
queryEngine: o.QueryEngine,
tsdb: o.TSDB,
Loading
Loading
@@ -181,7 +181,7 @@ func New(logger log.Logger, o *Options) *Handler {
ready: 0,
}
 
h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.targetManager, h.notifier,
h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, h.scrapeManager, h.notifier,
func() config.Config {
h.mtx.RLock()
defer h.mtx.RUnlock()
Loading
Loading
@@ -405,7 +405,7 @@ func (h *Handler) Run(ctx context.Context) error {
h.options.QueryEngine,
h.options.Storage.Querier,
func() []*retrieval.Target {
return h.options.TargetManager.Targets()
return h.options.ScrapeManager.Targets()
},
func() []*url.URL {
return h.options.Notifier.Alertmanagers()
Loading
Loading
@@ -587,7 +587,7 @@ func (h *Handler) rules(w http.ResponseWriter, r *http.Request) {
 
func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) {
var index []string
targets := h.targetManager.TargetMap()
targets := h.scrapeManager.TargetMap()
for job := range targets {
index = append(index, job)
}
Loading
Loading
@@ -605,7 +605,7 @@ func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) {
func (h *Handler) targets(w http.ResponseWriter, r *http.Request) {
// Bucket targets by job label
tps := map[string][]*retrieval.Target{}
for _, t := range h.targetManager.Targets() {
for _, t := range h.scrapeManager.Targets() {
job := t.Labels().Get(model.JobLabel)
tps[job] = append(tps[job], t)
}
Loading
Loading
Loading
Loading
@@ -96,7 +96,7 @@ func TestReadyAndHealthy(t *testing.T) {
Context: nil,
Storage: &tsdb.ReadyStorage{},
QueryEngine: nil,
TargetManager: nil,
ScrapeManager: nil,
RuleManager: nil,
Notifier: nil,
RoutePrefix: "/",
Loading
Loading
@@ -187,7 +187,7 @@ func TestRoutePrefix(t *testing.T) {
Context: nil,
Storage: &tsdb.ReadyStorage{},
QueryEngine: nil,
TargetManager: nil,
ScrapeManager: nil,
RuleManager: nil,
Notifier: nil,
RoutePrefix: "/prometheus",
Loading
Loading