Skip to content
Snippets Groups Projects
Commit 5d02dc60 authored by Nick Thomas's avatar Nick Thomas
Browse files

First attempt at actually indexing blobs and commits

parent 60ca8cb8
No related branches found
No related tags found
1 merge request!1Initial implementation of an elasticsearch indexer in Go
Loading
Loading
@@ -22,13 +22,15 @@ var (
)
 
type Client struct {
client *elastic.Client
bulk *elastic.BulkProcessor
IndexName string
ProjectID string
client *elastic.Client
bulk *elastic.BulkProcessor
}
 
// FromEnv creates an Elasticsearch client from the `ELASTIC_CONNECTION_INFO`
// environment variable
func FromEnv() (*Client, error) {
func FromEnv(projectID string) (*Client, error) {
data := strings.NewReader(os.Getenv("ELASTIC_CONNECTION_INFO"))
 
config, err := ReadConfig(data)
Loading
Loading
@@ -36,6 +38,15 @@ func FromEnv() (*Client, error) {
return nil, fmt.Errorf("Couldn't parse ELASTIC_CONNECTION_INFO: %s", err)
}
 
railsEnv := os.Getenv("RAILS_ENV")
indexName := "gitlab"
if railsEnv != "" {
indexName = indexName + "-" + railsEnv
}
config.IndexName = indexName
config.ProjectID = projectID
return NewClient(config)
}
 
Loading
Loading
@@ -78,21 +89,43 @@ func NewClient(config *Config) (*Client, error) {
return nil, err
}
 
return &Client{client: client, bulk: bulk}, nil
return &Client{
IndexName: config.IndexName,
ProjectID: config.ProjectID,
client: client,
bulk: bulk,
}, nil
}
func (c *Client) ParentID() string {
return c.ProjectID
}
 
func (c *Client) Flush() error {
return c.bulk.Flush()
}
 
