首页 > 编程知识 正文

实现一个最简单的rpc框架,rpc使用

时间:2023-05-05 12:49:53 阅读:257216 作者:3701

RPC框架——傻瓜式教程(五)

完成了前面的几篇,我们的RPC框架已经很不错了,接下来就是优化部分了

不知道大家发现没有,我们的服务端地址是固化在代码中的,也就是说,对于一个客户端,它只会去寻找那么一个服务提供者,如果这个提供者挂了或者换了地址,那就没有办法了。

在分布式架构中,有一个重要的组件,就是服务注册中心,它用于保存多个服务提供者的信息,每个服务提供者在启动时都需要向注册中心注册自己所拥有的服务。这样客户端在发起 RPC 时,就可以直接去向注册中心请求服务提供者的信息,如果拿来的这个挂了,还可以重新请求,并且在这种情况下可以很方便地实现负载均衡。

常见的注册中心有 Eureka、Zookeeper 和 Nacos,这里我们选用nacos

Nacos

啊,我懒得写教程了,不过考虑到我弄这个nacos弄了很久,还是说说吧

第一步
nacos下载
哪个版本都行,我下的是1.3.1

第二步
下载完以后直接解压,解压到的路径不能含有中文,要不然会报错
然后运行zzdxg->startup.cmd就行了,等一等不要急,他要加载蛮多东西

完成第二步就可以访问这里了
http://127.0.0.1:8848/nacos/index.html

第三步

在mysql里创建数据库,名字随你,我的是nacos_config到conf->nacos-mysql.sql,把这里面的sql写进数据库里到conf->application.properties,找到这个地方添加配置 spring.datasource.platform=mysqldb.num=1db.url.0=jdbc:mysql://127.0.0.1:3306/nacos_config?注意看这里的文字,这段文字前是我起的数据库名字nacos_config记得换成你的再把这段文字删掉呆萌的乐曲=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTCdb.user=你的用户名db.password=你的密码db mysql

配置完以后再启动,还是一样的界面,你就成功了,对了,nacos的首页默认用户名和密码都是nacos。

开始我们的项目

引入 nacos-client 依赖

这里的pom

<dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <version>1.3.0</version> </dependency>

然后我们把我们的ServiceRegistry,改为ServiceProvider,重新定义一个ServiceRegistry接口作为远程注册表(Nacos)使用。

public interface ServiceRegistry { void register(String serviceName, InetSocketAddress inetSocketAddress); InetSocketAddress lookupService(String serviceName);}

两个方法很好理解,register 方法将服务的名称和地址注册进服务注册中心,lookupService 方法则是根据服务名称从注册中心获取到一个服务提供者的地址。

接口有了,我们就可以写实现类了,我们实现一个 Nacos 作为注册中心的实现类:
NacosServiceRegistry

public class NacosServiceRegistry implements ServiceRegistry { private static final Logger logger = LoggerFactory.getLogger(NacosServiceRegistry.class); private static final String SERVER_ADDR = "127.0.0.1:8848"; private static final NamingService namingService; static { try { namingService = NamingFactory.createNamingService(SERVER_ADDR); } catch (NacosException e) { logger.error("连接到Nacos时有错误发生: ", e); throw new RpcException(RpcError.FAILED_TO_CONNECT_TO_SERVICE_REGISTRY); }m,. } @Override public void register(String serviceName, InetSocketAddress inetSocketAddress) { try { namingService.registerInstance(serviceName, inetSocketAddress.getHostName(), inetSocketAddress.getPort()); } catch (NacosException e) { logger.error("注册服务时有错误发生:", e); throw new RpcException(RpcError.REGISTER_SERVICE_FAILED); } } @Override public InetSocketAddress lookupService(String serviceName) { try { List<Instance> instances = namingService.getAllInstances(serviceName); Instance instance = instances.get(0); return new InetSocketAddress(instance.getIp(), instance.getPort()); } catch (NacosException e) { logger.error("获取服务时有错误发生:", e); } return null; }}

