Data Prepper - Additional Information about Anomaly Detector Processor

Versions (relevant - OpenSearch/Dashboard/Server OS/Browser):
Data Prepper 2.5.0 - Docker

Describe the issue:
I’m trying to understand all the features of Anomaly Detector Processor of Data Prepper, but after reviewed the documentation and install it and do some tests, I have some questions:

  • Is the anomaly computed on all the fields that are part of the log, or how can I configure what fields are going to be used by the anomaly detection model?
    For example, for this log line:
    {“count_qname”:14,“source_ip”:“10.199.0.40”,“tag”:“dns_metrics_query_by_qname_by_ip_1m”,“qname”:“chat.google.com”}
    Is the anomaly being computed for the tuple source_ip and qname (the tag is going to be always the same)? I’m using as key count_qname.
    And for this one?
    {“tag”:“dns_metrics_query_by_qname_5m”,“qname”:“chat.google.com”,“count_qname”:5}
  • How the anomaly calculation is persisted between docker restarts?
  • Is it possible not to overwrite the File Output every time docker restarts? Because every time I restart the docker container of Data Prepper the output file is deleted/overwritten.
  • Any blog post or additional documentation I can read to understand this processor better?

Configuration:
FluentBit HTTP Output → Data Preper HTTP Input → Anomaly Detector Processor → File Output

Thanks in advance for your help.

Thanks for the interest! The solution overview in Enable cost-efficient operational analytics with Amazon OpenSearch Ingestion | AWS Big Data Blog should be helpful in configuring the parts of the log on which anomaly is computed.

Thanks @sudipto, I’m going to review the provided link.

Regards,
Alejandro

@aguida79 , Thank you for your interest and questions.

To configure the fields used, you can use the keys configuration. For example, if you wanted to use only count_qname you could configure keys: [count_qname].

You do need to provide the identification_keys parameter. This parameter configures the keys that are used to group any given event. So for example, if you set identification_keys to source_ip and qname, Data Prepper will detect anomalies by grouping all events with the same values for those fields. Each combination of identification_keys creates a different RCF.

Currently the plugin does not persist or re-use existing data. This is something that we can add as interest. Please create a GitHub issue if you’d like it.

In addition to the fantastic blog post that @sudipto provided, you can read the documentation for the processor.

Hi @dlv ,
Thanks for your answer and your explanations.
So, after I read what you wrote, I added identification_keys for all my Anomaly Detector Processors (before I only have keys defined), because some of them are not generating any output, and I want to try if I add identification_keys that changes:

dns-ip-pipeline:
#example event: {"count_source_ip":102,"source_ip":"10.199.0.150","tag":"dns_metrics_query_by_ip_5m"}
  source:
    http:
      ssl: false
      port: 2023
  processor:
    - anomaly_detector:
        identification_keys: ["source_ip"]
        keys: ["count_source_ip"]
        mode:
            random_cut_forest:
  sink:
    - file:
        path: /usr/share/data-prepper/pipelines/dns_metrics_ip_output
dns-qname-pipeline:
#example event: {"count_qname":4,"qname":"www.facebook.com","tag":"dns_metrics_query_by_qname_5m"}
  source:
    http:
      ssl: false
      port: 2022
  processor:
    - anomaly_detector:
        identification_keys: ["qname"]
        keys: ["count_qname"]
        mode:
            random_cut_forest:
  sink:
    - file:
        path: /usr/share/data-prepper/pipelines/dns_metrics_qname_output
firewall-allow-user-pipeline:
#example event: {"tag":"firewall_metrics_allow_by_user_5m","srcuser":"admings","count_srcuser":119}
  source:
    http:
      ssl: false
      port: 2025
#  processor:
#    - parse_json:
  processor:
    - anomaly_detector:
        identification_keys: ["srcuser"]
        keys: ["count_srcsuser"]
        mode:
            random_cut_forest:
  sink:
    - file:
        path: /usr/share/data-prepper/pipelines/firewall_metrics_allow_user_output
firewall-deny-user-pipeline:
#example event: {"srcuser":"admings","tag":"firewall_metrics_deny_by_user_5m","count_srcuser":1}
  source:
    http:
      ssl: false
      port: 2026
  processor:
    - anomaly_detector:
        identification_keys: ["srcuser"]
        keys: ["count_srcsuser"]
        mode:
            random_cut_forest:
  sink:
    - file:
        path: /usr/share/data-prepper/pipelines/firewall_metrics_deny_user_output
