.net core 集成 kafka的步驟
最近維護(hù)的一個系統(tǒng)并發(fā)有點(diǎn)高,所以想引入一個消息隊(duì)列來進(jìn)行削峰。考察了一些產(chǎn)品,最終決定使用kafka來當(dāng)做消息隊(duì)列。以下是關(guān)于kafka的一些知識的整理筆記。
kafka
kafka 是分布式流式平臺。它由linkedin開發(fā),后貢獻(xiàn)給了apache開源組織并成為頂級開源項(xiàng)目。它可以應(yīng)用在高并發(fā)場景下的日志系統(tǒng),也可以當(dāng)作消息隊(duì)列來使用,也可以當(dāng)作消息服務(wù)對系統(tǒng)進(jìn)行解耦。
流處理平臺有以下三種特性:
- 可以讓你發(fā)布和訂閱流式的記錄。這一方面與消息隊(duì)列或者企業(yè)消息系統(tǒng)類似。
- 可以儲存流式的記錄,并且有較好的容錯性。
- 可以在流式記錄產(chǎn)生時(shí)就進(jìn)行處理。
一般它可以應(yīng)用于兩個場景:
- 構(gòu)造實(shí)時(shí)流數(shù)據(jù)管道,它可以在系統(tǒng)或應(yīng)用之間可靠地獲取數(shù)據(jù)。 (相當(dāng)于message queue)
- 構(gòu)建實(shí)時(shí)流式應(yīng)用程序,對這些流數(shù)據(jù)進(jìn)行轉(zhuǎn)換或者影響。 (就是流處理,通過kafka stream topic和topic之間內(nèi)部進(jìn)行變化)
broker
kafka中的每個節(jié)點(diǎn)即每個服務(wù)器就是一個broker 。
topic
kafka中的topic是一個分類的概念,表示一類消息。生產(chǎn)者在生產(chǎn)消息的時(shí)候需要指定topic,消費(fèi)者在消費(fèi)消息的時(shí)候也需要指定topic。
partition
partition是分區(qū)的概念。kafka的一個topic可以有多個partition。每個partition會分散到不同的broker上,起到負(fù)載均衡的作用。生產(chǎn)者的消息會通過算法均勻的分散在各個partition上。
consumer group
kafka的消費(fèi)者有個組的概念。一個partition可以被多consumer group訂閱。每個消息會廣播到每一個group中。但是每個消息只會被group中的一個consumer消費(fèi)。相當(dāng)于每個group,一個partition只能有一個consumer訂閱,所以group中的consumer數(shù)量不可以超過topic中partition的數(shù)量。并且消息的消費(fèi)的順序在每個partition中是保證有序的,但是在多個partition之間是不保證的,因?yàn)閏onsumer的消費(fèi)速度是有快慢的。
所以如果要用kafka實(shí)現(xiàn)嚴(yán)格的消息隊(duì)列點(diǎn)對點(diǎn)模式那么我們可以設(shè)置一個partition并且設(shè)置一個consumer。如果對消息消費(fèi)的順序不是那么敏感,那么可以設(shè)置多個partition來并行消費(fèi)消息,提高吞吐量。
安裝kafka
為了能體驗(yàn)下kafka,我們還是要實(shí)際安裝一下kafka,畢竟空想是沒有用的?,F(xiàn)在有了docker,安裝起來也是相當(dāng)?shù)魏唵?。我們只需要定義好docker-compose的yml就行了。
version: '3' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka depends_on: - zookeeper ports: - "9092:9092" environment: kafka_advertised_host_name: 192.168.0.117 kafka_create_topics: "test:3:1" kafka_zookeeper_connect: zookeeper:2181
1.我們在yml里定義2個service:
2.zookeeper,kafka的分布式依賴zookeeper,所以我需要先定義它。
kafka ,kafka的定義有幾個地方要注意的。
- depends_on:zookeeper 指定kafka依賴zookeeper這個service,當(dāng)啟動kafka的時(shí)候自動會啟動zookeeper。
- kafka_advertised_host_name 這里要指定宿主機(jī)的ip
- kafka_create_topics 這個變量只是的默認(rèn)創(chuàng)建的topic。"test:3:1"代表創(chuàng)建一個名為test的topic并且創(chuàng)建3個分區(qū)1個復(fù)制。
定義好這些之后我們只需要使用docker-compose命令運(yùn)行它:
sudo docker-compose up -d
.net 操作 kafka
安裝好kafka的docker環(huán)境之后,下面演示下如何使用.net操作kafka,進(jìn)行消息的生產(chǎn)與消費(fèi)。
生產(chǎn)者
static async task main(string[] args) { console.writeline("hello world producer!"); var config = new producerconfig { bootstrapservers = "192.168.0.117:9092", clientid = dns.gethostname(), }; using (var producer = new producerbuilder<null, string>(config).build()) { string topic = "test"; for (int i = 0; i < 100; i++) { var msg = "message " + i; console.writeline($"send message: value {msg}"); var result = await producer.produceasync(topic, new message<null, string> { value = msg }); console.writeline($"result: key {result.key} value {result.value} partition:{result.topicpartition}"); thread.sleep(500); } } console.readline(); }
新建一個控制臺項(xiàng)目,從nuget安裝kafka的官方client。
install-package confluent.kafka
代碼非常簡單,使用producerbuilder構(gòu)造一個producer,然后調(diào)用produceasync方法發(fā)送消息。
其中需要注意的是如果你的場景并發(fā)非常之高,官方文檔推薦的方法是produce而不是produceasync。這是一個比較迷的地方。按常理使用produceasync應(yīng)該比使用同步方法produce能獲得更高的并發(fā)才對。但是文檔確確實(shí)實(shí)說高并發(fā)場景請使用produce??赡苁菫榱吮苊鈖roduceasync結(jié)果返回的時(shí)候異步線程上下文切換造成的性能開銷。
原文:
there are a couple of additional benefits of using the produce method. first, notification of message delivery (or failure) is strictly in the order of broker acknowledgement. with produceasync, this is not the case because tasks may complete on any thread pool thread. second, produce is more performant because there is unavoidable overhead in the higher level task based api.
消費(fèi)者
static void main(string[] args) { console.writeline("hello world kafka consumer !"); var config = new consumerconfig { bootstrapservers = "192.168.0.117:9092", groupid = "foo", autooffsetreset = autooffsetreset.earliest }; var cancel = false; using (var consumer = new consumerbuilder<ignore, string>(config).build()) { var topic = "test"; consumer.subscribe(topic); while (!cancel) { var consumeresult = consumer.consume(cancellationtoken.none); console.writeline($"consumer message: { consumeresult.message.value} topic: {consumeresult.topic} partition: {consumeresult.partition}"); } consumer.close(); } }
消費(fèi)者的演示代碼同樣很簡單。我們需要指定groupid,然后訂閱topic。使用consumerbuilder構(gòu)造一個consumer,然后調(diào)用consume方法進(jìn)行消費(fèi)就可以。
注意:
這里默認(rèn)是自動commit消費(fèi)。你也可以根據(jù)情況手動提交commit。
運(yùn)行一下
我們運(yùn)行一個生產(chǎn)者進(jìn)程,按照500ms的速度生產(chǎn)消息。運(yùn)行三個consumer進(jìn)行消費(fèi),可以看到消息被均勻的推送到三個consumer上去。
總結(jié)
以上簡單的介紹了kafka的背景、安裝方法、使用場景。還簡單演示了如何使用.net來操作kafka。它可以當(dāng)作流式計(jì)算平臺來使用,也可以當(dāng)作傳統(tǒng)的消息隊(duì)列使用。它當(dāng)前非常流行,網(wǎng)上的資料也多如牛毛。官方也提供了簡單易用的.net sdk ,為.net 平臺集成kafka提供了便利。
以上就是.net core 集成 kafka的步驟的詳細(xì)內(nèi)容,更多關(guān)于.net core 集成 kafka的資料請關(guān)注碩編程其它相關(guān)文章!