Hi,
Versions : opensearchproject/opensearch:latest
I tried using opensearch. I was planning on integrating this with my system for anomaly detection from a time series data. To check the use case of anomaly detection, I tried injecting a simple time series data and then created a detector. But unfortunately, I am not getting any anomalies. I would like to know if this is a bug or if I am doing anything wrong. The code sample that I used for injecting data in python is given below:
from datetime import datetime, timedelta
from opensearchpy import OpenSearch, helpers
import random
import json
# Connect to OpenSearch
client = OpenSearch(
hosts=[{'host': 'localhost', 'port': 9200}],
http_auth=('admin', 'testForAnomalyEA432!'),
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
)
index_name = "bulk_sensor_data_latest"
# Create index if it doesn't exist
if not client.indices.exists(index=index_name):
client.indices.create(index=index_name, body={
"mappings": {
"properties": {
"timestamp": {"type": "date", "format": "epoch_millis"},
"value": {"type": "float"}
}
}
})
print("š Generating 10 hours of bulk data...")
# Configuration
start_time = datetime.utcnow().replace(second=0, microsecond=0)
records_per_minute = 12 # One record every 5 seconds
total_hours = 10
total_minutes = total_hours * 60
total_records = total_minutes * records_per_minute
# Generate anomaly minutes with intervals
anomaly_minutes = []
current_minute = 4 # First anomaly occurs at minute 4
while current_minute < total_minutes:
anomaly_minutes.append(current_minute)
# Next anomaly after 4-6 minutes
gap = random.randint(10, 20)
current_minute += gap
# For each anomaly minute, pick 5 random positions
anomaly_positions = {}
for minute in anomaly_minutes:
anomaly_positions[minute] = random.sample(range(records_per_minute), 5)
print(f"Anomaly minutes: {anomaly_minutes}")
print(f"Number of anomaly clusters: {len(anomaly_minutes)}")
# Function to generate data in batches
def generate_data_batch(batch_size=1000):
data = []
record_log = [] # To store records for printing
for i in range(total_records):
current_minute = i // records_per_minute
current_minute_record = i % records_per_minute
current_ts = start_time + timedelta(seconds=i * 5) # 5 seconds interval
epoch_millis = int(current_ts.timestamp() * 1000)
# Check if this record should have an anomaly
should_inject_anomaly = (
current_minute in anomaly_positions and
current_minute_record in anomaly_positions[current_minute]
)
if should_inject_anomaly:
value = round(random.uniform(5000, 10000), 2) # Anomaly value
is_anomaly = True
else:
value = round(random.uniform(20, 40), 2) # Normal value
is_anomaly = False
# Format record for console output
record_log.append({
"minute": current_minute,
"position": current_minute_record,
"timestamp": current_ts.strftime("%H:%M:%S"),
"value": value,
"is_anomaly": is_anomaly
})
data.append({
"_index": index_name,
"_source": {
"timestamp": epoch_millis,
"value": value
}
})
# If we've collected enough docs or reached the end, yield the batch along with logs
if len(data) >= batch_size or i == total_records - 1:
yield data, record_log
data = []
record_log = []
# Bulk insert logic with progress reporting
batch_size = 5000
total_batches = (total_records + batch_size - 1) // batch_size
docs_inserted = 0
anomalies_count = 0
print(f"Total records to generate: {total_records}")
print(f"Total anomalies to inject: {len(anomaly_minutes) * 5}")
print(f"Bulk inserting in batches of {batch_size}...")
# Perform the bulk insert
for batch_num, (batch, record_log) in enumerate(generate_data_batch(batch_size), 1):
# Count anomalies in this batch
batch_anomalies = sum(1 for doc in batch if doc["_source"]["value"] > 100)
anomalies_count += batch_anomalies
# Print some records from this batch to show anomalies
print("\nSample data from current batch:")
print("-" * 60)
print(f"{'MINUTE':^8}|{'POS':^5}|{'TIMESTAMP':^10}|{'VALUE':^10}|{'ANOMALY':^8}")
print("-" * 60)
# Only print records that are anomalies or around anomalies (to avoid too much output)
anomaly_indexes = [i for i, record in enumerate(record_log) if record["is_anomaly"]]
records_to_print = set()
for idx in anomaly_indexes:
# Add the anomaly and some records before/after it
for j in range(max(0, idx-2), min(len(record_log), idx+3)):
records_to_print.add(j)
# If no anomalies in this batch, just print first few records
if not records_to_print:
records_to_print = set(range(min(5, len(record_log))))
# Print the selected records
for i in sorted(records_to_print):
record = record_log[i]
anomaly_marker = "ā ļø" if record["is_anomaly"] else ""
print(f"{record['minute']:^8}|{record['position']:^5}|{record['timestamp']:^10}|{record['value']:^10.2f}|{anomaly_marker:^8}")
print("-" * 60)
# Insert the batch
success, failed = helpers.bulk(client, batch, stats_only=True)
docs_inserted += success
# Report progress
progress = int((batch_num / total_batches) * 100)
print(f"Progress: {progress}% - Inserted {docs_inserted}/{total_records} records, {anomalies_count} anomalies")
print(f"\nā
Completed! {docs_inserted} records inserted with {anomalies_count} anomalies")
print(f"Data spans from {start_time} to {start_time + timedelta(hours=total_hours)}")
# Print a summary of all anomaly clusters
print("\nSummary of Anomaly Clusters:")
print("-" * 40)
for minute in sorted(anomaly_positions.keys()):
minute_start = start_time + timedelta(minutes=minute)
positions = sorted(anomaly_positions[minute])
print(f"Minute {minute} ({minute_start.strftime('%H:%M')}): Positions {positions}")
Then I have added detector configuration:
{
"name": "sensor_avg_detector_live",
"description": "Detector for value anomalies",
"time_field": "timestamp",
"indices": [
"bulk_sensor_data_latest"
],
"feature_attributes": [
{
"feature_name": "value_avg",
"feature_enabled": true,
"aggregation_query": {
"value_avg": {
"avg": {
"field": "value"
}
}
}
}
],
"detection_interval": {
"period": {
"interval": 1,
"unit": "Minutes"
}
},
"window_delay": {
"period": {
"interval": 1,
"unit": "Minutes"
}
},
"shingle_size": 5
}
After starting the detector, It is not recognizing the huge spike in certain minutes as an anomaly, can I know if that is a bug or am I doing something wrong?
Thanks