Page tree
Skip to end of metadata
Go to start of metadata

Quickstart

Step 1 : 다운로드

https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.0/kafka_2.12-2.2.0.tgz

카프카 압축 해제
> tar -xzf kafka_2.12-2.2.0.tgz
> cd kafka_2.12-2.2.0


Step 2 : 서버 실행

Kafka는 ZooKeeper를 사용하여 컨슈머 클라이언트와 카프카 클러스터에 관한 메타데이터를 저장한다. 주키퍼는 카프카 배포판에 포함되어 있으며, 카프카 배포판에서 제공해주는 스크립트를 사용해서 설치 및 실행 할 수 있다.

 ZooKeeper 개요

주키퍼는 분산 애플리케이션을 위한 코디네이션 시스템이다. 분산 애플리케이션이 안정적인 서비스를 할 수 있도록 분산되어 있는 각 애플리케이션의 정보를 중앙에 집중하고 구성 관리, 그룹 관리 네이밍, 동기화 등의 서비스를 제공한다.

그림과 같이 서버 여러 대를 앙상블(클러스터)로 구성하고, 분산 애플리케이션들이 각 클라이언트가 되어 주키퍼 서버들과 커넥션을 맺은 후 상태 정보 등을 주고 받게 된다. 상태 정보들은 주키퍼의 지노드라 불리는 곳에 Key-Value 형태로 저장하고, 지노드에 키-값이 저장된 것을 이용하여 분산 애플리케이션들은 서로 데이터를 주고 받는다.

주키퍼에서 사용되는 지노드는 데이터를 저장하기 위한 공간 이름을 말하는 것으로, 일반 컴퓨터의 파일이나 폴더 개념이라고 생각하면 쉽다. 지노드는 계층형 구조로 구성되어 있다.

주키퍼의 각 지노드는 데이터 변경 등에 대한 유효성 검사 등을 위해 버전 번호를 관리하며, 지노드의 데이터가 변경 될 때마다 지노드의 버전 번호가 증가한다.

주키퍼에 저장되는 데이터는 모두 메모리에 저장되어 처리량이 매우 크고 속도 또한 빠르다.

주키퍼의 클러스터를 앙상블이라고 하며 하나의 서버에만 서비스가 집중 되지 않게 분산하여 동시 처리하며, 한 서버에서 처리한 결과를 다른 서버와 동기화시켜 데이터의 안전성을 보장한다.

앙상블은 홀수 개의 서버를 멤버로 갖으며 앙상블의 서버 중 과반수가 작동 가능해야한다.

주키퍼 앙상블은 다섯 개의 서버 노드를 갖는것을 추천한다. 앙상블의 구성을 변경하려면 한 번에 하나의 서버 노드를 중단했다가 다시 로딩해야 한다. 하나 이상의 노드를 중단 할 수 없는 앙상블이라면 유지보수 작업에 위험이 따르고 서버가 너무 많을 경우 오히려 성능이 저하 될 수 있다.(카프카 핵심 가이드)

주키퍼, 카프카 실행
> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties


Step 3 : 토픽 생성 및 확인

토픽 생성 및 확인
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • --topic : 토픽 이름
  • --partitions : 파티션 수
  • --replication-factor : 복제 수
  • --create : 토픽 생성
  • --list : 토픽 목록 확인
  • --describe : 토픽 상세 확인

Step 4 : 메시지 전송

메시지 전송
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  • --broker-list : 브로커의 호스트명:포트번호, 호스트명:포트번호 형식으로 카프카 클러스터 내 모든 브로커 리스트를 입력
  • --topic : 메시지를 보내고자 하는 토픽 이름

Step 5 : 컨슈머 실행

컨슈머 실행
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  • --from-beginning : 처음부터 메시지를 가지고오도록 설정

Step 6 : 멀티 브로커 클러스터

멀티 브로커 클러스터 구성
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
서버 설정 파일 수정
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2
카프카 브로커 실행 및 토픽 생성
> bin/kafka-server-start.sh config/server-1.properties &
> bin/kafka-server-start.sh config/server-2.properties &

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
  • Leader : 토픽의 리더
  • Replicas : 토픽이 리플리케이션 되고 있는 브로커 위치
  • Isr(In Sync Replica) : 현재 리플리케이션되고 있는 리플리케이션 그룹, ISR 그룹에 속해 있는 구성원만이 리더의 자격을 가질수 있음

카프카 토픽 발행 및 소비
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
카프카 브로커 중지 (Fault tolerance 테스트)
> ps aux | grep server-1.properties
> kill -9 XXXX
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic


Step 7 : Kafka Connect를 이용한 Data 가져오기/내보내기

테스트 데이터 생성
> echo -e "foo\nbar" > test.txt
커넥트 실행
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
컨슈머 실행
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
데이터 추가
> echo Another line>> test.txt

Apache Kafka with Spring

pom.xml
<dependency>
	<groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>
application.properties
kafka.bootstrapAddress=localhost:9092
토픽 설정
@Configuration
public class KafkaTopicConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic slippTopic() {
        return new NewTopic("slipp", 1, (short) 1);
    }
}
메시지 발행 설정
@Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
메시지 컨슈머 설정
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                "foo");
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

Application
	public static final String topicName = "slipp";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping("/send")
    public void sendMessage(@RequestBody String msg) {
        kafkaTemplate.send(topicName, msg);
    }


	@KafkaListener(topics = topicName, groupId = "foo")
    public void listen(String message) {
        System.out.println("Received Message in group foo1: " + message);
    }

    @KafkaListener(topics = topicName, groupId = "foo")
    public void listen1(String message) {
        System.out.println("Received Message in group foo2: " + message);
    }

    @KafkaListener(topics = topicName, groupId = "bar")
    public void listen2(String message) {
        System.out.println("Received Message in group bar1: " + message);
    }

    @KafkaListener(topics = topicName, groupId = "bar")
    public void listen3(String message) {
        System.out.println("Received Message in group bar2: " + message);
    }

    @KafkaListener(topics = topicName, groupId = "baz")
    public void listenWithHeaders(
            @Payload String message,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(
                "Received Message in group baz1: " + message + " from partition: " + partition);
    }

참조

아파치 카프가 : https://kafka.apache.org/quickstart

아파치 카프카 스트림 : https://kafka.apache.org/22/documentation/streams/quickstart

주키퍼 : https://zookeeper.apache.org/doc/current/zookeeperOver.html

스프링 카프카 Reference : https://docs.spring.io/spring-kafka/reference/

스프링 카프카 API : https://docs.spring.io/spring-kafka/api/overview-summary.html

Intro to Apache Kafka with Spring : https://www.baeldung.com/spring-kafka

고승범님 글 : https://bit.ly/2KRuGEe

카프카 핵심가이드 : http://www.yes24.com/Product/goods/65418488

카프카, 데이터 플랫폼의 최강자 : https://github.com/onlybooks/kafka/

  • No labels