Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):
OpenSearch_2_9_R20230928-P3 (latest)
Describe the issue:
I’m using threadpool and slices to speed up a search, but it’s not performing as expected. Wanted to see if it’s normal and possible workarounds. There are around 160 million hits and I’m trying to fetch 512x1 vector embeddings along with another keyword field to be used as primary key. I’d also like to know which CloudWatch metrics I could be looking at to figure out if I’m getting the most out of the cluster in that case (should I take a look at any specific throughput metric? Which ones?) The estimated size of the query is 1.7TB and I’m fetching at around 0.55GB/min, which would mean 55 hours for completion and is kind of unacceptable in my case. So I just want to know if there’s anything I could improve in my coding, or where/how I can check if I’m really using the cluster to its max potential…
Configuration:
Instance type
i3.xlarge.search
Number of nodes
10
Running on an EC2 instance of type r5.2xlarge
Relevant Logs or Screenshots:
import os
from elasticsearch import Elasticsearch
import logging
from concurrent.futures import ThreadPoolExecutor
import json
from datetime import datetime
# Configure logging
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S'
)
opensearch_endpoint = 'my endpoint there'
# Connect to your OpenSearch instance
es = Elasticsearch(hosts=opensearch_endpoint, port=80, timeout=120)
index_name = 'sentences'
size = 10000
# Get shard information for the specific index
shard_info = es.cat.shards(index=index_name, format='json')
num_shards = len(shard_info)
# Up to 20 slices per shard
num_slices = num_shards * 20
max_workers = num_shards
print(f"num slices: {num_slices}")
print(f"num workers: {max_workers}")
def process_slice(slice_id, timestamp):
query = {
"slice": {
"id": slice_id,
"max": num_slices
},
"query": {
"match_all": {}
},
"size": size
}
time = '5m'
response = es.search(index=index_name, body=query, scroll=time, size=size)
hits = response['hits']['hits']
total_hits = response['hits']['total']['value']
# Save the data to a JSON file with the timestamp in the name
filename = f"./output/slice_{slice_id}_{timestamp}.json"
with open(filename, 'w') as json_file:
for doc in hits:
data = {
"Sentence_ID": doc['_source']['Sentence_ID'],
"Embedding": doc['_source']['Embedding'] # Extract Embedding data
}
json.dump(data, json_file)
json_file.write('\n')
while len(hits) < total_hits:
scroll_id = response['_scroll_id']
response = es.scroll(scroll_id=scroll_id, scroll=time)
hits = response['hits']['hits']
with open(filename, 'a') as json_file:
for doc in hits:
data = {
"Sentence_ID": doc['_source']['Sentence_ID'],
"Embedding": doc['_source']['Embedding']
}
json.dump(data, json_file)
json_file.write('\n')
logging.info(f"Processed {len(hits)} documents in slice {slice_id}")
logging.info(f"Data for slice {slice_id} saved to {filename}")
# Create a text file to indicate that the slice has been processed
processed_filename = f"./output/slice_{slice_id}_processed.txt"
with open(processed_filename, 'w') as file:
file.write(f"Slice {slice_id} processed successfully.")
logging.info(f"Processed file for slice {slice_id} created at {processed_filename}")
# Generate a timestamp
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
# os.mkdir(f'output')
# Use a ThreadPoolExecutor to process slices in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_slice, i, timestamp) for i in range(num_slices)]