Skip to content
Snippets Groups Projects
Commit 90393cc7 authored by Nick Thomas's avatar Nick Thomas
Browse files

Introduce an explicit indexer

parent 4986d042
No related branches found
No related tags found
1 merge request!1Initial implementation of an elasticsearch indexer in Go
#!/usr/bin/make -f
 
GO_SRC := $(shell find . -iname *.go)
GO_PKGS := $(shell go list ./... | grep -v vendor )
GO_SRC := $(shell find . -iname '*.go')
GO_PKGS := $(shell go list ./... | grep -v vendor)
 
GO15VENDOREXPERIMENT := 1
CGO_ENABLED := 0
Loading
Loading
Loading
Loading
@@ -23,12 +23,15 @@ type Client struct {
bulk *elastic.BulkProcessor
}
 
func (c *Client) Close() error {
return c.bulk.Close()
func (c *Client) Flush() error {
return c.bulk.Flush()
}
 
func NewClient(config *Config) (*Client, error) {
opts := []elastic.ClientOptionFunc{elastic.SetURL(config.URL...)}
opts := []elastic.ClientOptionFunc{
elastic.SetURL(config.URL...),
elastic.SetSniff(false), // For now. Move back to AWS-only later
}
 
// Sniffer should look for HTTPS URLs if at-least-one initial URL is HTTPS
for _, url := range config.URL {
Loading
Loading
@@ -46,7 +49,7 @@ func NewClient(config *Config) (*Client, error) {
return nil, err
}
 
opts = append(opts, elastic.SetHttpClient(awsClient), elastic.SetSniff(false))
opts = append(opts, elastic.SetHttpClient(awsClient))
}
 
client, err := elastic.NewClient(opts...)
Loading
Loading
Loading
Loading
@@ -2,8 +2,6 @@ package git
 
import (
"fmt"
"log"
"sync"
 
"srcd.works/go-git.v4"
"srcd.works/go-git.v4/plumbing"
Loading
Loading
@@ -12,7 +10,7 @@ import (
)
 
var (
EndError = fmt.Errorf("Finished") // not really an error
endError = fmt.Errorf("Finished") // not really an error
)
 
type Repo struct {
Loading
Loading
@@ -88,7 +86,7 @@ func (r *Repo) Diff() (difftree.Changes, error) {
return difftree.DiffTree(fromTree, toTree)
}
 
type FileFunc func(file *object.File)
type FileFunc func(file *object.File) error
 
func (r *Repo) EachFileChange(ins, mod, del FileFunc) error {
changes, err := r.Diff()
Loading
Loading
@@ -105,92 +103,37 @@ func (r *Repo) EachFileChange(ins, mod, del FileFunc) error {
switch change.Action {
case difftree.Insert:
toF.Name = change.To.Name
ins(toF)
return ins(toF)
case difftree.Modify:
toF.Name = change.To.Name
mod(toF)
return mod(toF)
case difftree.Delete:
fromF.Name = change.From.Name
del(fromF)
return del(fromF)
}
}
 
return nil
return nil // TODO: should this be an "unrecognised action" error?
}
 
// EachCommit runs `f` for each commit within `fromSHA`...`toSHA` (i.e., the
// inclusive set).
func (r *Repo) EachCommit(f func(*object.Commit)) error {
func (r *Repo) EachCommit(f func(*object.Commit) error) error {
err := object.WalkCommitHistory(r.ToCommit, func(c *object.Commit) error {
f(c)
if err := f(c); err != nil {
return err
}
 
if r.FromCommit != nil && c.ID() == r.FromCommit.ID() {
return EndError
return endError
}
 
return nil
})
 
if err != nil && err != EndError {
if err != nil && err != endError {
return err
}
 
return nil
}
func (r *Repo) IndexCommits() error {
commit := func(c *object.Commit) {
log.Print("Commit: ", c.ID())
// TODO: Read & json-encode the commit so we do the same disc work as the ruby version
//reader, err := c.Blob.Reader()
//if err != nil {
// os.Exit(1)
//}
//json.NewEncoder(ioutil.Discard).Encode(&Req{&ReaderAsJSONString{reader}})
//reader.Close()
}
return r.EachCommit(commit)
}
func (r *Repo) IndexBlobs() error {
insOrModFile := func(file *object.File) {
log.Print("Write: ", file.Name)
// Read & json-encode the file so we do the same work as the Ruby version
//reader, err := file.Blob.Reader()
//if err != nil {
// os.Exit(1)
//}
//json.NewEncoder(ioutil.Discard).Encode(&Req{&ReaderAsJSONString{reader}})
//reader.Close()
}
delFile := func(file *object.File) {
log.Print("Delete: ", file.Name)
}
return r.EachFileChange(insOrModFile, insOrModFile, delFile)
}
func (r *Repo) Index(_ interface{}) error {
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
if err := r.IndexBlobs(); err != nil {
log.Print("Error while indexing blobs:", err)
}
wg.Done()
}()
go func() {
if err := r.IndexCommits(); err != nil {
log.Print("Error while indexing commits:", err)
}
wg.Done()
}()
wg.Wait()
return nil
}
package indexer
import (
"log"
"sync"
"srcd.works/go-git.v4/plumbing/object"
"gitlab.com/gitlab-org/es-git-go/git"
)
type Submitter interface {
Flush() error
}
type Indexer struct {
Repo *git.Repo
Submitter
}
func (i *Indexer) SubmitCommit(c *object.Commit) error {
log.Print("Commit: ", c.ID())
// TODO: touch Submitter
return nil
}
func (i *Indexer) SubmitFile(file *object.File) error {
log.Print("Write: ", file.Name)
// TODO: touch Submitter
// Read & json-encode the file so we do the same work as the Ruby version
//reader, err := file.Blob.Reader()
//if err != nil {
// os.Exit(1)
//}
//json.NewEncoder(ioutil.Discard).Encode(&Req{&ReaderAsJSONString{reader}})
//reader.Close()
return nil
}
func (i *Indexer) RemoveFile(file *object.File) error {
log.Print("Delete: ", file.Name)
// TODO: touch Submitter
return nil
}
func (i *Indexer) IndexCommits() error {
return i.Repo.EachCommit(i.SubmitCommit)
}
func (i *Indexer) IndexBlobs() error {
return i.Repo.EachFileChange(i.SubmitFile, i.SubmitFile, i.RemoveFile)
}
func (i *Indexer) Index() error {
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
if err := i.IndexBlobs(); err != nil {
log.Print("Error while indexing blobs:", err)
}
wg.Done()
}()
go func() {
if err := i.IndexCommits(); err != nil {
log.Print("Error while indexing commits:", err)
}
wg.Done()
}()
wg.Wait()
return i.Submitter.Flush()
}
Loading
Loading
@@ -7,6 +7,7 @@ import (
 
"gitlab.com/gitlab-org/es-git-go/elastic"
"gitlab.com/gitlab-org/es-git-go/git"
"gitlab.com/gitlab-org/es-git-go/indexer"
)
 
func main() {
Loading
Loading
@@ -38,11 +39,10 @@ func main() {
log.Printf("Indexing from %s to %s", repo.FromHash, repo.ToHash)
log.Printf("Project ID: %s, Rails env: %s", projectID, railsEnv)
 
repo.Index(esClient)
idx := &indexer.Indexer{Submitter: esClient, Repo: repo}
if err := idx.Index(); err != nil {
log.Fatalln("Indexing error: ", err)
 
// Flushes any queued requests. Errors mean we've not indexed everything, so
// need to return an error code
if err := esClient.Close(); err != nil {
log.Fatalln("Elasticsearch error: ", err)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment