Skip to content
Snippets Groups Projects
Commit 80182a5d authored by Krasi Georgiev's avatar Krasi Georgiev
Browse files

use poolKey as the pool map key to avoid multi dimensional maps

parent 1ec76d19
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -52,10 +52,18 @@ type Discoverer interface {
Run(ctx context.Context, up chan<- []*config.TargetGroup)
}
 
// type pool struct {
// cancel func()
// tgroups []*config.TargetGroup
// }
type poolKey struct {
set string
provider string
}
// byProvider implements sort.Interface for []poolKey based on the provider field.
// Sorting is needed so that we can have predictable tests.
type byProvider []poolKey
func (a byProvider) Len() int { return len(a) }
func (a byProvider) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byProvider) Less(i, j int) bool { return a[i].provider < a[j].provider }
 
// NewManager is the Discovery Manager constructor
func NewManager(logger log.Logger) *Manager {
Loading
Loading
@@ -63,20 +71,19 @@ func NewManager(logger log.Logger) *Manager {
logger: logger,
actionCh: make(chan func(context.Context)),
syncCh: make(chan map[string][]*config.TargetGroup),
targets: make(map[string]map[string][]*config.TargetGroup),
targets: make(map[poolKey][]*config.TargetGroup),
discoverCancel: []context.CancelFunc{},
}
}
 
// Manager maintains a set of discovery providers and sends each update to a channel used by other packages.
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
// Targets pool is kept in a map with a format map[targetSetName]map[providerName].
type Manager struct {
logger log.Logger
syncCh chan map[string][]*config.TargetGroup
actionCh chan func(context.Context)
discoverCancel []context.CancelFunc
targets map[string]map[string][]*config.TargetGroup
targets map[poolKey][]*config.TargetGroup
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
syncCh chan map[string][]*config.TargetGroup
}
 
// Run starts the background processing
Loading
Loading
@@ -104,7 +111,7 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.cancelDiscoverers()
for _, scfg := range cfg.ScrapeConfigs {
for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) {
m.startProvider(ctx, scfg.JobName, provName, prov)
m.startProvider(ctx, poolKey{set: scfg.JobName, provider: provName}, prov)
}
}
close(err)
Loading
Loading
@@ -113,17 +120,17 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
return <-err
}
 
func (m *Manager) startProvider(ctx context.Context, jobName, provName string, worker Discoverer) {
func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) {
ctx, cancel := context.WithCancel(ctx)
updates := make(chan []*config.TargetGroup)
 
m.discoverCancel = append(m.discoverCancel, cancel)
 
go worker.Run(ctx, updates)
go m.runProvider(ctx, provName, jobName, updates)
go m.runProvider(ctx, poolKey, updates)
}
 
