有效数据生成以及插入数据库方案#
先产生insert数据并存到备份文件中#
因为有效数据生成的数量不大, 按照压测那边给我的需求大概每个交易4千笔数据左右,需要并发量比较高的交易也不过4万笔数据,涉及到转账的交易数据量比较高一点。并且需要做一个备份为了以后压测可以备用,因此我选择先生成到不同的表对应的表名文件中,然后再写一个批量执行SQL的程序执行这些文件中的insert语句
下面的代码做了脱敏处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class MultiThreadScript {
private static final int THREAD_POOL_SIZE = 4; // 线程池大小
private static final String INPUT_FILE_PATH = "input.sql"; // 输入文件路径
private static final String OUTPUT_FILE_PATH = "output.sql"; // 输出文件路径
private static final String INSERT_REGEX = "(?i)^insert into .* values\\s*\\((.*)\\);?$"; // insert语句的正则表达式
private static final String PK_REGEX = "'[0-9A-Za-z]+'"; // 主键的正则表达式
private static final int PK_INDEX = 0; // 主键在值列表中的索引
public static void main(String[] args) throws Exception {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
try (BufferedReader reader = new BufferedReader(new FileReader(INPUT_FILE_PATH))) {
String line;
while ((line = reader.readLine()) != null) {
if (isInsertStatement(line)) {
executor.execute(new InsertTask(line));
}
}
}
// 关闭线程池并等待所有任务完成
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
}
// 判断一行文本是否为insert语句
private static boolean isInsertStatement(String line) {
return line.matches(INSERT_REGEX);
}
// 插入任务
private static class InsertTask implements Runnable {
private final String originalSql;
public InsertTask(String originalSql) {
this.originalSql = originalSql;
}
@Override
public void run() {
try {
// 提取主键
Pattern pkPattern = Pattern.compile(PK_REGEX);
Matcher pkMatcher = pkPattern.matcher(originalSql);
pkMatcher.find();
String originalPk = pkMatcher.group();
// 提取值列表
String valueList = originalSql.replaceAll(INSERT_REGEX, "$1");
String[] values = valueList.split(",");
// 递增主键并生成新的SQL语句
StringBuilder newSqlBuilder = new StringBuilder();
for (int i = 0; i < 40000; i++) {
String newPk = getNextPk(originalPk);
String newValueList = valueList.replace(originalPk, newPk);
String newSql = originalSql.replaceAll(valueList, newValueList);
newSqlBuilder.append(newSql).append("\n");
}
// 写入输出文件
synchronized (MultiThreadScript.class) {
try (BufferedWriter writer = new BufferedWriter(new FileWriter(OUTPUT_FILE_PATH, true))) {
writer.write(newSqlBuilder.toString());
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 获取下一个主键
private String getNextPk (String originalPk) {
String prefix = originalPk.substring(0, originalPk.length() - 1);
String suffix = originalPk.substring(originalPk.length() - 1);
String newSuffix = getNextSuffix(suffix);
return prefix + newSuffix;
}
// 获取下一个主键后缀
private String getNextSuffix(String suffix) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < suffix.length(); i++) {
char c = suffix.charAt(i);
if (Character.isDigit(c)) {
int digit = Character.getNumericValue(c);
if (digit == 9) {
sb.append('A');
} else if (digit == 35) {
sb.append('a');
} else {
sb.append(Character.forDigit(digit + 1, 36));
}
} else if (Character.isLetter(c)) {
if (c == 'Z') {
sb.append('0');
} else if (c == 'z') {
sb.append('0');
} else {
sb.append((char) (c + 1));
}
} else {
sb.append(c);
}
}
return sb.toString();
}
}
|
上面的代码中,MultiThreadScript
类是脚本的主类,它负责读取输入文件并创建线程池来处理每条insert语句。InsertTask
类是插入任务类,它实现了Runnable
接口,用于递增主键并生成新的SQL语句。为了避免多个线程同时写入输出文件,InsertTask
类中使用了synchronized
关键字来进行同步。
在getNextPk()
方法中,我使用了类似于Excel中列名的递增方式来递增主键。首先,将原始主键分为前缀和后缀两部分,其中前缀是主键的前面部分,后缀是主键的最后一位字符。然后,对后缀进行递增,并根据递增后的后缀重新生成新的主键。
最后,需要注意的是,由于主键可能包含字母和数字,因此使用36进制来对主键进行递增。例如,对于主键值为"001",它的下一个值为"002";对于主键值为"AZ9",它的下一个值为"BA0"。
进行插入操作(脱敏处理后的代码):#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
import java.io.BufferedReader;
import java.io.FileReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class InsertExecutor {
private static final String URL = "jdbc:mysql://localhost:3306/mydatabase";
private static final String USER = "myuser";
private static final String PASSWORD = "mypassword";
private static final int THREAD_POOL_SIZE = 10;
public static void main(String[] args) {
try {
// 读取insert语句文件
BufferedReader reader = new BufferedReader(new FileReader("inserts.sql"));
String line;
Queue<String> inserts = new LinkedList<>();
while ((line = reader.readLine()) != null) {
inserts.add(line);
}
reader.close();
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
// 执行insert语句
while (!inserts.isEmpty()) {
String insert = inserts.poll();
executorService.execute(new InsertWorker(insert));
}
// 关闭线程池
executorService.shutdown();
} catch (IOException e) {
e.printStackTrace();
}
}
static class InsertWorker implements Runnable {
private String insert;
public InsertWorker(String insert) {
this.insert = insert;
}
@Override
public void run() {
try (Connection conn = DriverManager.getConnection(URL, USER, PASSWORD);
PreparedStatement statement = conn.prepareStatement(insert)) {
// 执行insert语句
statement.executeUpdate();
} catch (SQLException e) {
// 主键冲突,跳过该语句
if (e.getErrorCode() == 1062) {
System.out.println("Skip duplicate insert: " + insert);
} else {
e.printStackTrace();
}
}
}
}
}
|
上面代码中我们把insert.sql取代为我们想要进行批量insert的sql文件即可,线程数量可根据CPU的情况来看,在不进行其他工作任务的情况下,可尽量压榨CPU的使用率以达到最高的效率。
亿级别的无效数据生成并插入数据库方案#
这里因为涉及到的数据量特别大, 一般是模拟生产环境,因此一张表可能有千万级别以及亿级别的数据量,因此我选择一边生成一边做insert操作。也就是一个生产者一个消费者,当然,这里都是多线程来操作的。一开始我是每次达到20个事务一次提交的,后来换了OceanBase后,只能一次提交一个事务了,效率也变满了一点点.
对于多线程插入数据库,将生成的SQL语句分配给多个线程,每个线程使用单独的数据库连接插入数据库,可以使用线程池来管理多个线程
下面是脱敏后的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
import java.io.BufferedReader;
import java.io.FileReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class MultiThreadedSqlInsert {
// 数据库连接信息
private static final String DB_URL = "jdbc:mysql://localhost:3306/mydb";
private static final String DB_USER = "root";
private static final String DB_PASSWORD = "mypassword";
// 主键列名和初始值
private static final String PK_COLUMN_NAME = "id";
private static final String PK_INITIAL_VALUE = "1000";
// 线程数和每个线程处理的主键值个数
private static final int THREAD_COUNT = 10;
private static final int KEYS_PER_THREAD = 10000000;
// 文件名和队列大小
private static final String FILE_NAME = "data.sql";
private static final int QUEUE_SIZE = 10000;
public static void main(String[] args) throws Exception {
// 读取文件中的 SQL 语句
String sql = readSqlFromFile(FILE_NAME);
// 创建线程池和队列
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
BlockingQueue<String> queue = new LinkedBlockingQueue<>(QUEUE_SIZE);
// 创建多个线程,为每个线程分配一段主键值的区间
for (int i = 0; i < THREAD_COUNT; i++) {
int start = i * KEYS_PER_THREAD;
int end = (i + 1) * KEYS_PER_THREAD - 1;
executor.submit(new SqlGenerator(sql, start, end, queue));
}
// 创建多个数据库连接,为每个连接分配一个线程
for (int i = 0; i < THREAD_COUNT; i++) {
Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
executor.submit(new SqlExecutor(conn, queue));
}
// 等待所有线程执行完毕
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
// 从文件中读取 SQL 语句
private static String readSqlFromFile(String fileName) throws Exception {
try (BufferedReader reader = new BufferedReader(new FileReader(fileName))) {
StringBuilder sb = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line).append("\n");
} return sb.toString();
}
}
// 生成新的 SQL 语句
private static String generateSql(String sql, int key) {
String pkValue = PK_INITIAL_VALUE + key;
return sql.replaceFirst(PK_COLUMN_NAME, pkValue);
}
// 生成新的 SQL 语句的线程
private static class SqlGenerator implements Runnable {
private final String sql;
private final int start;
private final int end;
private final BlockingQueue<String> queue;
public SqlGenerator(String sql, int start, int end, BlockingQueue<String> queue) {
this.sql = sql;
this.start = start;
this.end = end;
this.queue = queue;
}
@Override
public void run() {
for (int i = start; i <= end; i++) {
String newSql = generateSql(sql, i);
try {
queue.put(newSql);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
// 执行 SQL 语句的线程
private static class SqlExecutor implements Runnable {
private final Connection conn;
private final BlockingQueue<String> queue;
public SqlExecutor(Connection conn, BlockingQueue<String> queue) {
this.conn = conn;
this.queue = queue;
}
@Override
public void run() {
try (PreparedStatement stmt = conn.prepareStatement("")) {
while (true) {
String sql = queue.take();
if (sql == null) {
break;
}
stmt.addBatch(sql);
if (stmt.getBatchSize() >= 1000) {
stmt.executeBatch();
}
}
stmt.executeBatch();
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
}
}
}
|
这里使用了两个线程池,一个用于生成新的 SQL 语句,一个用于执行 SQL 语句。生成 SQL 语句的线程将生成的 SQL 语句存储到一个线程安全的队列中,执行 SQL 语句的线程从队列中取出 SQL 语句并执行插入操作。程序使用了 JDBC 连接 MySQL 数据库,并使用了 PreparedStatement 批量执行 SQL 语句,以提高插入效率。
需要注意的是,为了避免多个线程同时操作数据库导致数据不一致的问题,每个线程使用了自己的数据库连接。此外,程序还使用了线程安全的队列和加锁机制来保证线程安全。
以上就是我在工作中遇到的问题之一,做一个小结,用到了很多线程和线程池的地方,以及操作数据库相关的知识。