首页 > 编程知识 正文

java批量推送消息,spring kafka配置

时间:2023-05-06 16:21:29 阅读:131949 作者:1381

前言

本文主要介绍springboot集成kafka。

安装kafka

创建项目

将名为springboot-kafka的pom项目创建为父项目,删除所有main和资源文件夹,并将配置添加到pom文件中

xsi :方案定位=' http://maven.Apache.org/POM/4.0.0http://maven.Apache.org/xsd/maven-4.0.0.xsssd

4.0.0

com.lxg

springboot-kafka

1.0-快照

pom

springboot-kafka

springboot-kafka-common

springboot-kafka-consumer

springboot-kafka-producer

org.springframework.boot

spring-boot-starter-parent

2.1.3 .发布

org.springframework.boot

spring-boot-starter-web

org.springframework.kafka

spring-kafka

org.projectlombok

lombok

1.18.12

com.alibaba

快速强森

1.2.67

3 .建立公共服务模块

创建名为springboot-kafka-common的微服务。 包是jar,用于存储公共配置和公共类,如util

1 .配置POM文件

xsi :方案定位=' http://maven.Apache.org/POM/4.0.0http://maven.Apache.org/xsd/maven-4.0.0.xsssd

com.lxg

springboot-kafka

1.0-快照

4.0.0

springboot-kafka-common

通过在pom文件中使用父项目作为父项依存关系,无需添加依存关系

创建新的user实体类

@Data

publicclassuserimplementsserializable {

//*

* id

*/

私有integer id;

//*

*用户名

*/

私有字符串用户名称;

//*

*密码

*/

私有字符串密码;

}

3 .创建APP应用程序-common.yml配置文件,主要添加kafka的公共配置文件

spring:

kafka:

#kafka配置

bootstrap-servers :192.168.56.10233609092

producer:

retries: 0

#一次大量发送的消息数

batch-size: 16384

buffer-memory: 33554432

#指定消息key和消息主体的编解码方式

key-serializer 3360 org.Apache.Kafka.com mon.serialization.string serializer

value-serializer 3360 org.Apache.Kafka.com mon.serialization.string serializer

消费者:

#指定默认消费者组id

组- id :测试用户组

自动偏移重置:电子仓库

启用自动提交:真

自动提交间隔: 5000

#指定消息key和消息主体的编解码方式

key-deserializer 3360 org.Apache.Kafka.com mon.serialization.string deserializer

value-deserializer 3360 org.Apache.Kafka.com mon.serialization.string deserializer

#在微服务中使用Value注释注入调用,如果kafka中没有该主题,则会自动创建您定义的主题名称

顶级的

ic:

userTopic: userInfo

4.创建消息生产者,即创建一个名为springboot-kafka-producer的普通springboot项目

1.pom文件配置

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

com.lxg

springboot-kafka-producer

1.0-SNAPSHOT

4.0.0

com.lxg

springboot-kafka-common

1.0-SNAPSHOT

2.application.yml配置文件,配置端口,设置微服务名称,引入公共服务模块中的application-common.yml

server:

port: 8081

spring:

application:

name: kafka-producer

profiles:

active: common

3.controller层

创建UserController

@Slf4j

@Controller

@RequestMapping("/api/user")

public class UserController {

@Autowired

private UserService userService;

@ResponseBody

@GetMapping("/getUser")

public void getUser() {

userService.sendUserMsg();

log.info("getUser");

}

}

4.service层

创建UserService

public interface UserService {

/**

* 发送用户信息

*

* @return

*/

Boolean sendUserMsg();

}

创建UserServiceImpl

@Slf4j

@Service

public class UserServiceImpl implements UserService {

@Value("${spring.kafka.topic.userTopic}")

private String userTopic;

@Autowired

KafkaTemplate kafkaTemplate;

@Override

public Boolean sendUserMsg() {

User user = new User();

user.setId(1);

user.setUsername("lxg");

user.setPassword("6767167");

kafkaTemplate.send(userTopic, JSONObject.toJSONString(user));

log.info("lxg");

return Boolean.TRUE;

}

}

5.创建启动类

@SpringBootApplication

public class WebApplication {

public static void main(String[] args) {

SpringApplication.run(WebApplication.class, args);

}

}

5.创建消息消费者

1.pom文件

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

com.lxg

springboot-kafka-consumer

1.0-SNAPSHOT

4.0.0

com.lxg

springboot-kafka-common

1.0-SNAPSHOT

2.创建yml配置文件

server:

port: 8082

spring:

application:

name: kafka-consumer

profiles:

active: common

3.创建consumer消费者类

@Slf4j

@Component

public class UserConsumer {

@KafkaListener(topics = {"${spring.kafka.topic.userTopic}"})

public void userConsumer(String message) {

log.info("receive msg " + message);

}

}

4.启动类

@SpringBootApplication

public class ConsumerApplication {

public static void main(String[] args) {

SpringApplication.run(ConsumerApplication.class, args);

}

}

测试

启动producer和consumer两个服务模块

访问producer微服务中的接口 http://localhost:8081/api/user/getUser

会发现consumer微服务中的控制台打印了producer中创建并推送过来的的user实体

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。