func (m *Manager) runProvider(ctx context.Context, provName, jobName string, updates chan []*config.TargetGroup) {
func (m *Manager) runProvider(ctx context.Context, poolKey poolKey, updates chan []*config.TargetGroup) {
for {
select {
case <-ctx.Done():
Loading
Loading
@@ -134,8 +141,8 @@ func (m *Manager) runProvider(ctx context.Context, provName, jobName string, upd
if !ok {
return
}
m.addGroup(jobName, provName, tgs)
m.syncCh <- m.allGroups(jobName)
m.addGroup(poolKey, tgs)
m.syncCh <- m.allGroups(poolKey)
}
}
}
Loading
Loading
@@ -144,20 +151,16 @@ func (m *Manager) cancelDiscoverers() {
for _, c := range m.discoverCancel {
c()
}
m.targets = make(map[string]map[string][]*config.TargetGroup)
m.targets = make(map[poolKey][]*config.TargetGroup)
m.discoverCancel = nil
}
 
func (m *Manager) addGroup(tsName, provName string, tg []*config.TargetGroup) {
func (m *Manager) addGroup(poolKey poolKey, tg []*config.TargetGroup) {
done := make(chan struct{})
 
m.actionCh <- func(ctx context.Context) {
if m.targets[tsName] == nil {
m.targets[tsName] = make(map[string][]*config.TargetGroup)
}
if tg != nil {
m.targets[tsName][provName] = tg
m.targets[poolKey] = tg
}
close(done)
 
Loading
Loading
@@ -165,31 +168,29 @@ func (m *Manager) addGroup(tsName, provName string, tg []*config.TargetGroup) {
<-done
}
 
func (m *Manager) allGroups(tsName string) map[string][]*config.TargetGroup {
tset := make(chan map[string][]*config.TargetGroup)
func (m *Manager) allGroups(pk poolKey) map[string][]*config.TargetGroup {
tSets := make(chan map[string][]*config.TargetGroup)
 
m.actionCh <- func(ctx context.Context) {
tgAll := []*config.TargetGroup{}
 
// Sorting the providers is needed so that we can have predictable tests.
// Maps cannot be sorted so need to extract the keys to a slice and sort the string slice.
var providerNames []string
for providerName := range m.targets[tsName] {
providerNames = append(providerNames, providerName)
// Sorting by the poolKey is needed so that we can have predictable tests.
var pKeys []poolKey
for pk := range m.targets {
pKeys = append(pKeys, pk)
}
sort.Strings(providerNames)
for _, prov := range providerNames {
for _, tg := range m.targets[tsName][prov] {
sort.Sort(byProvider(pKeys))
tSetsAll := map[string][]*config.TargetGroup{}
for _, pk := range pKeys {
for _, tg := range m.targets[pk] {
if tg.Source != "" { // Don't add empty targets.
tgAll = append(tgAll, tg)
tSetsAll[pk.set] = append(tSetsAll[pk.set], tg)
}
}
}
t := make(map[string][]*config.TargetGroup)
t[tsName] = tgAll
tset <- t
tSets <- tSetsAll
}
return <-tset
return <-tSets
 
}
 
Loading
Loading
Loading
Loading
@@ -590,7 +590,7 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
var totalUpdatesCount int
for tpName, update := range testCase.updates {
provider := newMockDiscoveryProvider(update)
discoveryManager.startProvider(ctx, strconv.Itoa(testIndex), tpName, provider)
discoveryManager.startProvider(ctx, poolKey{set: strconv.Itoa(testIndex), provider: tpName}, provider)
 
if len(update) > 0 {
totalUpdatesCount = totalUpdatesCount + len(update)
Loading
Loading
@@ -627,19 +627,15 @@ func TestDiscoveryManagerSyncCalls(t *testing.T) {
}
 
func TestTargetSetRecreatesTargetGroupsEveryRun(t *testing.T) {
verifyPresence := func(tSets map[string]map[string][]*config.TargetGroup, tSetName string, provName, label string, present bool) {
if _, ok := tSets[tSetName]; !ok {
t.Fatalf("'%s' should be present in TargetSets: %v", tSetName, tSets)
return
}
if _, ok := tSets[tSetName][provName]; !ok {
t.Fatalf("'%s' should be present in Discovery providers: %v", provName, tSets[tSetName])
verifyPresence := func(tSets map[poolKey][]*config.TargetGroup, poolKey poolKey, label string, present bool) {
if _, ok := tSets[poolKey]; !ok {
t.Fatalf("'%s' should be present in Pool keys: %v", poolKey, tSets)
return
}
 
match := false
var mergedTargets string
for _, targetGroup := range tSets[tSetName][provName] {
for _, targetGroup := range tSets[poolKey] {
 
for _, l := range targetGroup.Targets {
mergedTargets = mergedTargets + " " + l.String()
Loading
Loading
@@ -678,8 +674,8 @@ scrape_configs:
discoveryManager.ApplyConfig(cfg)
 
_ = <-discoveryManager.SyncCh()
verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"foo:9090\"}", true)
verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"bar:9090\"}", true)
verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", true)
 
sTwo := `
scrape_configs:
Loading
Loading
@@ -693,8 +689,8 @@ scrape_configs:
discoveryManager.ApplyConfig(cfg)
 
_ = <-discoveryManager.SyncCh()
verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"foo:9090\"}", true)
verifyPresence(discoveryManager.targets, "prometheus", "static/0", "{__address__=\"bar:9090\"}", false)
verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
verifyPresence(discoveryManager.targets, poolKey{set: "prometheus", provider: "static/0"}, "{__address__=\"bar:9090\"}", false)
}
 
type update struct {
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment