博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka 学习笔记(一)
阅读量:3899 次
发布时间:2019-05-23

本文共 7630 字,大约阅读时间需要 25 分钟。

Kafka 学习笔记(一)

Kafka 是什么

Kafka 最初是 LinkedIn 的内部项目,现在已经捐赠给 Apache 基金会。Kafka 是一个高性能分布式基于发布/订阅的消息系统,是一个分布式的,可划分的,冗余备份的持久性的日志服务。

为什么要使用消息队列

当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异。“ 消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中,“ 消息队列”是在消息的传输过程中保存消息的容器。

使用消息队列的好处

提升响应速度

使用了消息队列,生产者一方,把消息往队列里一扔,就可以立马返回,响应用户了。无需等待处理结果。处理结果可以让用户稍后自己来取,如医院取化验单。也可以让生产者订阅(如:留下手机号码或让生产者实现listener接口、加入监听队列),有结果了通知。获得约定将结果放在某处,无需通知。

提升稳定性

考虑电商系统下订单,发送数据给生产系统的情况。电商系统和生产系统之间的网络有可能掉线,生产系统可能会因维护等原因暂停服务。如果不使用消息队列,电商系统数据发布出去,顾客无法下单,影响业务开展。两个系统间不应该如此紧密耦合。应该通过消息队列解耦。同时让系统更健壮、稳定。

消减服务耦合

传统的设计中一个服务的调用可能会涉及到多个下游的调用。直接造成一个服务和多个下游服务耦合,且多个下游调用将拖慢服务的运行速度。当使用了消息队列,我们的服务只关系消息是否发送成功,对于一条消息会被谁消费服务不在关心。直接消除了服务的间的耦合度,只与消息队列产生耦合。

Kafka 的入门案例

安装运行环境

安装zookeeper

因为Kafka是使用的zookeeper作为Kafka集群的注册中心,所以我们先要在自己的本机上部署zookeeper的实例。zookeeper使用的Java进行编写,所以我们的部署机器上还要有Java的运行环境

zookeeper下载地址: https://zookeeper.apache.org/releases.html
单机模式为例:
解压下载好的压缩包,进入到zookeeper的conf目录,将其中的zoo_sample.cfg重新命名为zoo.cfg
修改zoo.cfg的配置:

# The number of milliseconds of each ticktickTime=2000# The number of ticks that the initial# synchronization phase can takeinitLimit=10# The number of ticks that can pass between# sending a request and getting an acknowledgementsyncLimit=5# the directory where the snapshot is stored.# do not use /tmp for storage, /tmp here is just# example sakes.dataDir=/Users/vector/codeTools/zookeeper-3.6.2/datadataLogDir=/Users/vector/codeTools/zookeeper-3.6.2/dataLogs# the port at which the clients will connectclientPort=2181# start address and port# server.进程ID=运行IP地址:提供服务的端口:leader选举端口server.1=127.0.0.1:2181:3181

切换到bin目录下

输入 zkServer.sh start 即可启动zookeeper服务
vector@VectordeMacBook-Pro bin % ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /Users/vector/codeTools/zookeeper-3.6.2/bin/…/conf/zoo.cfg
Starting zookeeper … STARTED

kafka的运行配置

下载Kafka的运行文件,然后解压

下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.13-2.8.0.tgz
解压后进入conf目录,编辑server.properties文件(单机部署):

############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.broker.id=0############################# Socket Server Settings ############################## The number of threads that the server uses for receiving requests from the network and sending responses to the networknum.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/Onum.io.threads=8# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log fileslog.dirs=/Users/vector/codeTools/data/kafka_2.13-2.8.0/log/kafka# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.# This value is recommended to be increased for installations with data dirs located in RAID array.num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  ############################## The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1############################# Log Flush Policy ########################################################## Log Retention Policy ############################## The minimum age of a log file to be eligible for deletion due to agelog.retention.hours=168# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according# to the retention policieslog.retention.check.interval.ms=300000############################# Zookeeper #############################zookeeper.connect=localhost:2181/kafka# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=18000############################# Group Coordinator Settings #############################group.initial.rebalance.delay.ms=0

终端中输入 以下命令即可在后台运行Kafka

bin/kafka-server-start.sh -daemon  config/server.properties

运行demo

我们使用Java语言为例

Producer

package com.vector.kafka.demo.producer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class Producer {    public static final String BrokerList = "localhost:9092";    public static final String Topic = "kafka-topic-demo";    public static void main(String[] args) {        Properties properties = new Properties();        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        properties.put("bootstrap.servers", BrokerList);        try (KafkaProducer
producer = new KafkaProducer<>(properties)) { ProducerRecord
record = new ProducerRecord<>(Topic, "hello kafka!"); producer.send(record); } catch (Exception e) { e.printStackTrace(); } }}

Consumer

package com.vector.kafka.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class Consumer {    public static final String BrokerList = "localhost:9092";    public static final String Topic = "kafka-topic-demo";    public static final String GroupID = "group.demo";    public static void main(String[] args) {        Properties properties = new Properties();        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        properties.put("bootstrap.servers", BrokerList);        properties.put("group.id", GroupID);        KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Collections.singletonList(Topic)); while (true) { ConsumerRecords
records = kafkaConsumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord
record : records) { System.out.println(record.value()); } } }}

运行截图

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.hello kafka!hello kafka!Process finished with exit code 130 (interrupted by signal 2: SIGINT)

非常感谢你能看到这里,下一期我将继续分享学习Kafka的一些内容。本次分享的内容只是入门级别,很多配置的参数也没有详细的介绍。后续我将会详细讲解!

也是很久没有分享自己的学习情况,后续将逐步恢复起来。简单讲下这段时间的实习吧,实习期间的体验还是很不错的,作为一个实习生还是处于学习的阶段,手上没有很多的事情,主要是业务的复杂度比较高,实还是以学习业务为主。组内的气氛也是很不错,是真的不讲title,平时都是叫名字,这是在其他公司很难见到的!最后新的一轮毕业季快来了,祝大家都能拿到满意的offer,遇到对的老板!

转载地址:http://wxden.baihongyu.com/

你可能感兴趣的文章
我的2015年
查看>>
Android studio 上使用aidl总结
查看>>
jquery和js实现页面返回到之前的位置
查看>>
js实现上传图片前预览效果
查看>>
spring 缓存@Cacheable的用法以及配置
查看>>
spring 后台对象为空校验@Valid的用法以及配置。以及@Valid抛出异常问题分析
查看>>
java isAssignableFrom,isInstance,AnnotationUtils.findAnnotation用法讲解
查看>>
js 正则表达式分数校验小于等于100,并且保留一位小数
查看>>
Wdatepicker限制日期用法(开始日期不大于结束日期,并且不大于当前日期)
查看>>
oracle 根据逗号拼接的数据进行查询,进行一一对应
查看>>
cmd 批量编译某个路径下的java文件
查看>>
nginx配置查看服务器日志目录文件
查看>>
tomcat性能优化配置-协议类型选择nio或APR
查看>>
用jquery实现简单的模块开发
查看>>
spring-data-redis 整合配置redis
查看>>
解决Could not resolve placeholder 'redis.maxTotal' in string value "${redis.maxTotal}
查看>>
redis+lua实现高并发商品秒杀案例
查看>>
arttemplate基础语法介绍
查看>>
FreeMarker基础语法介绍
查看>>
linux 下安装minio并配置
查看>>