func (c *Client) IndexCommit(indexName string, commit interface{}) error {
return fmt.Errorf("TODO")
}
func (c *Client) Index(id string, thing interface{}) error {
req := elastic.NewBulkIndexRequest().
Index(c.IndexName).
Type("repository").
Parent(c.ProjectID).
Id(id).
Doc(thing)
c.bulk.Add(req)
 
func (c *Client) IndexBlob(indexName string, blob interface{}) error {
return fmt.Errorf("TODO")
return nil
}
 
func (c *Client) RemoveBlob(indexName string, blob interface{}) error {
return fmt.Errorf("TODO")
func (c *Client) Remove(id string) error {
req := elastic.NewBulkDeleteRequest().
Index(c.IndexName).
Type("repository").
Parent(c.ProjectID).
Id(id)
c.bulk.Add(req)
return nil
}
Loading
Loading
@@ -6,6 +6,8 @@ import (
)
 
type Config struct {
IndexName string `json:"-"`
ProjectID string `json:"-"`
URL []string `json:"url"`
AWS bool `json:"aws"`
Region string `json:"aws_region"`
Loading
Loading
Loading
Loading
@@ -520,8 +520,8 @@ var indexMapping = `
`
 
// CreateIndex creates an index matching that created by gitlab-elasticsearch-git v1.1.1
func (c *Client) CreateIndex(indexName string) error {
createIndex, err := c.client.CreateIndex(indexName).BodyString(indexMapping).Do(context.Background())
func (c *Client) CreateIndex() error {
createIndex, err := c.client.CreateIndex(c.IndexName).BodyString(indexMapping).Do(context.Background())
if err != nil {
return err
}
Loading
Loading
@@ -533,8 +533,8 @@ func (c *Client) CreateIndex(indexName string) error {
return nil
}
 
func (c *Client) DeleteIndex(indexName string) error {
deleteIndex, err := c.client.DeleteIndex(indexName).Do(context.Background())
func (c *Client) DeleteIndex() error {
deleteIndex, err := c.client.DeleteIndex(c.IndexName).Do(context.Background())
if err != nil {
return err
}
Loading
Loading
Loading
Loading
@@ -8,6 +8,8 @@ import (
)
 
type Blob struct {
ID string `json:"-"`
Type string `json:"type"`
OID string `json:"oid"`
RepoID string `json:"rid"`
CommitSHA string `json:"commit_sha"`
Loading
Loading
@@ -29,11 +31,11 @@ type Blob struct {
Language string `json:"language"`
}
 
func (b *Blob) ID() string {
return fmt.Sprintf("%s_%s", b.CommitSHA, b.Path)
func GenerateBlobID(parentID, filename string) string {
return fmt.Sprintf("%s_%s", parentID, filename)
}
 
func BuildBlob(file *object.File, commitSHA, repoID string) (*Blob, error) {
func BuildBlob(file *object.File, commitSHA, parentID string) (*Blob, error) {
reader, err := file.Blob.Reader()
if err != nil {
return nil, err
Loading
Loading
@@ -50,8 +52,10 @@ func BuildBlob(file *object.File, commitSHA, repoID string) (*Blob, error) {
content := string(b)
 
return &Blob{
Type: "blob",
ID: GenerateBlobID(parentID, file.Name),
OID: file.Blob.Hash.String(),
RepoID: repoID,
RepoID: parentID,
CommitSHA: commitSHA,
Content: content,
Path: file.Name,
Loading
Loading
Loading
Loading
@@ -7,22 +7,28 @@ import (
)
 
type Commit struct {
Type string `json:"type"`
ID string `json:"id"`
Author *Person `json:"author"`
Committer *Person `json:"commiter"`
ID string `json:"id"` // ${RepoID}_${SHA}
RepoID string `json:"rid"`
Message string `json:"message"`
SHA string `json:"sha"`
}
 
func BuildCommit(c *object.Commit, repoID string) *Commit {
func GenerateCommitID(parentID, commitSHA string) string {
return fmt.Sprintf("%s_%s", parentID, commitSHA)
}
func BuildCommit(c *object.Commit, parentID string) *Commit {
sha := c.Hash.String()
 
return &Commit{
Type: "commit",
Author: BuildPerson(c.Author),
Committer: BuildPerson(c.Committer),
ID: fmt.Sprintf("%s_%s", repoID, sha),
RepoID: repoID,
ID: GenerateCommitID(parentID, sha),
RepoID: parentID,
Message: c.Message,
SHA: sha,
}
Loading
Loading
Loading
Loading
@@ -10,46 +10,47 @@ import (
)
 
type Submitter interface {
IndexCommit(indexName string, commit interface{}) error
IndexBlob(indexName string, blob interface{}) error
RemoveBlob(indexName string, blob interface{}) error
ParentID() string
Index(id string, thing interface{}) error
Remove(id string) error
 
Flush() error
}
 
type Indexer struct {
IndexName string
ProjectID string
Repo *git.Repo
Repo *git.Repo
Submitter
}
 
// FIXME: none of the indexers worry about encoding right now
 
func (i *Indexer) SubmitCommit(c *object.Commit) error {
commit := BuildCommit(c, i.ProjectID)
log.Print("Commit: ", c.Hash.String())
 
return i.Submitter.IndexCommit(i.IndexName, commit)
commit := BuildCommit(c, i.Submitter.ParentID())
return i.Submitter.Index(commit.ID, commit)
}
 
func (i *Indexer) SubmitBlob(f *object.File, _, toCommit *object.Commit) error {
log.Print("Write: ", f.Name)
 
// TODO(nick): Existing code doesn't index blobs > 10MiB in size
// FIXME(nick): Not sure commitSHA is right, or how it works at all
 
blob, err := BuildBlob(f, i.ProjectID, toCommit.Hash.String())
blob, err := BuildBlob(f, toCommit.Hash.String(), i.Submitter.ParentID())
if err != nil {
return err
}
 
return i.Submitter.IndexBlob(i.IndexName, blob)
return i.Submitter.Index(blob.ID, blob)
}
 
func (i *Indexer) RemoveBlob(file *object.File, _, toCommit *object.Commit) error {
log.Print("Delete: ", file.Name)
blob := &Blob{RepoID: i.ProjectID, CommitSHA: toCommit.Hash.String()}
blobID := GenerateBlobID(toCommit.Hash.String(), i.Submitter.ParentID())
 
return i.Submitter.RemoveBlob(i.IndexName, blob) // should be the blob id
return i.Submitter.Remove(blobID)
}
 
func (i *Indexer) IndexCommits() error {
Loading
Loading
Loading
Loading
@@ -16,6 +16,10 @@ var (
binary = flag.String("binary", "./bin/es-git-go", "Path to `es-git-go` binary for integration tests")
)
 
const (
projectID = "667"
)
func checkDeps(t *testing.T) {
if os.Getenv("ELASTIC_CONNECTION_INFO") == "" {
t.Log("ELASTIC_CONNECTION_INFO not set")
Loading
Loading
@@ -35,15 +39,15 @@ func checkDeps(t *testing.T) {
 
func buildIndex(t *testing.T) (string, func()) {
railsEnv := fmt.Sprintf("test-%d", time.Now().Unix())
indexName := fmt.Sprintf("gitlab-" + railsEnv)
os.Setenv("RAILS_ENV", railsEnv)
 
client, err := elastic.FromEnv()
client, err := elastic.FromEnv(projectID)
assert.NoError(t, err)
 
assert.NoError(t, client.CreateIndex(indexName))
assert.NoError(t, client.CreateIndex())
 
return railsEnv, func() {
client.DeleteIndex(indexName)
client.DeleteIndex()
}
}
 
Loading
Loading
Loading
Loading
@@ -18,33 +18,25 @@ func main() {
projectPath := os.Args[2]
fromSHA := os.Getenv("FROM_SHA")
toSHA := os.Getenv("TO_SHA")
railsEnv := os.Getenv("RAILS_ENV")
indexName := "gitlab"
if railsEnv != "" {
indexName = indexName + "-" + railsEnv
}
 
repo, err := git.NewRepo(projectPath, fromSHA, toSHA)
if err != nil {
log.Fatalf("Failed to open %s: %s", projectPath, err)
}
 
log.Printf("Indexing from %s to %s", repo.FromHash, repo.ToHash)
log.Printf("Project ID: %s, Rails env: %s, index: %s", projectID, railsEnv, indexName)
esClient, err := elastic.FromEnv()
esClient, err := elastic.FromEnv(projectID)
if err != nil {
log.Fatalln(err)
}
 
idx := &indexer.Indexer{
IndexName: indexName,
ProjectID: projectID,
Submitter: esClient,
Repo: repo,
}
 
log.Printf("Indexing from %s to %s", repo.FromHash, repo.ToHash)
log.Printf("Index: %s, Project ID: %s", esClient.IndexName, esClient.ParentID())
if err := idx.Index(); err != nil {
log.Fatalln("Indexing error: ", err)
 
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment