MongoDB分片技术实现
概述
MongoDB分片(Sharding)是MongoDB的水平扩展解决方案,通过将数据分布到多个分片(shard)上来处理大数据量和高吞吐量的需求。
MongoDB分片架构
1. 分片集群组件
# MongoDB分片集群架构
version: '3.8'
services:# Config Server副本集config1:image: mongo:5.0command: mongod --configsvr --replSet configReplSet --port 27019ports:- "27019:27019"volumes:- config1_data:/data/dbconfig2:image: mongo:5.0command: mongod --configsvr --replSet configReplSet --port 27019ports:- "27020:27019"volumes:- config2_data:/data/dbconfig3:image: mongo:5.0command: mongod --configsvr --replSet configReplSet --port 27019ports:- "27021:27019"volumes:- config3_data:/data/db# 分片1副本集shard1_replica1:image: mongo:5.0command: mongod --shardsvr --replSet shard1ReplSet --port 27018ports:- "27022:27018"volumes:- shard1_replica1_data:/data/dbshard1_replica2:image: mongo:5.0command: mongod --shardsvr --replSet shard1ReplSet --port 27018ports:- "27023:27018"volumes:- shard1_replica2_data:/data/db# 分片2副本集shard2_replica1:image: mongo:5.0command: mongod --shardsvr --replSet shard2ReplSet --port 27018ports:- "27024:27018"volumes:- shard2_replica1_data:/data/dbshard2_replica2:image: mongo:5.0command: mongod --shardsvr --replSet shard2ReplSet --port 27018ports:- "27025:27018"volumes:- shard2_replica2_data:/data/db# mongos路由服务mongos1:image: mongo:5.0command: mongos --configdb configReplSet/config1:27019,config2:27019,config3:27019 --port 27017ports:- "27017:27017"depends_on:- config1- config2- config3mongos2:image: mongo:5.0command: mongos --configdb configReplSet/config1:27019,config2:27019,config3:27019 --port 27017ports:- "27026:27017"depends_on:- config1- config2- config3volumes:config1_data:config2_data:config3_data:shard1_replica1_data:shard1_replica2_data:shard2_replica1_data:shard2_replica2_data:
2. 分片集群初始化脚本
#!/bin/bash
# mongodb-cluster-init.shecho "初始化MongoDB分片集群..."# 等待服务启动
sleep 30# 初始化Config Server副本集
echo "初始化Config Server副本集..."
mongo --host config1:27019 --eval '
rs.initiate({_id: "configReplSet",configsvr: true,members: [{ _id: 0, host: "config1:27019" },{ _id: 1, host: "config2:27019" },{ _id: 2, host: "config3:27019" }]
})'# 等待副本集初始化完成
sleep 20# 初始化分片1副本集
echo "初始化分片1副本集..."
mongo --host shard1_replica1:27018 --eval '
rs.initiate({_id: "shard1ReplSet",members: [{ _id: 0, host: "shard1_replica1:27018" },{ _id: 1, host: "shard1_replica2:27018" }]
})'# 初始化分片2副本集
echo "初始化分片2副本集..."
mongo --host shard2_replica1:27018 --eval '
rs.initiate({_id: "shard2ReplSet",members: [{ _id: 0, host: "shard2_replica1:27018" },{ _id: 1, host: "shard2_replica2:27018" }]
})'# 等待分片副本集初始化完成
sleep 30# 添加分片到集群
echo "添加分片到集群..."
mongo --host mongos1:27017 --eval '
sh.addShard("shard1ReplSet/shard1_replica1:27018,shard1_replica2:27018")
sh.addShard("shard2ReplSet/shard2_replica1:27018,shard2_replica2:27018")
'echo "MongoDB分片集群初始化完成!"
Java应用集成
1. Spring Boot配置
@Configuration
public class MongoShardingConfig {@Value("${spring.data.mongodb.uri}")private String mongoUri;@Beanpublic MongoClient mongoClient() {// 连接到mongos路由服务ConnectionString connectionString = new ConnectionString(mongoUri);MongoClientSettings settings = MongoClientSettings.builder().applyConnectionString(connectionString).readPreference(ReadPreference.secondaryPreferred()) // 读写分离.writeConcern(WriteConcern.MAJORITY) // 写关注.readConcern(ReadConcern.MAJORITY) // 读关注.retryWrites(true) // 重试写入.retryReads(true) // 重试读取.applyToConnectionPoolSettings(builder -> {builder.maxSize(100) // 最大连接数.minSize(10) // 最小连接数.maxWaitTime(30, TimeUnit.SECONDS) // 最大等待时间.maxConnectionIdleTime(60, TimeUnit.SECONDS); // 连接空闲时间}).build();return MongoClients.create(settings);}@Beanpublic MongoTemplate mongoTemplate() {return new MongoTemplate(mongoClient(), "sharded_database");}
}
2. 分片键设计
@Document(collection = "users")
public class User {@Idprivate String id;@Indexedprivate String userId; // 分片键private String username;private String email;private Date createTime;private String region; // 地理位置// 构造函数、getter、setter
}@Document(collection = "orders")
public class Order {@Idprivate String id;@Indexedprivate String customerId; // 分片键private String orderId;private BigDecimal amount;private Date orderTime;private String status;// 构造函数、getter、setter
}@Document(collection = "products")
public class Product {@Idprivate String id;@Indexedprivate String categoryId; // 分片键private String productName;private BigDecimal price;private String description;// 构造函数、getter、setter
}
3. 分片管理服务
@Service
public class MongoShardingService {@Autowiredprivate MongoTemplate mongoTemplate;/*** 启用数据库分片*/public void enableSharding(String database) {Document command = new Document("enableSharding", database);mongoTemplate.getDb().runCommand(command);log.info("已启用数据库分片: {}", database);}/*** 对集合进行分片*/public void shardCollection(String database, String collection, String shardKey) {Document command = new Document("shardCollection", database + "." + collection).append("key", new Document(shardKey, 1));mongoTemplate.getDb().runCommand(command);log.info("已对集合进行分片: {}.{}, 分片键: {}", database, collection, shardKey);}/*** 创建哈希分片*/public void createHashedSharding(String database, String collection, String shardKey) {Document command = new Document("shardCollection", database + "." + collection).append("key", new Document(shardKey, "hashed"));mongoTemplate.getDb().runCommand(command);log.info("已创建哈希分片: {}.{}, 分片键: {}", database, collection, shardKey);}/*** 创建范围分片*/public void createRangeSharding(String database, String collection, String shardKey) {Document command = new Document("shardCollection", database + "." + collection).append("key", new Document(shardKey, 1));mongoTemplate.getDb().runCommand(command);log.info("已创建范围分片: {}.{}, 分片键: {}", database, collection, shardKey);}/*** 创建复合分片键*/public void createCompoundSharding(String database, String collection, Map<String, Object> shardKeys) {Document keyDoc = new Document();shardKeys.forEach(keyDoc::append);Document command = new Document("shardCollection", database + "." + collection).append("key", keyDoc);mongoTemplate.getDb().runCommand(command);log.info("已创建复合分片: {}.{}, 分片键: {}", database, collection, shardKeys);}/*** 查看分片状态*/public Document getShardingStatus() {return mongoTemplate.getDb().runCommand(new Document("sh.status", 1));}/*** 查看集合分片信息*/public Document getCollectionShardInfo(String database, String collection) {Document command = new Document("collStats", collection).append("verbose", true);return mongoTemplate.getDb(database).runCommand(command);}
}
4. 分片初始化配置
@Component
public class ShardingInitializer {@Autowiredprivate MongoShardingService shardingService;@EventListener(ApplicationReadyEvent.class)public void initializeSharding() {try {// 启用数据库分片shardingService.enableSharding("sharded_database");// 用户集合 - 使用userId哈希分片shardingService.createHashedSharding("sharded_database", "users", "userId");// 订单集合 - 使用customerId范围分片shardingService.createRangeSharding("sharded_database", "orders", "customerId");// 产品集合 - 使用复合分片键Map<String, Object> productShardKeys = new HashMap<>();productShardKeys.put("categoryId", 1);productShardKeys.put("productId", 1);shardingService.createCompoundSharding("sharded_database", "products", productShardKeys);log.info("MongoDB分片初始化完成");} catch (Exception e) {log.error("MongoDB分片初始化失败", e);}}
}
分片策略优化
1. 智能分片键选择
@Service
public class ShardKeyOptimizer {@Autowiredprivate MongoTemplate mongoTemplate;/*** 分析集合的查询模式*/public ShardKeyRecommendation analyzeQueryPatterns(String collection) {// 分析查询日志List<Document> queryLogs = getQueryLogs(collection);Map<String, Integer> fieldUsageCount = new HashMap<>();Map<String, Double> fieldSelectivity = new HashMap<>();for (Document log : queryLogs) {Document query = log.get("command", Document.class);if (query != null && query.containsKey("find")) {Document filter = query.get("filter", Document.class);if (filter != null) {analyzeFilterFields(filter, fieldUsageCount);}}}// 计算字段选择性for (String field : fieldUsageCount.keySet()) {double selectivity = calculateFieldSelectivity(collection, field);fieldSelectivity.put(field, selectivity);}return recommendShardKey(fieldUsageCount, fieldSelectivity);}private void analyzeFilterFields(Document filter, Map<String, Integer> fieldUsageCount) {for (String field : filter.keySet()) {fieldUsageCount.merge(field, 1, Integer::sum);}}private double calculateFieldSelectivity(String collection, String field) {// 计算字段的选择性(不重复值的比例)Aggregation aggregation = Aggregation.newAggregation(Aggregation.group(field),Aggregation.count().as("distinctCount"));AggregationResults<Document> results = mongoTemplate.aggregate(aggregation, collection, Document.class);long distinctCount = results.getMappedResults().size();long totalCount = mongoTemplate.count(new Query(), collection);return totalCount > 0 ? (double) distinctCount / totalCount : 0;}private ShardKeyRecommendation recommendShardKey(Map<String, Integer> fieldUsageCount, Map<String, Double> fieldSelectivity) {// 综合考虑使用频率和选择性String recommendedField = fieldUsageCount.entrySet().stream().max((e1, e2) -> {double score1 = e1.getValue() * fieldSelectivity.getOrDefault(e1.getKey(), 0.0);double score2 = e2.getValue() * fieldSelectivity.getOrDefault(e2.getKey(), 0.0);return Double.compare(score1, score2);}).map(Map.Entry::getKey).orElse("_id");return new ShardKeyRecommendation(recommendedField, fieldSelectivity.getOrDefault(recommendedField, 0.0));}private List<Document> getQueryLogs(String collection) {// 从MongoDB profiler获取查询日志Query query = new Query(Criteria.where("ns").is("sharded_database." + collection).and("ts").gte(new Date(System.currentTimeMillis() - 24 * 60 * 60 * 1000))); // 最近24小时return mongoTemplate.find(query, Document.class, "system.profile");}public static class ShardKeyRecommendation {private String field;private double selectivity;public ShardKeyRecommendation(String field, double selectivity) {this.field = field;this.selectivity = selectivity;}// getter和setter}
}
2. 数据平衡监控
@Service
public class ShardBalanceMonitor {@Autowiredprivate MongoTemplate mongoTemplate;@Autowiredprivate MeterRegistry meterRegistry;/*** 监控分片数据分布*/@Scheduled(fixedRate = 300000) // 5分钟检查一次public void monitorShardDistribution() {try {Document shardStats = mongoTemplate.getDb().runCommand(new Document("shardDistribution", 1));analyzeShardBalance(shardStats);} catch (Exception e) {log.error("分片分布监控失败", e);}}private void analyzeShardBalance(Document shardStats) {Document shards = shardStats.get("shards", Document.class);if (shards == null) return;Map<String, Long> shardSizes = new HashMap<>();long totalSize = 0;for (String shardName : shards.keySet()) {Document shardInfo = shards.get(shardName, Document.class);long size = shardInfo.getLong("size");shardSizes.put(shardName, size);totalSize += size;}// 计算分布不均衡度double imbalanceRatio = calculateImbalanceRatio(shardSizes, totalSize);// 记录指标Gauge.builder("mongodb.shard.imbalance.ratio").register(meterRegistry, imbalanceRatio);// 如果不均衡度超过阈值,触发重新平衡if (imbalanceRatio > 0.3) { // 30%的不均衡度log.warn("检测到分片数据不均衡,不均衡度: {:.2f}", imbalanceRatio);triggerRebalance();}}private double calculateImbalanceRatio(Map<String, Long> shardSizes, long totalSize) {if (shardSizes.isEmpty() || totalSize == 0) return 0;double avgSize = (double) totalSize / shardSizes.size();double maxDeviation = shardSizes.values().stream().mapToDouble(size -> Math.abs(size - avgSize) / avgSize).max().orElse(0);return maxDeviation;}private void triggerRebalance() {try {// 启动平衡器mongoTemplate.getDb().runCommand(new Document("balancerStart", 1));log.info("已启动分片重新平衡");} catch (Exception e) {log.error("启动分片重新平衡失败", e);}}/*** 监控chunk分布*/@Scheduled(fixedRate = 600000) // 10分钟检查一次public void monitorChunkDistribution() {try {// 查询chunks集合Query query = new Query();List<Document> chunks = mongoTemplate.find(query, Document.class, "chunks");Map<String, Integer> shardChunkCount = new HashMap<>();for (Document chunk : chunks) {String shard = chunk.getString("shard");shardChunkCount.merge(shard, 1, Integer::sum);}// 记录每个分片的chunk数量for (Map.Entry<String, Integer> entry : shardChunkCount.entrySet()) {Gauge.builder("mongodb.shard.chunk.count").tag("shard", entry.getKey()).register(meterRegistry, entry.getValue());}} catch (Exception e) {log.error("Chunk分布监控失败", e);}}
}
3. 查询路由优化
@Service
public class QueryRoutingOptimizer {@Autowiredprivate MongoTemplate mongoTemplate;/*** 优化查询以避免跨分片操作*/public <T> List<T> optimizedFind(Query query, Class<T> entityClass, String collection) {// 分析查询是否包含分片键if (containsShardKey(query, collection)) {// 包含分片键,可以路由到特定分片return mongoTemplate.find(query, entityClass, collection);} else {// 不包含分片键,需要广播查询log.warn("查询不包含分片键,将执行跨分片查询: {}", query);return mongoTemplate.find(query, entityClass, collection);}}/*** 批量查询优化*/public <T> List<T> optimizedBatchFind(List<String> shardKeyValues, String shardKeyField, Class<T> entityClass, String collection) {// 按分片键分组Map<String, List<String>> shardGroups = groupByShardKey(shardKeyValues, shardKeyField);List<T> results = new ArrayList<>();// 并行查询各分片shardGroups.entrySet().parallelStream().forEach(entry -> {Query query = new Query(Criteria.where(shardKeyField).in(entry.getValue()));List<T> shardResults = mongoTemplate.find(query, entityClass, collection);synchronized (results) {results.addAll(shardResults);}});return results;}/*** 聚合查询优化*/public <T> AggregationResults<T> optimizedAggregate(Aggregation aggregation, String collection, Class<T> outputType) {// 检查聚合管道是否可以下推到分片if (canPushDownToShards(aggregation)) {return mongoTemplate.aggregate(aggregation, collection, outputType);} else {// 需要在mongos层进行聚合log.warn("聚合操作需要在mongos层执行,可能影响性能");return mongoTemplate.aggregate(aggregation, collection, outputType);}}private boolean containsShardKey(Query query, String collection) {// 获取集合的分片键信息String shardKey = getShardKey(collection);if (shardKey == null) return false;// 检查查询条件是否包含分片键Document queryDoc = query.getQueryObject();return queryDoc.containsKey(shardKey);}private String getShardKey(String collection) {try {// 从config.collections获取分片键信息Query query = new Query(Criteria.where("_id").is("sharded_database." + collection));Document collectionInfo = mongoTemplate.findOne(query, Document.class, "collections");if (collectionInfo != null) {Document key = collectionInfo.get("key", Document.class);if (key != null && !key.isEmpty()) {return key.keySet().iterator().next();}}} catch (Exception e) {log.error("获取分片键失败", e);}return null;}private Map<String, List<String>> groupByShardKey(List<String> values, String shardKeyField) {// 根据分片键值计算目标分片Map<String, List<String>> groups = new HashMap<>();for (String value : values) {String targetShard = calculateTargetShard(value, shardKeyField);groups.computeIfAbsent(targetShard, k -> new ArrayList<>()).add(value);}return groups;}private String calculateTargetShard(String shardKeyValue, String shardKeyField) {// 简化的分片计算逻辑int hash = shardKeyValue.hashCode();int shardCount = getShardCount();int shardIndex = Math.abs(hash) % shardCount;return "shard" + shardIndex;}private int getShardCount() {try {Document listShards = mongoTemplate.getDb().runCommand(new Document("listShards", 1));List<Document> shards = listShards.getList("shards", Document.class);return shards != null ? shards.size() : 1;} catch (Exception e) {log.error("获取分片数量失败", e);return 1;}}private boolean canPushDownToShards(Aggregation aggregation) {// 检查聚合管道是否包含可以下推到分片的操作List<AggregationOperation> operations = aggregation.getOperations();for (AggregationOperation operation : operations) {if (operation instanceof GroupOperation || operation instanceof SortOperation ||operation instanceof LimitOperation) {// 这些操作通常需要在mongos层执行return false;}}return true;}
}
性能优化
1. 连接池优化
@Configuration
public class MongoConnectionOptimization {@Beanpublic MongoClientSettings mongoClientSettings() {return MongoClientSettings.builder().applyToConnectionPoolSettings(builder -> {builder.maxSize(200) // 最大连接数.minSize(20) // 最小连接数.maxWaitTime(30, TimeUnit.SECONDS) // 最大等待时间.maxConnectionLifeTime(60, TimeUnit.MINUTES) // 连接最大生存时间.maxConnectionIdleTime(30, TimeUnit.MINUTES) // 连接最大空闲时间.maintenanceInitialDelay(0, TimeUnit.SECONDS).maintenanceFrequency(30, TimeUnit.SECONDS); // 维护频率}).applyToSocketSettings(builder -> {builder.connectTimeout(10, TimeUnit.SECONDS) // 连接超时.readTimeout(30, TimeUnit.SECONDS); // 读取超时}).applyToServerSettings(builder -> {builder.heartbeatFrequency(10, TimeUnit.SECONDS) // 心跳频率.minHeartbeatFrequency(500, TimeUnit.MILLISECONDS); // 最小心跳频率}).build();}
}
2. 批量操作优化
@Service
public class MongoBatchOptimization {@Autowiredprivate MongoTemplate mongoTemplate;/*** 批量插入优化*/public <T> void optimizedBatchInsert(List<T> documents, String collection) {if (documents.isEmpty()) return;// 按分片键分组Map<String, List<T>> shardGroups = groupDocumentsByShardKey(documents, collection);// 并行插入各分片shardGroups.entrySet().parallelStream().forEach(entry -> {List<T> shardDocuments = entry.getValue();// 分批插入,避免单次操作过大int batchSize = 1000;for (int i = 0; i < shardDocuments.size(); i += batchSize) {int endIndex = Math.min(i + batchSize, shardDocuments.size());List<T> batch = shardDocuments.subList(i, endIndex);mongoTemplate.insert(batch, collection);}});}/*** 批量更新优化*/public void optimizedBatchUpdate(List<UpdateRequest> updateRequests, String collection) {BulkOperations bulkOps = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, collection);for (UpdateRequest request : updateRequests) {bulkOps.updateOne(request.getQuery(), request.getUpdate());}BulkWriteResult result = bulkOps.execute();log.info("批量更新完成,匹配: {}, 修改: {}", result.getMatchedCount(), result.getModifiedCount());}/*** 批量删除优化*/public void optimizedBatchDelete(List<Query> deleteQueries, String collection) {BulkOperations bulkOps = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, collection);for (Query query : deleteQueries) {bulkOps.remove(query);}BulkWriteResult result = bulkOps.execute();log.info("批量删除完成,删除数量: {}", result.getDeletedCount());}private <T> Map<String, List<T>> groupDocumentsByShardKey(List<T> documents, String collection) {// 根据分片键对文档进行分组Map<String, List<T>> groups = new HashMap<>();String shardKey = getShardKey(collection);if (shardKey == null) {groups.put("default", documents);return groups;}for (T document : documents) {String shardKeyValue = extractShardKeyValue(document, shardKey);String targetShard = calculateTargetShard(shardKeyValue);groups.computeIfAbsent(targetShard, k -> new ArrayList<>()).add(document);}return groups;}private String extractShardKeyValue(Object document, String shardKey) {// 使用反射或其他方式提取分片键值try {Field field = document.getClass().getDeclaredField(shardKey);field.setAccessible(true);Object value = field.get(document);return value != null ? value.toString() : "";} catch (Exception e) {log.error("提取分片键值失败", e);return "";}}private String getShardKey(String collection) {// 获取集合的分片键return "userId"; // 简化实现}private String calculateTargetShard(String shardKeyValue) {// 计算目标分片return "shard0"; // 简化实现}public static class UpdateRequest {private Query query;private Update update;public UpdateRequest(Query query, Update update) {this.query = query;this.update = update;}// getter和setter}
}
3. 索引优化
@Service
public class MongoIndexOptimization {@Autowiredprivate MongoTemplate mongoTemplate;/*** 创建分片友好的索引*/public void createShardFriendlyIndexes(String collection) {// 1. 分片键索引(自动创建)// 2. 复合索引(包含分片键)Index compoundIndex = new Index().on("userId", Sort.Direction.ASC) // 分片键.on("createTime", Sort.Direction.DESC).on("status", Sort.Direction.ASC);mongoTemplate.indexOps(collection).ensureIndex(compoundIndex);// 3. 查询优化索引Index queryIndex = new Index().on("userId", Sort.Direction.ASC) // 分片键.on("email", Sort.Direction.ASC).sparse(); // 稀疏索引mongoTemplate.indexOps(collection).ensureIndex(queryIndex);// 4. 地理位置索引Index geoIndex = new Index().on("userId", Sort.Direction.ASC) // 分片键.on("location", "2dsphere");mongoTemplate.indexOps(collection).ensureIndex(geoIndex);}/*** 监控索引使用情况*/@Scheduled(fixedRate = 3600000) // 1小时检查一次public void monitorIndexUsage() {List<String> collections = Arrays.asList("users", "orders", "products");for (String collection : collections) {try {// 获取索引统计信息Document indexStats = mongoTemplate.getDb().getCollection(collection).aggregate(Arrays.asList(new Document("$indexStats", new Document()))).first();if (indexStats != null) {analyzeIndexUsage(collection, indexStats);}} catch (Exception e) {log.error("监控索引使用情况失败: {}", collection, e);}}}private void analyzeIndexUsage(String collection, Document indexStats) {Document accesses = indexStats.get("accesses", Document.class);if (accesses != null) {long ops = accesses.getLong("ops");Date since = accesses.getDate("since");if (ops == 0 && since != null) {long daysSinceLastUse = (System.currentTimeMillis() - since.getTime()) / (24 * 60 * 60 * 1000);if (daysSinceLastUse > 30) {log.warn("索引 {} 在集合 {} 中超过30天未使用,考虑删除", indexStats.getString("name"), collection);}}}}/*** 自动创建查询优化索引*/public void autoCreateQueryIndexes(String collection, List<Document> queryPatterns) {Map<String, Integer> fieldUsageCount = new HashMap<>();// 分析查询模式for (Document query : queryPatterns) {analyzeQueryFields(query, fieldUsageCount);}// 创建高频查询字段的索引fieldUsageCount.entrySet().stream().filter(entry -> entry.getValue() > 100) // 使用次数超过100.forEach(entry -> {String field = entry.getKey();if (!field.equals("_id")) { // 跳过默认索引Index index = new Index().on(field, Sort.Direction.ASC);mongoTemplate.indexOps(collection).ensureIndex(index);log.info("为字段 {} 创建索引,使用频率: {}", field, entry.getValue());}});}private void analyzeQueryFields(Document query, Map<String, Integer> fieldUsageCount) {for (String field : query.keySet()) {if (!field.startsWith("$")) { // 跳过操作符fieldUsageCount.merge(field, 1, Integer::sum);}}}
}
监控与运维
1. 分片集群监控
@Component
public class MongoShardingMonitor {@Autowiredprivate MongoTemplate mongoTemplate;@Autowiredprivate MeterRegistry meterRegistry;/*** 监控分片集群健康状态*/@Scheduled(fixedRate = 30000)public void monitorClusterHealth() {try {// 检查mongos状态Document isMaster = mongoTemplate.getDb().runCommand(new Document("isMaster", 1));boolean isMongos = isMaster.getBoolean("ismaster", false);// 检查分片状态Document listShards = mongoTemplate.getDb().runCommand(new Document("listShards", 1));List<Document> shards = listShards.getList("shards", Document.class);int healthyShards = 0;int totalShards = shards.size();for (Document shard : shards) {String state = shard.getString("state");if ("1".equals(state)) {healthyShards++;}}// 记录指标Gauge.builder("mongodb.cluster.shards.total").register(meterRegistry, totalShards);Gauge.builder("mongodb.cluster.shards.healthy").register(meterRegistry, healthyShards);// 检查平衡器状态Document balancerStatus = mongoTemplate.getDb().runCommand(new Document("balancerStatus", 1));boolean balancerEnabled = balancerStatus.getBoolean("mode", false);Gauge.builder("mongodb.cluster.balancer.enabled").register(meterRegistry, balancerEnabled ? 1 : 0);} catch (Exception e) {log.error("MongoDB集群健康监控失败", e);}}/*** 监控分片性能指标*/@Scheduled(fixedRate = 60000)public void monitorShardPerformance() {try {Document serverStatus = mongoTemplate.getDb().runCommand(new Document("serverStatus", 1));// 连接数Document connections = serverStatus.get("connections", Document.class);if (connections != null) {int current = connections.getInteger("current", 0);int available = connections.getInteger("available", 0);Gauge.builder("mongodb.connections.current").register(meterRegistry, current);Gauge.builder("mongodb.connections.available").register(meterRegistry, available);}// 操作计数Document opcounters = serverStatus.get("opcounters", Document.class);if (opcounters != null) {long insert = opcounters.getLong("insert");long query = opcounters.getLong("query");long update = opcounters.getLong("update");long delete = opcounters.getLong("delete");Counter.builder("mongodb.operations.insert").register(meterRegistry).increment(insert);Counter.builder("mongodb.operations.query").register(meterRegistry).increment(query);Counter.builder("mongodb.operations.update").register(meterRegistry).increment(update);Counter.builder("mongodb.operations.delete").register(meterRegistry).increment(delete);}} catch (Exception e) {log.error("MongoDB性能监控失败", e);}}
}
2. 自动故障恢复
@Service
public class MongoFailoverService {@Autowiredprivate MongoTemplate mongoTemplate;@Autowiredprivate NotificationService notificationService;/*** 检测并处理分片故障*/@Scheduled(fixedRate = 15000)public void detectAndHandleFailures() {try {Document listShards = mongoTemplate.getDb().runCommand(new Document("listShards", 1));List<Document> shards = listShards.getList("shards", Document.class);for (Document shard : shards) {String shardId = shard.getString("_id");String host = shard.getString("host");String state = shard.getString("state");if (!"1".equals(state)) {handleShardFailure(shardId, host);}}} catch (Exception e) {log.error("分片故障检测失败", e);}}private void handleShardFailure(String shardId, String host) {log.error("检测到分片故障: {} ({})", shardId, host);// 发送告警notificationService.sendAlert("MongoDB分片故障",String.format("分片 %s (%s) 发生故障,请及时处理", shardId, host));// 尝试自动恢复attemptAutoRecovery(shardId, host);}private void attemptAutoRecovery(String shardId, String host) {try {// 检查副本集状态if (host.contains("/")) {String[] parts = host.split("/");String replSetName = parts[0];String[] hosts = parts[1].split(",");// 尝试连接副本集的其他成员for (String memberHost : hosts) {if (testConnection(memberHost)) {log.info("副本集 {} 的成员 {} 仍然可用", replSetName, memberHost);return;}}}// 如果所有成员都不可用,尝试重启服务log.warn("分片 {} 的所有成员都不可用,需要手动干预", shardId);} catch (Exception e) {log.error("自动恢复失败", e);}}private boolean testConnection(String host) {try {MongoClient testClient = MongoClients.create("mongodb://" + host);testClient.getDatabase("admin").runCommand(new Document("ping", 1));testClient.close();return true;} catch (Exception e) {return false;}}
}
总结
MongoDB分片技术是处理大规模数据的重要解决方案。成功实施需要考虑:
分片键设计:选择合适的分片键是关键,需要平衡查询性能和数据分布
架构规划:合理规划Config Server、分片和mongos的部署
查询优化:尽量包含分片键以避免跨分片查询
监控运维:建立完善的监控体系,及时发现和处理问题
最佳实践:
选择高基数、查询频繁的字段作为分片键
使用复合分片键提高查询效率
定期监控数据分布和性能指标
建立自动化的故障检测和恢复机制
通过合理的设计和实施,MongoDB分片可以为应用提供优秀的水平扩展能力。