首页 > 编程知识 正文

Python 批量推送微信公众号模板消息,批量取消公众号消息

时间:2023-05-03 16:48:56 阅读:272194 作者:339

目录 1. 依赖包2. 消息推送的步骤3. 功能描述3.1. 注意事项3.2. 消息推送模板的参数说明3.3. 消息推送模块代码3.4. 消息推送模块的调用

1. 依赖包 Django==3.1.3django-redis==4.12.1redis==3.5.3requests==2.24.0 2. 消息推送的步骤 获取 access_token;构建数据;发送推送请求。 3. 功能描述 3.1. 注意事项 要注意模板消息的类型,一次性订阅消息长期订阅消息。 一次性订阅消息

一次性订阅消息用于解决用户使用小程序后,后续服务环节的通知问题。用户自主订阅后,开发者可不限时间地下发一条对应的服务消息;每条消息可单独订阅或退订。

长期订阅消息

一次性订阅消息可满足小程序的大部分服务场景需求,但线下公共服务领域存在一次性订阅无法满足的场景,如航班延误,需根据航班实时动态来多次发送消息提醒。为便于服务,我们提供了长期性订阅消息,用户订阅一次后,开发者可长期下发多条消息。

目前长期性订阅消息仅向政务民生、医疗、交通、金融、教育等线下公共服务开放,后期将逐步支持到其他线下公共服务业务。

access_token的有效期目前为2个小时,需定时刷新,重复获取将导致上次获取的access_token失效。 3.2. 消息推送模板的参数说明 参数是否必填说明touser是接收者openidtemplate_id是模板IDurl否模板跳转链接(海外帐号没有跳转能力)miniprogram否跳小程序所需数据,不需跳小程序可不用传该数据appid是所需跳转到的小程序appid(该小程序appid必须与发模板消息的公众号是绑定关联关系,暂不支持小游戏)pagepath否所需跳转到小程序的具体页面路径,支持带参数,(示例index?foo=bar),暂不支持小游戏data是模板数据color否模板内容字体颜色,不填默认为黑色3.3. 消息推送模块代码 import copyimport jsonimport redisimport requestsfrom django.conf import settingsfrom django_redis import get_redis_connectionclass WeChatMsg: def __init__(self, app_id=settings.WX_APP_ID, secret=settings.WX_APP_SECRET, template=None): self.app_id = app_id self.secret = secret self.template = { "touser": "", "template_id": "", "url": "", "data": { "first": { "value": "", }, "keyword1": { "value": "", }, "keyword2": { "value": "", }, "remark": { "value": "", }, } } if template: self.template = template self.access_token = None self.req_list = list() self.user_list = list() self.data_list = list() def _get_token(self, force_update=False): key_name = 'wechat_token_{}'.format(self.app_id) if force_update: self.access_token = None else: self.access_token = get_data(key_name) if not self.access_token: url = "https://api.weixin.qq.com/cgi-bin/token?" payload = { 'grant_type': 'client_credential', 'appid': self.app_id, 'secret': self.secret, } response = requests.get(url, params=payload, timeout=50) access_token = response.json().get("access_token") if access_token: set_data(key_name, access_token, ex=7100) self.access_token = get_data(key_name) print('access_token', self.access_token) def make_data_list(self): from main.models import User user_openid_list = User.objects.filter( pk__in=self.user_list ).exclude( openid='' ).exclude( openid__isnull=True ).values_list('openid', flat=True) for openid in user_openid_list: user_template = copy.deepcopy(self.template) user_template['touser'] = openid self.data_list.append(user_template) def post_data(self): # 获取 token self._get_token() url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={}".format(self.access_token) # 准备数据 if self.user_list: self.make_data_list() # 发送请求 for data in self.data_list: json_template = json.dumps(data) res = requests.post(url, data=json_template) print('post_data', res.text)def get_redis(): """ 获取 Redis 的连接 """ pool = get_redis_connection('default').connection_pool r = redis.Redis(connection_pool=pool) return rdef set_data(name, value, **kwargs): # 将单条数据存入redis缓存 r = get_redis() value = json.dumps(value) r.set(name, value, **kwargs)def get_data(name): # 取出单条数据 r = get_redis() value = r.get(name) if value: value = json.loads(value) return value 3.4. 消息推送模块的调用 def send_wechat_msg(): w = WeChatMsg() w.template = { "touser": "", "template_id": "", "url": "", "data": { "first": { "value": "", }, "keyword1": { "value": "", }, "keyword2": { "value": "", }, "remark": { "value": "", }, } } w.user_list = [1, 2] w.post_data()

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。