首页 > 编程知识 正文

多个程序操作同一张表导致超时的解决方案

时间:2023-11-20 22:11:27 阅读:290947 作者:LRZS

当多个程序对同一张表进行操作时,容易出现超时现象,导致数据读取或写入失败。以下是针对此问题的一些解决方法。

一、代码优化

在进行大量数据读写操作时,代码的效率显得尤为重要。因此,进行代码优化是一个有效的解决方法。常用的代码优化方法有:

1、使用缓存,将频繁读写的数据存入缓存。这样可以减少数据库的读写次数,提高程序的效率。


public class CacheHelper {
    private static Map cache = new HashMap();

    public static Object get(String key){
        return cache.get(key);
    }

    public static void set(String key, Object value){
        cache.put(key, value);
    }

    public static void remove(String key){
        cache.remove(key);
    }
}

//使用示例
Object obj = CacheHelper.get("key");
if (obj == null) {
    obj = getDataFromDatabase();
    CacheHelper.set("key", obj);
} 
return obj;

2、批量操作数据库,将多条SQL语句合并为一条。这样可以减少数据库的连接次数,在处理大批量数据时效果更加明显。


public void batchInsert(List data) {
    try {
        Connection conn = getConnection();
        conn.setAutoCommit(false);

        String sql = "insert into TABLE_NAME (col1, col2, col3, ... ) values(?, ?, ?, ...)";
        PreparedStatement ps = conn.prepareStatement(sql);

        for (int i=0; i

二、分布式锁

在多个程序同时操作一张表时,避免重复写入数据是一个重要的问题。使用分布式锁可以保证在任意时刻只有一个程序对数据进行写操作。

1、使用Redis实现分布式锁:


public class RedisLock {
    private RedisTemplate redisTemplate;

    public RedisLock(RedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public boolean lock(String key) {
        ValueOperations valueOperations = redisTemplate.opsForValue();
        return valueOperations.setIfAbsent(key, "1");
    }

    public void unlock(String key) {
        redisTemplate.delete(key);
    }
}

//使用示例
RedisLock lock = new RedisLock(redisTemplate);
if (lock.lock("lock_key")) {
    //do something
    lock.unlock("lock_key");
} else {
    //wait or return directly
}

2、使用ZooKeeper实现分布式锁:


public class ZooKeeperLock {
    private ZooKeeper zooKeeper;

    public ZooKeeperLock(String zkUrl) throws IOException, KeeperException, InterruptedException {
        CountDownLatch connectedSignal = new CountDownLatch(1);
        zooKeeper = new ZooKeeper(zkUrl, 1000, new Watcher() {
            public void process(WatchedEvent we) {
                if (we.getState() == Event.KeeperState.SyncConnected) {
                    connectedSignal.countDown();
                }
            }
        });
        connectedSignal.await();
    }

    public boolean lock(String key) throws KeeperException, InterruptedException {
        String path = "/locks/" + key;
        String createdPath = zooKeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        List siblings = zooKeeper.getChildren("/locks", false);

        //重载排序方法(compareTo是从小到大排序)
        Collections.sort(siblings, new Comparator() {
            public int compare(String s1, String s2) {
                return s1.substring(s1.indexOf("-")+1).compareTo(s2.substring(s2.indexOf("-")+1));
            }
        });

        if (createdPath.equals("/locks/" + siblings.get(0))) {
            return true;
        } else {
            String prevSibling = siblings.get(siblings.indexOf(createdPath.substring("/locks/".length()))-1);
            final CountDownLatch lockSignal = new CountDownLatch(1);
            Stat stat = zooKeeper.exists("/locks/"+prevSibling, new Watcher() {
                public void process(WatchedEvent we) {
                    if (we.getType() == Event.EventType.NodeDeleted) {
                        lockSignal.countDown();
                    } else if (we.getType() == Event.EventType.None) {
                        if (we.getState() == Event.KeeperState.Expired) {
                            lockSignal.countDown();
                        }
                    }
                }
            });
            if (stat != null) {
                lockSignal.await();
            }
            return true;
        }
    }

    public void unlock(String key) throws KeeperException, InterruptedException {
        String path = "/locks/" + key;
        zooKeeper.delete(path, -1);
    }
}

//使用示例
ZooKeeperLock lock = new ZooKeeperLock("zk_url");
if (lock.lock("lock_key")) {
    //do something
    lock.unlock("lock_key");
} else {
    //wait or return directly
}

三、读写分离

当大量的读操作与写操作同时进行时,读写分离是一个有效的解决方法。将读操作与写操作分别交给不同的节点进行处理,可以有效减轻单个数据库节点的负担。

1、使用MySQL实现读写分离:


jdbc:mysql://master_server/database?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&rewriteBatchedStatements=true

jdbc:mysql:loadbalance://master_server,slave_server1,slave_server2/database?loadBalanceConnection=true&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&rewriteBatchedStatements=true&readFromMasterWhenNoSlaves=true

2、使用MongoDB实现读写分离:


@Bean
public MongoClient mongoClient() {
    MongoClientURI uri = new MongoClientURI("mongodb://username:password@host1,host2,host3/?replicaSet=rs0");
    return new MongoClient(uri);
}

@Bean(name = "mongoTemplate")
public MongoTemplate mongoTemplate() throws Exception {
    MongoTemplate template = new MongoTemplate(mongoClient(), "database_name");
    return template;
}

四、分区表

当数据量过大时,将数据分成几个小的表格是一种很好的解决方法。这样可以避免单个表格过大,导致读写效率下降。

1、使用MySQL实现分区表:


CREATE TABLE sales (
     sale_id INT NOT NULL AUTO_INCREMENT,
     sale_date DATE NOT NULL,
     sale_amount DECIMAL(12,2) NOT NULL,
     PRIMARY KEY (sale_id,sale_date)
) 
PARTITION BY RANGE( YEAR(sale_date) ) (
     PARTITION p0 VALUES LESS THAN (2010),
     PARTITION p1 VALUES LESS THAN (2011),
     PARTITION p2 VALUES LESS THAN (2012),
     PARTITION p3 VALUES LESS THAN MAXVALUE
);

2、使用MongoDB实现分区表:


db.runCommand( { shardCollection: "db.collection", key: { "_id": "hashed" } } );

五、数据库连接池

在进行大量数据读写操作时,数据库连接池是一个非常重要的组件。它可以保证在并发高峰期,数据库连接的利用率最大。

1、使用Druid实现数据库连接池:


@Bean
public DataSource dataSource() {
    DruidDataSource dataSource = new DruidDataSource();
    dataSource.setDriverClassName("com.mysql.jdbc.Driver");
    dataSource.setUrl("jdbc:mysql://localhost:3306/test");
    dataSource.setUsername("root");
    dataSource.setPassword("root");
    dataSource.setMaxActive(20);
    dataSource.setInitialSize(1);
    dataSource.setMinIdle(1);
    dataSource.setMaxWait(60000);
    dataSource.setTimeBetweenEvictionRunsMillis(60000);
    dataSource.setMinEvictableIdleTimeMillis(300000);
    dataSource.setValidationQuery("SELECT 1 FROM DUAL");
    dataSource.setTestWhileIdle(true);
    dataSource.setTestOnBorrow(false);
    dataSource.setTestOnReturn(false);
    return dataSource;
}

@Bean
public JdbcTemplate jdbcTemplate() {
    return new JdbcTemplate(dataSource());
}

2、使用HikariCP实现数据库连接池:


@Bean
public DataSource dataSource() {
    HikariConfig config = new HikariConfig();
    config.setDriverClassName("com.mysql.cj.jdbc.Driver");
    config.setJdbcUrl("jdbc:mysql://localhost:3306/test");
    config.setUsername("root");
    config.setPassword("root");
    config.setMaximumPoolSize(20);
    config.setMinimumIdle(1);
    config.setConnectionTestQuery("SELECT 1 FROM DUAL");
    config.setAutoCommit(false);
    config.addDataSourceProperty("cachePrepStmts", "true");
    config.addDataSourceProperty("prepStmtCacheSize", "250");
    config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
    config.addDataSourceProperty("useServerPrepStmts", "true");
    config.addDataSourceProperty("useLocalSessionState", "true");
    config.addDataSourceProperty("rewriteBatchedStatements", "true");
    config.addDataSourceProperty("cacheResultSetMetadata", "true");
    config.addDataSourceProperty("cacheServerConfiguration", "true");
    config.addDataSourceProperty("elideSetAutoCommits", "true");
    config.addDataSourceProperty("maintainTimeStats", "false");
    return new HikariDataSource(config);
}

@Bean
public JdbcTemplate jdbcTemplate() {
    return new JdbcTemplate(dataSource());
}

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。