跳到主要内容

dolphin scheduler 代码乱看

2024-01-16

sql查询解析字段

挺有意思的写法, 从sql里判断查询的字段名称.

  • 首先使用alibaba的druid解析sql, 直接从sql语法里判断查询字段. 感觉挺繁琐的, 如果对druid和sql解析不了解, 压根写不出来
  • 如果语法解析失败, 则直接到数据库里查询, where语句是0 = 1也就是不会真实查询到具体数据, 但是从查询到metadata可以直接获取sql引擎解析到的结果header

算是工程实践里的又一个案例case.


package org.apache.dolphinscheduler.plugin.task.datax;


String[] columns = parsingSqlColumnNames(dataxTaskExecutionContext.getSourcetype(),
dataxTaskExecutionContext.getTargetType(),
dataSourceCfg, dataXParameters.getSql());

/**
* parsing synchronized column names in SQL statements
*
* @param sourceType the database type of the data source
* @param targetType the database type of the data target
* @param dataSourceCfg the database connection parameters of the data source
* @param sql sql for data synchronization
* @return Keyword converted column names
*/
private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg,
String sql) {
String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(sourceType, sql);

if (columnNames == null || columnNames.length == 0) {
log.info("try to execute sql analysis query column name");
columnNames = tryExecuteSqlResolveColumnNames(sourceType, dataSourceCfg, sql);
}

notNull(columnNames, String.format("parsing sql columns failed : %s", sql));

return DataxUtils.convertKeywordsColumns(targetType, columnNames);
}

使用alibaba druid框架解析sql, 不过解析后也没法直接得到查询的header栏, 还有很多细节需要去处理


package org.apache.dolphinscheduler.plugin.task.datax;

import org.apache.dolphinscheduler.spi.enums.DbType;

import com.alibaba.druid.sql.dialect.clickhouse.parser.ClickhouseStatementParser;
import com.alibaba.druid.sql.dialect.hive.parser.HiveStatementParser;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser;
import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser;
import com.alibaba.druid.sql.dialect.presto.parser.PrestoStatementParser;
import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser;
import com.alibaba.druid.sql.parser.SQLStatementParser;

public static SQLStatementParser getSqlStatementParser(DbType dbType, String sql) {
switch (dbType) {
case MYSQL:
return new MySqlStatementParser(sql);
case POSTGRESQL:
return new PGSQLStatementParser(sql);
case ORACLE:
return new OracleStatementParser(sql);
case SQLSERVER:
return new SQLServerStatementParser(sql);
case CLICKHOUSE:
return new ClickhouseStatementParser(sql);
case HIVE:
return new HiveStatementParser(sql);
case PRESTO:
return new PrestoStatementParser(sql);
default:
return null;
}
}

/**
* try grammatical parsing column
*
* @param dbType database type
* @param sql sql for data synchronization
* @return column name array
* @throws RuntimeException if error throws RuntimeException
*/
private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql) {
String[] columnNames;

try {
SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql);
if (parser == null) {
log.warn("database driver [{}] is not support grammatical analysis sql", dbType);
return new String[0];
}

SQLStatement sqlStatement = parser.parseStatement();
SQLSelectStatement sqlSelectStatement = (SQLSelectStatement) sqlStatement;
SQLSelect sqlSelect = sqlSelectStatement.getSelect();

List<SQLSelectItem> selectItemList = null;
if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) {
SQLSelectQueryBlock block = (SQLSelectQueryBlock) sqlSelect.getQuery();
selectItemList = block.getSelectList();
} else if (sqlSelect.getQuery() instanceof SQLUnionQuery) {
SQLUnionQuery unionQuery = (SQLUnionQuery) sqlSelect.getQuery();
SQLSelectQueryBlock block = (SQLSelectQueryBlock) unionQuery.getRight();
selectItemList = block.getSelectList();
}

notNull(selectItemList,
String.format("select query type [%s] is not support", sqlSelect.getQuery().toString()));

columnNames = new String[selectItemList.size()];
for (int i = 0; i < selectItemList.size(); i++) {
SQLSelectItem item = selectItemList.get(i);

String columnName = null;

if (item.getAlias() != null) {
columnName = item.getAlias();
} else if (item.getExpr() != null) {
if (item.getExpr() instanceof SQLPropertyExpr) {
SQLPropertyExpr expr = (SQLPropertyExpr) item.getExpr();
columnName = expr.getName();
} else if (item.getExpr() instanceof SQLIdentifierExpr) {
SQLIdentifierExpr expr = (SQLIdentifierExpr) item.getExpr();
columnName = expr.getName();
}
} else {
throw new RuntimeException(
String.format("grammatical analysis sql column [ %s ] failed", item));
}

if (SELECT_ALL_CHARACTER.equals(item.toString())) {
log.info("sql contains *, grammatical analysis failed");
return new String[0];
}

if (columnName == null) {
throw new RuntimeException(
String.format("grammatical analysis sql column [ %s ] failed", item));
}

columnNames[i] = columnName;
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
return new String[0];
}

return columnNames;
}

这种做法就比较像工程取巧的做法了, 使用sql到引擎里去查询, 从结果的metaData直接获取.

    /**
* try to execute sql to resolve column names
*
* @param baseDataSource the database connection parameters
* @param sql sql for data synchronization
* @return column name array
*/
public String[] tryExecuteSqlResolveColumnNames(DbType sourceType, BaseConnectionParam baseDataSource, String sql) {
String[] columnNames;
sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql);
sql = sql.replace(";", "");

try (
Connection connection =
DataSourceClientProvider.getAdHocConnection(sourceType, baseDataSource);
PreparedStatement stmt = connection.prepareStatement(sql);
ResultSet resultSet = stmt.executeQuery()) {

ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
columnNames = new String[num];
for (int i = 1; i <= num; i++) {
columnNames[i - 1] = md.getColumnName(i).replace("t.", "");
}
} catch (SQLException | ExecutionException e) {
log.error(e.getMessage(), e);
return null;
}

return columnNames;
}