kafka 기본 설치와 도커 컴포즈를 활용한 클러스터 구성

01. 사전 개념

(01) Kafka 의 종류

  • Apache Kafka [ 자유롭게 사용 가능]
  • Confluent Kafka
    • 커뮤니티 버전에서도
    • 기본 Kafka 에서 지원하지 않는 기능이 지원됨 ( 프리미어 커넥트가 주요함 )

(02) 데이터 파이프 라인 이란 ?

사람의 개입없이 데이터를 오염, 중복 , 유실과 같은 결합 없이 수집 저장 ETL 이 가능하도록 일련의 흐름을 만들어 주는 과정

(03) 시스템 모니터링 방식

kafka 모니터링

(04) 메세징 허브

[기존] 기존의 서비스가 1개 시스템의 실패시 전체의 문제를 만들수 있음

[변경] 특정 프로세스의 실패시에도 해당 프로세스만 문제로 처리 되고, 서비스 마다 자신의 성능에 맞게 설정하여 진행이 가능하다.

02. Kafka 설치와 설정

01) Apache Kafka 설치 (단일 노드)

(01) 다운로드 사이트 접속

다운로드 링크 : Apache Kafka

(02) 압축 해제

tar -xvf kafka_2.13-2.8.0.tgz

[결과]

jay@JayLees-MacBook-Pro  ~/kafka-test  ls
kafka_2.13-2.8.0.tgz
 jay@JayLees-MacBook-Pro  ~/kafka-test  tar -xvf kafka_2.13-2.8.0.tgz
x kafka_2.13-2.8.0/
x kafka_2.13-2.8.0/LICENSE
x kafka_2.13-2.8.0/NOTICE
x kafka_2.13-2.8.0/bin/
x kafka_2.13-2.8.0/bin/zookeeper-shell.sh
x kafka_2.13-2.8.0/bin/kafka-log-dirs.sh
x kafka_2.13-2.8.0/bin/zookeeper-server-stop.sh
x kafka_2.13-2.8.0/bin/kafka-configs.sh
x kafka_2.13-2.8.0/bin/kafka-server-stop.sh
x kafka_2.13-2.8.0/bin/windows/
x kafka_2.13-2.8.0/bin/windows/kafka-delegation-tokens.bat
x kafka_2.13-2.8.0/bin/windows/kafka-producer-perf-test.bat
x kafka_2.13-2.8.0/bin/windows/kafka-run-class.bat
x kafka_2.13-2.8.0/bin/windows/kafka-server-stop.bat
x kafka_2.13-2.8.0/bin/windows/kafka-streams-application-reset.bat
x kafka_2.13-2.8.0/bin/windows/kafka-dump-log.bat
x kafka_2.13-2.8.0/bin/windows/kafka-server-start.bat

(03) 자주 접근을 위해 심볼릭 링크 추가

ln -s kafka_2.13-2.8.0 kafka
 jay@JayLees-MacBook-Pro  ~/kafka-test  ln -s kafka_2.13-2.8.0 kafka
 jay@JayLees-MacBook-Pro  ~/kafka-test  cd kafka
 jay@JayLees-MacBook-Pro  ~/kafka-test/kafka  ls
LICENSE   NOTICE    bin       config    libs      licenses  site-docs

(04) Zookeeper 설정

[Zookeeper 설정 파일 살펴 보기]

  • 2181 포트로 서비스 진행함

[zookeeper 실행]

bin/zookeeper-server-start.sh config/zookeeper.properties

[실행 결과]

 jay@JayLees-MacBook-Pro  ~/kafka-test/kafka  bin/zookeeper-server-start.sh config/zookeeper.properties
[2023-08-01 10:26:07,293] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-08-01 10:26:07,294] WARN config/zookeeper.properties is relative. Prepend ./ to indicate that you're sure! (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-08-01 10:26:07,300] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-08-01 10:26:07,300] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2023-08-01 10:26:07,301] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-08-01 10:26:07,301] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2023-08-01 10:26:07,301] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
...
[2023-08-01 10:26:07,334] INFO binding to port 0.0.0.0/0.0.0.0:2181 
....
[2023-08-01 10:26:07,355] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.Cont

[1차 확인] 2181 포트 바인딩

[2차 확인] Telnet 접속 테스트

 jay@JayLees-MacBook-Pro  ~  telnet 0 2181
Trying 0.0.0.0...
Connected to 0.
Escape character is '^]'.

[Zookeeper 쉘 접근]

 bin/zookeeper-shell.sh localhost:2181

[결과]

 ✘ jay@JayLees-MacBook-Pro  ~/kafka-test/kafka   bin/zookeeper-shell.sh localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
ls /
[zookeeper]

-> 정상 실행 확인 루트내 zookeeper 확인

(05) Kafka 브로커 실행

