Anomaly detection not working with simple sample data

Hi,
Versions : opensearchproject/opensearch:latest

I tried using opensearch. I was planning on integrating this with my system for anomaly detection from a time series data. To check the use case of anomaly detection, I tried injecting a simple time series data and then created a detector. But unfortunately, I am not getting any anomalies. I would like to know if this is a bug or if I am doing anything wrong. The code sample that I used for injecting data in python is given below:

from datetime import datetime, timedelta
from opensearchpy import OpenSearch, helpers
import random
import json

# Connect to OpenSearch
client = OpenSearch(
    hosts=[{'host': 'localhost', 'port': 9200}],
    http_auth=('admin', 'testForAnomalyEA432!'),
    use_ssl=True,
    verify_certs=False,
    ssl_show_warn=False,
)

index_name = "bulk_sensor_data_latest"

# Create index if it doesn't exist
if not client.indices.exists(index=index_name):
    client.indices.create(index=index_name, body={
        "mappings": {
            "properties": {
                "timestamp": {"type": "date", "format": "epoch_millis"},
                "value": {"type": "float"}
            }
        }
    })

print("šŸš€ Generating 10 hours of bulk data...")

# Configuration
start_time = datetime.utcnow().replace(second=0, microsecond=0)
records_per_minute = 12  # One record every 5 seconds
total_hours = 10
total_minutes = total_hours * 60
total_records = total_minutes * records_per_minute

# Generate anomaly minutes with intervals
anomaly_minutes = []
current_minute = 4  # First anomaly occurs at minute 4

while current_minute < total_minutes:
    anomaly_minutes.append(current_minute)
    # Next anomaly after 4-6 minutes
    gap = random.randint(10, 20)
    current_minute += gap

# For each anomaly minute, pick 5 random positions
anomaly_positions = {}
for minute in anomaly_minutes:
    anomaly_positions[minute] = random.sample(range(records_per_minute), 5)

print(f"Anomaly minutes: {anomaly_minutes}")
print(f"Number of anomaly clusters: {len(anomaly_minutes)}")

# Function to generate data in batches
def generate_data_batch(batch_size=1000):
    data = []
    record_log = []  # To store records for printing
    
    for i in range(total_records):
        current_minute = i // records_per_minute
        current_minute_record = i % records_per_minute
        
        current_ts = start_time + timedelta(seconds=i * 5)  # 5 seconds interval
        epoch_millis = int(current_ts.timestamp() * 1000)
        
        # Check if this record should have an anomaly
        should_inject_anomaly = (
            current_minute in anomaly_positions and 
            current_minute_record in anomaly_positions[current_minute]
        )
        
        if should_inject_anomaly:
            value = round(random.uniform(5000, 10000), 2)  # Anomaly value
            is_anomaly = True
        else:
            value = round(random.uniform(20, 40), 2)  # Normal value
            is_anomaly = False
        
        # Format record for console output
        record_log.append({
            "minute": current_minute,
            "position": current_minute_record,
            "timestamp": current_ts.strftime("%H:%M:%S"),
            "value": value,
            "is_anomaly": is_anomaly
        })
        
        data.append({
            "_index": index_name,
            "_source": {
                "timestamp": epoch_millis,
                "value": value
            }
        })
        
        # If we've collected enough docs or reached the end, yield the batch along with logs
        if len(data) >= batch_size or i == total_records - 1:
            yield data, record_log
            data = []
            record_log = []

# Bulk insert logic with progress reporting
batch_size = 5000
total_batches = (total_records + batch_size - 1) // batch_size
docs_inserted = 0
anomalies_count = 0

print(f"Total records to generate: {total_records}")
print(f"Total anomalies to inject: {len(anomaly_minutes) * 5}")
print(f"Bulk inserting in batches of {batch_size}...")

# Perform the bulk insert
for batch_num, (batch, record_log) in enumerate(generate_data_batch(batch_size), 1):
    # Count anomalies in this batch
    batch_anomalies = sum(1 for doc in batch if doc["_source"]["value"] > 100)
    anomalies_count += batch_anomalies
    
    # Print some records from this batch to show anomalies
    print("\nSample data from current batch:")
    print("-" * 60)
    print(f"{'MINUTE':^8}|{'POS':^5}|{'TIMESTAMP':^10}|{'VALUE':^10}|{'ANOMALY':^8}")
    print("-" * 60)
    
    # Only print records that are anomalies or around anomalies (to avoid too much output)
    anomaly_indexes = [i for i, record in enumerate(record_log) if record["is_anomaly"]]
    records_to_print = set()
    
    for idx in anomaly_indexes:
        # Add the anomaly and some records before/after it
        for j in range(max(0, idx-2), min(len(record_log), idx+3)):
            records_to_print.add(j)
    
    # If no anomalies in this batch, just print first few records
    if not records_to_print:
        records_to_print = set(range(min(5, len(record_log))))
    
    # Print the selected records
    for i in sorted(records_to_print):
        record = record_log[i]
        anomaly_marker = "āš ļø" if record["is_anomaly"] else ""
        print(f"{record['minute']:^8}|{record['position']:^5}|{record['timestamp']:^10}|{record['value']:^10.2f}|{anomaly_marker:^8}")
    
    print("-" * 60)
    
    # Insert the batch
    success, failed = helpers.bulk(client, batch, stats_only=True)
    docs_inserted += success
    
    # Report progress
    progress = int((batch_num / total_batches) * 100)
    print(f"Progress: {progress}% - Inserted {docs_inserted}/{total_records} records, {anomalies_count} anomalies")

print(f"\nāœ… Completed! {docs_inserted} records inserted with {anomalies_count} anomalies")
print(f"Data spans from {start_time} to {start_time + timedelta(hours=total_hours)}")

# Print a summary of all anomaly clusters
print("\nSummary of Anomaly Clusters:")
print("-" * 40)
for minute in sorted(anomaly_positions.keys()):
    minute_start = start_time + timedelta(minutes=minute)
    positions = sorted(anomaly_positions[minute])
    print(f"Minute {minute} ({minute_start.strftime('%H:%M')}): Positions {positions}")

Then I have added detector configuration:

{
    "name": "sensor_avg_detector_live",
    "description": "Detector for value anomalies",
    "time_field": "timestamp",
    "indices": [
        "bulk_sensor_data_latest"
    ],
    "feature_attributes": [
        {
            "feature_name": "value_avg",
            "feature_enabled": true,
            "aggregation_query": {
                "value_avg": {
                    "avg": {
                        "field": "value"
                    }
                }
            }
        }
    ],
    "detection_interval": {
        "period": {
            "interval": 1,
            "unit": "Minutes"
        }
    },
    "window_delay": {
        "period": {
            "interval": 1,
            "unit": "Minutes"
        }
    },
    "shingle_size": 5
}

After starting the detector, It is not recognizing the huge spike in certain minutes as an anomaly, can I know if that is a bug or am I doing something wrong?

Thanks