Skip to content
Snippets Groups Projects

Initial implementation of an elasticsearch indexer in Go

Merged Nick Thomas requested to merge 1-initial-implementation into master
All threads resolved!
3 files
+ 76
3
Compare changes
  • Side-by-side
  • Inline
Files
3
elastic/client.go 0 → 100644
+ 67
0
package elastic
import (
"context"
"strings"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/deoxxa/aws_signing_client"
"gopkg.in/olivere/elastic.v5"
)
var (
// TODO: make this configurable / detectable.
// Limiting to 10MiB lets us work on small AWS clusters, but unnecessarily
// increases round trips in larger or non-AWS clusters
MaxBulkSize = 10 * 1024 * 1024
BulkWorkers = 2
)
type Client struct {
client *elastic.Client
bulk *elastic.BulkProcessor
}
func (c *Client) Close() error {
return c.bulk.Close()
}
func NewClient(config *Config) (*Client, error) {
opts := []elastic.ClientOptionFunc{elastic.SetURL(config.URL...)}
// Sniffer should look for HTTPS URLs if at-least-one initial URL is HTTPS
for _, url := range config.URL {
if strings.HasPrefix(url, "https:") {
opts = append(opts, elastic.SetScheme("https"))
break
}
}
if config.AWS {
credentials := credentials.NewStaticCredentials(config.AccessKey, config.SecretKey, "")
signer := v4.NewSigner(credentials)
awsClient, err := aws_signing_client.New(signer, nil, "es", config.Region)
if err != nil {
return nil, err
}
opts = append(opts, elastic.SetHttpClient(awsClient), elastic.SetSniff(false))
}
client, err := elastic.NewClient(opts...)
if err != nil {
return nil, err
}
bulk, err := client.BulkProcessor().
Workers(BulkWorkers).
BulkSize(MaxBulkSize).
Do(context.Background())
if err != nil {
return nil, err
}
return &Client{client: client, bulk: bulk}, nil
}
Loading