[설정 파일 수정]

vim config/server.properties

로컬 서버 동작을 위해 127.0.0.1 로 리스너 활성화

...
31 #listeners=PLAINTEXT://:9092 
---
31 listeners=PLAINTEXT://127.0.0.1:9092 

[브로커 실행]

bin/kafka-server-start.sh config/server.properties
 jay@JayLees-MacBook-Pro  ~/kafka-test/kafka  bin/kafka-server-start.sh config/server.properties
[2023-08-01 10:37:25,242] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2023-08-01 10:37:25,406] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2023-08-01 10:37:25,451] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2023-08-01 10:37:25,453] INFO starting (kafka.server.KafkaServer)
[2023-08-01 10:37:25,453] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2023-08-01 10:37:25,462] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
...
[2023-08-01 10:37:26,086] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT://127.0.0.1:9092, czxid (broker epoch): 27 (kafka.zk.KafkaZkClient)
[2023-08-01 10:37:26,117] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
...
[2023-08-01 10:37:26,193] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2023-08-01 10:37:26,251] INFO [broker-0-to-controller-send-thread]: Recorded new controller, from now on will use broker 127.0.0.1:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)

(06) “TEST” 토픽 생성 테스트

[토픽 생성]

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
 ✘ jay@JayLees-MacBook-Pro  ~/kafka-test/kafka  bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
Created topic test.
 jay@JayLees-MacBook-Pro  ~/kafka-test/kafka 

[토픽 리스트 확인]

bin/kafka-topics.sh --list --bootstrap-server localhost:9092
 ✘ jay@JayLees-MacBook-Pro  ~/kafka-test/kafka  bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

토픽 생성시 주의할 점

  1. 글자수 제한이 있다
  2. “.” 과 같은 특수 문자는 사용하지 말자

(07) 데이터 넣어보기 (프로듀서 해보기)

[명령]

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

[메세지 입력하기]

test msg 01
test msg 02

[결과]

 jay@JayLees-MacBook-Pro  ~/kafka-test/kafka  bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>test msg 01
>test msg 02

(08) 데이터 확인하기 (컨슈머 해보기)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  • from-begining 을 넣는 이유
    기본적으로 컨슈머 콘솔이 붙을 때에는 붙는 시점 이후의 데이터를 읽게 된다. 붙이지 않으면 과거에 데이터들을 무시하는 만큼 처음 시점 데이터가 필요한 경우 넣는다.

[결과]

 ✘ jay@JayLees-MacBook-Pro  ~/kafka-test/kafka  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
test msg 01
test msg 02

(09) 주키퍼 재확인 하기

bin/zookeeper-shell.sh localhost:2181
 ✘ jay@JayLees-MacBook-Pro  ~/kafka-test/kafka  bin/zookeeper-shell.sh localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
ls /brokers
[ids, seqid, topics]
ls /brokers/ids
[0] # 브로커 등록시 아이디 정보로 들어 갔다. 
 ls /brokers/topics
[__consumer_offsets, test] # consumer_offset 의 대한 토픽은 기본적으로 지정 된다. 

쉘에 다양한 카프카 관련 요소들이 생성된 것을 알수 있다.

02) Confluence Kafka 설치 (클러스터 방식)

  • Docker Compose 를 활용하여 진행할 계획

(01) docker-compose 실행

[m1 / intel 모두 호환 가능 ] (7 버전 대부터 ARM 이 지원되기 시작하였고, 7.2 부터는 1개 버전에 통합되어 지원이 가능하다. )

git clone https://github.com/conduktor/kafka-stack-docker-compose.git
cd [git 폴더] 

[ compose 살펴 보기 ]

version: '2.1'

services:
  zoo1:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888

  zoo2:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo2
    container_name: zoo2
    ports:
      - "2182:2182"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2182
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888

  zoo3:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo3
    container_name: zoo3
    ports:
      - "2183:2183"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2183
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888



  kafka1:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka2:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"
      - "29093:29093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka3:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"
      - "29094:29094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1
      - zoo2
      - zoo3

(02) docker-compose 실행

docker compose -f zk-multiple-kafka-multiple.yml up

docker compose 결과 확인

