Skip to content

Commit b3114ae

Browse files
committed
Add support for multiple hosts configuration
- Allow to use Mono for user and password - Add multiple hosts connection strategy - Add HA protocol support for multiple hosts - Add DNS SRV driver for HA protocol
1 parent d533e1e commit b3114ae

32 files changed

+2411
-734
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright 2024 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.asyncer.r2dbc.mysql;
18+
19+
import io.asyncer.r2dbc.mysql.client.Client;
20+
import io.asyncer.r2dbc.mysql.constant.CompressionAlgorithm;
21+
import io.asyncer.r2dbc.mysql.constant.SslMode;
22+
import io.netty.channel.ChannelOption;
23+
import io.netty.resolver.AddressResolver;
24+
import io.netty.resolver.AddressResolverGroup;
25+
import io.netty.resolver.DefaultNameResolver;
26+
import io.netty.resolver.RoundRobinInetAddressResolver;
27+
import io.netty.util.concurrent.EventExecutor;
28+
import io.netty.util.internal.logging.InternalLogger;
29+
import io.netty.util.internal.logging.InternalLoggerFactory;
30+
import reactor.core.publisher.Mono;
31+
import reactor.netty.resources.LoopResources;
32+
import reactor.netty.tcp.TcpClient;
33+
34+
import java.net.InetSocketAddress;
35+
import java.time.Duration;
36+
import java.util.Set;
37+
38+
/**
39+
* An interface of a connection strategy that considers how to obtain a MySQL {@link Client} object.
40+
*
41+
* @since 1.2.0
42+
*/
43+
@FunctionalInterface
44+
interface ConnectionStrategy {
45+
46+
InternalLogger logger = InternalLoggerFactory.getInstance(ConnectionStrategy.class);
47+
48+
/**
49+
* Establish a connection to a target server that is determined by this connection strategy.
50+
*
51+
* @return a logged-in {@link Client} object.
52+
*/
53+
Mono<Client> connect();
54+
55+
/**
56+
* Creates a general-purpose {@link TcpClient} with the given {@link SocketClientConfiguration}.
57+
* <p>
58+
* Note: Unix Domain Socket also uses this method to create a general-purpose {@link TcpClient client}.
59+
*
60+
* @param configuration socket client configuration.
61+
* @return a general-purpose {@link TcpClient client}.
62+
*/
63+
static TcpClient createTcpClient(SocketClientConfiguration configuration, boolean balancedDns) {
64+
LoopResources loopResources = configuration.getLoopResources();
65+
Duration connectTimeout = configuration.getConnectTimeout();
66+
TcpClient client = TcpClient.newConnection();
67+
68+
if (loopResources != null) {
69+
client = client.runOn(loopResources);
70+
}
71+
72+
if (connectTimeout != null) {
73+
client = client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis()));
74+
}
75+
76+
if (balancedDns) {
77+
client = client.resolver(BalancedResolverGroup.INSTANCE);
78+
}
79+
80+
return client;
81+
}
82+
83+
/**
84+
* Logins to a MySQL server with the given {@link TcpClient}, {@link Credential} and configurations.
85+
*
86+
* @param tcpClient a TCP client to connect to a MySQL server.
87+
* @param credential user and password to log in to a MySQL server.
88+
* @param configuration a configuration that affects login behavior.
89+
* @return a logged-in {@link Client} object.
90+
*/
91+
static Mono<Client> login(
92+
TcpClient tcpClient,
93+
Credential credential,
94+
MySqlConnectionConfiguration configuration
95+
) {
96+
MySqlSslConfiguration ssl = configuration.getSsl();
97+
SslMode sslMode = ssl.getSslMode();
98+
boolean createDbIfNotExist = configuration.isCreateDatabaseIfNotExist();
99+
String database = configuration.getDatabase();
100+
String loginDb = createDbIfNotExist ? "" : database;
101+
Set<CompressionAlgorithm> compressionAlgorithms = configuration.getCompressionAlgorithms();
102+
int zstdLevel = configuration.getZstdCompressionLevel();
103+
ConnectionContext context = new ConnectionContext(
104+
configuration.getZeroDateOption(),
105+
configuration.getLoadLocalInfilePath(),
106+
configuration.getLocalInfileBufferSize(),
107+
configuration.isPreserveInstants(),
108+
configuration.retrieveConnectionZoneId()
109+
);
110+
111+
return Client.connect(tcpClient, ssl, context).flatMap(client ->
112+
QueryFlow.login(client, sslMode, loginDb, credential, compressionAlgorithms, zstdLevel));
113+
}
114+
}
115+
116+
/**
117+
* Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
118+
*
119+
* @since 1.2.0
120+
*/
121+
final class BalancedResolverGroup extends AddressResolverGroup<InetSocketAddress> {
122+
123+
BalancedResolverGroup() {
124+
}
125+
126+
public static final BalancedResolverGroup INSTANCE;
127+
128+
static {
129+
INSTANCE = new BalancedResolverGroup();
130+
Runtime.getRuntime().addShutdownHook(new Thread(
131+
INSTANCE::close,
132+
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
133+
));
134+
}
135+
136+
@Override
137+
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
138+
return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver();
139+
}
140+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2024 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.asyncer.r2dbc.mysql;
18+
19+
import org.jetbrains.annotations.Nullable;
20+
21+
import java.util.Objects;
22+
23+
/**
24+
* A value object representing a user with an optional password.
25+
*/
26+
final class Credential {
27+
28+
private final String user;
29+
30+
@Nullable
31+
private final CharSequence password;
32+
33+
Credential(String user, @Nullable CharSequence password) {
34+
this.user = user;
35+
this.password = password;
36+
}
37+
38+
String getUser() {
39+
return user;
40+
}
41+
42+
@Nullable
43+
CharSequence getPassword() {
44+
return password;
45+
}
46+
47+
@Override
48+
public boolean equals(Object o) {
49+
if (this == o) {
50+
return true;
51+
}
52+
if (!(o instanceof Credential)) {
53+
return false;
54+
}
55+
56+
Credential that = (Credential) o;
57+
58+
return user.equals(that.user) && Objects.equals(password, that.password);
59+
}
60+
61+
@Override
62+
public int hashCode() {
63+
return 31 * user.hashCode() + Objects.hashCode(password);
64+
}
65+
66+
@Override
67+
public String toString() {
68+
return "Credential{user=" + user + ", password=REDACTED}";
69+
}
70+
}

0 commit comments

Comments
 (0)