dolphin scheduler 代码乱看
2024-01-16
sql查询解析字段
挺有意思的写法, 从sql里判断查询的字段名称.
- 首先使用alibaba的druid解析sql, 直接从sql语法里判断查询字段. 感觉挺繁琐的, 如果对druid和sql解析不了解, 压根写不出来
- 如果语法解析失败, 则直接到数据库里查询, where语句是
0 = 1也就是不会真实查询到具体数据, 但是从查询到metadata可以直接获取sql引擎解析到的结果header
算是工程实践里的又一个案例case.
package org.apache.dolphinscheduler.plugin.task.datax;
String[] columns = parsingSqlColumnNames(dataxTaskExecutionContext.getSourcetype(),
dataxTaskExecutionContext.getTargetType(),
dataSourceCfg, dataXParameters.getSql());
/**
* parsing synchronized column names in SQL statements
*
* @param sourceType the database type of the data source
* @param targetType the database type of the data target
* @param dataSourceCfg the database connection parameters of the data source
* @param sql sql for data synchronization
* @return Keyword converted column names
*/
private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg,
String sql) {
String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(sourceType, sql);
if (columnNames == null || columnNames.length == 0) {
log.info("try to execute sql analysis query column name");
columnNames = tryExecuteSqlResolveColumnNames(sourceType, dataSourceCfg, sql);
}
notNull(columnNames, String.format("parsing sql columns failed : %s", sql));
return DataxUtils.convertKeywordsColumns(targetType, columnNames);
}
使用alibaba druid框架解析sql, 不过解析后也没法直接得到查询的header栏, 还有很多细节需要去处理
package org.apache.dolphinscheduler.plugin.task.datax;
import org.apache.dolphinscheduler.spi.enums.DbType;
import com.alibaba.druid.sql.dialect.clickhouse.parser.ClickhouseStatementParser;
import com.alibaba.druid.sql.dialect.hive.parser.HiveStatementParser;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser;
import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser;
import com.alibaba.druid.sql.dialect.presto.parser.PrestoStatementParser;
import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser;
import com.alibaba.druid.sql.parser.SQLStatementParser;
public static SQLStatementParser getSqlStatementParser(DbType dbType, String sql) {
switch (dbType) {
case MYSQL:
return new MySqlStatementParser(sql);
case POSTGRESQL:
return new PGSQLStatementParser(sql);
case ORACLE:
return new OracleStatementParser(sql);
case SQLSERVER:
return new SQLServerStatementParser(sql);
case CLICKHOUSE:
return new ClickhouseStatementParser(sql);
case HIVE:
return new HiveStatementParser(sql);
case PRESTO:
return new PrestoStatementParser(sql);
default:
return null;
}
}
/**
* try grammatical parsing column
*
* @param dbType database type
* @param sql sql for data synchronization
* @return column name array
* @throws RuntimeException if error throws RuntimeException
*/
private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql) {
String[] columnNames;
try {
SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql);
if (parser == null) {
log.warn("database driver [{}] is not support grammatical analysis sql", dbType);
return new String[0];
}
SQLStatement sqlStatement = parser.parseStatement();
SQLSelectStatement sqlSelectStatement = (SQLSelectStatement) sqlStatement;
SQLSelect sqlSelect = sqlSelectStatement.getSelect();
List<SQLSelectItem> selectItemList = null;
if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) {
SQLSelectQueryBlock block = (SQLSelectQueryBlock) sqlSelect.getQuery();
selectItemList = block.getSelectList();
} else if (sqlSelect.getQuery() instanceof SQLUnionQuery) {
SQLUnionQuery unionQuery = (SQLUnionQuery) sqlSelect.getQuery();
SQLSelectQueryBlock block = (SQLSelectQueryBlock) unionQuery.getRight();
selectItemList = block.getSelectList();
}
notNull(selectItemList,
String.format("select query type [%s] is not support", sqlSelect.getQuery().toString()));
columnNames = new String[selectItemList.size()];
for (int i = 0; i < selectItemList.size(); i++) {
SQLSelectItem item = selectItemList.get(i);
String columnName = null;
if (item.getAlias() != null) {
columnName = item.getAlias();
} else if (item.getExpr() != null) {
if (item.getExpr() instanceof SQLPropertyExpr) {
SQLPropertyExpr expr = (SQLPropertyExpr) item.getExpr();
columnName = expr.getName();
} else if (item.getExpr() instanceof SQLIdentifierExpr) {
SQLIdentifierExpr expr = (SQLIdentifierExpr) item.getExpr();
columnName = expr.getName();
}
} else {
throw new RuntimeException(
String.format("grammatical analysis sql column [ %s ] failed", item));
}
if (SELECT_ALL_CHARACTER.equals(item.toString())) {
log.info("sql contains *, grammatical analysis failed");
return new String[0];
}
if (columnName == null) {
throw new RuntimeException(
String.format("grammatical analysis sql column [ %s ] failed", item));
}
columnNames[i] = columnName;
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
return new String[0];
}
return columnNames;
}
这种做法就比较像工程取巧的做法了, 使用sql到引擎里去查询, 从结果的metaData直接获取.
/**
* try to execute sql to resolve column names
*
* @param baseDataSource the database connection parameters
* @param sql sql for data synchronization
* @return column name array
*/
public String[] tryExecuteSqlResolveColumnNames(DbType sourceType, BaseConnectionParam baseDataSource, String sql) {
String[] columnNames;
sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql);
sql = sql.replace(";", "");
try (
Connection connection =
DataSourceClientProvider.getAdHocConnection(sourceType, baseDataSource);
PreparedStatement stmt = connection.prepareStatement(sql);
ResultSet resultSet = stmt.executeQuery()) {
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
columnNames = new String[num];
for (int i = 1; i <= num; i++) {
columnNames[i - 1] = md.getColumnName(i).replace("t.", "");
}
} catch (SQLException | ExecutionException e) {
log.error(e.getMessage(), e);
return null;
}
return columnNames;
}