首页 > 编程知识 正文

服务器怎么做集群,credits是什么意思

时间:2023-05-04 06:28:25 阅读:111102 作者:4816

更改文章目录环境的配置和基本redis.HPP redis.cppchatserviceredis消息队列中获取订阅的消息

环境构成和基本

C构建集群聊天室(十七) ngnix简介和tcp负载均衡配置

Redis环境的构建和配置

hiredis从设置到实际操作一条龙服务

这是redis交易处理机制,但如果你涉猎它

了解更多redis :我的redis专栏

该看上面的都看完了,我们一往下就进入代码!

这次的变更会有点大。

redis.hpp可以放在任何地方,毕竟redis是数据库,所以放在db文件夹下吧。

# ifndef redis _ h # define redis _ h # includehiredis/hired is.h # includefunctionalusingnamespacestd; 类redis { public : redis (; ~Redis (; 连接到redis服务器的bool connect (); 在redis指定的频道channel上投稿信息boolpublish(intChannel,string message ); 向redis指定的频道subscribe订阅消息bool subscribe (int channel ); 取消注册到redis中指定的频道unsubscribe的消息boolunsubscribe(intchannel ); //独立线程接收订阅信道消息void observer_channel_message (); //初始化待回调的void init _ notify _ handler (函数语音(int,string ) fn ),在业务层上报信道消息; publish消息redis上下文* _ publish _ context; subscribe消息redisContext *_subcribe_context; //回调操作,接收订阅的消息,并在服务层中键入functionvoid(int,string ) _notify_message_handler; (; #endif redis.cpp集封装用于轻型群集服务器之间的通信。

# include ' redis.HPP ' # includeiostreamusingnamespacestd; redis :3360 redis (: _ publish _ context ) nullptr ),_subcribe_context ) nullptr ) { } redis :360:~redis }if(subcribe_context!=nullptr(redisfree ) _subcribe_context ); } bool redis :3360 connect ((/publish _ publish _ context=re disconnect (' 127.0.0.1 ',6379 ) ) nullptr==_ publish _ context ) { cerr 'connect redis failed!' endl; 返回假; _ subcribe _ context=re disconnect (' 127.0.0.1 ',6379 )负责订阅消息的上下文连接; nullptr==_ subcribe _ context { cerr ' connect redis failed!' endl; 返回假; //在另一个线程上,侦听信道上的事件,threadt([] ) ({ observer_channel_message ) ); ); t.detach (; cout ' connect redis -服务器success!' endl; 返回真; redis在指定通道通道上消息bool redis 3360: publish (int channel,string message ) redisreply*reply=) redisreply* ) redis

t, "PUBLISH %d %s", channel, message.c_str()); if (nullptr == reply){ cerr << "publish command failed!" << endl; return false; } freeReplyObject(reply); return true;}// 向redis指定的通道subscribe订阅消息bool Redis::subscribe(int channel){ // SUBSCRIBE命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息 // 通道消息的接收专门在observer_channel_message函数中的独立线程中进行 // 只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源 if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel)){ cerr << "subscribe command failed!" << endl; return false; } // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1) int done = 0; while (!done){ if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){ cerr << "subscribe command failed!" << endl; return false; } } // redisGetReply return true;}// 向redis指定的通道unsubscribe取消订阅消息bool Redis::unsubscribe(int channel){ if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "UNSUBSCRIBE %d", channel)){ cerr << "unsubscribe command failed!" << endl; return false; } // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1) int done = 0; while (!done){ if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)){ cerr << "unsubscribe command failed!" << endl; return false; } } return true;}// 在独立线程中接收订阅通道中的消息void Redis::observer_channel_message(){ redisReply *reply = nullptr; while (REDIS_OK == redisGetReply(this->_subcribe_context, (void **)&reply)){ // 订阅收到的消息是一个带三元素的数组 if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) { // 给业务层上报通道上发生的消息 _notify_message_handler(atoi(reply->element[1]->str) , reply->element[2]->str); } freeReplyObject(reply); } cerr << ">>>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<<" << endl;}void Redis::init_notify_handler(function<void(int,string)> fn){ this->_notify_message_handler = fn;} chatservice修改

头文件里面自行修改吧,这里放出源文件的修改范围。

构造函数中连接上redis:

// 连接redis服务器 if (_redis.connect()){ // 设置上报消息的回调 _redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage, this, _1, _2)); }

登录成功后,向redis消息队列进行订阅:

// id用户登录成功后,向redis订阅channel(id) _redis.subscribe(id);

用户注销之后,取消订阅:

// 用户注销,相当于就是下线,在redis中取消订阅通道 _redis.unsubscribe(userid);

(客户端里以外掉线也给它来上这么一下)

单聊:

//一对一聊天void ChatService::onechat(const TcpConnectionPtr &conn,json &js,Timestamp time){// cout<<js<<endl; int toid = js["toid"].get<int>(); //这里bug // bool userstate = false; //开辟锁的作用域 { lock_guard<mutex> lock(_connMutex); auto it = _userConnMap.find(toid); if(it != _userConnMap.end()){ //用户在线,转发消息 it->second->send(js.dump()); return; } } // 查询toid是否在线 User user = _usermodel.query(toid); if (user.getstate() == "online"){ _redis.publish(toid, js.dump()); return; } // toid不在线,存储离线消息 _offlineMsgmodel.insert(toid, js.dump());}

群聊:

// 群组聊天业务void ChatService::groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time){ int userid = js["id"].get<int>(); int groupid = js["groupid"].get<int>(); vector<int> useridVec = _groupModel.queryGroupUsers(userid, groupid); lock_guard<mutex> lock(_connMutex); for (int id : useridVec){ auto it = _userConnMap.find(id); if (it != _userConnMap.end()){ // 转发群消息 it->second->send(js.dump()); } else{ User user = _usermodel.query(id); if (user.getstate() == "online"){ _redis.publish(id, js.dump()); } else{ // 存储离线群消息 _offlineMsgmodel.insert(id, js.dump()); } } }} 从redis消息队列中获取订阅的消息 void ChatService::handleRedisSubscribeMessage(int userid, string msg){ lock_guard<mutex> lock(_connMutex); auto it = _userConnMap.find(userid); if (it != _userConnMap.end()) { it->second->send(msg); return; } // 存储该用户的离线消息 _offlineMsgmodel.insert(userid, msg);}

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。