Intended usage of BulkIndexer

What is the intended usage of the BulkIndexer struct in the opensearchutil package - as a long-lived object, where one calls Close() on program shutdown, or as a short-lived object where one calls Close() after each “bulk indexing event”.

My team are using it in the former manner. However, we are seeing our program using a large amount of memory. Turning on profiling, we can see that the run method calls bytes.makeSlice, which holds onto a large amount of heap memory; 50% of the overall heap allocations, ~500MB in our test case.

We would appreciate any guidance in this. I’ve included two example code snippets below to help illustrate our current pattern, and the pattern we are questioning whether we should be using.

TIA :slight_smile:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/opensearch-project/opensearch-go"
	"github.com/opensearch-project/opensearch-go/opensearchutil"
)

func main() {
	client, _ := opensearch.NewDefaultClient()
	bulkIndexer, _ := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
		Client: client,
		FlushBytes:    15e6,
		FlushInterval: time.Second * 30,
		Refresh:       "false",
		Timeout:       time.Second,
		OnError: func(ctx context.Context, err error) {
			log.Println(fmt.Errorf("bulk item indexer failed %w", err))
		},
	})
	defer bulkIndexer.Close(context.Background())

	for {
		// Read batch of messages from Kafka.
		// Use bulk indexer to index messages to Opensearch.
		// Repeat
	}
}

vs…

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/opensearch-project/opensearch-go"
	"github.com/opensearch-project/opensearch-go/opensearchutil"
)

func main() {
	client, _ := opensearch.NewDefaultClient()

	for {
		bulkIndexer, _ := opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
			Client: client,
			FlushBytes:    15e6,
			FlushInterval: time.Second * 30,
			Refresh:       "false",
			Timeout:       time.Second,
			OnError: func(ctx context.Context, err error) {
				log.Println(fmt.Errorf("bulk item indexer failed %w", err))
			},
		})
		// Read batch of messages from Kafka.
		// Use bulk indexer to index messages to Opensearch.
		bulkIndexer.Close(context.Background())
		// Repeat
	}
}