前言
本文主要介绍springboot集成kafka。
安装kafka
创建项目
将名为springboot-kafka的pom项目创建为父项目,删除所有main和资源文件夹,并将配置添加到pom文件中
xsi :方案定位=' http://maven.Apache.org/POM/4.0.0http://maven.Apache.org/xsd/maven-4.0.0.xsssd
4.0.0
com.lxg
springboot-kafka
1.0-快照
pom
springboot-kafka
springboot-kafka-common
springboot-kafka-consumer
springboot-kafka-producer
org.springframework.boot
spring-boot-starter-parent
2.1.3 .发布
org.springframework.boot
spring-boot-starter-web
org.springframework.kafka
spring-kafka
org.projectlombok
lombok
1.18.12
com.alibaba
快速强森
1.2.67
3 .建立公共服务模块
创建名为springboot-kafka-common的微服务。 包是jar,用于存储公共配置和公共类,如util
1 .配置POM文件
xsi :方案定位=' http://maven.Apache.org/POM/4.0.0http://maven.Apache.org/xsd/maven-4.0.0.xsssd
com.lxg
springboot-kafka
1.0-快照
4.0.0
springboot-kafka-common
通过在pom文件中使用父项目作为父项依存关系,无需添加依存关系
创建新的user实体类
@Data
publicclassuserimplementsserializable {
//*
* id
*/
私有integer id;
//*
*用户名
*/
私有字符串用户名称;
//*
*密码
*/
私有字符串密码;
}
3 .创建APP应用程序-common.yml配置文件,主要添加kafka的公共配置文件
spring:
kafka:
#kafka配置
bootstrap-servers :192.168.56.10233609092
producer:
retries: 0
#一次大量发送的消息数
batch-size: 16384
buffer-memory: 33554432
#指定消息key和消息主体的编解码方式
key-serializer 3360 org.Apache.Kafka.com mon.serialization.string serializer
value-serializer 3360 org.Apache.Kafka.com mon.serialization.string serializer
消费者:
#指定默认消费者组id
组- id :测试用户组
自动偏移重置:电子仓库
启用自动提交:真
自动提交间隔: 5000
#指定消息key和消息主体的编解码方式
key-deserializer 3360 org.Apache.Kafka.com mon.serialization.string deserializer
value-deserializer 3360 org.Apache.Kafka.com mon.serialization.string deserializer
#在微服务中使用Value注释注入调用,如果kafka中没有该主题,则会自动创建您定义的主题名称
顶级的
ic:userTopic: userInfo
4.创建消息生产者,即创建一个名为springboot-kafka-producer的普通springboot项目
1.pom文件配置
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
com.lxg
springboot-kafka-producer
1.0-SNAPSHOT
4.0.0
com.lxg
springboot-kafka-common
1.0-SNAPSHOT
2.application.yml配置文件,配置端口,设置微服务名称,引入公共服务模块中的application-common.yml
server:
port: 8081
spring:
application:
name: kafka-producer
profiles:
active: common
3.controller层
创建UserController
@Slf4j
@Controller
@RequestMapping("/api/user")
public class UserController {
@Autowired
private UserService userService;
@ResponseBody
@GetMapping("/getUser")
public void getUser() {
userService.sendUserMsg();
log.info("getUser");
}
}
4.service层
创建UserService
public interface UserService {
/**
* 发送用户信息
*
* @return
*/
Boolean sendUserMsg();
}
创建UserServiceImpl
@Slf4j
@Service
public class UserServiceImpl implements UserService {
@Value("${spring.kafka.topic.userTopic}")
private String userTopic;
@Autowired
KafkaTemplate kafkaTemplate;
@Override
public Boolean sendUserMsg() {
User user = new User();
user.setId(1);
user.setUsername("lxg");
user.setPassword("6767167");
kafkaTemplate.send(userTopic, JSONObject.toJSONString(user));
log.info("lxg");
return Boolean.TRUE;
}
}
5.创建启动类
@SpringBootApplication
public class WebApplication {
public static void main(String[] args) {
SpringApplication.run(WebApplication.class, args);
}
}
5.创建消息消费者
1.pom文件
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
com.lxg
springboot-kafka-consumer
1.0-SNAPSHOT
4.0.0
com.lxg
springboot-kafka-common
1.0-SNAPSHOT
2.创建yml配置文件
server:
port: 8082
spring:
application:
name: kafka-consumer
profiles:
active: common
3.创建consumer消费者类
@Slf4j
@Component
public class UserConsumer {
@KafkaListener(topics = {"${spring.kafka.topic.userTopic}"})
public void userConsumer(String message) {
log.info("receive msg " + message);
}
}
4.启动类
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
测试
启动producer和consumer两个服务模块
访问producer微服务中的接口 http://localhost:8081/api/user/getUser
会发现consumer微服务中的控制台打印了producer中创建并推送过来的的user实体