Hey guys,
after reading countless documentations to AWS and Data Prepper for a couple of days I’ve come to the conclusion that I don’t understand how the authentication to the SQS Queue for log ingestion out of S3 works.
As far as I understand, I have to create an IAM Role that has access to the SQS queue that I created, and I call that queue via the “sts_role_arn” parameter in the pipeline.yaml
But the IAM Role needs a principal that dictates who can assume this role. So somehow Data Prepper has to authenticate to the IAM Role if I’m correct.
I found this GitHub issue where an access_key_id and a secret_key_id are mentioned, which at least could be used to authenticate to an IAM Account, but it seems that these parameters don’t exist anymore or never existed in the first place?
opened 03:24PM - 06 Sep 21 UTC
closed 12:41PM - 23 Jun 22 UTC
enhancement
plugin - source
## Use-Case
Many users have external systems which write their logs to Amazon… S3. These users want to use OpenSearch to analyze these logs. Data Prepper is an ingestion tool which can aid teams in extracting these logs for S3 and sending them to OpenSearch or elsewhere.
This proposal is to receive events from S3 notifications, read the object from S3, and create log lines for these.
## Basic Configuration
This plugin will be a single source plugin which:
* Polls a configured SQS standard queue which should hold S3 Event messages.
* Reads S3 objects which the message indicates as created.
* Uses a configured codec to parse the S3 object into Log Events.
* Writes the Log Events into the Data Prepper buffer.
The following example shows what a basic configure would look like.
```
source:
s3:
notification_type: sqs
sqs:
queue_url: "https://sqs.us-east-2.amazonaws.com/123456789012/MyS3EventQueue"
codec:
single-line:
processor:
grok:
match:
message: [ "%{COMMONAPACHELOG}" ]
```
## Detailed Process
The S3 Source will start a new thread for reading from S3. (The number of threads can be configured).
This thread will perform the following steps repeatedly until shutdown
1. Use the SQS `ReceiveMessage` API to receive messages from SQS.
2. For each Message from SQS, it will:
a. Parse the Message as an S3Event.
b. Download the S3 Object which the S3Event indicates was created.
c. Decompress the object if configured to do so.
d. Parse the decompressed file using the configured `codec` into a list of `Log` `Event` objects.
e. Write the `Log` objects into the Data Prepper buffer.
3. Perform a `DeleteMessageBatch` with all of the messages which were successfully processed.
4. Repeat
### Error Handling
The S3 Source will suppress exceptions which occur during processing. Any Message which is not processed correctly will not be included in the `DeleteMessageBatch` request. Thus, the message will appear in the SQS again. Data Prepper expects that the SQS queue is correctly configured with a DLQ or MessageRetentionPeriod to prevent the SQS queue from filling up with invalid messages.
## Codecs
The S3 Source will use configurable codecs to support multiple data formats in the S3 objects. Initially, two codecs are planned:
1. `single-line` - This is used for logs which should be separated by a newline.
2. `json` - A codec for parsing JSON logs
### Single Line
The `single-line` codec has no configuration items.
Below is an example S3 object.
```
POST /search
POST /index
PUT /document/12345
```
With `single-line`, the S3 source will produce 3 Events, each with the following structure.
```
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.log",
"message" : "POST /search"
```
```
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.log",
"message" : "POST /index"
```
```
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.log",
"message" : "PUT /document/12345"
```
### JSON
The `json` codec supports reading a JSON file and will create Events for each JSON object in an array. This S3 plugin is starting with the expectation that the incoming JSON is formed as a large JSON array of JSON objects. Each JSON object in that array is an Event. Thus, this codec will find the first JSON array in the JSON. It will output the objects within that array as Events from the JSON.
Future iterations of this plugin could allow for more customization. One possibility is to use JSON Pointer. However, the first iteration should meet many use-cases and allows for streaming the JSON to support parsing large JSON objects.
Below is an example configuration. This configures the S3 Sink to read a JSON array from the `items` key.
```
s3:
codec:
json:
```
Given the following S3 Object:
```
{
"http_requests" : [
{ "status" : 200, "path" : "/search", "method" : "POST" },
{ "status" : 200, "path" : "/index", "method" : "POST" },
{ "status" : 200, "path" : "/document/12345", "method" : "PUT" }
]
}
```
The S3 source will output 3 Log events:
```
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.json",
"message" : { "status" : 200, "path" : "/index", "method" : "POST" }
```
```
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.json",
"message" : { "status" : 200, "path" : "/search", "method" : "POST" }
```
```
"bucket" : "my-bucket",
"key" : "application1/instance200/2022-05-11.json",
"message" : { "status" : 200, "path" : "/document/12345", "method" : "PUT" }
```
### Compression
The S3 Source will support three configurations for compression.
1. `none` - The object will be treated as uncompressed.
2. `gzip` - The object will be decompressed using the gzip decompression algorithm
3. `automatic` - The S3 Source will example the object key to guess if it is compressed or not. If the key ends with `.gz` the S3 Source will attempt to decompress it using gzip. It can support other heuristics to determine if the file is compressed in future iterations.
## Full Configuration Options
| Option | Type | Required | Description |
| ------------- | ------------- | ------------- | ------------- |
| notification_type | Enum: `sqs` | Yes | Only SQS is supported. SNS may be a future option |
| compression | Enum: `none`, `gzip`, `automatic` | No | Default is `none` |
| codec | Codec | Yes | See Codecs section above. |
| sqs.queue_url | String - URL | Yes | The queue URL of the SQS queue. |
| sqs.maximum_messages | Integer | No | Directly related to SQS input. Default is 10. |
| sqs.visibility_timeout | Duration | No | Directly related to SQS input. Default is TBD. |
| sqs.wait_time | Duration | No | Directly related to SQS input. Default is TBD. |
| sqs.poll_delay | Duration | No | An optional delay between iterations of the process. Default is 0 seconds. |
| sqs.thread_count | Integer | No | Number of threads polling S3. Default is 1. |
| region | String | Yes | The AWS Region. TBD. |
| sts_role_arn | String | No | Role used for accessing S3 and SQS |
| access_key_id | String | No | Static access to S3 and SQS |
| secret_key_id | String | No | Static access to S3 and SQS |
| buckets | String List | No | If provided, only read objects from the buckets provided in the list. |
| account_ids | String List | No | If provided, only read objects from the buckets owned by an accountId in this list. |
## S3 Events
The S3 Source will parse all SQS Messages according to the [S3 Event message structure](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html).
The S3 Source will also parse out any event types which are not `s3:ObjectCreated:*`. These events will be silently ignored. That is, the S3 Source will remove them from the SQS Queue, and will not create an Events for them.
Additionally, this source will have an optional `buckets` and `account_ids` lists. If supplied by the pipeline author, Data Prepper will only read objects for S3 events which are part of that list. For the `buckets` list, only S3 buckets in the list are used. For the `account_ids` list, only buckets owned by accounts with matching Ids are used. If this list is not provided, Data Prepper will read from any bucket which is owned by the accountId of the SQS queue. Use of this list is optional.
## AWS Permissions Needed
The S3 Source will require the following permissions:
| Action | Resource |
| ------------- | ------------- |
| `s3:GetObject` | The S3 bucket and key path for any object needed |
| `sqs:ReceiveMessage` | The ARN of the SQS queue specified by `sqs.queue_url` |
| `sqs:DeleteMessageBatch ` | The ARN of the SQS queue specified by `sqs.queue_url` |
## Possible Future Enhancements
### Direct SNS Notification
The `notification_type` currently only supports SQS. Some teams may want Data Prepper to receive notifications directly from SNS and thus remove the need for an SQS queue.
The `notification_type` could support an `sns` value in the future.
### Additional Codecs
As needed, Data Prepper can support other codecs. Some possible candidates to consider are:
* Multi-line
* JSON List
## Metrics
- messagesReceived (Counter)
- messagesDeleted (Counter)
- messagesFailed (Counter)
- eventsCreated (Counter)
- requestsDuration (Timer)
## Not Included
* This proposal is focused only reading S3 objects starting with a notification. Thus any use-case for replay is not part of this scope. Also, use-cases for reading existing logs are not covered. These use-cases can have their own issue.
* Updated S3 objects are not part of the scope. This work will only support use-cases when a log file is written once.
* Configuration of SQS queue to receive SNS topics should be done externally. Data Prepper will not manage this.
## Tasks
- [x] #1423
- [x] #1424
- [x] #1425
- [x] #1433
- [x] #1434
- [x] #1435
- [x] #1461
- [x] #1462
- [x] #1463
- [x] #1464
- [x] #1501
- [x] ~#1500~ (Not for the initial feature release)
- [x] #1515
I hope someone can tell me what I’m missing here
Cheers guys!
Patrick
dlv
February 14, 2023, 12:17am
2
Hello Patrick.
As far as I understand, I have to create an IAM Role that has access to the SQS queue that I created, and I call that queue via the “sts_role_arn” parameter in the pipeline.yaml
This is not entirely accurate. You can provide an IAM role. If you do not set the sts_role_arn
property, then Data Prepper will use the default AWS credential provider chain .
If you are running Data Prepper and already have AWS credentials in place with the necessary SQS/S3 permissions, then you can omit the role.
But the IAM Role needs a principal that dictates who can assume this role. So somehow Data Prepper has to authenticate to the IAM Role if I’m correct.
If you use an IAM Role, you need to specify a trust principal. This is who can assume the role. The value you specify here depends on how you are using Data Prepper. If you are using it in the same account, you can configure the the trust principal to be your account. Other scenarios may be more complicated and are really dictated by how IAM works. This documentation may help.
One last thing - you need to have permissions to sts:AssumeRole
on the desired role. The trust principal is not enough. The permissions that Data Prepper is running with will need to have these permissions.
I hope this helps. Please let us know how we can help next.
David
1 Like