clickhouse-jdbc中负载均衡数据源的实现。
基本逻辑如下:
1.通过配置的url串,来切分构造url列表;
2.通过一个定时线程任务,来不断的去ping url列表,来更新可用的url列表;
3.在可用列表中随机返回一个可用url;
/*** 提供负载均衡能力的datasource实现*/
public class BalancedClickhouseDataSource implements DataSource {private static final Logger log = LoggerFactory.getLogger(BalancedClickhouseDataSource.class);private static final Pattern URL_TEMPLATE = Pattern.compile("jdbc:clickhouse://([a-zA-Z0-9_:,.-]+)(/[a-zA-Z0-9_]+([?][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+([&][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+)*)?)?");private PrintWriter printWriter;private int loginTimeoutSeconds;//随机数private final ThreadLocal<Random> randomThreadLocal;//所有的urlprivate final List<String> allUrls;//可用的urlprivate volatile List<String> enabledUrls;private final ClickHouseProperties properties;private final ClickHouseDriver driver;public BalancedClickhouseDataSource(String url) {this(splitUrl(url), getFromUrl(url));}public BalancedClickhouseDataSource(String url, Properties properties) {this(splitUrl(url), new ClickHouseProperties(properties));}public BalancedClickhouseDataSource(String url, ClickHouseProperties properties) {this(splitUrl(url), properties.merge(getFromUrlWithoutDefault(url)));}private BalancedClickhouseDataSource(List<String> urls) {this(urls, new ClickHouseProperties());}private BalancedClickhouseDataSource(List<String> urls, Properties info) {this(urls, new ClickHouseProperties(info));}private BalancedClickhouseDataSource(List<String> urls, ClickHouseProperties properties) {this.loginTimeoutSeconds = 0;this.randomThreadLocal = new ThreadLocal();this.driver = new ClickHouseDriver();if (urls.isEmpty()) {throw new IllegalArgumentException("Incorrect ClickHouse jdbc url list. It must be not empty");} else {try {//解析配置文件ClickHouseProperties localProperties = ClickhouseJdbcUrlParser.parse((String)urls.get(0), properties.asProperties());localProperties.setHost((String)null);localProperties.setPort(-1);this.properties = localProperties;} catch (URISyntaxException var8) {throw new IllegalArgumentException(var8);}List<String> allUrls = new ArrayList(urls.size());Iterator var4 = urls.iterator();while(var4.hasNext()) {String url = (String)var4.next();try {//如果合法urlif (this.driver.acceptsURL(url)) {//添加到所有的url列表allUrls.add(url);} else {log.error("that url is has not correct format: {}", url);}} catch (SQLException var7) {throw new IllegalArgumentException("error while checking url: " + url, var7);}}if (allUrls.isEmpty()) {throw new IllegalArgumentException("there are no correct urls");} else {//所有urlthis.allUrls = Collections.unmodifiableList(allUrls);//可用urlthis.enabledUrls = this.allUrls;}}}/*** 切割url* @param url* @return*/static List<String> splitUrl(String url) {//校验url合法性Matcher m = URL_TEMPLATE.matcher(url);if (!m.matches()) {throw new IllegalArgumentException("Incorrect url");} else {String database = m.group(2);if (database == null) {database = "";}//切割url串String[] hosts = m.group(1).split(",");List<String> result = new ArrayList(hosts.length);String[] var5 = hosts;int var6 = hosts.length;//遍历,添加切割后的urlfor(int var7 = 0; var7 < var6; ++var7) {String host = var5[var7];result.add("jdbc:clickhouse://" + host + database);}return result;}}/*** ping url看是否可用* @param url* @return*/private boolean ping(String url) {try {//执行简单sql测试url链接可用性this.driver.connect(url, this.properties).createStatement().execute("SELECT 1");return true;} catch (Exception var3) {return false;}}/*** 遍历所有url,通过ping的方式,选择出可用的url* @return*/public synchronized int actualize() {//新建可用url列表List<String> enabledUrls = new ArrayList(this.allUrls.size());Iterator var2 = this.allUrls.iterator();while(var2.hasNext()) {String url = (String)var2.next();log.debug("Pinging disabled url: {}", url);if (this.ping(url)) {log.debug("Url is alive now: {}", url);//ping通的才添加进可用的enabledUrls.add(url);} else {log.debug("Url is dead now: {}", url);}}//重置可用url列表this.enabledUrls = Collections.unmodifiableList(enabledUrls);return enabledUrls.size();}/*** 随机获取可用url返回* @return* @throws SQLException*/private String getAnyUrl() throws SQLException {//可用url列表List<String> localEnabledUrls = this.enabledUrls;if (localEnabledUrls.isEmpty()) {throw new SQLException("Unable to get connection: there are no enabled urls");} else {Random random = (Random)this.randomThreadLocal.get();if (random == null) {this.randomThreadLocal.set(new Random());//产生一个随机数random = (Random)this.randomThreadLocal.get();}int index = random.nextInt(localEnabledUrls.size());//用随机数选择一个可用的url返回return (String)localEnabledUrls.get(index);}}public ClickHouseConnection getConnection() throws SQLException {return this.driver.connect(this.getAnyUrl(), this.properties);}public ClickHouseConnection getConnection(String username, String password) throws SQLException {return this.driver.connect(this.getAnyUrl(), this.properties.withCredentials(username, password));}public <T> T unwrap(Class<T> iface) throws SQLException {if (iface.isAssignableFrom(this.getClass())) {return iface.cast(this);} else {throw new SQLException("Cannot unwrap to " + iface.getName());}}public boolean isWrapperFor(Class<?> iface) throws SQLException {return iface.isAssignableFrom(this.getClass());}public PrintWriter getLogWriter() throws SQLException {return this.printWriter;}public void setLogWriter(PrintWriter printWriter) throws SQLException {this.printWriter = printWriter;}public void setLoginTimeout(int seconds) throws SQLException {this.loginTimeoutSeconds = seconds;}public int getLoginTimeout() throws SQLException {return this.loginTimeoutSeconds;}public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {throw new SQLFeatureNotSupportedException();}/*** 定期清理无用url链接* @param rate* @param timeUnit* @return*/public BalancedClickhouseDataSource withConnectionsCleaning(int rate, TimeUnit timeUnit) {this.driver.scheduleConnectionsCleaning(rate, timeUnit);return this;}/*** 定期确认url,通过定时任务实现,以定时更新可用url列表* @param delay* @param timeUnit* @return*/public BalancedClickhouseDataSource scheduleActualization(int delay, TimeUnit timeUnit) {ScheduledConnectionCleaner.INSTANCE.scheduleWithFixedDelay(new Runnable() {public void run() {try {BalancedClickhouseDataSource.this.actualize();} catch (Exception var2) {BalancedClickhouseDataSource.log.error("Unable to actualize urls", var2);}}}, 0L, (long)delay, timeUnit);return this;}public List<String> getAllClickhouseUrls() {return this.allUrls;}public List<String> getEnabledClickHouseUrls() {return this.enabledUrls;}/*** 返回不可用url集合* 通过all 和 enable的差值来找* * @return*/public List<String> getDisabledUrls() {List<String> enabledUrls = this.enabledUrls;if (!this.hasDisabledUrls()) {return Collections.emptyList();} else {List<String> disabledUrls = new ArrayList(this.allUrls);disabledUrls.removeAll(enabledUrls);return disabledUrls;}}public boolean hasDisabledUrls() {return this.allUrls.size() != this.enabledUrls.size();}public ClickHouseProperties getProperties() {return this.properties;}private static ClickHouseProperties getFromUrl(String url) {return new ClickHouseProperties(getFromUrlWithoutDefault(url));}private static Properties getFromUrlWithoutDefault(String url) {if (StringUtils.isBlank(url)) {return new Properties();} else {int index = url.indexOf("?");return index == -1 ? new Properties() : ClickhouseJdbcUrlParser.parseUriQueryPart(url.substring(index + 1), new Properties());}}
}
新需求,每次获取连接实例的时候打印出连接的ip
每次获取连接
```java
BalancedClickhouseDataSource source ;
connection=source.getConnection();
//获取的连接ip
String url = connection.getMetaData().getURL();