Skip to content
Snippets Groups Projects
Commit c41f8d09 authored by Nick Thomas's avatar Nick Thomas
Browse files

Introduce an elasticsearch client

parent 00e99e32
No related branches found
No related tags found
1 merge request!1Initial implementation of an elasticsearch indexer in Go
package elastic
import (
"context"
"strings"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/deoxxa/aws_signing_client"
"gopkg.in/olivere/elastic.v5"
)
var (
// TODO: make this configurable / detectable.
// Limiting to 10MiB lets us work on small AWS clusters, but unnecessarily
// increases round trips in larger or non-AWS clusters
MaxBulkSize = 10 * 1024 * 1024
BulkWorkers = 2
)
type Client struct {
client *elastic.Client
bulk *elastic.BulkProcessor
}
func (c *Client) Close() error {
return c.bulk.Close()
}
func NewClient(config *Config) (*Client, error) {
opts := []elastic.ClientOptionFunc{elastic.SetURL(config.URL...)}
// Sniffer should look for HTTPS URLs if at-least-one initial URL is HTTPS
for _, url := range config.URL {
if strings.HasPrefix(url, "https:") {
opts = append(opts, elastic.SetScheme("https"))
break
}
}
if config.AWS {
credentials := credentials.NewStaticCredentials(config.AccessKey, config.SecretKey, "")
signer := v4.NewSigner(credentials)
awsClient, err := aws_signing_client.New(signer, nil, "es", config.Region)
if err != nil {
return nil, err
}
opts = append(opts, elastic.SetHttpClient(awsClient), elastic.SetSniff(false))
}
client, err := elastic.NewClient(opts...)
if err != nil {
return nil, err
}
bulk, err := client.BulkProcessor().
Workers(BulkWorkers).
BulkSize(MaxBulkSize).
Do(context.Background())
if err != nil {
return nil, err
}
return &Client{client: client, bulk: bulk}, nil
}
Loading
Loading
@@ -173,7 +173,7 @@ func (r *Repo) IndexBlobs() error {
return r.EachFileChange(insOrModFile, insOrModFile, delFile)
}
 
func (r *Repo) Index() error {
func (r *Repo) Index(_ interface{}) error {
wg := &sync.WaitGroup{}
wg.Add(2)
 
Loading
Loading
Loading
Loading
@@ -14,11 +14,17 @@ func main() {
log.Fatalf("Usage: %s <project-id> <project-path>", os.Args[0])
}
 
_, err := elastic.ReadConfig(strings.NewReader(os.Getenv("ELASTIC_CONNECTION_INFO")))
esConfig, err := elastic.ReadConfig(strings.NewReader(os.Getenv("ELASTIC_CONNECTION_INFO")))
if err != nil {
log.Fatalln("Couldn't parse ELASTIC_CONNECTION_INFO:", err)
}
 
esClient, err := elastic.NewClient(esConfig)
if err != nil {
log.Fatalln("Failed to connect to elasticsearch:", err)
}
defer esClient.Close()
projectID := os.Args[1]
projectPath := os.Args[2]
fromSHA := os.Getenv("FROM_SHA")
Loading
Loading
@@ -33,5 +39,5 @@ func main() {
log.Printf("Indexing from %s to %s", repo.FromHash, repo.ToHash)
log.Printf("Project ID: %s, Rails env: %s", projectID, railsEnv)
 
repo.Index() // TODO: pass in something to receive the indexed data
repo.Index(esClient)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment