본문 바로가기
Programming/JAVA

Apache Kafka 정리

by 코딩의성지 2021. 8. 19.

하이 .. 최근에 프로젝트를 진행하면서 대용량 실시간 데이터를 Apache Kafka를 이용하여 사용해 보았다.

 

간단하게 Kafka는 링크드인에서 2011년 오픈소스로 공개된  실시간 분산 데이터 처리 플랫폼이라고 할 수 있다. 예전에 이렇게 대용량 실시간 스트리밍 데이터를 처리하기 위해서는 redis나 rabbitMQ등의 다양한 어플리케이션을 사용하곤 했었는데, 이런 어플리케이션은 각각 특징이 다 달라서 데이터 파편화 현상이 있어서 유지보수에 어려움을 줬다. 그래서 이리한 문제를 해결해줄 수 있는게 바로 Kafka 이다. Kafka를 사용하면 Source 어플리케이션과 Target 어플리케이션간의 커플링을 약하게 할 수 있다.

 

Kafka의 경우, 데이터 스트림을 각 어플리케이션에서 처리하는 것이 아닌 한곳에서 한곳에 중앙집중화하여 처리한다. 

Kafka는 세가지 특징을 가진다.

 

1. High throughput message capacity

이는 짧은 시간에 대용량 데이터를 쉽게 전달할수 있다는 것을 의미한다. 파티션을 통해 분산처리가 가능하여 데이터가 아무리 많아도 컨슈머를 늘려서 병렬처리를 통해 빠르게 분산 처리가 가능하다.

 

2. Scalability & Fault tolerant 

카프카는 이미 사용중인 브로커 서버 이외에도 신규 서버를 스케일 아웃할 수 있다. 그리고 여러대의 브로커 서버는 서로 레플리카, 즉 복제되어 있어 한대가 죽는다해도 운용이 가능하다.

 

3. Undeleted log

카프카 토픽에 들어간 데이터는 컨슈머가 데이터를 소비해도 바로 사라지지 않는다. 이는 다양한 어플리케이션에서 다양한 용도로 데이터를 처리할 때 굉장히 효율적이다.

 

카프카의 동작원리를 간단하게 말씀드리겠다.

위의 그림을 보면 가운데 카프카를 기준으로 양옆에 Producer 와 Consumer 가 있다. Producer는 실시간으로 데이터를 Kafka에 보내는 놈이고, Consumer는 Kafka에 있는 데이터를 가져다 쓰는 것이라 생각하면된다. 그리고 Kafka 내부에는 Topic이라는게 있는데 각각의 목적을 가진 큐라고 생각하면된다. (더 쉽게 이해하시려면 데이터 베이스의 테이블 역할을 한다 생각하시면 된다.)

 

Topic

 

Topic 에 대해 조금더 자세히 말씀드리겠다. Topic은 큐라고 말씀드렸는데 큐의 특징이 먼저 들어온놈이 먼저 나가는건데 각 Consumer는 먼저 들어온 데이터를 순차적으로 읽어낸다(0,1,2,3,4...). 조금 특이한게 토픽에 쌓인 데이터는 일반 큐처럼 바로 데이터가 날라가는게 아니다. 그렇기에 Consumer1 에서 데이터를 썼다해도. Consumer2에서 또 사용할 수가 있다. 다만 이렇게 사용하기 위해서는 Consumer 그룹이 달라야하고 auto.offset.reset = earliest 설정이 되어있어야한다.

 

 

그리고 위는 파티션이 하나인 경우인데 아래 그림처럼 파티션을 두개 이상 사용이 가능하다.

이렇게 파티션을 나눌 때 , 키를 넣어주는 방식과 아닌 방식이 있는데  키를 넣어주고 기본 파티셔너를 사용한다면 키의 해시 값을 이용해 특정 파티션에 데이터를 넣는게 가능하지만, 키가 없고(null) 기본 파티셔너를 사용할경우에는 위처럼 라운드 로빈 방식으로 데이터가 할당된다. 

 

이렇게 파티션을 늘리면 Consumer 갯수를 늘려서 데이터를 분산처리 시킬수가 있다. 그런데 파티션을 늘리는건 가능하지만 줄이는게 불가능하기 때문에 잘 설계해서 사용하도록하자.

 

이렇게 파티션에 쌓인 데이터의 삭제 주기를 우리는 설정할 수 있는데

