Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):
- OpenSearch version 2.4.1
Describe the issue:
-
After the Transform Job has been completed with aggregated data, we expect each key in the target index to have its transform._doc_count value equal to the number of documents in the source index corresponding to that key. However, there are instances where the transform._doc_count value is less than expected.
-
For example, the key 2024-04-29_CHANNEL1_US_MS01_CT01 has 4 documents in the source index, but its transform._doc_count in the target index is only 3.
Configuration:
transform job:
PUT _plugins/_transform/message-count
{
"transform": {
"enabled": true,
"continuous": true,
"schedule": {
"interval": {
"period": 1,
"unit": "Minutes",
"start_time": 1602100553000
}
},
"description": "Transform job",
"source_index": "stats_source",
"target_index": "stats_target",
"data_selection_query": {
"match_all": { }
},
"page_size": 1000,
"groups": [
{
"terms": {
"source_field": "channelID",
"target_field": "channelID"
}
},
{
"terms": {
"source_field": "contentTypeCode",
"target_field": "contentTypeCode"
}
},
{
"terms": {
"source_field": "messageTypeCode",
"target_field": "messageTypeCode"
}
},
{
"terms": {
"source_field": "nationCode",
"target_field": "nationCode"
}
},
{
"date_histogram": {
"calendar_interval": "1d",
"source_field": "createDate",
"target_field": "statsDate"
}
}
]
}
}
stats_source index:
PUT /stats_source-000001
{
"aliases": {
"stats_source": {
"is_write_index": true
}
},
"mappings": {
"properties": {
"channelID": {
"type": "keyword"
},
"contentTypeCode": {
"type": "keyword"
},
"createDate": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"messageTypeCode": {
"type": "keyword"
},
"nationCode": {
"type": "keyword"
}
}
}
}
stats_target index:
PUT /stats_target
{
"settings": {
"index": {
"sort.field": "statsDate",
"sort.order": "asc"
}
},
"mappings": {
"properties": {
"channelID": {
"type": "keyword"
},
"contentTypeCode": {
"type": "keyword"
},
"messageTypeCode": {
"type": "keyword"
},
"nationCode": {
"type": "keyword"
},
"statsDate": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
}
}
}
}
rollover policy:
PUT /_plugins/_ism/policies/active_delete_workflow?pretty
{
"policy": {
"description": "Daily rollover, move to inactive after 7 days, and delete after 14 days",
"default_state": "active",
"states": [
{
"name": "active",
"actions": [
{
"retry": {
"count": 3,
"backoff": "exponential",
"delay": "1m"
},
"rollover": {
"min_index_age": "1d"
}
}
],
"transitions": [
{
"state_name": "inactive",
"conditions": {
"min_index_age": "7d"
}
}
]
},
{
"name": "inactive",
"actions": [
{
"alias": {
"actions": [
{
"remove": {
"alias": "stats_source"
}
}
]
}
}
],
"transitions": [
{
"state_name": "delete",
"conditions": {
"min_index_age": "14d"
}
}
]
},
{
"name": "delete",
"actions": [
{
"retry": {
"count": 3,
"backoff": "exponential",
"delay": "1m"
},
"delete": { }
}
],
"transitions": [ ]
}
],
"ism_template": [
{
"index_patterns": [
"stats_source-*"
],
"priority": 100
}
]
}
}
Relevant Logs or Screenshots:
- This issue sometimes occurs in the beta environment with a small deviation but is more frequent in the production environment. Consequently, the error ratio increases with the frequency of incoming data
- The issue does not stem from the transform logic, as I have experimented with many datasets and the results are still correct
- There are no errors or warnings related to this issue in the OpenSearch log
I think there are some issues with the OpenSearch’s get and set checkpoints mechanism, which may not handle boundary values well when dealing with high incoming data frequency.
Reproduce the error:
- Step 1: Create indices stats_source and stats_target.
- Step 2: Configure and enable a continuous transform job to aggregate data every minute.
- Step 3: Set up a function to detect incorrect aggregation results.
- Step 4: Push data to stats_source at a high frequency (~3000 messages per second) and wait to observe incorrect aggregation results (Note: It may take a significant amount of time for the error to occur)