docker ps 
jay@JayLees-MacBook-Pro  ~/kafka-test/resources   main  docker ps
CONTAINER ID   IMAGE                             COMMAND                  CREATED          STATUS          PORTS                                                                                                        NAMES
cc0c291695a7   confluentinc/cp-kafka:6.2.0       "/etc/confluent/dock…"   55 seconds ago   Up 52 seconds   9092/tcp, 0.0.0.0:39092->39092/tcp                                                                           resources-kafka-3-1
b52a917cdea5   confluentinc/cp-kafka:6.2.0       "/etc/confluent/dock…"   55 seconds ago   Up 52 seconds   9092/tcp, 0.0.0.0:29092->29092/tcp                                                                           resources-kafka-2-1
3f4e7494df25   confluentinc/cp-kafka:6.2.0       "/etc/confluent/dock…"   55 seconds ago   Up 52 seconds   9092/tcp, 0.0.0.0:19092->19092/tcp                                                                           resources-kafka-1-1
2909c071e03e   confluentinc/cp-zookeeper:6.2.0   "/etc/confluent/dock…"   56 seconds ago   Up 53 seconds   2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 0.0.0.0:42888->42888/tcp, 3888/tcp, 0.0.0.0:43888->43888/tcp   resources-zookeeper-3-1
1ca731a489ce   confluentinc/cp-zookeeper:6.2.0   "/etc/confluent/dock…"   56 seconds ago   Up 53 seconds   2181/tcp, 2888/tcp, 0.0.0.0:12181->12181/tcp, 0.0.0.0:22888->22888/tcp, 3888/tcp, 0.0.0.0:23888->23888/tcp   resources-zookeeper-1-1
f40153edf6f2   confluentinc/cp-zookeeper:6.2.0   "/etc/confluent/dock…"   56 seconds ago   Up 53 seconds   2181/tcp, 2888/tcp, 0.0.0.0:22181->22181/tcp, 0.0.0.0:32888->32888/tcp, 3888/tcp, 0.0.0.0:33888->33888/tcp   resources-zookeeper-2-1
  • Zookeeper
    포트 : 2191 ~ 2193 (3개)
  • Kafka
    접근 포트 : 9092 ~ 9094 / 29092 ~ 29094

(03) 정상 동작 여부 확인 하기

컨플루언트 다운로드 : Previous Versions – Confluent

[01] Tarball 로 다운로드

[02] 압축 푼 후 -> bin 폴더 접근

[토픽 생성 테스트]

./kafka-topics --bootstrap-server localhost:9092 --create --topic test --partitions 2 --replication-factor 3
  • Partition 과 factor 로 분산 저장 및 HA 유지

[결과]

 ✘ jay@JayLees-MacBook-Pro  ~/kafka-test/confluent-kafka/confluent-7.4.0/bin  ./kafka-topics --bootstrap-server localhost:9092 --create --topic test --partitions 2 --replication-factor 3
Created topic test.

[ 프로덕션]

./kafka-console-producer --bootstrap-server localhost:9092 --topic test
 ✘ jay@JayLees-MacBook-Pro  ~/kafka-test/confluent-kafka/confluent-7.4.0/bin  ./kafka-console-producer --bootstrap-server localhost:9092 --topic test
>hello
>test123
>good
>

[토픽 / 파티션 분배 현황 확인]

./kafka-topics --describe --bootstrap-server localhost:9093 --topic test
 ✘ jay@JayLees-MacBook-Pro  ~/kafka-test/confluent-kafka/confluent-7.4.0/bin  ./kafka-topics --describe --bootstrap-server localhost:9093 --topic test
Topic: test	TopicId: G5YT3ayRRRSer0X5CfB-mQ	PartitionCount: 2	ReplicationFactor: 3	Configs:
	Topic: test	Partition: 0	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
	Topic: test	Partition: 1	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2

리더 정보와 파티션들의 분배 현황 및 ISR 정보를 얻을 수 있다.

파티션은 늘리기만 가능하나, Key 로 맵핑하고 있다면, 늘리지 않는 것이 좋다.

(04) HA TEST (Kafka 1개 제거 )

kafka 컨테이너 ID 확인

docker ps 
>  docker ps
CONTAINER ID   IMAGE                             COMMAND                  CREATED          STATUS          PORTS                                                        NAMES
f0425f3ec9cd   confluentinc/cp-kafka:7.3.2       "/etc/confluent/dock…"   17 minutes ago   Up 17 minutes   0.0.0.0:9094->9094/tcp, 9092/tcp, 0.0.0.0:29094->29094/tcp   kafka3
cd158bad385a   confluentinc/cp-kafka:7.3.2       "/etc/confluent/dock…"   17 minutes ago   Up 17 minutes   0.0.0.0:9093->9093/tcp, 9092/tcp, 0.0.0.0:29093->29093/tcp   kafka2
988faffd3e9c   confluentinc/cp-kafka:7.3.2       "/etc/confluent/dock…"   17 minutes ago   Up 17 minutes   0.0.0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp             kafka1
2b9c500d121d   confluentinc/cp-zookeeper:7.3.2   "/etc/confluent/dock…"   17 minutes ago   Up 17 minutes   2181/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2182->2182/tcp         zoo2
529a197f2cae   confluentinc/cp-zookeeper:7.3.2   "/etc/confluent/dock…"   17 minutes ago   Up 17 minutes   2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp                   zoo1
ad37e8d72e92   confluentinc/cp-zookeeper:7.3.2   "/etc/confluent/dock…"   17 minutes ago   Up 17 minutes   2181/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2183->2183/tcp         zoo3

