Ingest kafka topic content to elastic search using logstash

Hi All,

let me confess first. I am new to ELK stack. I am just trying out a POC for the first time now.
i am trying to push the contents of a kafka topic to elastic search via logstash. All of this running in my local mac docker environement.
However i am not able to see anything getting ingested to elastic search, not even the index is getting created. I have the same logstash setting with elastic stack elastic search 7.11.2 and it works. However i am not able to get it working with opendistro. frustrating part is that i dont get any error logs and absolutely no log when i am posting some content on the topic.
here is my docker-compose.yml

version: '3.4'
services:
  es01:
    image: amazon/opendistro-for-elasticsearch:1.13.2
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536 
        hard: 65536
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - 19200:9200
      - 19600:9600
    networks:
      - elastic-net

  es02:
    image: amazon/opendistro-for-elasticsearch:1.13.2
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536 
        hard: 65536
    volumes:
      - data02:/usr/share/elasticsearch/data
    networks:
      - elastic-net

  es03:
    image: amazon/opendistro-for-elasticsearch:1.13.2
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536 
        hard: 65536
    volumes:
      - data03:/usr/share/elasticsearch/data
    networks:
      - elastic-net

  kib01:
    image: amazon/opendistro-for-elasticsearch-kibana:1.13.2
    container_name: kib01
    ports:
      - 15601:5601
    depends_on: 
      - es01
      - es02
      - es03
    environment:
      ELASTICSEARCH_URL: https://es01:9200
      ELASTICSEARCH_HOSTS: '["https://es01:9200","https://es02:9200","https://es03:9200"]'
    networks:
      - elastic-net

  logstash-kafka:
    image: docker.elastic.co/logstash/logstash-oss:7.10.2
    container_name: logstash-kafka
    ports: 
      - 19601:9600
    depends_on: 
      - es01
      - es02
      - es03
    volumes:
      - /Users/gxm0/TechPOCs/elk-kafka/logstash/pipeline/:/usr/share/logstash/pipeline/
      - /Users/gxm0/TechPOCs/elk-kafka/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml
      - /Users/gxm0/TechPOCs/elk-kafka/logstash/config/log4j2.properties:/usr/share/logstash/config/log4j2.properties
    networks:
     - elastic-net
     - kafka-net

volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic-net:
  kafka-net:
    external:  
        name: kafka_kafka-network

and here is the log-stash pipeline

input {
	kafka   {
        bootstrap_servers => "kafka1:9092"
        topics => "poc_test"
        codec => "json"
        decorate_events => "true"
        exclude_internal_topics => "true"
        group_id => "elk_fo_poc"
        id => "poc_plugin_kafka"
    }
}

output {
	elasticsearch {
        id => "poc_plugin_es"
		hosts => "https://es01:9200"
        ssl => true
        ssl_certificate_verification => false
        user => logstash
        password => logstash
        index => "test_index"
        ecs_compatibility => "disabled"
        ilm_enabled => false
	}
}

Any help in pointing out any mistake that i may be making will be very helpful

you’re using uri with “https://” . have you tried putting host in as list? [“https://es01:9200”]

per doc…
Examples:

"127.0.0.1"
["127.0.0.1:9200","127.0.0.2:9200"]
["http://127.0.0.1"]
["https://127.0.0.1:9200"]
["https://127.0.0.1:9200/mypath"] (If using a proxy on a subpath)

Hi Thanks for the response.
After long investigation it happened to be an issue with Kafka that i was running via docker as well. When i recreated it cleaning all data the ingest started working.