firewall-allow-ip-app-bytes-pipeline:
#example event: {"sum_bytes":2238,"tag":"firewall_metrics_allow_by_ip_by_app_by_bytes_5m","src":"10.11.3.239","app":"ms-update"}
  source:
    http:
      ssl: false
      port: 2027
  processor:
    - anomaly_detector:
        identification_keys: ["app","src"]
        keys: ["sum_bytes"]
        mode:
            random_cut_forest:
  sink:
    - file:
        path: /usr/share/data-prepper/pipelines/firewall_metrics_allow_ip_app_bytes_output
firewall-allow-user-app-bytes-pipeline:
#example event: {"sum_bytes":15670,"tag":"firewall_metrics_allow_by_user_by_app_by_bytes_5m","srcuser":"admings","app":"ms-teams"}
  source:
    http:
      ssl: false
      port: 2028
  processor:
    - anomaly_detector:
        identification_keys: ["app","srcuser"]
        keys: ["sum_bytes"]
        mode:
            random_cut_forest:
  sink:
    - file:
        path: /usr/share/data-prepper/pipelines/firewall_metrics_allow_user_app_bytes_output

The problem is that after add identification_keys to the pipelines, Data Prepper is running without errors for many minutes, but after that (I supossed when is going to generate an output with the anomaly_detector processor) generate a lot of errors in logs, and any output is filled now with any data.
Examples of the errors (there are many):

2023-11-03T20:27:44,916 [armeria-boss-http-*:2024] WARN  io.netty.channel.AbstractChannel$AbstractUnsafe - Force-closing a channel whose registration task was not accepted by an event loop: [id: 0xac361f5b, L:/172.18.0.27:2024 - R:/172.18.0.6:39800]
java.util.concurrent.RejectedExecutionException: event executor terminated
        at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:89) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:83) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor.channelRead(ServerBootstrap.java:215) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at com.linecorp.armeria.server.ConnectionLimitingHandler.channelRead(ConnectionLimitingHandler.java:70) [armeria-1.25.2.jar:?]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.epoll.AbstractEpollServerChannel$EpollServerSocketUnsafe.epollInReady(AbstractEpollServerChannel.java:120) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at java.lang.Thread.run(Unknown Source) [?:?]
2023-11-03T20:27:44,917 [armeria-boss-http-*:2024] ERROR  io.netty.util.concurrent.DefaultPromise - Failed to submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated
        at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:862) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:500) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:494) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:89) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:83) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor.channelRead(ServerBootstrap.java:215) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at com.linecorp.armeria.server.ConnectionLimitingHandler.channelRead(ConnectionLimitingHandler.java:70) [armeria-1.25.2.jar:?]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.epoll.AbstractEpollServerChannel$EpollServerSocketUnsafe.epollInReady(AbstractEpollServerChannel.java:120) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at java.lang.Thread.run(Unknown Source) [?:?]
2023-11-03T20:27:44,917 [armeria-boss-http-*:2024] ERROR  io.netty.util.concurrent.DefaultPromise - Failed to submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated
        at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:862) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:500) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:185) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor.channelRead(ServerBootstrap.java:215) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at com.linecorp.armeria.server.ConnectionLimitingHandler.channelRead(ConnectionLimitingHandler.java:70) [armeria-1.25.2.jar:?]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.epoll.AbstractEpollServerChannel$EpollServerSocketUnsafe.epollInReady(AbstractEpollServerChannel.java:120) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at java.lang.Thread.run(Unknown Source) [?:?]
