docker-elk-experiment ## fle `// filebeat-->logstash-->elasticsearch` ```bash docker network create my-elk sysctl -w vm.max_map_count=262144 docker pull elasticsearch:7.5.2 docker pull kibana:7.5.2 docker pull logstash:7.5.2 docker pull elastic/filebeat:7.5.2 mkdir -p $(pwd)/elasticsearch mkdir -p $(pwd)/kibana mkdir -p $(pwd)/logstash mkdir -p $(pwd)/filebeat docker run -itd --rm --name elasticsearch-tmp elasticsearch:7.5.2 sleep 10 docker cp elasticsearch-tmp:/usr/share/elasticsearch/config $(pwd)/elasticsearch docker run -itd --rm --name kibana-tmp kibana:7.5.2 sleep 10 docker cp kibana-tmp:/usr/share/kibana/config $(pwd)/kibana docker run -itd --rm --name logstash-tmp logstash:7.5.2 sleep 10 docker cp logstash-tmp:/usr/share/logstash/config $(pwd)/logstash docker run -itd --rm --name filebeat-tmp elastic/filebeat:7.5.2 sleep 10 docker cp filebeat-tmp:/usr/share/filebeat/filebeat.yml $(pwd)/filebeat/filebeat.yml ## run elasticsearch sed -i '$abootstrap.memory_lock: true' $(pwd)/elasticsearch/config/elasticsearch.yml sed -i '$adiscovery.type: single-node' $(pwd)/elasticsearch/config/elasticsearch.yml sed -i 's@-Xms1g@-Xms2g@' $(pwd)/elasticsearch/config/jvm.options sed -i 's@-Xmx1g@-Xmx2g@' $(pwd)/elasticsearch/config/jvm.options docker run -itd -e TZ="Asia/shanghai" --privileged --network my-elk --name elasticsearch -p 9200:9200 -p 9300:9300 -v $(pwd)/elasticsearch/config:/usr/share/elasticsearch/config elasticsearch:7.5.2 ## run logstash sed -i 's@-Xms1g@-Xms2g@' $(pwd)/logstash/config/jvm.options sed -i 's@-Xmx1g@-Xmx2g@' $(pwd)/logstash/config/jvm.options cp $(pwd)/logstash/config/logstash-sample.conf $(pwd)/logstash/config/logstash.conf sed -i 's@http://localhost:9200@http://elasticsearch:9200@' $(pwd)/logstash/config/logstash.conf sed -i "s@^xpack@#&@" $(pwd)/logstash/config/logstash.yml sed -i "$alog.level: debug" $(pwd)/logstash/config/logstash.yml docker run -itd -e TZ="Asia/shanghai" --privileged --network my-elk --name logstash -p 9600:9600 -p 5044:5044 -v $(pwd)/logstash/config:/usr/share/logstash/config logstash:7.5.2 logstash -f /usr/share/logstash/config/logstash.conf ## run kibana sed -i 's@xpack.monitoring.ui.container.elasticsearch.enabled: true@xpack.monitoring.ui.container.elasticsearch.enabled: false@' $(pwd)/kibana/config/kibana.yml sed -i '$ai18n.locale: zh-CN' $(pwd)/kibana/config/kibana.yml docker run -itd -e TZ="Asia/shanghai" --privileged --network my-elk --name kibana -p 5601:5601 -v $(pwd)/kibana/config:/usr/share/kibana/config kibana:7.5.2 ## run filebeat cat >$(pwd)/filebeat/filebeat.yml <logstash-->elasticsearch` ```bash mkdir $(pwd)/pki cd $(pwd)/pki openssl genrsa -out root-ca.key 4096 openssl req -x509 -new -nodes -sha512 -days 18250 -subj "/C=CN/ST=Beijing/L=Beijing/O=example/OU=ELK/CN=rootCA" -key root-ca.key -out root-ca.crt openssl genrsa -out logstash.key 4096 openssl req -sha512 -new -subj "/C=CN/ST=Beijing/L=Beijing/O=example/OU=ELK/CN=logstash" -key logstash.key -out logstash.csr cat > v3.ext <<-EOF authorityKeyIdentifier=keyid,issuer basicConstraints=CA:FALSE keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment extendedKeyUsage = serverAuth,clientAuth subjectAltName = @alt_names [alt_names] DNS.1=logstash DNS.2=logstash.local DNS.3=elasticsearch DNS.4=elasticsearch.local EOF openssl x509 -req -sha512 -days 18250 -extfile v3.ext -CA root-ca.crt -CAkey root-ca.key -CAcreateserial -in logstash.csr -out logstash.crt openssl genrsa -out filebeat.key 4096 openssl req -sha512 -new -subj "/C=CN/ST=Beijing/L=Beijing/O=example/OU=ELK/CN=logstash" -key filebeat.key -out filebeat.csr openssl x509 -req -sha512 -days 18250 -extfile v3.ext -CA root-ca.crt -CAkey root-ca.key -CAcreateserial -in filebeat.csr -out filebeat.crt openssl pkcs8 -topk8 -inform pem -in logstash.key -outform pem -nocrypt -out logstash-pkcs8.key chown -R root:docker *.key chown -R root:docker *.crt cd .. cat >$(pwd)/logstash/config/logstash.conf < 5044 ssl => true ssl_certificate_authorities => ["/etc/pki/certs/root-ca.crt"] ssl_certificate => "/etc/pki/certs/logstash.crt" ssl_key => "/etc/pki/certs/logstash-pkcs8.key" ssl_verify_mode => "force_peer" client_inactivity_timeout => 36000 } } output { elasticsearch { hosts => ["http://elasticsearch:9200"] index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" } } EOL cat >$(pwd)/filebeat/filebeat.yml <(ECPrivateKeyImpl.java:74) ~[jdk.crypto.ec:?] at sun.security.ec.ECKeyFactory.implGeneratePrivate(ECKeyFactory.java:237) ~[jdk.crypto.ec:?] at sun.security.ec.ECKeyFactory.engineGeneratePrivate(ECKeyFactory.java:165) ~[jdk.crypto.ec:?] at java.security.KeyFactory.generatePrivate(KeyFactory.java:390) ~[?:?] at io.netty.handler.ssl.SslContext.getPrivateKeyFromByteBuffer(SslContext.java:1142) ~[netty-all-4.1.44.Final.jar:4.1.44.Final] at io.netty.handler.ssl.SslContext.toPrivateKey(SslContext.java:1113) ~[netty-all-4.1.44.Final.jar:4.1.44.Final] at io.netty.handler.ssl.SslContextBuilder.keyManager(SslContextBuilder.java:348) ~[netty-all-4.1.44.Final.jar:4.1.44.Final] ... 22 more ``` ## fkle `// filebeat-->kafka-->logstash-->elasticsearch` ```bash mkdir -p $(pwd)/zoo{1,2,3}/data cat >$(pwd)/zoo1/zoo.cfg <$(pwd)/zoo2/zoo.cfg <$(pwd)/zoo3/zoo.cfg < (org.apache.zookeeper.ZooKeeper) [2020-02-08 05:20:39,306] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper) [2020-02-08 05:20:39,306] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper) [2020-02-08 05:20:39,306] INFO Client environment:os.version=4.19.95 (org.apache.zookeeper.ZooKeeper) [2020-02-08 05:20:39,306] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper) [2020-02-08 05:20:39,307] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper) [2020-02-08 05:20:39,307] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper) [2020-02-08 05:20:39,307] INFO Client environment:os.memory.free=979MB (org.apache.zookeeper.ZooKeeper) [2020-02-08 05:20:39,307] INFO Client environment:os.memory.max=1024MB (org.apache.zookeeper.ZooKeeper) [2020-02-08 05:20:39,307] INFO Client environment:os.memory.total=1024MB (org.apache.zookeeper.ZooKeeper) [2020-02-08 05:20:39,309] INFO Initiating client connection, connectString=zoo1:2181,zoo2:2181,zoo3:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@3427b02d (org.apache.zookeeper.ZooKeeper) [2020-02-08 05:20:39,314] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util) [2020-02-08 05:20:39,319] INFO jute.maxbuffer value is 4194304 Bytes (org.apache.zookeeper.ClientCnxnSocket) [2020-02-08 05:20:39,325] INFO zookeeper.request.timeout value is 0. feature enabled= (org.apache.zookeeper.ClientCnxn) [2020-02-08 05:20:39,327] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2020-02-08 05:20:39,330] INFO Opening socket connection to server zoo3/172.19.0.8:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2020-02-08 05:20:39,335] INFO Socket connection established, initiating session, client: /172.19.0.9:34738, server: zoo3/172.19.0.8:2181 (org.apache.zookeeper.ClientCnxn) [2020-02-08 05:20:39,372] INFO Session establishment complete on server zoo3/172.19.0.8:2181, sessionid = 0x300039611ff0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2020-02-08 05:20:39,376] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient) [2020-02-08 05:20:39,743] INFO Cluster ID = PxzeWiM2Q2ewvIap-LRVqQ (kafka.server.KafkaServer) [2020-02-08 05:20:39,746] WARN No meta.properties file under dir /kafka/kafka-logs-3448717d3a95/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2020-02-08 05:20:39,810] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null alter.config.policy.class.name = null alter.log.dirs.replication.quota.window.num = 11 alter.log.dirs.replication.quota.window.size.seconds = 1 authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = -1 broker.id.generation.enable = true broker.rack = null client.quota.callback.class = null compression.type = producer connection.failed.authentication.delay.ms = 100 connections.max.idle.ms = 600000 connections.max.reauth.ms = 0 control.plane.listener.name = null controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 30000 create.topic.policy.class.name = null default.replication.factor = 1 delegation.token.expiry.check.interval.ms = 3600000 delegation.token.expiry.time.ms = 86400000 delegation.token.master.key = null delegation.token.max.lifetime.ms = 604800000 delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = true fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms = 0 group.max.session.timeout.ms = 1800000 group.max.size = 2147483647 group.min.session.timeout.ms = 6000 host.name = inter.broker.listener.name = INSIDE inter.broker.protocol.version = 2.4-IV1 kafka.metrics.polling.interval.secs = 10 kafka.metrics.reporters = [] leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT listeners = INSIDE://:9092,OUTSIDE://:9094 log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.max.compaction.lag.ms = 9223372036854775807 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /kafka/kafka-logs-3448717d3a95 log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 60000 log.flush.scheduler.interval.ms = 9223372036854775807 log.flush.start.offset.checkpoint.interval.ms = 60000 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.downconversion.enable = true log.message.format.version = 2.4-IV1 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 300000 log.retention.hours = 168 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 60000 max.connections = 2147483647 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = max.incremental.fetch.session.cache.slots = 1000 message.max.bytes = 1000012 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 num.partitions = 1 num.recovery.threads.per.data.dir = 1 num.replica.alter.log.dirs.threads = null num.replica.fetchers = 1 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 10080 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 1 offsets.topic.segment.bytes = 104857600 password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding password.encoder.iterations = 4096 password.encoder.key.length = 128 password.encoder.keyfactory.algorithm = null password.encoder.old.secret = null password.encoder.secret = null port = 9092 principal.builder.class = null producer.purgatory.purge.interval.requests = 1000 queued.max.request.bytes = -1 queued.max.requests = 500 quota.consumer.default = 9223372036854775807 quota.producer.default = 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 10000 replica.selector.class = null replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 sasl.client.callback.handler.class = null sasl.enabled.mechanisms = [GSSAPI] sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism.inter.broker.protocol = GSSAPI sasl.server.callback.handler.class = null security.inter.broker.protocol = PLAINTEXT security.providers = null socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 102400 ssl.cipher.suites = [] ssl.client.auth = none ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.principal.mapping.rules = DEFAULT ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000 transaction.max.timeout.ms = 900000 transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 transaction.state.log.load.buffer.size = 5242880 transaction.state.log.min.isr = 1 transaction.state.log.num.partitions = 50 transaction.state.log.replication.factor = 1 transaction.state.log.segment.bytes = 104857600 transactional.id.expiration.ms = 604800000 unclean.leader.election.enable = false zookeeper.connect = zoo1:2181,zoo2:2181,zoo3:2181 zookeeper.connection.timeout.ms = 6000 zookeeper.max.in.flight.requests = 10 zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = false zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig) [2020-02-08 05:20:39,824] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null alter.config.policy.class.name = null alter.log.dirs.replication.quota.window.num = 11 alter.log.dirs.replication.quota.window.size.seconds = 1 authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = -1 broker.id.generation.enable = true broker.rack = null client.quota.callback.class = null compression.type = producer connection.failed.authentication.delay.ms = 100 connections.max.idle.ms = 600000 connections.max.reauth.ms = 0 control.plane.listener.name = null controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 30000 create.topic.policy.class.name = null default.replication.factor = 1 delegation.token.expiry.check.interval.ms = 3600000 delegation.token.expiry.time.ms = 86400000 delegation.token.master.key = null delegation.token.max.lifetime.ms = 604800000 delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = true fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms = 0 group.max.session.timeout.ms = 1800000 group.max.size = 2147483647 group.min.session.timeout.ms = 6000 host.name = inter.broker.listener.name = INSIDE inter.broker.protocol.version = 2.4-IV1 kafka.metrics.polling.interval.secs = 10 kafka.metrics.reporters = [] leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT listeners = INSIDE://:9092,OUTSIDE://:9094 log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.max.compaction.lag.ms = 9223372036854775807 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /kafka/kafka-logs-3448717d3a95 log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 60000 log.flush.scheduler.interval.ms = 9223372036854775807 log.flush.start.offset.checkpoint.interval.ms = 60000 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.downconversion.enable = true log.message.format.version = 2.4-IV1 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 300000 log.retention.hours = 168 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 60000 max.connections = 2147483647 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = max.incremental.fetch.session.cache.slots = 1000 message.max.bytes = 1000012 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 num.partitions = 1 num.recovery.threads.per.data.dir = 1 num.replica.alter.log.dirs.threads = null num.replica.fetchers = 1 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 10080 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 1 offsets.topic.segment.bytes = 104857600 password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding password.encoder.iterations = 4096 password.encoder.key.length = 128 password.encoder.keyfactory.algorithm = null password.encoder.old.secret = null password.encoder.secret = null port = 9092 principal.builder.class = null producer.purgatory.purge.interval.requests = 1000 queued.max.request.bytes = -1 queued.max.requests = 500 quota.consumer.default = 9223372036854775807 quota.producer.default = 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 10000 replica.selector.class = null replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 sasl.client.callback.handler.class = null sasl.enabled.mechanisms = [GSSAPI] sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism.inter.broker.protocol = GSSAPI sasl.server.callback.handler.class = null security.inter.broker.protocol = PLAINTEXT security.providers = null socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 102400 ssl.cipher.suites = [] ssl.client.auth = none ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.principal.mapping.rules = DEFAULT ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000 transaction.max.timeout.ms = 900000 transaction.remove.expired.transaction.cleanup.interval.ms = 3600000 transaction.state.log.load.buffer.size = 5242880 transaction.state.log.min.isr = 1 transaction.state.log.num.partitions = 50 transaction.state.log.replication.factor = 1 transaction.state.log.segment.bytes = 104857600 transactional.id.expiration.ms = 604800000 unclean.leader.election.enable = false zookeeper.connect = zoo1:2181,zoo2:2181,zoo3:2181 zookeeper.connection.timeout.ms = 6000 zookeeper.max.in.flight.requests = 10 zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = false zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig) [2020-02-08 05:20:39,846] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper) [2020-02-08 05:20:39,847] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper) [2020-02-08 05:20:39,847] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper) [2020-02-08 05:20:39,868] INFO Log directory /kafka/kafka-logs-3448717d3a95 not found, creating it. (kafka.log.LogManager) [2020-02-08 05:20:39,874] INFO Loading logs. (kafka.log.LogManager) [2020-02-08 05:20:39,880] INFO Logs loading complete in 6 ms. (kafka.log.LogManager) [2020-02-08 05:20:39,893] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2020-02-08 05:20:39,913] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2020-02-08 05:20:40,271] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2020-02-08 05:20:40,305] INFO [SocketServer brokerId=1001] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(INSIDE),PLAINTEXT) (kafka.network.SocketServer) [2020-02-08 05:20:40,306] INFO Awaiting socket connections on 0.0.0.0:9094. (kafka.network.Acceptor) [2020-02-08 05:20:40,314] INFO [SocketServer brokerId=1001] Created data-plane acceptor and processors for endpoint : EndPoint(null,9094,ListenerName(OUTSIDE),PLAINTEXT) (kafka.network.SocketServer) [2020-02-08 05:20:40,315] INFO [SocketServer brokerId=1001] Started 2 acceptor threads for data-plane (kafka.network.SocketServer) [2020-02-08 05:20:40,332] INFO [ExpirationReaper-1001-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-02-08 05:20:40,333] INFO [ExpirationReaper-1001-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-02-08 05:20:40,334] INFO [ExpirationReaper-1001-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-02-08 05:20:40,334] INFO [ExpirationReaper-1001-ElectLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-02-08 05:20:40,347] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler) [2020-02-08 05:20:40,366] INFO Creating /brokers/ids/1001 (is it secure? false) (kafka.zk.KafkaZkClient) [2020-02-08 05:20:40,394] INFO Stat of the created znode at /brokers/ids/1001 is: 4294967321,4294967321,1581139240379,1581139240379,1,0,0,216176725195685888,240,0,4294967321 (kafka.zk.KafkaZkClient) [2020-02-08 05:20:40,394] INFO Registered broker 1001 at path /brokers/ids/1001 with addresses: ArrayBuffer(EndPoint(3448717d3a95,9092,ListenerName(INSIDE),PLAINTEXT), EndPoint(3448717d3a95,9094,ListenerName(OUTSIDE),PLAINTEXT)), czxid (broker epoch): 4294967321 (kafka.zk.KafkaZkClient) [2020-02-08 05:20:40,456] INFO [ExpirationReaper-1001-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-02-08 05:20:40,463] INFO [ExpirationReaper-1001-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-02-08 05:20:40,464] INFO [ExpirationReaper-1001-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-02-08 05:20:40,492] INFO [GroupCoordinator 1001]: Starting up. (kafka.coordinator.group.GroupCoordinator) [2020-02-08 05:20:40,493] INFO [GroupCoordinator 1001]: Startup complete. (kafka.coordinator.group.GroupCoordinator) [2020-02-08 05:20:40,495] INFO Successfully created /controller_epoch with initial epoch 0 (kafka.zk.KafkaZkClient) [2020-02-08 05:20:40,499] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 4 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2020-02-08 05:20:40,512] INFO [ProducerId Manager 1001]: Acquired new producerId block (brokerId:1001,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager) [2020-02-08 05:20:40,532] INFO [TransactionCoordinator id=1001] Starting up. (kafka.coordinator.transaction.TransactionCoordinator) [2020-02-08 05:20:40,534] INFO [TransactionCoordinator id=1001] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator) [2020-02-08 05:20:40,536] INFO [Transaction Marker Channel Manager 1001]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager) [2020-02-08 05:20:40,558] INFO [ExpirationReaper-1001-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2020-02-08 05:20:40,575] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread) [2020-02-08 05:20:40,649] INFO [SocketServer brokerId=1001] Started data-plane processors for 2 acceptors (kafka.network.SocketServer) [2020-02-08 05:20:40,652] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser) [2020-02-08 05:20:40,652] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser) [2020-02-08 05:20:40,653] INFO Kafka startTimeMs: 1581139240649 (org.apache.kafka.common.utils.AppInfoParser) [2020-02-08 05:20:40,653] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer) ``` ```bash # filebeat.yml #output.logstash: output.kafka: enabled: true hosts: ["kafka1:9092","kafka2:9092","kafka3:9092"] topic: beattest cat >$(pwd)/logstash/config/logstash-kafka.conf < ["kafka1:9092,kafka2:9092,kafka3:9092"] group_id => "baicai" auto_offset_reset => "earliest" consumer_threads => "5" decorate_events => "false" topics => ["beattest"] codec => json } } output { elasticsearch { hosts => ["http://elasticsearch:9200"] index => "logstash-kafka-%{+YYYY.MM.dd}" } } EOL docker run -itd -e TZ="Asia/shanghai" --privileged --network my-elk --name logstash -p 9600:9600 -p 5044:5044 -v $(pwd)/logstash/config:/usr/share/logstash/config logstash:7.5.2 logstash -f /usr/share/logstash/config/logstash-kafka.conf ## run another logstash instance docker run -itd -e TZ="Asia/shanghai" --privileged --network my-elk --name logstash1 -p 19600:9600 -p 15044:5044 -v $(pwd)/logstash/config:/usr/share/logstash/config logstash:7.5.2 logstash -f /usr/share/logstash/config/logstash-kafka.conf mkdir -p $(pwd)/kafka{1,2,3}/log docker exec -it kafka-tmp bash -c "cat /opt/kafka/config/server.properties" | grep -v -e '^\s*#' -e '^\s*$' > $(pwd)/kafka1/server.properties && docker exec -it kafka-tmp bash -c "cat /opt/kafka/config/server.properties" | grep -v -e '^\s*#' -e '^\s*$' > $(pwd)/kafka2/server.properties && docker exec -it kafka-tmp bash -c "cat /opt/kafka/config/server.properties" | grep -v -e '^\s*#' -e '^\s*$' > $(pwd)/kafka3/server.properties sed -i 's@broker.id=-1@broker.id=101@' $(pwd)/kafka1/server.properties sed -i 's@broker.id=-1@broker.id=102@' $(pwd)/kafka2/server.properties sed -i 's@broker.id=-1@broker.id=103@' $(pwd)/kafka3/server.properties docker run -itd --network my-elk --name kafka1 -p 10192:9092 -v $(pwd)/kafka1/server.properties:/opt/kafka/config/server.properties -v /var/run/docker.sock:/var/run/docker.sock --entrypoint "" wurstmeister/kafka:2.12-2.4.0 bash -e -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties' docker run -itd --network my-elk --name kafka2 -p 10292:9092 -v $(pwd)/kafka2/server.properties:/opt/kafka/config/server.properties -v /var/run/docker.sock:/var/run/docker.sock --entrypoint "" wurstmeister/kafka:2.12-2.4.0 bash -e -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties' docker run -itd --network my-elk --name kafka3 -p 10392:9092 -v $(pwd)/kafka3/server.properties:/opt/kafka/config/server.properties -v /var/run/docker.sock:/var/run/docker.sock --entrypoint "" wurstmeister/kafka:2.12-2.4.0 bash -e -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties' # kafka创建主题 docker exec -it kafka1 bash -c "kafka-topics.sh --bootstrap-server kafka1:9092 --create --partitions 3 --replication-factor 3 --topic test" # kafka查看主题 docker exec -it kafka1 bash -c "kafka-topics.sh --bootstrap-server kafka1:9092 --list" # kafka查看主题详情 docker exec -it kafka1 bash -c "kafka-topics.sh --bootstrap-server kafka1:9092 --describe --topic test" # kafka修改主题 docker exec -it kafka1 bash -c "kafka-topics.sh --bootstrap-server kafka1:9092 --alter --topic test --partitions 5" # kafka生产消息 docker run -it --rm --network my-elk wurstmeister/kafka:2.12-2.4.0 kafka-console-producer.sh --broker-list kafka1:9092 --topic test # kafka消费消息 docker run -it --rm --network my-elk wurstmeister/kafka:2.12-2.4.0 kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server kafka1:9092 --group console-group ``` ## kafka-broker - broker.id: broker的唯一标识符,集群环境该值不可重复 - log.dirs: 一个用逗号分隔的目录列表,可以有多个,用来为Kafka存储数据 - zookeeper.connect:zookeeper访问地址,多个地址用’,’隔开 - message.max.bytes: server能接收的一条消息的最大的大小 - https://kafka.apache.org/documentation/#brokerconfigs ## kafka-producer - https://kafka.apache.org/documentation/#producerconfigs ## kafka-consumer - https://kafka.apache.org/documentation/#consumerconfigs ```bash [2020-02-11 10:36:21,340] INFO Cluster ID = mJ_AYihBTnyQkXlrslbEJw (kafka.server.KafkaServer) [2020-02-11 10:36:21,351] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The Cluster ID mJ_AYihBTnyQkXlrslbEJw doesn't match stored clusterId Some(PxzeWiM2Q2ewvIap-LRVqQ) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:220) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala) docker exec -it zoo1 bash -c "zkCli.sh get /kafka/cluster/id" {"version":"1","id":"mJ_AYihBTnyQkXlrslbEJw"} cZxid = 0x10000005b ctime = Tue Feb 11 10:36:21 UTC 2020 mZxid = 0x10000005b mtime = Tue Feb 11 10:36:21 UTC 2020 pZxid = 0x10000005b cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 45 numChildren = 0 ~/kafka-logs/meta.properties # #Sat Feb 08 05:20:40 GMT 2020 cluster.id=PxzeWiM2Q2ewvIap-LRVqQ version=0 broker.id=1001 ``` ## ref * https://www.cnblogs.com/reblue520/p/11460584.html * https://www.cnblogs.com/kuku0223/p/8317965.html * https://www.cnblogs.com/zlslch/p/6622079.html * [Intermittent connectivity issues between Filebeat and Logstash](https://discuss.elastic.co/t/intermittent-connectivity-issues-between-filebeat-and-logstash/216286) * https://www.elastic.co/guide/en/beats/filebeat/master/filebeat-reference-yml.html