log.retention.ms (최대 record 보존 시간), log.retention.byte (최대 record 보존 크기(byte)) 로 설정이 가능하다.

 

* Replication

여기서 각 카프카 브로커 마다 복수의 파티션이 존재할 수 있는데, 대부분 대용량 처리 서비스에서는 파티션의 고가용성을 위해 Replicate(복제)를 한다. 즉 브로커 하나가 죽어도 복제본이 있기에 가용 가능하게 되는 것이다.

 

 

이러한 고가용성은 Producer의 ack 옵션을 통해 유지가 가능하다. ack는 0,1,all 이렇게 옵션 3개 중 하나를 선택하여 사용할수 있다.

0일 경우에는 Leader Partition에 Request만 하고 별도의 Response를 받지 않는다. 이렇게 하면 데이터가 잘 request 전송되었는지와 나머지 Partition에 잘 복제가 되었는지 알수가 없어 데이터 유실이 발생할 수도 있다. 속도는 굉장히 빠르다.

1일 경우에는 Leader Partition에 Request한 내용과 정상적으로 갔는지에 대한 Response를 받을 수 있다. 하지만 아직까지도 나머지 Partion에 잘 복제가 되었는지는 알 수가 없다.

all 일경우에는 Leader Partition 뿐만 아니라 Follow Partition의 Request 와 Response 정보를 받는다. 이렇게하면 완전히 잘 복제되었는지는 알수가 있다. 데이터유실은 없으나 속도가 느리다는 단점이 있다.

 

Producer

 

다음은 Producer 에 대해서 조금더 말씀드리겠다. 앞서 말씀드린것처럼 Producer는 Topic에 맞는 메시지를 생성하여 Topic에 pubish 하고, 만약 실패시 재시도 하는 역할까지 하는 놈이라 보면 된다.

 

간단하게 Spring에서 Kafka Producer를 간단하게 구현하는 예제소스를 작성해 보겠다. 일단 환경은 gradle이라 가정한다. gradle 파일에 아래 내용을 넣어 라이브러리를 가져오자.

compile group: 'org.apache.kafka', name: 'kafka-clients', version: 'X.X.X'

위에서 가장 중요한건 버전인데 버전은 꼭 Producer 버전과 Kafka 브로커가 서로 호환되는 버전으로 잡아줘야한다.

 

public class MyProducer {
	public static void main(String[] args) throws IOException {
    	Properties configs = new Properties();
        configs.put("bootstrap.servers", "localhost:9082");
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        KafkaProducer <String, String> producer = new KafkaProducer <String, String> (configs);
        
        ProducerRecord record = new ProducerRecord <String, String> ("topic1", "log1"); //키 없을때
	// ProducerRecord record = new ProducerRecord <String, String> ("topic1, "1", "log1"); // 키 있을때producer.send(record);
        
        producer.close();
    }
}

이렇게 간단하게 Producer 예제를 짤수 가 있다.

 

먼저 보시면 자바의 Properties 객체를 이용하여 Producer의 설정을 정의한다. 순차적으로 설명을 드리면

configs.put("bootstrap.servers", "localhost:9082");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

bootstarp.servers 설정을 기존에 설치된 localhost 카프카를 바라보게 하고(kafka의 브로커 설정은 보통 세개이상의 ip와 port를 설정하도록 권장(브로커가 비정상일경우 지속가능하게 하기 위해) 이지만 예제라 하나만 세팅), 그리고 key 와 value를 String 직렬화 시켜서 저장해두었는데, String 말고도 Byte Array, Interger 도 직렬화 하여 저장할 수 있다. 여기서 키를 세팅하여 사용하면 임의의 파티션으로 데이터를 보낼수가 있다.

 

 KafkaProducer <String, String> producer = new KafkaProducer <String, String> (configs);

다음은 설정을 이용하여 Producer 객체를 만드는 부분이다.

 

ProducerRecord record = new ProducerRecord <String, String> ("topic1", "log1"); //키 없을때
// ProducerRecord record = new ProducerRecord <String, String> ("topic1, "1", "log1"); // 키 있을때
producer.send(record);

다음은 어떤 토픽에 데이터를 넣을것인지, 어떤 키와 밸류를 넣을것인지를 설정하는 ProducerRecord 객체를 만들고 데이터를 보내는 send 메서드를 날리는 부분의 코드이다.

 

prodcuer.close();

마지막으로 여느 io 객체처럼 다쓰고 나면 close를 해줘서 마무리한다.

 

 

Consumer

Consumer는 Partition에 저장된 데이터를 Polling 해오는 역할을 한다. 이때 Partition offset 위치를 기록( Commit) 할수도 있다. 그리고 Consumer가 여러개 일때, Consumer group을 통해 여러개의 Partion에 대한 병렬처리가 가능하다. 

 

Producer와 마찬가지로 간단한 예시를 작성해 보도록 하겠다.

compile group: 'org.apache.kafka', name: 'kafka-clients', version: 'X.X.X'

 

버전은 Producer 처럼 Consumer 버전과 Kafka 브로커가 서로 호환되는 버전으로 잡아줘야한다.

public class MyConsumer {
	public static void main(String[] args) throws IOException {
    	Properties configs = new Properties();
        configs.put("bootstrap.servers", "localhost:9082");
        configs.put("group.id", "topic1_group");
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        KafkaConsumer <String, String> consumer = new KafkaConsumer <String, String> (configs);
        
        consumer.subscribe(Arrays.asList("topic1"));
        
        while(true) {
        	ConsumerRecords<String, String> records = consumer.poll(500);
            for (ConsumerRecord<String, String> record : records) {
            	// 데이터 사용
            }
        }
     
    }
}

 

Consumer도 Producer와 마찬가지로 Properties 객체를 이용하여 설정을 해주는데,

Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9082");
configs.put("group.id", "topic1_group");
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer 역시 broker 서버 여러개를 연결해주는게 권장 사항이나 예제라 로컬호스트 하나만 설정을 한 것이고,

여기서 Consumer 그룹을 설정해줘야 한다. Consumer 그룹은 특정 아이디 값을 통해 묶어줄수 있다.

그리고 아까전에 Producer에서 key와 value에 대한 직렬화 설정을 해줬으니 여기선 반대로 역직렬화 설정을 해주면 된다. 

 

KafkaConsumer <String, String> consumer = new KafkaConsumer <String, String> (configs);
        
consumer.subscribe(Arrays.asList("topic1"));

다음은 설정값을 이용한 Consumer 객체 생성하는 부분과 어느토픽을 대상으로 데이터를 가져올지를 선언하는 subscribe 메서드 호출하는 부분의 코드이다. 

 

여기서 특정 토픽의 전체파티션이 아니라 key를 이용해 일부 파티션만 가지고 싶으면 아래 코드처럼 구현을 해주면된다.

TopicPartition partition0 = new TopicPartition(topicName, 0);
TopicPartition partition1 = new TopicPartition(topicName, 1);
consumer.assign(Arrays.asList(partition0, partition1));

 

다음은 폴링루프에 대한 코드이다.

while(true) {
	ConsumerRecords<String, String> records = consumer.poll(500);
	for (ConsumerRecord<String, String> record : records) {
		System.out.println(record.value());
      		  // 데이터 사용
	}
}

위의 코드는 Consumer를 구현하는데 있어 굉장히 핵심적인 로직이다. 위의 코드는 500ms 마다 데이터를 주기적으로 records 객체에 저장한다 ( 데이터가 없으면 null). 그리고 가져온 records를 for문으로 돌리고 내부에서 개개별의 데이터를 사용할 로직을 구현해주면된다.  value() 메서드를 사용해서 나온 값이 바로 실제로 처리해야하는 데이터가 된다. 

 

아 그리고 아까 말씀드린 것처럼 offset 정보를 저장한다고 했는데 조금더 자세히말하면 __consumer_offset에 마지막으로 가져온 데이터의 위치정보가 저장되기때문에 혹시나 Consumer가 망가지고 복구 된다고 하더라도, 그 다음위치부터 데이터를 가져오는게 가능해진다. 

 

그리고 Consumer 그룹에 포함된 Consumer 갯수는 Partition 갯수보다 작거나 같아야한다. Partition 갯수보다 많아지면 Consumer가 동작하지 않기때문이다. 여기까지는 알아두도록 하자.

 

조금 길긴 하지만 이렇게 Kafka의 구동하는 방식에 대해 간단하게 알아보았다. 다들 글을 쭉 재밌게 읽어주셨으면 좋겠다.

 

참조 도서 

<아파치 카프카 애플리케이션 프로그래밍 with 자바, 최원영, 비제이퍼블릭>

 

반응형

댓글