Transform Job Missing Data

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):

  • OpenSearch version 2.4.1

Describe the issue:

  • After the Transform Job has been completed with aggregated data, we expect each key in the target index to have its transform._doc_count value equal to the number of documents in the source index corresponding to that key. However, there are instances where the transform._doc_count value is less than expected.

  • For example, the key 2024-04-29_CHANNEL1_US_MS01_CT01 has 4 documents in the source index, but its transform._doc_count in the target index is only 3.

Configuration:
transform job:

PUT _plugins/_transform/message-count
{
  "transform": {
    "enabled": true,
    "continuous": true,
    "schedule": {
      "interval": {
        "period": 1,
        "unit": "Minutes",
        "start_time": 1602100553000
      }
    },
    "description": "Transform job",
    "source_index": "stats_source",
    "target_index": "stats_target",
    "data_selection_query": {
      "match_all": { }
    },
    "page_size": 1000,
    "groups": [
      {
        "terms": {
          "source_field": "channelID",
          "target_field": "channelID"
        }
      },
      {
        "terms": {
          "source_field": "contentTypeCode",
          "target_field": "contentTypeCode"
        }
      },
      {
        "terms": {
          "source_field": "messageTypeCode",
          "target_field": "messageTypeCode"
        }
      },
      {
        "terms": {
          "source_field": "nationCode",
          "target_field": "nationCode"
        }
      },
      {
        "date_histogram": {
          "calendar_interval": "1d",
          "source_field": "createDate",
          "target_field": "statsDate"
        }
      }
    ]
  }
}

stats_source index:

PUT /stats_source-000001
{
  "aliases": {
    "stats_source": {
      "is_write_index": true
    }
  },
  "mappings": {
    "properties": {
      "channelID": {
        "type": "keyword"
      },
      "contentTypeCode": {
        "type": "keyword"
      },
      "createDate": {
        "type": "date",
        "format": "strict_date_optional_time||epoch_millis"
      },
      "messageTypeCode": {
        "type": "keyword"
      },
      "nationCode": {
        "type": "keyword"
      }
    }
  }
}

stats_target index:

PUT /stats_target
{
  "settings": {
    "index": {
      "sort.field": "statsDate",
      "sort.order": "asc"
    }
  },
  "mappings": {
    "properties": {
      "channelID": {
        "type": "keyword"
      },
      "contentTypeCode": {
        "type": "keyword"
      },
      "messageTypeCode": {
        "type": "keyword"
      },
      "nationCode": {
        "type": "keyword"
      },
      "statsDate": {
        "type": "date",
        "format": "strict_date_optional_time||epoch_millis"
      }
    }
  }
}

rollover policy:

PUT /_plugins/_ism/policies/active_delete_workflow?pretty 
{
  "policy": {
    "description": "Daily rollover, move to inactive after 7 days, and delete after 14 days",
    "default_state": "active",
    "states": [
      {
        "name": "active",
        "actions": [
          {
            "retry": {
              "count": 3,
              "backoff": "exponential",
              "delay": "1m"
            },
            "rollover": {
              "min_index_age": "1d"
            }
          }
        ],
        "transitions": [
          {
            "state_name": "inactive",
            "conditions": {
              "min_index_age": "7d"
            }
          }
        ]
      },
      {
        "name": "inactive",
        "actions": [
          {
            "alias": {
              "actions": [
                {
                  "remove": {
                    "alias": "stats_source"
                  }
                }
              ]
            }
          }
        ],
        "transitions": [
          {
            "state_name": "delete",
            "conditions": {
              "min_index_age": "14d"
            }
          }
        ]
      },
      {
        "name": "delete",
        "actions": [
          {
            "retry": {
              "count": 3,
              "backoff": "exponential",
              "delay": "1m"
            },
            "delete": { }
          }
        ],
        "transitions": [ ]
      }
    ],
    "ism_template": [
      {
        "index_patterns": [
          "stats_source-*"
        ],
        "priority": 100
      }
    ]
  }
}

Relevant Logs or Screenshots:

  • This issue sometimes occurs in the beta environment with a small deviation but is more frequent in the production environment. Consequently, the error ratio increases with the frequency of incoming data
  • The issue does not stem from the transform logic, as I have experimented with many datasets and the results are still correct
  • There are no errors or warnings related to this issue in the OpenSearch log

I think there are some issues with the OpenSearch’s get and set checkpoints mechanism, which may not handle boundary values well when dealing with high incoming data frequency.

Reproduce the error:

  • Step 1: Create indices stats_source and stats_target.
  • Step 2: Configure and enable a continuous transform job to aggregate data every minute.
  • Step 3: Set up a function to detect incorrect aggregation results.
  • Step 4: Push data to stats_source at a high frequency (~3000 messages per second) and wait to observe incorrect aggregation results (Note: It may take a significant amount of time for the error to occur)