首页 > 编程知识 正文

pgsql数据同步,数据库 pgsql

时间:2023-05-03 20:16:07 阅读:268516 作者:2515

思想

  ES数据库同步PG数据库在本文主要用到的思想是:在PG数据库的数据录入以及更新时,如下图所示,会有其对应的字段modify_time记录最后的修改时间。程序会记录同步到ES数据库中最后一条数据的修改时间,利用线程间隔10s检查一次PG数据库是否有modify_time > 记录的最后修改时间。如果有,将最新的数据同步到ES数据库,并修改记录时间。如果没有,继续每隔10s检查一次。

最终的实现效果: 程序实现的主要代码 def update_pg2es(): global last_modify_time print(type(last_modify_time), last_modify_time) print("检测PG数据库是否有数据更新...") #连接数据库 pg_conn = psycopg2.connect(config.CONNECT_PG_DB_URL) pg_cursor = pg_conn.cursor() select_sql = "select * from test where modify_time > '{}' order by modify_time".format(last_modify_time) pg_cursor.execute(select_sql) datas = pg_cursor.fetchall() # 关闭连接 pg_conn.close pg_cursor.close es_data_list = [] for row in datas: # 获取当前行数据的列与值得字典 column_values = get_row_colum_info(pg_cursor, row) es_data = translate_entity_date_to_es(column_values) es_data_list.append(es_data) last_modify_time = column_values['modify_time'] index_entity = config.entity_index type_entity = config.entity_type num = 10000 # 批量存入的个数 while (len(es_data_list) / num >= 0): if (math.floor(len(es_data_list) / num) == 0): # 最后了 es_result = es_service.insert_data_list(index_entity, type_entity, es_data_list[:]) # 取剩下的 print("存入{}个数据到ES".format(len(es_data_list))) break print("存入{}个数据到ES".format(num)) print("前五个数据:") print(index_entity, type_entity, es_data_list[:5]) es_result = es_service.insert_data_list(index_entity, type_entity, es_data_list[:num]) # 存储一定的数量 es_data_list = es_data_list[num:] # 取剩下的 if "ok" != es_result: print("存储实体数据到es中失败") break # 如果某一次存入有问题,直接退出 # 线程定时执行 t = threading.Timer(10, update_pg2es) t.start() if __name__ == '__main__':update_pg2es()

  注意:

select now()::timestamp(6)without time zone SQL语句可以查询当前时间(不带时区,精度为秒后面保存6位)select_sql = "select * from test where modify_time > '{}' order by modify_time".format(last_modify_time)将sql语句赋值给select_sql,sql语句的意思为选择出修改时间大于记录时间的数据,并将数据按照修改时间进行排序。此处注意记录时间两边需要加单引号才能保证sql语句的正常运行。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。