Skip to content
Snippets Groups Projects

Initial implementation of an elasticsearch indexer in Go

Merged Nick Thomas requested to merge 1-initial-implementation into master
All threads resolved!
8 files
+ 637
15
Compare changes
  • Side-by-side
  • Inline
Files
8
+ 46
4
@@ -2,6 +2,8 @@ package elastic
import (
"context"
"fmt"
"os"
"strings"
"github.com/aws/aws-sdk-go/aws/credentials"
@@ -14,8 +16,9 @@ var (
// TODO: make this configurable / detectable.
// Limiting to 10MiB lets us work on small AWS clusters, but unnecessarily
// increases round trips in larger or non-AWS clusters
MaxBulkSize = 10 * 1024 * 1024
BulkWorkers = 2
MaxBulkSize = 10 * 1024 * 1024
BulkWorkers = 2
timeoutError = fmt.Errorf("Timeout")
)
type Client struct {
@@ -23,8 +26,17 @@ type Client struct {
bulk *elastic.BulkProcessor
}
func (c *Client) Flush() error {
return c.bulk.Flush()
// FromEnv creates an Elasticsearch client from the `ELASTIC_CONNECTION_INFO`
// environment variable
func FromEnv() (*Client, error) {
data := strings.NewReader(os.Getenv("ELASTIC_CONNECTION_INFO"))
config, err := ReadConfig(data)
if err != nil {
return nil, fmt.Errorf("Couldn't parse ELASTIC_CONNECTION_INFO: %s", err)
}
return NewClient(config)
}
func NewClient(config *Config) (*Client, error) {
@@ -68,3 +80,33 @@ func NewClient(config *Config) (*Client, error) {
return &Client{client: client, bulk: bulk}, nil
}
func (c *Client) Flush() error {
return c.bulk.Flush()
}
func (c *Client) CreateIndex(indexName, mapping string) error {
createIndex, err := c.client.CreateIndex(indexName).BodyString(mapping).Do(context.Background())
if err != nil {
return err
}
if !createIndex.Acknowledged {
return timeoutError
}
return nil
}
func (c *Client) DeleteIndex(indexName string) error {
deleteIndex, err := c.client.DeleteIndex(indexName).Do(context.Background())
if err != nil {
return err
}
if !deleteIndex.Acknowledged {
return timeoutError
}
return nil
}
Loading