2023-11-03T20:27:54,836 [armeria-boss-http-*:2024] WARN  io.netty.channel.AbstractChannel$AbstractUnsafe - Force-closing a channel whose registration task was not accepted by an event loop: [id: 0x4104ba52, L:/172.18.0.27:2024 - R:/172.18.0.6:38536]
java.util.concurrent.RejectedExecutionException: event executor terminated
        at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:89) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:83) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor.channelRead(ServerBootstrap.java:215) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at com.linecorp.armeria.server.ConnectionLimitingHandler.channelRead(ConnectionLimitingHandler.java:70) [armeria-1.25.2.jar:?]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.epoll.AbstractEpollServerChannel$EpollServerSocketUnsafe.epollInReady(AbstractEpollServerChannel.java:120) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) [netty-transport-classes-epoll-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at java.lang.Thread.run(Unknown Source) [?:?]
2023-11-03T20:28:10,693 [armeria-boss-http-*:2024] WARN  io.netty.channel.AbstractChannelHandlerContext - An exception 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
java.lang.OutOfMemoryError: Java heap space

Maybe this is because with the identification_keys defined, Data Prepper java processes are consuming more resources, and my server/config is not enough? I didn’t find any information on the docs of Data Prepper on how to config the amount of memory/CPU for java heap.

These are the resources use of Data Prepper Docker before I added the identification_keys, and after:


I’m using Data Prepper 2.5.0 docker image.

Any piece of advice?

Thanks in advance for your help.

So, after add 4 GB of RAM to the Ubuntu VM where I installed Data Prepper docker container, I added again identification_keys definitions, and is working stable, so the problem was about resources. Is there any documentation about hardware requirements for Anomaly Detector Processor or Data Prepper in general?
Thanks in advance for your help.

The problem happened again, event with the 4 GB of RAM added. The strange thing is that is only happening if I add identification_keys definitions. If I remove that from all my pipelines, no errors happen.
Reviewing my pipelines, anyone can tall me what the exactly difference in this pipelines between use identification_keys definitions or not use them?
Thanks in advance for your help.

An additional error that I seeing in my logs after many hours of Data Prepper running:

2023-11-08T14:35:19,091 [pool-6-thread-118] ERROR org.opensearch.dataprepper.plugins.source.loghttp.LogHTTPService - Failed to write the request of size 57134 due to: Unrecognized token 'inf': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.linecorp.armeria.internal.shaded.fastutil.io.FastByteArrayInputStream); line: 1, column: 36262] (through reference chain: java.util.ArrayList[315])
2023-11-08T14:35:30,998 [pool-6-thread-15] ERROR org.opensearch.dataprepper.plugins.source.loghttp.LogHTTPService - Failed to write the request of size 57134 due to: Unrecognized token 'inf': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.linecorp.armeria.internal.shaded.fastutil.io.FastByteArrayInputStream); line: 1, column: 36262] (through reference chain: java.util.ArrayList[315])
2023-11-10T15:00:19,055 [pool-6-thread-108] ERROR org.opensearch.dataprepper.plugins.source.loghttp.LogHTTPService - Failed to write the request of size 64057 due to: Unrecognized token 'inf': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.linecorp.armeria.internal.shaded.fastutil.io.FastByteArrayInputStream); line: 1, column: 38980] (through reference chain: java.util.ArrayList[337])
2023-11-10T15:00:24,994 [pool-6-thread-14] ERROR org.opensearch.dataprepper.plugins.source.loghttp.LogHTTPService - Failed to write the request of size 64057 due to: Unrecognized token 'inf': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.linecorp.armeria.internal.shaded.fastutil.io.FastByteArrayInputStream); line: 1, column: 38980] (through reference chain: java.util.ArrayList[337])
2023-11-10T21:13:33,019 [pool-6-thread-73] ERROR org.opensearch.dataprepper.plugins.source.loghttp.LogHTTPService - Failed to write the request of size 29950 due to: Unrecognized token 'inf': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.linecorp.armeria.internal.shaded.fastutil.io.FastByteArrayInputStream); line: 1, column: 8930] (through reference chain: java.util.ArrayList[78])
2023-11-10T21:13:39,007 [pool-6-thread-146] ERROR org.opensearch.dataprepper.plugins.source.loghttp.LogHTTPService - Failed to write the request of size 29950 due to: Unrecognized token 'inf': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.linecorp.armeria.internal.shaded.fastutil.io.FastByteArrayInputStream); line: 1, column: 8930] (through reference chain: java.util.ArrayList[78])
2023-11-13T17:14:32,024 [pool-6-thread-77] ERROR org.opensearch.dataprepper.plugins.source.loghttp.LogHTTPService - Failed to write the request of size 53844 due to: Unrecognized token 'inf': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.linecorp.armeria.internal.shaded.fastutil.io.FastByteArrayInputStream); line: 1, column: 52304] (through reference chain: java.util.ArrayList[448])
2023-11-13T17:14:41,000 [pool-6-thread-153] ERROR org.opensearch.dataprepper.plugins.source.loghttp.LogHTTPService - Failed to write the request of size 53844 due to: Unrecognized token 'inf': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.linecorp.armeria.internal.shaded.fastutil.io.FastByteArrayInputStream); line: 1, column: 52304] (through reference chain: java.util.ArrayList[448])

Any idea of what that error message means?
Thanks in advance for your help.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.