Skip to content
Snippets Groups Projects
Commit ec2d0762 authored by Nick Thomas's avatar Nick Thomas
Browse files

Implement the 1MiB text-only blob indexing limits

This involves some refactoring to make the error handling clearer
parent 84370c64
No related branches found
No related tags found
1 merge request!1Initial implementation of an elasticsearch indexer in Go
Pipeline #
Loading
Loading
@@ -20,8 +20,8 @@ const (
// TODO: make this configurable / detectable.
// Limiting to 10MiB lets us work on small AWS clusters, but unnecessarily
// increases round trips in larger or non-AWS clusters
MaxBulkSize = 10 * 1024 * 1024
BulkWorkers = 10
MaxBulkSize = 10 * 1024 * 1024
BulkWorkers = 10
)
 
type Client struct {
Loading
Loading
@@ -104,16 +104,11 @@ func (c *Client) ParentID() string {
return c.ProjectID
}
 
// FIXME(nick): this should reserve some space for encoding
func (c *Client) SubmissionLimit() int64 {
return MaxBulkSize
}
func (c *Client) Flush() error {
return c.bulk.Flush()
}
 
func (c *Client) Index(id string, thing interface{}) error {
func (c *Client) Index(id string, thing interface{}) {
req := elastic.NewBulkIndexRequest().
Index(c.IndexName).
Type("repository").
Loading
Loading
@@ -122,11 +117,9 @@ func (c *Client) Index(id string, thing interface{}) error {
Doc(thing)
 
c.bulk.Add(req)
return nil
}
 
func (c *Client) Remove(id string) error {
func (c *Client) Remove(id string) {
req := elastic.NewBulkDeleteRequest().
Index(c.IndexName).
Type("repository").
Loading
Loading
@@ -134,6 +127,4 @@ func (c *Client) Remove(id string) error {
Id(id)
 
c.bulk.Add(req)
return nil
}
package indexer
 
import (
"bytes"
"fmt"
"io/ioutil"
 
"srcd.works/go-git.v4/plumbing/object"
)
 
var (
skipTooLargeBlob = fmt.Errorf("Blob should be skipped: Too large")
skipBinaryBlob = fmt.Errorf("Blob should be skipped: binary")
)
const (
binarySearchLimit = 8 * 1024 // 8 KiB, Same as git
maxBlobSize = 1024 * 1024 // 1MiB, same as gitlab-elasticsearch-git
)
func isSkipBlobErr(err error) bool {
switch err {
case skipTooLargeBlob:
return true
case skipBinaryBlob:
return true
}
return false
}
type Blob struct {
ID string `json:"-"`
Type string `json:"type"`
ID string `json:"-"`
OID string `json:"oid"`
RepoID string `json:"rid"`
CommitSHA string `json:"commit_sha"`
Loading
Loading
@@ -35,7 +57,13 @@ func GenerateBlobID(parentID, filename string) string {
return fmt.Sprintf("%s_%s", parentID, filename)
}
 
func BuildBlob(file *object.File, commitSHA, parentID string) (*Blob, error) {
func (i *Indexer) BuildBlob(file *object.File, commitSHA string) (*Blob, error) {
parentID := i.Submitter.ParentID()
if file.Blob.Size > maxBlobSize {
return nil, skipTooLargeBlob
}
reader, err := file.Blob.Reader()
if err != nil {
return nil, err
Loading
Loading
@@ -49,6 +77,11 @@ func BuildBlob(file *object.File, commitSHA, parentID string) (*Blob, error) {
if err != nil {
return nil, err
}
if DetectBinary(b) {
return nil, skipBinaryBlob
}
content := string(b)
 
return &Blob{
Loading
Loading
@@ -68,3 +101,14 @@ func BuildBlob(file *object.File, commitSHA, parentID string) (*Blob, error) {
func DetectLanguage(data []byte) string {
return "Text"
}
// Check whether the passed-in data contains a NUL byte. Only scan the start of
// large blobs. This is the same test performed by git to check text/binary
func DetectBinary(data []byte) bool {
searchLimit := binarySearchLimit
if len(data) < searchLimit {
searchLimit = len(data)
}
return bytes.Contains(data[:searchLimit], []byte{0})
}
Loading
Loading
@@ -20,8 +20,9 @@ func GenerateCommitID(parentID, commitSHA string) string {
return fmt.Sprintf("%s_%s", parentID, commitSHA)
}
 
func BuildCommit(c *object.Commit, parentID string) *Commit {
func (i *Indexer) BuildCommit(c *object.Commit) *Commit {
sha := c.Hash.String()
parentID := i.Submitter.ParentID()
 
return &Commit{
Type: "commit",
Loading
Loading
package indexer
 
import (
"fmt"
"log"
 
"srcd.works/go-git.v4/plumbing/object"
Loading
Loading
@@ -10,10 +11,9 @@ import (
 
type Submitter interface {
ParentID() string
SubmissionLimit() int64
 
Index(id string, thing interface{}) error
Remove(id string) error
Index(id string, thing interface{})
Remove(id string)
 
Flush() error
}
Loading
Loading
@@ -26,30 +26,33 @@ type Indexer struct {
// FIXME: none of the indexers worry about encoding right now
 
func (i *Indexer) SubmitCommit(c *object.Commit) error {
commit := BuildCommit(c, i.Submitter.ParentID())
return i.Submitter.Index(commit.ID, commit)
commit := i.BuildCommit(c)
i.Submitter.Index(commit.ID, commit)
return nil
}
 
func (i *Indexer) SubmitBlob(f *object.File, _, toCommit *object.Commit) error {
// FIXME(nick): Not sure commitSHA is right, or how it works at all
 
if f.Blob.Size > i.Submitter.SubmissionLimit() {
log.Printf("Skipping %s: too large: %dMiB", f.Name, f.Blob.Size/(1024*1024))
return nil
}
blob, err := BuildBlob(f, toCommit.Hash.String(), i.Submitter.ParentID())
blob, err := i.BuildBlob(f, toCommit.Hash.String())
if err != nil {
return err
if isSkipBlobErr(err) {
return nil
}
return fmt.Errorf("Blob %s: %s", f.Name, err)
}
 
return i.Submitter.Index(blob.ID, blob)
i.Submitter.Index(blob.ID, blob)
return nil
}
 
func (i *Indexer) RemoveBlob(file *object.File, _, toCommit *object.Commit) error {
blobID := GenerateBlobID(toCommit.Hash.String(), i.Submitter.ParentID())
 
return i.Submitter.Remove(blobID)
i.Submitter.Remove(blobID)
return nil
}
 
func (i *Indexer) IndexCommits() error {
Loading
Loading
@@ -62,14 +65,18 @@ func (i *Indexer) IndexBlobs() error {
 
func (i *Indexer) Index() error {
if err := i.IndexBlobs(); err != nil {
log.Print("Error while indexing blobs:", err)
log.Print("Error while indexing blobs: ", err)
return err
}
 
if err := i.IndexCommits(); err != nil {
log.Print("Error while indexing commits:", err)
log.Print("Error while indexing commits: ", err)
return err
}
 
return i.Submitter.Flush()
if err := i.Submitter.Flush(); err != nil {
log.Print("Error while flushing requests: ", err)
}
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment