티스토리 뷰

반응형
  • Kafka란?
  • Kafka 설치
  • Kafka Producer/Consumer
  • Kafka Connect

Apache Kafka란?

Apache Software Foundation의 Scala언어로 된 오픈 소스 메시지 브로커 프로젝트이다.

메시지 브로커는 특정한 서비스에서 다른 서비스로 메시지를 전달한다.

실시간 데이터 피드를 관리하기 위해 통일된 높은 처리량(대용량), 낮은 지연 시간을 지닌 플랫폼을 제공한다.

 

카프카가 없던 시절에는 Hadoop, SearchEngine 등 다양한 형태의 처리가 불편하였고, 확장이 편리하지 않았다.

카프카의 주요 장점은 다음과 같습니다:

카프카(Kafka)는 분산 스트리밍 플랫폼으로, 대용량의 실시간 데이터를 처리하기 위해 설계되었습니다. 주키퍼(Zookeeper)와 브로커(Broker)를 사용하여 메시지를 처리하며, Producer(생산자)와 Consumer(소비자)는 메시지를 생성하고 소비합니다. 또한, Kafka Connect Source와 Kafka Connect Sink는 데이터를 외부 시스템과 카프카 간에 이동시키는 역할을 담당하는 Kafka Connect를 이용합니다.

  1. 고성능: 카프카는 높은 처리량과 지연 시간을 제공합니다. 수백만 개의 메시지를 초당 처리할 수 있으며, 대규모의 데이터 스트림을 효율적으로 처리할 수 있습니다.
  2. 내구성: 카프카는 데이터의 내구성을 보장합니다. 메시지는 여러 브로커에 복제되므로 브로커 중 하나가 중단되어도 데이터가 손실되지 않습니다. 이를 통해 데이터의 안정성과 신뢰성을 보장할 수 있습니다.
  3. 확장성: 카프카는 분산 시스템으로 설계되어 있어, 브로커, 프로듀서, 컨슈머를 추가하여 시스템을 확장할 수 있습니다. 이는 대용량 데이터 처리에 필요한 확장성과 유연성을 제공합니다.
  4. 다양한 클라이언트 지원: 카프카는 다양한 프로그래밍 언어로 작성된 클라이언트를 제공합니다. 이를 통해 프로듀서와 컨슈머를 구현하고 실시간 데이터 처리를 수행할 수 있습니다.
  5. 유연한 데이터 유형: 카프카는 다양한 유형의 데이터를 처리할 수 있습니다. 구조적인 데이터뿐만 아니라 비구조적인 데이터도 처리할 수 있습니다. 또한, 데이터 변환 및 저장을 위한 유연성을 제공하여 다양한 데이터 처리 요구 사항을 충족시킬 수 있습니다.

이러한 장점들은 카프카가 대규모 데이터 처리 및 스트리밍 데이터 파이프라인을 구축하는 데 널리 사용되는 이유입니다.

 

Kafka Broker

Kafka Broker는 실행된 Kafka 애플리케이션 서버로써, 보통 3대 이상의 Broker Cluster 구성을 권장한다.

Zookeeper 연동

  • 역할 : 메타데이터 (Broker ID, Controller ID 등)저장하고, Controller의 정보를 저장한다.

n개 Broker 중 1대는 Controller의 기능을 수행한다.

  • Controller의 역할 : 각 Boker에게 담당 파티션 할당한다.
    • Broker 정상 동작 모니터링 관리

 

카프카(Kafka)와 주키퍼(ZooKeeper)는 밀접한 관련이 있는 두 가지 독립적인 시스템입니다. 
주키퍼는 카프카의 의존성 관리와 클러스터의 상태 관리를 담당하는 중앙 집중식의 분산 코디네이션 서비스입니다. 

이들 간의 연관관계는 다음과 같이 설명할 수 있습니다:

1. 의존성 관리: 
	카프카는 주키퍼에 의존합니다. 
	주키퍼는 카프카 브로커들과 컨슈머 그룹 간의 통신을 조정하고 필요한 정보를 제공합니다. 
	카프카 브로커들은 주키퍼를 통해 
	클러스터 구성 정보, 토픽(partition)의 상태, 브로커의 동적 할당 등을 확인하고 관리합니다.

2. 클러스터 상태 관리: 
	주키퍼는 카프카 클러스터의 상태를 관리합니다. 
	브로커의 생존 여부, 토픽의 파티션 할당, 리더 선출 등의 정보를 주키퍼가 유지하고 있습니다. 
	카프카 브로커는 이러한 정보를 주키퍼로부터 읽어와 클러스터의 상태를 파악하고, 필요한 조치를 취할 수 있습니다.

3. 리더 선출: 
	카프카의 토픽(partition)은 여러 개의 복제된 리플리카(replica)를 가집니다. 
	이 중 한 개의 리플리카가 리더(leader)로 선출되어 데이터의 읽기 및 쓰기 작업을 담당합니다. 
	주키퍼는 리더 선출과 관련된 메타데이터를 관리하며, 브로커들 간에 리더 선출이 필요한 경우 이를 조정합니다.

4. 동적 확장: 
	주키퍼는 카프카의 동적 확장을 지원합니다. 
	새로운 브로커가 클러스터에 추가되거나 기존 브로커가 중지될 때, 
	주키퍼를 통해 클러스터의 상태와 구성이 업데이트됩니다. 
	이를 통해 카프카 클러스터는 주키퍼의 도움으로 확장 가능하고 안정적으로 운영될 수 있습니다.

주키퍼는 카프카 클러스터의 상태 관리와 리더 선출을 포함한 분산 시스템의 조정과 협업을 담당합니다.
주키퍼는 
	- 카프카 클러스터의 구성 정보
	- 토픽의 파티션 할당
	- 브로커의 동적 할당 등을 관리하며
	- 클러스터의 안정성과 일관성을 보장합니다. 

이를 통해 카프카는 데이터의 내구성과 처리량을 유지하면서 신뢰할 수 있는 분산 메시징 시스템을 구축할 수 있습니다.
여기서 잠깐, Dubbo에 사용되는 Zookeeper와, Kafka에서 사용되는 Zookeeper?!

Dubbo에서 사용되는 주키퍼(ZooKeeper)와 
카프카(Kafka)에서 사용되는 주키퍼는 같은 주키퍼 시스템입니다.

주키퍼는 분산 코디네이션 서비스로서, 다양한 분산 시스템에서 사용됩니다. 
Dubbo와 카프카는 모두 주키퍼를 사용하여 분산 시스템의 필요한 기능을 지원합니다.

[Dubbo에서 주키퍼]
	서비스 디스커버리와 로드 밸런싱을 위한 용도로 사용됩니다. 
	주키퍼는 Dubbo의 레지스트리 센터로서, 서비스 프로바이더들이 주키퍼에 자신의 정보를 등록하고, 
	컨슈머들은 주키퍼를 통해 서비스 프로바이더들의 위치 정보를 얻어와 통신을 수행합니다.

[Kafka에서 주키퍼]
	메시징 시스템 내에서의 분산 코디네이션과 리더 선출을 위한 용도로 사용됩니다. 
	주키퍼는 카프카의 브로커들 사이의 조정과 협업을 담당하며, 리더 선출과 같은 중요한 작업을 수행합니다.
	또한, 주키퍼는 카프카의 토픽(partition)의 구성 정보와 상태를 관리합니다.

따라서, Dubbo와 카프카 모두 같은 주키퍼 시스템을 사용하지만, 
각각의 시스템에서 주키퍼는 다른 목적으로 활용됩니다. 
Dubbo에서는 서비스 디스커버리와 로드 밸런싱을 위한 용도로 주키퍼를 사용하고, 
카프카에서는 분산 메시징 시스템과 관련된 기능을 위해 주키퍼를 사용합니다.

 

Kafka 설치

Kafka 우분투에 설치 방법(kafka 설치, topic 생성, 통신 테스트)

~/kafka_2.13-3.2.1/config

connect-console-sink.properties		
connect-console-source.properties	
connect-distributed.properties		
connect-file-sink.properties		
connect-file-source.properties	
connect-log4j.properties	
connect-mirror-maker.properties		
connect-standalone.properties		

server.properties     # 카프카 구동 설정
zookeeper.properties  # 주키퍼 구동 설정
consumer.properties
producer.properties

kraft
tools-log4j.properties
trogdor.conf
log4j.properties

~/kafka_2.13-3.2.1/bin

connect-distributed.sh			
kafka-mirror-maker.sh
connect-mirror-maker.sh			
kafka-producer-perf-test.sh
connect-standalone.sh			
kafka-reassign-partitions.sh
kafka-acls.sh				
kafka-replica-verification.sh
kafka-broker-api-versions.sh		
kafka-run-class.sh
kafka-cluster.sh			
kafka-configs.sh	
kafka-server-start.sh  # Zookeeper Kafka Start
kafka-server-stop.sh   # Zookeeper Kafka Stop
kafka-console-consumer.sh		
kafka-storage.sh
kafka-console-producer.sh		
kafka-streams-application-reset.sh
kafka-consumer-groups.sh		
kafka-topics.sh
kafka-consumer-perf-test.sh		
kafka-transactions.sh
kafka-delegation-tokens.sh		
kafka-verifiable-consumer.sh
kafka-delete-records.sh			
kafka-verifiable-producer.sh
kafka-dump-log.sh			
trogdor.sh
kafka-features.sh			
windows     # window에서 사용되는 bin
kafka-get-offsets.sh			
zookeeper-security-migration.sh
kafka-leader-election.sh		
zookeeper-server-start.sh  # Zookeeper Server Start
zookeeper-server-stop.sh.  # Zookeeper Server Stop

kafka-log-dirs.sh			
kafka-metadata-shell.sh			
zookeeper-shell.sh

Apache Kafka 사용하기 - Producer/Consumer

  1. Kafka 메세지 전송 → Consumer에게 메시지 전달
  2. DB 변경이 생겼을 경우, Kafka가 다른 서비스에게 전달하기

https://cwiki.apache.org/confluence/display/kafka/clients

 

Clients - Apache Kafka - Apache Software Foundation

How The Kafka Project Handles Clients Starting with the 0.8 release we are maintaining all but the jvm client external to the main code base. The reason for this is that it allows a small group of implementers who know the language of that client to quickl

cwiki.apache.org

 


카프카, 주키퍼, 토픽, 메시지, Producer, Consumer, 구독

Kafka 서버 기동

./zookeeper-server-start.sh ../config/zookeeper.properties 
	## zookeeper 먼저 실행
	## INFO clientPortAddress is 0.0.0.0:2181 ## 2181 PORT로 기동됨

./kafka-server-start.sh ../config/server.properties
  ## INFO Awaiting socket connections on 0.0.0.0:9092 ## 9092 PORT로 기동

topick 조회 및 생성

% pwd
/Users/geumbit/study/tools/kafka_2.13-3.2.1

## kafka topic list 조회
% ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

## kafka topic 생성 (topic명: quickstart-events, 
## partitions 1로 설정함으로써 kafka-broker 한곳에만 topic 저장하겠다.
% ./bin/kafka-topics.sh --bootstrap-server localhost:9092 
		--create --topic quickstart-events --partitions 1
Created topic quickstart-events.

% ./bin/kafka-topics.sh --bootstrap-server localhost:9092 
		--list 
quickstart-events ## topic 리스트 조회 시 [quickstart-events] topic 하나가 존재한다

% ./bin/kafka-topics.sh --bootstrap-server localhost:9092 
    --describe --topic quickstart-events

## --describe 명령어를 사용하여 
## quickstart-events 토픽의 상세 정보를 조회하면 
## 해당 토픽의 파티션 정보와 리더 브로커 정보
Topic: quickstart-events	TopicId: A0PJoFXcQcGO3I6jNDTB-w	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: quickstart-events	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
파티션 수를 늘린다는 것은 Kafka 클러스터 내의 브로커들에게 토픽을 분산하여 저장하도록 하는 것을 의미합니다. 
각 파티션은 Kafka 클러스터의 여러 브로커 중 하나에 할당됩니다.

예를 들어, 
Kafka 클러스터에 2개의 브로커가 있고 토픽의 파티션 수를 2로 설정한다면, 
각 브로커에는 하나의 파티션씩 할당됩니다. 
이렇게 토픽을 파티션으로 분산 저장함으로써 데이터 처리와 확장성을 향상시킬 수 있습니다.

각 브로커에 할당되는 파티션은 토픽의 복제본(replica)과 관련이 있습니다. 
Kafka는 파티션의 복제본을 유지함으로써 내고장성을 제공합니다. 
각 파티션에는 한 개 이상의 복제본이 있을 수 있으며, 이 복제본은 다른 브로커에 저장됩니다. 
따라서 파티션 수를 늘리면 해당 토픽의 복제본 수도 증가하게 됩니다.

각 파티션은 독립적으로 데이터를 저장하고 처리하기 때문에, 
컨슈머 그룹의 멤버들은 분산된 파티션에서 병렬로 작업을 수행할 수 있습니다. 
이를 통해 데이터 처리량과 성능을 향상시킬 수 있습니다.

따라서, 파티션 수를 늘린다는 것은 
Kafka 클러스터의 브로커들에게 토픽의 파티션을 분산하여 저장하도록 하는 것입니다. 
이를 통해 데이터 처리량, 내고장성, 병렬 처리 등이 개선될 수 있습니다.

 

Producer

./bin/kafka-console-producer.sh 
		--broker-list localhost:9092 
		--topic quickstart-events

 

Consumer

./bin/kafka-console-consumer.sh 
		--bootstrap-server localhost:9092 
		--topic quickstart-events 
		--from-beginning

 

위 내용을 입력하는순간 kafka-9092서버에 아래와 같은 로그가 찍힌다.

[2023-06-04 17:00:23,810] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-28 in 24 milliseconds for epoch 0, of which 24 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2023-06-04 17:00:23,877] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group console-consumer-27572 in Empty state. Created a new member id console-consumer-4ebef3f3-bbe8-44ab-a4ba-1d7da2ebc503 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2023-06-04 17:00:23,889] INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-27572 in state PreparingRebalance with old generation 0 (__consumer_offsets-44) (reason: Adding new member console-consumer-4ebef3f3-bbe8-44ab-a4ba-1d7da2ebc503 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
[2023-06-04 17:00:23,900] INFO [GroupCoordinator 0]: Stabilized group console-consumer-27572 generation 1 (__consumer_offsets-44) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2023-06-04 17:00:23,918] INFO [GroupCoordinator 0]: Assignment received from leader console-consumer-4ebef3f3-bbe8-44ab-a4ba-1d7da2ebc503 for group console-consumer-27572 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)

위 Producer은 [quickstart-events] 이라는 토픽으로 설정해놓았고,

Consumer는 모두 [quickstart-events]라는 토픽을 Subscribe(구독)하고 있다.

그렇기에, Producer의 [quickstart-events] topic의 메시지를 입력하면

Consumer는 구독하고있는 topic의 메시지를 응답받을 수 있다.

만약, Consumer를 하나 더 만들어서 topic 구독한다면, —from-beginning 을 사용하였기에 처음부터 전달했던 메시지를 받아온다

 

 

Zookeeper [2181] Kafka [9092]
Producer[quickstart-events] Consumer [quickstart-events 구독]
  Consumer [quickstart-events 구독]

Kafka connect [8083]

 


Apache Kafka 사용 -kafka Connect

Kafka Connect를 통해 Data를 Import/Export 가능하며, 코드 없이 Configuration으로 데이터를 이동

Standalone mode, Distribution mode를 지원한다.

  • Restful API 통해 지원
  • Stream 또는 Batch 형태로 데이터 전송 가능
  • 커스텀 Connector를 통해 다양한 Plugin을 제공(Mysql, S3, File, Hive …)

 

Kafka와 Kafka Connect는 Apache Kafka를 기반으로 한 데이터 플랫폼의 구성 요소입니다. 다음은 각각의 개념과 역할에 대한 설명입니다:

  1. Kafka: Kafka는 분산 스트리밍 플랫폼으로, 대량의 실시간 데이터를 안정적이고 확장 가능한 방식으로 처리하는 데 사용됩니다. Kafka는 대용량의 데이터를 안정적으로 저장하고, 실시간으로 데이터를 스트리밍하며, 다양한 애플리케이션 간에 데이터를 연결하는 역할을 합니다. 메시지 큐 시스템으로 분류되며, pub-sub (publisher-subscriber) 패턴과 메시지 로그 기반의 영구 저장소로 동작합니다.
  2. Kafka Connect: Kafka Connect는 Kafka와 상호 작용하여 데이터를 외부 시스템과 연결하는 기능을 제공하는 도구입니다. Kafka Connect는 데이터 흐름을 처리하고 변환하며, 다른 시스템과의 데이터 이동을 담당합니다. Kafka Connect는 표준화된 커넥터 구성을 통해 다양한 데이터 소스 및 데이터 대상 시스템과 통합될 수 있습니다. 예를 들어, 데이터베이스, 파일 시스템, 메시징 시스템 등 다양한 소스 및 대상 시스템과의 연결을 지원합니다.

Kafka Connect는 데이터 통합 솔루션으로, Kafka를 사용하여 데이터를 캡처하고 이동시키는 기능을 제공합니다. 데이터 소스로부터 데이터를 읽어들이고 Kafka 토픽으로 전송하거나, Kafka 토픽에서 데이터를 소비하여 외부 시스템으로 전달하는 등의 작업을 수행할 수 있습니다. 이를 통해 Kafka Connect는 데이터 파이프라인을 구축하고 다양한 데이터 소스 및 대상 시스템 간에 데이터를 효율적으로 이동시키는 데 도움을 줍니다.

Confluent는 Kafka의 기업 지원을 제공하는 회사로, Confluent Platform은 Kafka와 관련된 다양한 도구와 기능을 포함한 플랫폼입니다. Kafka Connect는 Confluent Platform의 일부로 제공되는 구성 요소 중 하나입니다.

 

Kafka Connect 설치

  1. Kafka Connect Install
curl -O <http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz>
  1. JDBC Kafka Connect Install

https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/overview.html

Install the connector manually 를 찾아간 후 Download and extract the ZIP file  Click

→ 페이지 이동 후 Download 클릭

  1. topic 조회
geumbit@gimgeumbich-ui-MacBookPro kafka_2.13-3.2.1 % 
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
quickstart-events

위와같이 2개의 topic이 존재한다.

  1. Zookeeper와 Kafka 실행된 상태에서 kafka connect를 실행시킨다.
geumbit@gimgeumbich-ui-MacBookPro confluent-6.1.0 % 
	./bin/connect-distributed ./etc/kafka/connect-distributed.properties
  1. 이후 다시 topic을 조회해본다
geumbit@gimgeumbich-ui-MacBookPro kafka_2.13-3.2.1 % ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
connect-configs  ## 생성1
connect-offsets  ## 생성2
connect-status   ## 생성3
quickstart-events

생성된 topic들을 확인해보면 위 3가지가 생성되었다.

  1. 위에서 다운받은 JDBC Connect를 설치한다.

그럼 confluentinc-kafka-connect-jdbc-10.7.2.zip 해당 파일이 설치되었다.

해당 파일을 풀고나면 kafka-connect-jdbc-10.7.2.jar 파일이 존재한다.

 

vi /Users/geumbit/study/tools/confluent-6.1.0/etc/kafka/connect-distributed.properties

plugin.path=/Users/geumbit/study/tools/confluentinc-kafka-connect-jdbc-10.7.2/lib
  1. /Users/geumbit/study/tools/confluent-6.1.0/share/java/kafka 해당 경로에 mysql connect jar 파일을 복사한다.
geumbit@gimgeumbich-ui-MacBookPro 8.0.28 % pwd
/Users/geumbit/.m2/repository/mysql/mysql-connector-java/8.0.28

geumbit@gimgeumbich-ui-MacBookPro 8.0.28 % cp ./mysql-connector-java-8.0.28.jar /Users/geumbit/study/tools/confluent-6.1.0/share/java/kafka

 


Kafka Source Connect 사용

우리가 가지고 있는 카프카 코어에서 Connect Source와 Connect Sink를 등록할 수 있다.

Connect Source에 DB데이터를 가져오고 Kafka Cluster에 저장을하고,

관심있다고 등록한 Kafka Connect Sink에 데이터를 보낸다??????????

Kafka Source Connect 테스트를 해보자.

일단, mysql에 아래와 같이 테이블 만들어 놓는다

CREATE TABLE kafkatest (
  id INT AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(20),
  city VARCHAR(100),
  createAt DATE
);
echo '
{
		"name" : "my-source-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url":"jdbc:mysql://localhost:3306/testdb",
        "connection.user": "root",
        "connection.password": "gbitkim",
        "mode": " incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist":"kafkatest",
        "topic.prefix" : "mysql_kafka_test_",
        "tasks.max" : "1"
    }
}
' | curl -X POST -d @- <http://localhost:8083/connectors> --header "content-Type:application/json"

 

현재는 해당 테이블에 데이터가 없지만 수정되거나 추가되면

해당 토픽을 구독한 Consumer는 데이터를 바로바로 받을 수 있을것이다.

해당 connector 상세 조회

<http://localhost:8083/connectors/my-source-connect>

 

토픽의 정보를 확인해보자,

./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
mysql_kafka_test_kafkatest // 추가됨 확인

그렇다면 토픽 컨슈머를 확인해보자

./bin/kafka-console-consumer.sh \\n
	--bootstrap-server localhost:9092 --topic mysql_kafka_test_kafkatest \\n
	--from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"city"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Date","version":1,"field":"createAt"}],"optional":false,"name":"kafkatest"},"payload":{"id":1,"name":"John Doe","city":"New York","createAt":19514}}

데이터를 넣으면 실시간으로 Consumer가 확인함을 알 수 있다.

 

{
   "schema":{
      "type":"struct",
      "fields":[
         { "type":"int32", "optional":false, "field":"id" },
         { "type":"string", "optional":true, "field":"name" },
         { "type":"string", "optional":true, "field":"city" },
         { "type":"int32", "optional":true, 
						"name":"org.apache.kafka.connect.data.Date",
            "version":1, "field":"createAt" }
      ],
      "optional":false,
      "name":"kafkatest"
   },
   "payload":{ "id":1, "name":"John Doe", "city":"New York", "createAt":19514 }
}

 


Kafka Sink Connect 테스트

kafka Sink는mysql_kafka_test_kafkatest 에 전달된 topic의 데이터를 사용하는 역할을한다.

우선 kafka topic list를 확인해본다

kafka_2.13-3.2.1 % ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

__consumer_offsets
connect-configs
connect-offsets
connect-status
mysql_kafka_test_kafkatest

 

echo '
{
    "name" : "my-sink-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/testdb",
        "connection.user": "root",
        "connection.password": "gbitkim",
        "auto.create": "true",
        "auto.evolve": "true",
        "delete.enabled": "false",
        "tasks.max" : "1",
		"topics": "mysql_kafka_test_kafkatest"
    }
}
' | curl -X POST -d @- <http://localhost:8083/connectors> --header "content-Type:application/json"
echo '
{
	"name" : "my-sink-connect", // Sink Connector의 이름
	"config" : {
    
		// JdbcSinkConnector를 사용하여 MySQL 데이터베이스로 데이터를 전송
		"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
		"connection.url":"jdbc:mysql://localhost:3306/testdb",
		"connection.user": "root",
		"connection.password": "gbitkim",
		"auto.create": "true", // 테이블을 자동으로 생성
		"auto.evolve": "true", // 스키마 변경에 따라 테이블을 자동으로 수정
		"delete.enabled": "false", // 데이터 삭제를 비활성화
		"tasks.max" : "1", 
		"topics": "mysql_kafka_test_kafkatest" // 데이터 전송할 topic이름
    }
}
' | curl -X POST -d @- <http://localhost:8083/connectors> --header "content-Type:application/json"

 

 

 

위 생성되었으며, 함께  mysql_kafka_test_kafkatest  table과, 데이터도 생성되었다.

 

 

위 Sink는 mysql_kafka_test_kafkatest 라는 topic에 구독되어진상태이기때문에

아래 Table을 등록하면

INSERT INTO kafkatest (name, city, createAt)
VALUES ('test', 'Busan', '2023-06-06');

INSERT INTO kafkatest (name, city, createAt)
VALUES ('admin', 'Super', '2023-06-06');

이전에 실행했던 source 로 인하여 mysql_kafka_test_kafkatest 해당 토픽에 전달되며,

echo '
{
	"name" : "my-source-connect",
	"config" : {
		"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
		"connection.url":"jdbc:mysql://localhost:3306/testdb",
		"connection.user": "root",
		"connection.password": "gbitkim",
		"mode": " incrementing",
		"incrementing.column.name" : "id",
		"table.whitelist":"kafkatest",
		"topic.prefix" : "mysql_kafka_test_",
		"tasks.max" : "1"
    }
}
' | curl -X POST -d @- <http://localhost:8083/connectors> --header "content-Type:application/json"

mysql_kafka_test_kafkatest 해당 토픽을 구독하고 있는 Consumer 역시 데이터가 쌓이고

geumbit@gimgeumbich-ui-MacBookPro kafka_2.13-3.2.1 % ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mysql_kafka_test_kafkatest --fro
m-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"city"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Date","version":1,"field":"createAt"}],"optional":false,"name":"kafkatest"},"payload":{"id":1,"name":"John Doe","city":"New York","createAt":19514}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"city"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Date","version":1,"field":"createAt"}],"optional":false,"name":"kafkatest"},"payload":{"id":2,"name":"gbitkim","city":"Seoul","createAt":19514}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"city"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Date","version":1,"field":"createAt"}],"optional":false,"name":"kafkatest"},"payload":{"id":3,"name":"test","city":"Busan","createAt":19514}}

// 추가됨
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"city"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Date","version":1,"field":"createAt"}],"optional":false,"name":"kafkatest"},"payload":{"id":4,"name":"admin","city":"Super","createAt":19514}}

my-sink-connect 또한 해당 topic을 바라보고있고, 해당 토픽이 변경될때마다 테이블 및 데이터를 생성한다고 설정하였기에 아래와 같이 해당 테이블의 데이터가 생성되었다.

 

 

my-sink-connect

데이터를 추출하여 가공하여 다른쪽에 데이터를 전달하거나 가공하기에 좋다.

자, 그럼 이번에는 Producer를 생성하여 topic에 전달하여

위 만들었던 my-sink-connect 와 Consumer에게 전달해보자

geumbit@gimgeumbich-ui-MacBookPro kafka_2.13-3.2.1 % ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mysql_kafka_test_kafkatest
>{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"city"},{"type":"int32","optional":true,"name":"org.apache.kafka.connect.data.Date","version":1,"field":"createAt"}],"optional":false,"name":"kafkatest"},"payload":{"id":5,"name":"ProducerSend","city":"Send","createAt":19514}}

 

아래 테이블은 구독한 topic을 설정한 connect가 없기에 데이터가 없지만

 

아래는 구독한내용이 있기에 데이터가 동기화된다.

 

반응형
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/10   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31
글 보관함