MongoDB分片技术实现

概述

MongoDB分片(Sharding)是MongoDB的水平扩展解决方案,通过将数据分布到多个分片(shard)上来处理大数据量和高吞吐量的需求。

MongoDB分片架构

1. 分片集群组件

# MongoDB分片集群架构
version: '3.8'
services:# Config Server副本集config1:image: mongo:5.0command: mongod --configsvr --replSet configReplSet --port 27019ports:- "27019:27019"volumes:- config1_data:/data/dbconfig2:image: mongo:5.0command: mongod --configsvr --replSet configReplSet --port 27019ports:- "27020:27019"volumes:- config2_data:/data/dbconfig3:image: mongo:5.0command: mongod --configsvr --replSet configReplSet --port 27019ports:- "27021:27019"volumes:- config3_data:/data/db# 分片1副本集shard1_replica1:image: mongo:5.0command: mongod --shardsvr --replSet shard1ReplSet --port 27018ports:- "27022:27018"volumes:- shard1_replica1_data:/data/dbshard1_replica2:image: mongo:5.0command: mongod --shardsvr --replSet shard1ReplSet --port 27018ports:- "27023:27018"volumes:- shard1_replica2_data:/data/db# 分片2副本集shard2_replica1:image: mongo:5.0command: mongod --shardsvr --replSet shard2ReplSet --port 27018ports:- "27024:27018"volumes:- shard2_replica1_data:/data/dbshard2_replica2:image: mongo:5.0command: mongod --shardsvr --replSet shard2ReplSet --port 27018ports:- "27025:27018"volumes:- shard2_replica2_data:/data/db# mongos路由服务mongos1:image: mongo:5.0command: mongos --configdb configReplSet/config1:27019,config2:27019,config3:27019 --port 27017ports:- "27017:27017"depends_on:- config1- config2- config3mongos2:image: mongo:5.0command: mongos --configdb configReplSet/config1:27019,config2:27019,config3:27019 --port 27017ports:- "27026:27017"depends_on:- config1- config2- config3volumes:config1_data:config2_data:config3_data:shard1_replica1_data:shard1_replica2_data:shard2_replica1_data:shard2_replica2_data:

2. 分片集群初始化脚本

#!/bin/bash
# mongodb-cluster-init.shecho "初始化MongoDB分片集群..."# 等待服务启动
sleep 30# 初始化Config Server副本集
echo "初始化Config Server副本集..."
mongo --host config1:27019 --eval '
rs.initiate({_id: "configReplSet",configsvr: true,members: [{ _id: 0, host: "config1:27019" },{ _id: 1, host: "config2:27019" },{ _id: 2, host: "config3:27019" }]
})'# 等待副本集初始化完成
sleep 20# 初始化分片1副本集
echo "初始化分片1副本集..."
mongo --host shard1_replica1:27018 --eval '
rs.initiate({_id: "shard1ReplSet",members: [{ _id: 0, host: "shard1_replica1:27018" },{ _id: 1, host: "shard1_replica2:27018" }]
})'# 初始化分片2副本集
echo "初始化分片2副本集..."
mongo --host shard2_replica1:27018 --eval '
rs.initiate({_id: "shard2ReplSet",members: [{ _id: 0, host: "shard2_replica1:27018" },{ _id: 1, host: "shard2_replica2:27018" }]
})'# 等待分片副本集初始化完成
sleep 30# 添加分片到集群
echo "添加分片到集群..."
mongo --host mongos1:27017 --eval '
sh.addShard("shard1ReplSet/shard1_replica1:27018,shard1_replica2:27018")
sh.addShard("shard2ReplSet/shard2_replica1:27018,shard2_replica2:27018")
'echo "MongoDB分片集群初始化完成!"

Java应用集成

1. Spring Boot配置