2번 kafka KILL

 docker kill -s=SIGKILL cd158bad385a

TOPIC 현황 확인

 ✘ jay@JayLees-MacBook-Pro  ~/kafka-test/confluent-kafka/confluent-7.4.0/bin  ./kafka-topics --describe --bootstrap-server localhost:9092 --topic test
Topic: test	TopicId: G5YT3ayRRRSer0X5CfB-mQ	PartitionCount: 2	ReplicationFactor: 3	Configs:
	Topic: test	Partition: 0	Leader: 3	Replicas: 2,3,1	Isr: 3,1
	Topic: test	Partition: 1	Leader: 3	Replicas: 3,1,2	Isr: 3,1

-> 모두 리더가 3번으로 변경 되었다.

(참고) 2번에 접근하면 어떻게 될까 ?

jay@JayLees-MacBook-Pro  ~/kafka-test/confluent-kafka/confluent-7.4.0/bin  ./kafka-topics --describe --bootstrap-server localhost:9093 --topic test
[2023-08-01 15:59:57,955] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2023-08-01 15:59:58,059] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

연결에 실패 한다.

03. Kafka 설정 살펴 보기

GitHUB LINK : kafka/config at trunk · apache/kafka · GitHub

01) Server.properties

############################# Server Basics #############################

# 브로커의 대한 ID 정보 
broker.id=0

############################# Socket Server Settings #############################

# 2개로 분리한 이유는, 내부와 외부 트래픽을 나눠서 진행하여, 효율화와 보안을 강화할 수 있다. )
# 브로커에서 참조하는 주소 
#listeners=PLAINTEXT://:9092

# 프로듀서 / 컨슈머가 참고하는 주소 
#advertised.listeners=PLAINTEXT://your.host.name:9092

# 서버가 요청이나 응답을 받는데 동작하는 서비스 
num.network.threads=3

# 서버가 클라이언트 요청을 처리할 때 디스크 IO 를 처리하는 쓰레드 
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# 브로커의 데이터 저장 폴더를 지정 하는 것 
log.dirs=/tmp/kafka-logs

# 기본 파티션 개수 
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
# 로그의 상황에 따른 삭제 정책

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

[ 프로덕션에서 주로 설정하는 항목]

  • auto.create.topics.enable : 토픽의 자동 생성 여부
  • compression type : 메세지 압축 방식 (동작은 프로듀서가 압축해서 전달하면, 브로커는 그대로 저장 하고, 추후 컨슈머가 읽어와 변환함 )
  • delete.topic.enable : 토픽 삭제의 대한 활성화
  • message.max.bytes : 메세지 페이로드의 가장 큰 사이즈 (브로커에서 전송 받을 데이터의 최대 사이즈 )
  • replica.log.time.max.ms : 팔로워가 리더의 요청 메세지를 얼마만큼 기다려 줄 것인지 [기본값 : 30초]

04. 기타

01) AKHQ (GUI 도구)

GUI 로 카프카의 대한 자원 관리를 효율적으로 수행하게 해줌

(01) 적용 방법

기존 docker-compose 파일 내 제일 하단에 아래 내용 추가

  akhq:
    image: tchiotludo/akhq:latest
    hostname: akhq
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            kafka:
              properties:
                bootstrap.servers: kafka1:29092,kafka2:29093,kafka3:29094
    ports:
      - 8080:8080      

(02) 화면

노드의 대한 설정이나 토픽의 대한 설정을 GUI 를 통해 진행할 수 있음

02) 관련 커맨드 모음

# kafka topic 생성
./kafka-topics --bootstrap-server localhost:19092 --create --topic fastcampus --partitions 20 --replication-factor 3

# kafka에 생성된 토픽 리스트 확인
./kafka-topics --bootstrap-server localhost:19092 --list

# 특정 토픽의 파티션 수, 리플리카 수 등의 상세정보 확인
./kafka-topics --describe --bootstrap-server localhost:19092 --topic fastcampus

# kafka 콘솔 컨슈머 실행
./kafka-console-consumer --bootstrap-server localhost:19092 --topic fastcampus --from-beginning

# kafka 콘솔 프로듀서 실행
./kafka-console-producer --bootstrap-server localhost:19092 --topic fastcampus

119 thoughts on “kafka 기본 설치와 도커 컴포즈를 활용한 클러스터 구성”

  1. An impressive share! I’ve just forwarded this onto a colleague who had been doing a little research on this. And he actually ordered me dinner simply because I discovered it for him… lol. So allow me to reword this…. Thanks for the meal!! But yeah, thanks for spending some time to discuss this subject here on your blog.

    응답

Leave a Comment