Skip to content

Commit 5b1256b

Browse files
author
Jarvis
committed
closed #91 support master/slave mode datasource
1 parent 454c8d4 commit 5b1256b

File tree

11 files changed

+612
-12
lines changed

11 files changed

+612
-12
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package org.springframework.data.mybatis.replication.datasource;
2+
3+
import javax.sql.DataSource;
4+
import java.util.List;
5+
6+
/**
7+
* @author Jarvis Song
8+
*/
9+
public interface DatasourceSelectPolicy {
10+
11+
int select(List<DataSource> slaves);
12+
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,347 @@
1+
package org.springframework.data.mybatis.replication.datasource;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.beans.factory.InitializingBean;
6+
7+
import javax.sql.DataSource;
8+
import java.io.PrintWriter;
9+
import java.lang.reflect.InvocationHandler;
10+
import java.lang.reflect.InvocationTargetException;
11+
import java.lang.reflect.Method;
12+
import java.lang.reflect.Proxy;
13+
import java.sql.Connection;
14+
import java.sql.SQLException;
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
18+
/**
19+
* @author Jarvis Song
20+
*/
21+
public class LazyReplicationConnectionDataSourceProxy implements DataSource, InitializingBean {
22+
private final transient static Logger logger = LoggerFactory.getLogger(LazyReplicationConnectionDataSourceProxy.class);
23+
24+
private DataSource master;
25+
private List<DataSource> slaves = new ArrayList<DataSource>();
26+
27+
private Boolean defaultAutoCommit;
28+
private Integer defaultTransactionIsolation;
29+
private DatasourceSelectPolicy datasourceSelectPolicy;
30+
31+
public LazyReplicationConnectionDataSourceProxy(DataSource master, List<DataSource> slaves) {
32+
this.master = master;
33+
this.slaves = slaves;
34+
}
35+
36+
@Override
37+
public void afterPropertiesSet() throws Exception {
38+
// Determine default auto-commit and transaction isolation
39+
// via a Connection from the target DataSource, if possible.
40+
if (this.defaultAutoCommit == null || this.defaultTransactionIsolation == null) {
41+
try {
42+
Connection con = getMaster().getConnection();
43+
try {
44+
checkDefaultConnectionProperties(con);
45+
} finally {
46+
con.close();
47+
}
48+
} catch (SQLException ex) {
49+
logger.warn("Could not retrieve default auto-commit and transaction isolation settings", ex);
50+
}
51+
}
52+
53+
if (null == datasourceSelectPolicy && !slaves.isEmpty()) {
54+
datasourceSelectPolicy = new RoundRobinDatasourceSelectPolicy();
55+
}
56+
}
57+
58+
@Override
59+
public Connection getConnection() throws SQLException {
60+
return (Connection) Proxy.newProxyInstance(
61+
ReplicationConnectionProxy.class.getClassLoader(),
62+
new Class<?>[]{ReplicationConnectionProxy.class},
63+
new LazyReplicationConnectionInvocationHandler());
64+
}
65+
66+
@Override
67+
public Connection getConnection(String username, String password) throws SQLException {
68+
return (Connection) Proxy.newProxyInstance(
69+
ReplicationConnectionProxy.class.getClassLoader(),
70+
new Class<?>[]{ReplicationConnectionProxy.class},
71+
new LazyReplicationConnectionInvocationHandler(username, password));
72+
}
73+
74+
@Override
75+
public PrintWriter getLogWriter() throws SQLException {
76+
return getMaster().getLogWriter();
77+
}
78+
79+
@Override
80+
public void setLogWriter(PrintWriter out) throws SQLException {
81+
getMaster().setLogWriter(out);
82+
getSlave().setLogWriter(out);
83+
}
84+
85+
@Override
86+
public int getLoginTimeout() throws SQLException {
87+
return getMaster().getLoginTimeout();
88+
}
89+
90+
@Override
91+
public void setLoginTimeout(int seconds) throws SQLException {
92+
getMaster().setLoginTimeout(seconds);
93+
getSlave().setLoginTimeout(seconds);
94+
}
95+
96+
97+
//---------------------------------------------------------------------
98+
// Implementation of JDBC 4.0's Wrapper interface
99+
//---------------------------------------------------------------------
100+
@Override
101+
@SuppressWarnings("unchecked")
102+
public <T> T unwrap(Class<T> iface) throws SQLException {
103+
if (iface.isInstance(this)) {
104+
return (T) this;
105+
}
106+
return getMaster().unwrap(iface);
107+
}
108+
109+
@Override
110+
public boolean isWrapperFor(Class<?> iface) throws SQLException {
111+
return (iface.isInstance(this) || getMaster().isWrapperFor(iface));
112+
}
113+
114+
//---------------------------------------------------------------------
115+
// Implementation of JDBC 4.1's getParentLogger method
116+
//---------------------------------------------------------------------
117+
public java.util.logging.Logger getParentLogger() {
118+
return java.util.logging.Logger.getLogger(java.util.logging.Logger.GLOBAL_LOGGER_NAME);
119+
}
120+
121+
public DataSource getMaster() {
122+
return master;
123+
}
124+
125+
public DataSource getSlave() {
126+
if (null == slaves || slaves.isEmpty()) {
127+
return master;
128+
}
129+
int select = datasourceSelectPolicy.select(slaves);
130+
return slaves.get(select);
131+
}
132+
133+
public void setDefaultAutoCommit(boolean defaultAutoCommit) {
134+
this.defaultAutoCommit = defaultAutoCommit;
135+
}
136+
137+
public void setDefaultTransactionIsolation(int defaultTransactionIsolation) {
138+
this.defaultTransactionIsolation = defaultTransactionIsolation;
139+
}
140+
141+
/**
142+
* Check the default connection properties (auto-commit, transaction isolation),
143+
* keeping them to be able to expose them correctly without fetching an actual
144+
* JDBC Connection from the target DataSource.
145+
* <p>This will be invoked once on startup, but also for each retrieval of a
146+
* target Connection. If the check failed on startup (because the database was
147+
* down), we'll lazily retrieve those settings.
148+
*
149+
* @param con the Connection to use for checking
150+
* @throws SQLException if thrown by Connection methods
151+
*/
152+
protected synchronized void checkDefaultConnectionProperties(Connection con) throws SQLException {
153+
if (this.defaultAutoCommit == null) {
154+
this.defaultAutoCommit = con.getAutoCommit();
155+
}
156+
if (this.defaultTransactionIsolation == null) {
157+
this.defaultTransactionIsolation = con.getTransactionIsolation();
158+
}
159+
}
160+
161+
/**
162+
* Expose the default auto-commit value.
163+
*/
164+
protected Boolean defaultAutoCommit() {
165+
return this.defaultAutoCommit;
166+
}
167+
168+
/**
169+
* Expose the default transaction isolation value.
170+
*/
171+
protected Integer defaultTransactionIsolation() {
172+
return this.defaultTransactionIsolation;
173+
}
174+
175+
private interface ReplicationConnectionProxy extends Connection {
176+
Connection getReplicationTargetConnection();
177+
}
178+
179+
/**
180+
* Invocation handler that defers fetching an actual JDBC Connection
181+
* until first creation of a Statement.
182+
*/
183+
private class LazyReplicationConnectionInvocationHandler implements InvocationHandler {
184+
185+
private String username;
186+
187+
private String password;
188+
189+
private Boolean readOnly = Boolean.FALSE;
190+
191+
private Integer transactionIsolation;
192+
193+
private Boolean autoCommit;
194+
195+
private boolean closed = false;
196+
197+
private Connection replicationTargetConnection;
198+
199+
public LazyReplicationConnectionInvocationHandler() {
200+
this.autoCommit = defaultAutoCommit();
201+
this.transactionIsolation = defaultTransactionIsolation();
202+
}
203+
204+
public LazyReplicationConnectionInvocationHandler(String username, String password) {
205+
this();
206+
this.username = username;
207+
this.password = password;
208+
}
209+
210+
@Override
211+
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
212+
// Invocation on ReplicationConnectionProxy interface coming in...
213+
214+
if (method.getName().equals("equals")) {
215+
// We must avoid fetching a target Connection for "equals".
216+
// Only consider equal when proxies are identical.
217+
return (proxy == args[0]);
218+
} else if (method.getName().equals("hashCode")) {
219+
// We must avoid fetching a target Connection for "hashCode",
220+
// and we must return the same hash code even when the target
221+
// Connection has been fetched: use hashCode of Connection proxy.
222+
return System.identityHashCode(proxy);
223+
} else if (method.getName().equals("unwrap")) {
224+
if (((Class<?>) args[0]).isInstance(proxy)) {
225+
return proxy;
226+
}
227+
} else if (method.getName().equals("isWrapperFor")) {
228+
if (((Class<?>) args[0]).isInstance(proxy)) {
229+
return true;
230+
}
231+
} else if (method.getName().equals("getReplicationTargetConnection")) {
232+
// Handle getReplicationTargetConnection method: return underlying connection.
233+
return getReplicationTargetConnection(method);
234+
}
235+
236+
if (!hasTargetConnection()) {
237+
// No physical target Connection kept yet ->
238+
// resolve transaction demarcation methods without fetching
239+
// a physical JDBC Connection until absolutely necessary.
240+
241+
if (method.getName().equals("toString")) {
242+
return "Lazy Connection proxy for target write DataSource [" + getMaster() + "] and target read DataSources [" + slaves + "]";
243+
} else if (method.getName().equals("isReadOnly")) {
244+
return this.readOnly;
245+
} else if (method.getName().equals("setReadOnly")) {
246+
this.readOnly = (Boolean) args[0];
247+
return null;
248+
} else if (method.getName().equals("getTransactionIsolation")) {
249+
if (this.transactionIsolation != null) {
250+
return this.transactionIsolation;
251+
}
252+
// Else fetch actual Connection and check there,
253+
// because we didn't have a default specified.
254+
} else if (method.getName().equals("setTransactionIsolation")) {
255+
this.transactionIsolation = (Integer) args[0];
256+
return null;
257+
} else if (method.getName().equals("getAutoCommit")) {
258+
if (this.autoCommit != null) {
259+
return this.autoCommit;
260+
}
261+
// Else fetch actual Connection and check there,
262+
// because we didn't have a default specified.
263+
} else if (method.getName().equals("setAutoCommit")) {
264+
this.autoCommit = (Boolean) args[0];
265+
return null;
266+
} else if (method.getName().equals("commit")) {
267+
// Ignore: no statements created yet.
268+
return null;
269+
} else if (method.getName().equals("rollback")) {
270+
// Ignore: no statements created yet.
271+
return null;
272+
} else if (method.getName().equals("getWarnings")) {
273+
return null;
274+
} else if (method.getName().equals("clearWarnings")) {
275+
return null;
276+
} else if (method.getName().equals("close")) {
277+
// Ignore: no target connection yet.
278+
this.closed = true;
279+
return null;
280+
} else if (method.getName().equals("isClosed")) {
281+
return this.closed;
282+
} else if (this.closed) {
283+
// Connection proxy closed, without ever having fetched a
284+
// physical JDBC Connection: throw corresponding SQLException.
285+
throw new SQLException("Illegal operation: connection is closed");
286+
}
287+
}
288+
289+
// Target Connection already fetched,
290+
// or target Connection necessary for current operation ->
291+
// invoke method on target connection.
292+
try {
293+
return method.invoke(getReplicationTargetConnection(method), args);
294+
} catch (InvocationTargetException ex) {
295+
throw ex.getTargetException();
296+
}
297+
}
298+
299+
/**
300+
* Return whether the proxy currently holds a target Connection.
301+
*/
302+
private boolean hasTargetConnection() {
303+
return (this.replicationTargetConnection != null);
304+
}
305+
306+
/**
307+
* Return the target Connection, fetching it and initializing it if necessary.
308+
*/
309+
private Connection getReplicationTargetConnection(Method operation) throws SQLException {
310+
if (this.replicationTargetConnection == null) {
311+
logger.debug("Connecting to database for operation '{}'", operation.getName());
312+
313+
logger.debug("current readOnly : {}", readOnly);
314+
DataSource targetDataSource = (readOnly == Boolean.TRUE) ? getSlave() : getMaster();
315+
316+
// Fetch physical Connection from DataSource.
317+
this.replicationTargetConnection = (this.username != null) ?
318+
targetDataSource.getConnection(this.username, this.password) :
319+
targetDataSource.getConnection();
320+
321+
// If we still lack default connection properties, check them now.
322+
checkDefaultConnectionProperties(this.replicationTargetConnection);
323+
324+
// Apply kept transaction settings, if any.
325+
if (this.readOnly) {
326+
try {
327+
this.replicationTargetConnection.setReadOnly(this.readOnly);
328+
} catch (Exception ex) {
329+
// "read-only not supported" -> ignore, it's just a hint anyway
330+
logger.debug("Could not set JDBC Connection read-only", ex);
331+
}
332+
}
333+
if (this.transactionIsolation != null &&
334+
!this.transactionIsolation.equals(defaultTransactionIsolation())) {
335+
this.replicationTargetConnection.setTransactionIsolation(this.transactionIsolation);
336+
}
337+
if (this.autoCommit != null && this.autoCommit != this.replicationTargetConnection.getAutoCommit()) {
338+
this.replicationTargetConnection.setAutoCommit(this.autoCommit);
339+
}
340+
} else {
341+
logger.debug("Using existing database connection for operation '{}'", operation.getName());
342+
}
343+
344+
return this.replicationTargetConnection;
345+
}
346+
}
347+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.springframework.data.mybatis.replication.datasource;
2+
3+
import javax.sql.DataSource;
4+
import java.util.List;
5+
import java.util.Random;
6+
7+
/**
8+
* @author Jarvis Song
9+
*/
10+
public class RandomDatasourceSelectPolicy implements DatasourceSelectPolicy {
11+
private Random random = new Random();
12+
13+
@Override
14+
public int select(List<DataSource> slaves) {
15+
int i = random.nextInt(slaves.size());
16+
return i;
17+
}
18+
}

0 commit comments

Comments
 (0)