Ingest data from DynamoDB (installed locally)

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):

Data-Prepper-2.6.0

Describe the issue:

Hi there
I have a question about Ingesting data from DynamoDB using Data-Prepper.

We are using DynamoDB and OpenSearch Serverless, we successfully created OpenSearch Ingestion pipelines to sync data from DynamoDB to OpenSearch Serverless Collections (on AWS management console).

Now, we are building our own local development environment, we deployed DynamoDB, OpenSearch and Data-Prepper on Docker Containers. I tried to create a pipeline (shown in below) in order to sync data from local DynamoDB to local OpenSearch, but I got an error “awsAuthenticationConfig must not be null”.

I am not sure how to config awsAuthenticationConfig, since it is a DynamoDB hosted locally. Just wondering Data-Prepper supports ingesting data from DynamoDB to OpenSearch locally? or any suggestions for local development and testing?

Thanks

Configuration:

cdc-pipeline:
  source:
    dynamodb:
      tables:
        - table_arn: "arn:aws:dynamodb:ddblocal:000000000000:table/TableName"
          stream:
            start_position: "LATEST" # Read latest data from streams (Default)
  sink:
    - opensearch:
        hosts: [ "http://localhost:9200" ]
        username: "*****"
        password: "*****"
        insecure: true
        index: "indexName"
        index_type: custom

Relevant Logs or Screenshots:

org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException: Plugin dynamodb in pipeline cdc-pipeline is configured incorrectly: awsAuthenticationConfig must not be null

I’m trying to do same thing using dynamodb and opensearch on a localstack container.
For awsAuthenticationConfig I used region localhost because that was used as region on my local table arn.

  source:
    dynamodb:
      tables:
        - table_arn: "arn:aws:dynamodb:localhost:000000000000:table/tablename"
          # Remove the stream block if only export is needed
          stream:
            start_position: "LATEST"
      aws:
        region: "localhost"

Then I started to get this error and that happens when you use DataPrepper inMemoryCoordinator, then I tried to use a DynamoDB based Coordinator and I wasn’t able to have it pointing to my local dynamoDb since it requires a role

1 Like

I’ve got the same issue, have you found a working solution @RobertoMoreno82?

Hi @duartedb I am not using localstack, but I tried to add aws region: “local” in my pipeline config.

cdc-pipeline:
  source:
    dynamodb:
      tables:
        - table_arn: "arn:aws:dynamodb:ddblocal:000000000000:table/TableName"
          stream:
            start_position: "LATEST" # Read latest data from streams (Default)
  
      aws:
        region: "local"

I still have another issue :querySourcePartitionItemsByStatus is currently not supported in In Memory Store .
I am not sure Data Prepper supports local DynamoDB, I could not find any related doc or examples.

pool-18-thread-1] INFO  org.opensearch.dataprepper.plugins.source.dynamodb.leader.ShardManager - Listing shards (DescribeStream call) took 26 milliseconds with 0 shards found
2024-04-24 21:41:11 2024-04-24T09:41:11,109 [pool-18-thread-1] ERROR org.opensearch.dataprepper.plugins.source.dynamodb.leader.LeaderScheduler - Exception occurred in primary scheduling loop
2024-04-24 21:41:11 java.lang.UnsupportedOperationException: querySourcePartitionItemsByStatus is currently not supported in In Memory Store
2024-04-24 21:41:11     at org.opensearch.dataprepper.plugins.sourcecoordinator.inmemory.InMemorySourceCoordinationStore.querySourcePartitionItemsByStatus(InMemorySourceCoordinationStore.java:63) ~[in-memory-source-coordination-store-2.7.0.jar:?]
2024-04-24 21:41:11     at org.opensearch.dataprepper.sourcecoordination.enhanced.EnhancedLeaseBasedSourceCoordinator.queryCompletedPartitions(EnhancedLeaseBasedSourceCoordinator.java:135) ~[data-prepper-core-2.7.0.jar:?]
2024-04-24 21:41:11     at org.opensearch.dataprepper.plugins.source.dynamodb.leader.LeaderScheduler.run(LeaderScheduler.java:113) [dynamodb-source-2.7.0.jar:?]

My table looks like that

"TableDescription": {
        "AttributeDefinitions": [
            {
                "AttributeName": "ID",
                "AttributeType": "S"
            }
        ],
        "TableName": "MyTable",
        "KeySchema": [
            {
                "AttributeName": "ID",
                "KeyType": "HASH"
            }
        ],
        "TableStatus": "ACTIVE",
        "CreationDateTime": "2024-04-24T21:01:26.302000+12:00",
        "ProvisionedThroughput": {
            "LastIncreaseDateTime": "1970-01-01T13:00:00+13:00",
            "LastDecreaseDateTime": "1970-01-01T13:00:00+13:00",
            "NumberOfDecreasesToday": 0,
            "ReadCapacityUnits": 5,
            "WriteCapacityUnits": 5
        },
        "TableSizeBytes": 0,
        "ItemCount": 0,
        "TableArn": "arn:aws:dynamodb:ddblocal:000000000000:table/MyTable",
        "StreamSpecification": {
            "StreamEnabled": true,
            "StreamViewType": "NEW_AND_OLD_IMAGES"
        },
        "LatestStreamLabel": "2024-04-24T09:01:26.302",
        "LatestStreamArn": "arn:aws:dynamodb:ddblocal:000000000000:table/MyTable/stream/2024-04-24T09:01:26.302"
    }

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.