@Configuration
public class MongoShardingConfig {@Value("${spring.data.mongodb.uri}")private String mongoUri;@Beanpublic MongoClient mongoClient() {// 连接到mongos路由服务ConnectionString connectionString = new ConnectionString(mongoUri);MongoClientSettings settings = MongoClientSettings.builder().applyConnectionString(connectionString).readPreference(ReadPreference.secondaryPreferred()) // 读写分离.writeConcern(WriteConcern.MAJORITY) // 写关注.readConcern(ReadConcern.MAJORITY) // 读关注.retryWrites(true) // 重试写入.retryReads(true) // 重试读取.applyToConnectionPoolSettings(builder -> {builder.maxSize(100) // 最大连接数.minSize(10) // 最小连接数.maxWaitTime(30, TimeUnit.SECONDS) // 最大等待时间.maxConnectionIdleTime(60, TimeUnit.SECONDS); // 连接空闲时间}).build();return MongoClients.create(settings);}@Beanpublic MongoTemplate mongoTemplate() {return new MongoTemplate(mongoClient(), "sharded_database");}
}

2. 分片键设计

@Document(collection = "users")
public class User {@Idprivate String id;@Indexedprivate String userId; // 分片键private String username;private String email;private Date createTime;private String region; // 地理位置// 构造函数、getter、setter
}@Document(collection = "orders")
public class Order {@Idprivate String id;@Indexedprivate String customerId; // 分片键private String orderId;private BigDecimal amount;private Date orderTime;private String status;// 构造函数、getter、setter
}@Document(collection = "products")
public class Product {@Idprivate String id;@Indexedprivate String categoryId; // 分片键private String productName;private BigDecimal price;private String description;// 构造函数、getter、setter
}

3. 分片管理服务

@Service
public class MongoShardingService {@Autowiredprivate MongoTemplate mongoTemplate;/*** 启用数据库分片*/public void enableSharding(String database) {Document command = new Document("enableSharding", database);mongoTemplate.getDb().runCommand(command);log.info("已启用数据库分片: {}", database);}/*** 对集合进行分片*/public void shardCollection(String database, String collection, String shardKey) {Document command = new Document("shardCollection", database + "." + collection).append("key", new Document(shardKey, 1));mongoTemplate.getDb().runCommand(command);log.info("已对集合进行分片: {}.{}, 分片键: {}", database, collection, shardKey);}/*** 创建哈希分片*/public void createHashedSharding(String database, String collection, String shardKey) {Document command = new Document("shardCollection", database + "." + collection).append("key", new Document(shardKey, "hashed"));mongoTemplate.getDb().runCommand(command);log.info("已创建哈希分片: {}.{}, 分片键: {}", database, collection, shardKey);}/*** 创建范围分片*/public void createRangeSharding(String database, String collection, String shardKey) {Document command = new Document("shardCollection", database + "." + collection).append("key", new Document(shardKey, 1));mongoTemplate.getDb().runCommand(command);log.info("已创建范围分片: {}.{}, 分片键: {}", database, collection, shardKey);}/*** 创建复合分片键*/public void createCompoundSharding(String database, String collection, Map<String, Object> shardKeys) {Document keyDoc = new Document();shardKeys.forEach(keyDoc::append);Document command = new Document("shardCollection", database + "." + collection).append("key", keyDoc);mongoTemplate.getDb().runCommand(command);log.info("已创建复合分片: {}.{}, 分片键: {}", database, collection, shardKeys);}/*** 查看分片状态*/public Document getShardingStatus() {return mongoTemplate.getDb().runCommand(new Document("sh.status", 1));}/*** 查看集合分片信息*/public Document getCollectionShardInfo(String database, String collection) {Document command = new Document("collStats", collection).append("verbose", true);return mongoTemplate.getDb(database).runCommand(command);}
}

4. 分片初始化配置

@Component
public class ShardingInitializer {@Autowiredprivate MongoShardingService shardingService;@EventListener(ApplicationReadyEvent.class)public void initializeSharding() {try {// 启用数据库分片shardingService.enableSharding("sharded_database");// 用户集合 - 使用userId哈希分片shardingService.createHashedSharding("sharded_database", "users", "userId");// 订单集合 - 使用customerId范围分片shardingService.createRangeSharding("sharded_database", "orders", "customerId");// 产品集合 - 使用复合分片键Map<String, Object> productShardKeys = new HashMap<>();productShardKeys.put("categoryId", 1);productShardKeys.put("productId", 1);shardingService.createCompoundSharding("sharded_database", "products", productShardKeys);log.info("MongoDB分片初始化完成");} catch (Exception e) {log.error("MongoDB分片初始化失败", e);}}
}

分片策略优化

1. 智能分片键选择

@Service
public class ShardKeyOptimizer {@Autowiredprivate MongoTemplate mongoTemplate;/*** 分析集合的查询模式*/public ShardKeyRecommendation analyzeQueryPatterns(String collection) {// 分析查询日志List<Document> queryLogs = getQueryLogs(collection);Map<String, Integer> fieldUsageCount = new HashMap<>();Map<String, Double> fieldSelectivity = new HashMap<>();for (Document log : queryLogs) {Document query = log.get("command", Document.class);if (query != null && query.containsKey("find")) {Document filter = query.get("filter", Document.class);if (filter != null) {analyzeFilterFields(filter, fieldUsageCount);}}}// 计算字段选择性for (String field : fieldUsageCount.keySet()) {double selectivity = calculateFieldSelectivity(collection, field);fieldSelectivity.put(field, selectivity);}return recommendShardKey(fieldUsageCount, fieldSelectivity);}private void analyzeFilterFields(Document filter, Map<String, Integer> fieldUsageCount) {for (String field : filter.keySet()) {fieldUsageCount.merge(field, 1, Integer::sum);}}private double calculateFieldSelectivity(String collection, String field) {// 计算字段的选择性(不重复值的比例)Aggregation aggregation = Aggregation.newAggregation(Aggregation.group(field),Aggregation.count().as("distinctCount"));AggregationResults<Document> results = mongoTemplate.aggregate(aggregation, collection, Document.class);long distinctCount = results.getMappedResults().size();long totalCount = mongoTemplate.count(new Query(), collection);return totalCount > 0 ? (double) distinctCount / totalCount : 0;}private ShardKeyRecommendation recommendShardKey(Map<String, Integer> fieldUsageCount, Map<String, Double> fieldSelectivity) {// 综合考虑使用频率和选择性String recommendedField = fieldUsageCount.entrySet().stream().max((e1, e2) -> {double score1 = e1.getValue() * fieldSelectivity.getOrDefault(e1.getKey(), 0.0);double score2 = e2.getValue() * fieldSelectivity.getOrDefault(e2.getKey(), 0.0);return Double.compare(score1, score2);}).map(Map.Entry::getKey).orElse("_id");return new ShardKeyRecommendation(recommendedField, fieldSelectivity.getOrDefault(recommendedField, 0.0));}private List<Document> getQueryLogs(String collection) {// 从MongoDB profiler获取查询日志Query query = new Query(Criteria.where("ns").is("sharded_database." + collection).and("ts").gte(new Date(System.currentTimeMillis() - 24 * 60 * 60 * 1000))); // 最近24小时return mongoTemplate.find(query, Document.class, "system.profile");}public static class ShardKeyRecommendation {private String field;private double selectivity;public ShardKeyRecommendation(String field, double selectivity) {this.field = field;this.selectivity = selectivity;}// getter和setter}
}

2. 数据平衡监控

@Service
public class ShardBalanceMonitor {@Autowiredprivate MongoTemplate mongoTemplate;@Autowiredprivate MeterRegistry meterRegistry;/*** 监控分片数据分布*/@Scheduled(fixedRate = 300000) // 5分钟检查一次public void monitorShardDistribution() {try {Document shardStats = mongoTemplate.getDb().runCommand(new Document("shardDistribution", 1));analyzeShardBalance(shardStats);} catch (Exception e) {log.error("分片分布监控失败", e);}}private void analyzeShardBalance(Document shardStats) {Document shards = shardStats.get("shards", Document.class);if (shards == null) return;Map<String, Long> shardSizes = new HashMap<>();long totalSize = 0;for (String shardName : shards.keySet()) {Document shardInfo = shards.get(shardName, Document.class);long size = shardInfo.getLong("size");shardSizes.put(shardName, size);totalSize += size;}// 计算分布不均衡度double imbalanceRatio = calculateImbalanceRatio(shardSizes, totalSize);// 记录指标Gauge.builder("mongodb.shard.imbalance.ratio").register(meterRegistry, imbalanceRatio);// 如果不均衡度超过阈值,触发重新平衡if (imbalanceRatio > 0.3) { // 30%的不均衡度log.warn("检测到分片数据不均衡,不均衡度: {:.2f}", imbalanceRatio);triggerRebalance();}}private double calculateImbalanceRatio(Map<String, Long> shardSizes, long totalSize) {if (shardSizes.isEmpty() || totalSize == 0) return 0;double avgSize = (double) totalSize / shardSizes.size();double maxDeviation = shardSizes.values().stream().mapToDouble(size -> Math.abs(size - avgSize) / avgSize).max().orElse(0);return maxDeviation;}private void triggerRebalance() {try {// 启动平衡器mongoTemplate.getDb().runCommand(new Document("balancerStart", 1));log.info("已启动分片重新平衡");} catch (Exception e) {log.error("启动分片重新平衡失败", e);}}/*** 监控chunk分布*/@Scheduled(fixedRate = 600000) // 10分钟检查一次public void monitorChunkDistribution() {try {// 查询chunks集合Query query = new Query();List<Document> chunks = mongoTemplate.find(query, Document.class, "chunks");Map<String, Integer> shardChunkCount = new HashMap<>();for (Document chunk : chunks) {String shard = chunk.getString("shard");shardChunkCount.merge(shard, 1, Integer::sum);}// 记录每个分片的chunk数量for (Map.Entry<String, Integer> entry : shardChunkCount.entrySet()) {Gauge.builder("mongodb.shard.chunk.count").tag("shard", entry.getKey()).register(meterRegistry, entry.getValue());}} catch (Exception e) {log.error("Chunk分布监控失败", e);}}
}

3. 查询路由优化

@Service
public class QueryRoutingOptimizer {@Autowiredprivate MongoTemplate mongoTemplate;/*** 优化查询以避免跨分片操作*/public <T> List<T> optimizedFind(Query query, Class<T> entityClass, String collection) {// 分析查询是否包含分片键if (containsShardKey(query, collection)) {// 包含分片键,可以路由到特定分片return mongoTemplate.find(query, entityClass, collection);} else {// 不包含分片键,需要广播查询log.warn("查询不包含分片键,将执行跨分片查询: {}", query);return mongoTemplate.find(query, entityClass, collection);}}/*** 批量查询优化*/public <T> List<T> optimizedBatchFind(List<String> shardKeyValues, String shardKeyField, Class<T> entityClass, String collection) {// 按分片键分组Map<String, List<String>> shardGroups = groupByShardKey(shardKeyValues, shardKeyField);List<T> results = new ArrayList<>();// 并行查询各分片shardGroups.entrySet().parallelStream().forEach(entry -> {Query query = new Query(Criteria.where(shardKeyField).in(entry.getValue()));List<T> shardResults = mongoTemplate.find(query, entityClass, collection);synchronized (results) {results.addAll(shardResults);}});return results;}/*** 聚合查询优化*/public <T> AggregationResults<T> optimizedAggregate(Aggregation aggregation, String collection, Class<T> outputType) {// 检查聚合管道是否可以下推到分片if (canPushDownToShards(aggregation)) {return mongoTemplate.aggregate(aggregation, collection, outputType);} else {// 需要在mongos层进行聚合log.warn("聚合操作需要在mongos层执行,可能影响性能");return mongoTemplate.aggregate(aggregation, collection, outputType);}}private boolean containsShardKey(Query query, String collection) {// 获取集合的分片键信息String shardKey = getShardKey(collection);if (shardKey == null) return false;// 检查查询条件是否包含分片键Document queryDoc = query.getQueryObject();return queryDoc.containsKey(shardKey);}private String getShardKey(String collection) {try {// 从config.collections获取分片键信息Query query = new Query(Criteria.where("_id").is("sharded_database." + collection));Document collectionInfo = mongoTemplate.findOne(query, Document.class, "collections");if (collectionInfo != null) {Document key = collectionInfo.get("key", Document.class);if (key != null && !key.isEmpty()) {return key.keySet().iterator().next();}}} catch (Exception e) {log.error("获取分片键失败", e);}return null;}private Map<String, List<String>> groupByShardKey(List<String> values, String shardKeyField) {// 根据分片键值计算目标分片Map<String, List<String>> groups = new HashMap<>();for (String value : values) {String targetShard = calculateTargetShard(value, shardKeyField);groups.computeIfAbsent(targetShard, k -> new ArrayList<>()).add(value);}return groups;}private String calculateTargetShard(String shardKeyValue, String shardKeyField) {// 简化的分片计算逻辑int hash = shardKeyValue.hashCode();int shardCount = getShardCount();int shardIndex = Math.abs(hash) % shardCount;return "shard" + shardIndex;}private int getShardCount() {try {Document listShards = mongoTemplate.getDb().runCommand(new Document("listShards", 1));List<Document> shards = listShards.getList("shards", Document.class);return shards != null ? shards.size() : 1;} catch (Exception e) {log.error("获取分片数量失败", e);return 1;}}private boolean canPushDownToShards(Aggregation aggregation) {// 检查聚合管道是否包含可以下推到分片的操作List<AggregationOperation> operations = aggregation.getOperations();for (AggregationOperation operation : operations) {if (operation instanceof GroupOperation || operation instanceof SortOperation ||operation instanceof LimitOperation) {// 这些操作通常需要在mongos层执行return false;}}return true;}
}

性能优化

1. 连接池优化

@Configuration
public class MongoConnectionOptimization {@Beanpublic MongoClientSettings mongoClientSettings() {return MongoClientSettings.builder().applyToConnectionPoolSettings(builder -> {builder.maxSize(200)                    // 最大连接数.minSize(20)                        // 最小连接数.maxWaitTime(30, TimeUnit.SECONDS)  // 最大等待时间.maxConnectionLifeTime(60, TimeUnit.MINUTES) // 连接最大生存时间.maxConnectionIdleTime(30, TimeUnit.MINUTES) // 连接最大空闲时间.maintenanceInitialDelay(0, TimeUnit.SECONDS).maintenanceFrequency(30, TimeUnit.SECONDS); // 维护频率}).applyToSocketSettings(builder -> {builder.connectTimeout(10, TimeUnit.SECONDS)    // 连接超时.readTimeout(30, TimeUnit.SECONDS);         // 读取超时}).applyToServerSettings(builder -> {builder.heartbeatFrequency(10, TimeUnit.SECONDS)   // 心跳频率.minHeartbeatFrequency(500, TimeUnit.MILLISECONDS); // 最小心跳频率}).build();}
}

2. 批量操作优化

@Service
public class MongoBatchOptimization {@Autowiredprivate MongoTemplate mongoTemplate;/*** 批量插入优化*/public <T> void optimizedBatchInsert(List<T> documents, String collection) {if (documents.isEmpty()) return;// 按分片键分组Map<String, List<T>> shardGroups = groupDocumentsByShardKey(documents, collection);// 并行插入各分片shardGroups.entrySet().parallelStream().forEach(entry -> {List<T> shardDocuments = entry.getValue();// 分批插入,避免单次操作过大int batchSize = 1000;for (int i = 0; i < shardDocuments.size(); i += batchSize) {int endIndex = Math.min(i + batchSize, shardDocuments.size());List<T> batch = shardDocuments.subList(i, endIndex);mongoTemplate.insert(batch, collection);}});}/*** 批量更新优化*/public void optimizedBatchUpdate(List<UpdateRequest> updateRequests, String collection) {BulkOperations bulkOps = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, collection);for (UpdateRequest request : updateRequests) {bulkOps.updateOne(request.getQuery(), request.getUpdate());}BulkWriteResult result = bulkOps.execute();log.info("批量更新完成,匹配: {}, 修改: {}", result.getMatchedCount(), result.getModifiedCount());}/*** 批量删除优化*/public void optimizedBatchDelete(List<Query> deleteQueries, String collection) {BulkOperations bulkOps = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, collection);for (Query query : deleteQueries) {bulkOps.remove(query);}BulkWriteResult result = bulkOps.execute();log.info("批量删除完成,删除数量: {}", result.getDeletedCount());}private <T> Map<String, List<T>> groupDocumentsByShardKey(List<T> documents, String collection) {// 根据分片键对文档进行分组Map<String, List<T>> groups = new HashMap<>();String shardKey = getShardKey(collection);if (shardKey == null) {groups.put("default", documents);return groups;}for (T document : documents) {String shardKeyValue = extractShardKeyValue(document, shardKey);String targetShard = calculateTargetShard(shardKeyValue);groups.computeIfAbsent(targetShard, k -> new ArrayList<>()).add(document);}return groups;}private String extractShardKeyValue(Object document, String shardKey) {// 使用反射或其他方式提取分片键值try {Field field = document.getClass().getDeclaredField(shardKey);field.setAccessible(true);Object value = field.get(document);return value != null ? value.toString() : "";} catch (Exception e) {log.error("提取分片键值失败", e);return "";}}private String getShardKey(String collection) {// 获取集合的分片键return "userId"; // 简化实现}private String calculateTargetShard(String shardKeyValue) {// 计算目标分片return "shard0"; // 简化实现}public static class UpdateRequest {private Query query;private Update update;public UpdateRequest(Query query, Update update) {this.query = query;this.update = update;}// getter和setter}
}

3. 索引优化

@Service
public class MongoIndexOptimization {@Autowiredprivate MongoTemplate mongoTemplate;/*** 创建分片友好的索引*/public void createShardFriendlyIndexes(String collection) {// 1. 分片键索引(自动创建)// 2. 复合索引(包含分片键)Index compoundIndex = new Index().on("userId", Sort.Direction.ASC)  // 分片键.on("createTime", Sort.Direction.DESC).on("status", Sort.Direction.ASC);mongoTemplate.indexOps(collection).ensureIndex(compoundIndex);// 3. 查询优化索引Index queryIndex = new Index().on("userId", Sort.Direction.ASC)  // 分片键.on("email", Sort.Direction.ASC).sparse(); // 稀疏索引mongoTemplate.indexOps(collection).ensureIndex(queryIndex);// 4. 地理位置索引Index geoIndex = new Index().on("userId", Sort.Direction.ASC)  // 分片键.on("location", "2dsphere");mongoTemplate.indexOps(collection).ensureIndex(geoIndex);}/*** 监控索引使用情况*/@Scheduled(fixedRate = 3600000) // 1小时检查一次public void monitorIndexUsage() {List<String> collections = Arrays.asList("users", "orders", "products");for (String collection : collections) {try {// 获取索引统计信息Document indexStats = mongoTemplate.getDb().getCollection(collection).aggregate(Arrays.asList(new Document("$indexStats", new Document()))).first();if (indexStats != null) {analyzeIndexUsage(collection, indexStats);}} catch (Exception e) {log.error("监控索引使用情况失败: {}", collection, e);}}}private void analyzeIndexUsage(String collection, Document indexStats) {Document accesses = indexStats.get("accesses", Document.class);if (accesses != null) {long ops = accesses.getLong("ops");Date since = accesses.getDate("since");if (ops == 0 && since != null) {long daysSinceLastUse = (System.currentTimeMillis() - since.getTime()) / (24 * 60 * 60 * 1000);if (daysSinceLastUse > 30) {log.warn("索引 {} 在集合 {} 中超过30天未使用,考虑删除", indexStats.getString("name"), collection);}}}}/*** 自动创建查询优化索引*/public void autoCreateQueryIndexes(String collection, List<Document> queryPatterns) {Map<String, Integer> fieldUsageCount = new HashMap<>();// 分析查询模式for (Document query : queryPatterns) {analyzeQueryFields(query, fieldUsageCount);}// 创建高频查询字段的索引fieldUsageCount.entrySet().stream().filter(entry -> entry.getValue() > 100) // 使用次数超过100.forEach(entry -> {String field = entry.getKey();if (!field.equals("_id")) { // 跳过默认索引Index index = new Index().on(field, Sort.Direction.ASC);mongoTemplate.indexOps(collection).ensureIndex(index);log.info("为字段 {} 创建索引,使用频率: {}", field, entry.getValue());}});}private void analyzeQueryFields(Document query, Map<String, Integer> fieldUsageCount) {for (String field : query.keySet()) {if (!field.startsWith("$")) { // 跳过操作符fieldUsageCount.merge(field, 1, Integer::sum);}}}
}

监控与运维

1. 分片集群监控

@Component
public class MongoShardingMonitor {@Autowiredprivate MongoTemplate mongoTemplate;@Autowiredprivate MeterRegistry meterRegistry;/*** 监控分片集群健康状态*/@Scheduled(fixedRate = 30000)public void monitorClusterHealth() {try {// 检查mongos状态Document isMaster = mongoTemplate.getDb().runCommand(new Document("isMaster", 1));boolean isMongos = isMaster.getBoolean("ismaster", false);// 检查分片状态Document listShards = mongoTemplate.getDb().runCommand(new Document("listShards", 1));List<Document> shards = listShards.getList("shards", Document.class);int healthyShards = 0;int totalShards = shards.size();for (Document shard : shards) {String state = shard.getString("state");if ("1".equals(state)) {healthyShards++;}}// 记录指标Gauge.builder("mongodb.cluster.shards.total").register(meterRegistry, totalShards);Gauge.builder("mongodb.cluster.shards.healthy").register(meterRegistry, healthyShards);// 检查平衡器状态Document balancerStatus = mongoTemplate.getDb().runCommand(new Document("balancerStatus", 1));boolean balancerEnabled = balancerStatus.getBoolean("mode", false);Gauge.builder("mongodb.cluster.balancer.enabled").register(meterRegistry, balancerEnabled ? 1 : 0);} catch (Exception e) {log.error("MongoDB集群健康监控失败", e);}}/*** 监控分片性能指标*/@Scheduled(fixedRate = 60000)public void monitorShardPerformance() {try {Document serverStatus = mongoTemplate.getDb().runCommand(new Document("serverStatus", 1));// 连接数Document connections = serverStatus.get("connections", Document.class);if (connections != null) {int current = connections.getInteger("current", 0);int available = connections.getInteger("available", 0);Gauge.builder("mongodb.connections.current").register(meterRegistry, current);Gauge.builder("mongodb.connections.available").register(meterRegistry, available);}// 操作计数Document opcounters = serverStatus.get("opcounters", Document.class);if (opcounters != null) {long insert = opcounters.getLong("insert");long query = opcounters.getLong("query");long update = opcounters.getLong("update");long delete = opcounters.getLong("delete");Counter.builder("mongodb.operations.insert").register(meterRegistry).increment(insert);Counter.builder("mongodb.operations.query").register(meterRegistry).increment(query);Counter.builder("mongodb.operations.update").register(meterRegistry).increment(update);Counter.builder("mongodb.operations.delete").register(meterRegistry).increment(delete);}} catch (Exception e) {log.error("MongoDB性能监控失败", e);}}
}

2. 自动故障恢复

@Service
public class MongoFailoverService {@Autowiredprivate MongoTemplate mongoTemplate;@Autowiredprivate NotificationService notificationService;/*** 检测并处理分片故障*/@Scheduled(fixedRate = 15000)public void detectAndHandleFailures() {try {Document listShards = mongoTemplate.getDb().runCommand(new Document("listShards", 1));List<Document> shards = listShards.getList("shards", Document.class);for (Document shard : shards) {String shardId = shard.getString("_id");String host = shard.getString("host");String state = shard.getString("state");if (!"1".equals(state)) {handleShardFailure(shardId, host);}}} catch (Exception e) {log.error("分片故障检测失败", e);}}private void handleShardFailure(String shardId, String host) {log.error("检测到分片故障: {} ({})", shardId, host);// 发送告警notificationService.sendAlert("MongoDB分片故障",String.format("分片 %s (%s) 发生故障,请及时处理", shardId, host));// 尝试自动恢复attemptAutoRecovery(shardId, host);}private void attemptAutoRecovery(String shardId, String host) {try {// 检查副本集状态if (host.contains("/")) {String[] parts = host.split("/");String replSetName = parts[0];String[] hosts = parts[1].split(",");// 尝试连接副本集的其他成员for (String memberHost : hosts) {if (testConnection(memberHost)) {log.info("副本集 {} 的成员 {} 仍然可用", replSetName, memberHost);return;}}}// 如果所有成员都不可用,尝试重启服务log.warn("分片 {} 的所有成员都不可用,需要手动干预", shardId);} catch (Exception e) {log.error("自动恢复失败", e);}}private boolean testConnection(String host) {try {MongoClient testClient = MongoClients.create("mongodb://" + host);testClient.getDatabase("admin").runCommand(new Document("ping", 1));testClient.close();return true;} catch (Exception e) {return false;}}
}

总结

MongoDB分片技术是处理大规模数据的重要解决方案。成功实施需要考虑:

  1. 分片键设计:选择合适的分片键是关键,需要平衡查询性能和数据分布

  2. 架构规划:合理规划Config Server、分片和mongos的部署

  3. 查询优化:尽量包含分片键以避免跨分片查询

  4. 监控运维:建立完善的监控体系,及时发现和处理问题

最佳实践:

  • 选择高基数、查询频繁的字段作为分片键

  • 使用复合分片键提高查询效率

  • 定期监控数据分布和性能指标

  • 建立自动化的故障检测和恢复机制

通过合理的设计和实施,MongoDB分片可以为应用提供优秀的水平扩展能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/92118.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/92118.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/92118.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python开发环境PyCharm下载与安装

python下载 python下载地址&#xff1a; Download Python | Python.org 上面的下载速度慢的话&#xff0c;用下面的地址下载&#xff08;window&#xff09;&#xff1a; https://download.csdn.net/download/liangmengbk/91580033 PyCharm下载 PyCharm下载地址&#xff1a…

汽车供应链PPAP自动化审核指南:如何用AI实现规则精准匹配与文件智能校验

在汽车行业质量管理的核心环节&#xff0c;PPAP&#xff08;生产件批准程序&#xff09;审核长期困扰着供应商与主机厂。 随着IATF 16949等标准持续升级、新能源零件复杂度激增&#xff0c;传统人工审核模式正面临系统性挑战。 行业数据显示&#xff0c;超过70%的SQE&#xf…

正则表达式在js中的应用

正则表达式在 JavaScript 中的应用非常广泛&#xff0c;尤其是在字符串处理和验证方面。以下是一些常见的正则表达式方法及其应用示例&#xff0c;包括 .test() 方法。 1. .test() 方法 .test() 方法用于测试一个字符串是否匹配正则表达式。如果匹配&#xff0c;返回 true&…

Rust视频处理开源项目精选

Rust视频处理开源项目精选 基于Rust实现的视频处理示例 以下是一些基于Rust实现的视频处理或多媒体相关的开源项目或示例,涵盖编解码、流媒体、分析工具等方向,可作为实际开发参考: 视频编解码与处理 rav1e:Rust编写的AV1视频编码器,高性能且内存安全,适合研究视频压缩…

Python爬虫实战:研究pycrumbs库,构建豆瓣读书数据采集系统

1. 引言 1.1 研究背景 在大数据与人工智能技术快速发展的背景下,互联网作为全球最大的信息载体,蕴含着海量结构化与非结构化数据。高效、合规地获取这些数据成为数据分析、业务决策的前提。网络爬虫作为自动化数据采集工具,通过模拟人类浏览行为遍历网页并提取信息,已成为…

linux的用户操作(详细介绍)

在 Linux 系统中&#xff0c;用户管理是系统管理员的核心工作之一&#xff0c;涉及用户账号的创建、修改、删除、权限分配等操作。Linux 采用多用户多任务机制&#xff0c;通过严格的用户和组管理确保系统安全性和资源分配合理性。以下是 Linux 用户操作的详细介绍&#xff1a;…

k8s常见问题

以下是 Kubernetes 常见问题&#xff08;FAQ&#xff09;的整理&#xff0c;涵盖了初学者和运维人员常遇到的痛点&#xff1a; ​一、部署与安装问题​ ​安装太复杂&#xff1f;​​ 解决方案&#xff1a;使用 ​kubeadm​&#xff08;官方工具&#xff09;、Minikube​&#…

RK Android14 新建分区恢复出厂设置分区数据不擦除及开机动画自定义(一)

文章目录 前言 一、分区创建与参数配置 二、分区挂载配置 三、SELinux 安全策略 四、系统初始化配置 五、开机动画路径重定向 总结 前言 本方案通过在 RK3568 Android 14 系统中创建一个独立的 rk_partition 分区(128MB),实现以下核心功能: 出厂设置保护:该分区在恢复出厂…

如何快速给PDF加书签--保姆级教程

买的电子书没有目录书签看着不舒服&#xff0c;手动加书签加到想吐。想有没有办法快速加书签。这要分为PDF目录部分可以被复制和不可被复制两种情况。不可复制时&#xff0c;要用到工具把目录提取出来&#xff0c;变成文字。 工具&#xff1a;Foxit Phantom福昕阅读器&#xff…

Redis面试精讲 Day 9:Redis模块开发与扩展

【Redis面试精讲 Day 9】Redis模块开发与扩展 文章标签 Redis,模块开发,扩展机制,面试技巧,Redis模块,Redis插件 文章简述 本文是"Redis面试精讲"系列第9天&#xff0c;聚焦Redis模块开发与扩展机制。文章详细解析Redis模块系统的架构设计&#xff0c;包括模块加…

八股训练--Spring

目录 一、引言 二、Spring 1.Spring框架的特性 2.介绍一下IOC和AOP 3.IOC和AOP都是如何实现的 4.怎么实现依赖注入 5.为什么AOP不用静态代理 6.介绍一下反射 7.Spring如何解决循环依赖问题 8.Spring常用注解 9.Spring事务什么情况会失效 10.Bean的生命周期 11.Bean…

无公网环境下在centos7.9上使用kk工具部署k8s平台(amd64架构)

文章目录前言一、环境列表二、思路三、环境准备四、有网环境下准备文件1.下载所需的rpm包2.准备harbor需要用到的镜像3. k8s的镜像文件4、 生成离线安装包5、harbor创建项目脚本五、无公网环境部署单点集群1、基础环境安装2、安装harbor3 、 准备k8s镜像4、安装k8s六、无公网环…

Objective-C中非传统设计模式的探索与实践

本文还有配套的精品资源&#xff0c;点击获取 简介&#xff1a;Objective-C的设计模式不仅仅局限于经典模式&#xff0c;还可以利用其动态特性实现一些非传统的模式。本文介绍了一系列基于Objective-C动态特性的设计模式&#xff0c;包括使用协议代替类继承、通过分类扩展类…

【笔记】重学单片机(51)(下)

中断系统 正常运行过程中&#xff0c;被打断进行另外工作&#xff0c;结束后回到原有进程。 5个中断源 外部中断源&#xff08;2个&#xff09;&#xff1a;INT0——由P3.2端口线引入&#xff0c;低电平或下降沿引起。INT1——由P3.3端口线引入&#xff0c;低电平或下降沿引起。…

Go实现程序启动器进而实现隐藏真实内容

注意&#xff1a; 本文内容于 2025-08-03 01:10:35 创建&#xff0c;可能不会在此平台上进行更新。如果您希望查看最新版本或更多相关内容&#xff0c;请访问原文地址&#xff1a;Go实现程序启动器进而实现隐藏真实内容。感谢您的关注与支持&#xff01; 突发奇想&#xff0c;…

Fiddler 中文版怎么用 实现接口抓包调试与前后端联调闭环

API调试在现代开发流程中的地位愈发重要&#xff1a;接口数量激增、请求逻辑复杂、数据结构多变、安全校验机制加严……一个小小的参数错误、一次隐蔽的跨域问题、一个环境配置疏漏&#xff0c;都可能导致长时间的排查成本。而拥有一款既强大又易用的调试工具&#xff0c;尤其是…

ollama 多实例部署

如果我们需要在一台服务器上使用多个ollama服务&#xff0c;那么我们需要进行将ollama前端和ollama后端对应连接的操作&#xff0c;否则就会出现如下场景&#xff1a;我们可以在当前端口设置&#xff0c;这句话就是指明当前ollama实例使用哪个后端进行请求&#xff1a;export O…

orchestrator部署

场景&#xff1a; 用于管理MySQL高可用 下载jq包 每台orchestrator集群机器上都进行下载。 # wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm # rpm -ivh epel-release-latest-7.noarch.rpm # yum repolist ###检查是否已经添加到源列表 # yum i…

CentOS 6.4 上安装 Oracle 10.2.0.1 并升级到 10.2.0.4

目录 一、系统检查与设置 1. 检查系统版本与磁盘空间 2. 修改系统参数 3. 创建组和用户 4. 设置主机名 5. 检查安装软件包 6. 设置 oracle 用户环境变量 二、安装 Oracle 软件包 1. 安装 10.2.0.1 安装包 2. 安装 10.2.0.4 补丁 三、建库 四、配置监听器 1. 编辑配…

【基于C# + HALCON的工业视系统开发实战】二十六、车规级PCB全自动质检:3D SPI+AI光学检测融合方案

摘要&#xff1a;本文详细阐述基于C# .NET Core 6与HALCON 24.11开发的车规级PCB板AOI智能检测系统&#xff0c;提出3D SPI与AI光学检测融合方案。系统通过结构光3D测量技术实现锡膏印刷质量检测&#xff0c;结合多算法融合的自动光学检测完成元件缺陷识别&#xff0c;构建SPI与…