php实现分库
PHP 实现分库的方法
数据库中间件方案
使用数据库中间件如MySQL Router、ProxySQL或ShardingSphere可以实现透明的分库分表。这些中间件负责将SQL请求路由到正确的数据库实例,应用程序无需直接处理分库逻辑。
配置中间件后,PHP代码只需连接中间件的入口点,中间件会根据分片键自动路由请求。
手动分库路由
在PHP应用中实现自定义分库逻辑,通过判断分片键值决定使用哪个数据库连接。

// 定义分库配置
$shards = [
0 => ['host' => 'db1.example.com', 'user' => 'user1', 'pass' => 'pass1'],
1 => ['host' => 'db2.example.com', 'user' => 'user2', 'pass' => 'pass2']
];
// 根据分片键选择数据库
function getShardConnection($shardKey) {
global $shards;
$shardIndex = $shardKey % count($shards);
$shard = $shards[$shardIndex];
return new PDO(
"mysql:host={$shard['host']};dbname=my_db",
$shard['user'],
$shard['pass']
);
}
// 使用示例
$userId = 12345;
$db = getShardConnection($userId);
ORM框架集成
主流PHP ORM框架如Doctrine、Laravel Eloquent支持分库配置。以Laravel为例:
// config/database.php
'connections' => [
'shard1' => [
'driver' => 'mysql',
'host' => 'shard1.example.com',
// ...其他配置
],
'shard2' => [
'driver' => 'mysql',
'host' => 'shard2.example.com',
// ...其他配置
],
],
// 模型中使用
class User extends Model {
public function getConnectionName() {
return 'shard' . ($this->id % 2 + 1);
}
}
分布式事务处理
跨分库事务需要使用XA协议或Saga模式等分布式事务解决方案。PHP可通过PDO支持XA事务:

$db1->exec("XA START 'transaction_id'");
$db2->exec("XA START 'transaction_id'");
// 执行各分库操作
$db1->exec("XA END 'transaction_id'");
$db2->exec("XA END 'transaction_id'");
$db1->exec("XA PREPARE 'transaction_id'");
$db2->exec("XA PREPARE 'transaction_id'");
$db1->exec("XA COMMIT 'transaction_id'");
$db2->exec("XA COMMIT 'transaction_id'");
数据迁移与扩容
实现动态扩容需要数据迁移工具和分片策略调整。一致性哈希算法可以减少扩容时的数据迁移量。
class ConsistentHashing {
private $nodes = [];
private $virtualNodes = [];
public function addNode($node, $virtualCount = 64) {
for ($i = 0; $i < $virtualCount; $i++) {
$hash = crc32("{$node}#{$i}");
$this->virtualNodes[$hash] = $node;
ksort($this->virtualNodes);
}
}
public function getNode($key) {
$hash = crc32($key);
foreach ($this->virtualNodes as $k => $v) {
if ($k >= $hash) {
return $v;
}
}
return reset($this->virtualNodes);
}
}
跨分库查询
对于需要跨分库聚合的查询,可以采用:
- 并行查询各分库后合并结果
- 使用预计算的物化视图
- 将结果写入中央数据库
// 并行查询示例
$results = [];
$mh = curl_multi_init();
$handles = [];
foreach ($shards as $shard) {
$ch = curl_init("http://{$shard['host']}/api/query");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_multi_add_handle($mh, $ch);
$handles[] = $ch;
}
do {
curl_multi_exec($mh, $running);
} while ($running > 0);
foreach ($handles as $ch) {
$results[] = json_decode(curl_multi_getcontent($ch), true);
curl_multi_remove_handle($mh, $ch);
}
curl_multi_close($mh);
// 合并结果
$mergedResult = array_merge(...$results);