Nacos 的使用很简单,通过 NamingFactory 创建 NamingService 连接 Nacos,连接的过程写在了静态代码块中,在类加载时自动连接。namingService 提供了两个很方便的接口,registerInstance 和 getAllInstances 方法,前者可以直接向 Nacos 注册服务,后者可以获得提供某个服务的所有提供者的列表。所以接口的这两个方法只需要包装一下就好了。

在 lookupService 方法中,通过 getAllInstance 获取到某个服务的所有提供者列表后,需要选择一个,这里就涉及到负载均衡策略,我们先选择第 0 个,后面会讲。

服务端

RpcServer
zddbd加上一个这个方法

<T> void publishService(Object service, Class<T> serviceClass);

然后在其他用到这个接口的地方都实现一下,举个例子
NettyServer
构造器改成这样,加上了两行

public NettyServer(String host, int port) { this.host = host; this.port = port; serviceRegistry = new NacosServiceRegistry(); serviceProvider = new ServiceProviderImpl(); }

怕你们迷茫,我就把这两个代码也贴出来把

public interface ServiceProvider { <T> void addServiceProvider(T service, String serviceName); Object getServiceProvider(String serviceName);} public class ServiceProviderImpl implements ServiceProvider { private static final Logger logger = LoggerFactory.getLogger(ServiceProviderImpl.class); private static final Map<String, Object> serviceMap = new ConcurrentHashMap<>(); private static final Set<String> registeredService = ConcurrentHashMap.newKeySet(); @Override public <T> void addServiceProvider(T service, String serviceName) { if (registeredService.contains(serviceName)) return; registeredService.add(serviceName); serviceMap.put(serviceName, service); logger.info("向接口: {} 注册服务: {}", service.getClass().getInterfaces(), serviceName); } @Override public Object getServiceProvider(String serviceName) { Object service = serviceMap.get(serviceName); if (service == null) { throw new RpcException(RpcError.SERVICE_NOT_FOUND); } return service; }}

实现publishService

public <T> void publishService(Object service, Class<T> serviceClass) { if(serializer == null) { logger.error("未设置序列化器"); throw new RpcException(RpcError.SERIALIZER_NOT_FOUND); } serviceProvider.addServiceProvider(service); serviceRegistry.register(serviceClass.getCanonicalName(), new InetSocketAddress(host, port)); start(); } 客户端

客户端的修改以 NettyClient 为例,在过去创建 NettyClient 时,需要传入 host 和 port,现在这个 host 和 port 是通过 Nacos 获取的,sendRequest 修改如下:

public Object sendRequest(RpcRequest rpcRequest) { if(serializer == null) { logger.error("未设置序列化器"); throw new RpcException(RpcError.SERIALIZER_NOT_FOUND); } AtomicReference<Object> result = new AtomicReference<>(null); try { InetSocketAddress inetSocketAddress = serviceRegistry.lookupService(rpcRequest.getInterfaceName()); Channel channel = ChannelProvider.get(inetSocketAddress, serializer);... Test

又到了紧张刺激的测试环节

==NettyTestClient ==

public class NettyTestClient { public static void main(String[] args) { RpcClient client = new NettyClient(); client.setSerializer(new KryoSerializer()); RpcClientProxy rpcClientProxy = new RpcClientProxy(client); HelloService helloService = rpcClientProxy.getProxy(HelloService.class); HelloObject object = new HelloObject(12, "This is a message"); String res = helloService.hello(object); System.out.println(res); }}

NettyTestServer

public class NettyTestServer { public static void main(String[] args) { HelloService helloService = new HelloServiceImpl(); NettyServer server = new NettyServer("127.0.0.1", 9999); server.setSerializer(new KryoSerializer()); server.publishService(helloService, HelloService.class); }}

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。