跳到主要内容

ranger impala 授权鉴权

看了下impala源码, 自带了对ranger的支持, 在执行sql的时候可以调用ranger的鉴权逻辑, 并不需要通过hive server进行ranger代理触发. 看起来impala plugin用的还是hive的policy, 因此估计可以一份ranger policy配置, hive与impala同时生效.

imapla 原生权限

impala内部也有自带的授权策略, 保存在hdfs文件中, 每5分钟同步一次. impala也可以直接使用ranger进行权限管控.

impala 原生权限支持 commit

在通过 impala catalog api 访问的时候, 会去请求用户的权限.

https://github.com/apache/impala/commit/7ac88e1fa9473ee3920d0159bdd426be41e6abfb

IMPALA-400: Add support for SQL statement authorization

This changes adds support for SQL statement authorization in Impala. The authorization works by updating the Catalog API to require a User + Privilege when getting Table/Db objects (and in the future can be extended to cover columns as well). If the user doesn't have permission to access the object, an AuthorizationException is thrown. The authorization checks are done during analysis as new Catalog objects are encountered.

These changes build on top of the Hive Access code which handles the actually processing of authorization requests. The authorization is currently based on a "policy file" which will be stored in HDFS. This policy file is read once on startup and then reloaded every 5 minutes. It can also be reloaded on a specific impalad by executing a "refresh" command.

Authorization is enabled by setting: --server_name='server1'

and then pointing the impalad to the policy file using the flag: --authorization_policy_file=/path/to/policy/file

impala ranger权限

https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/authorization/ranger/RangerImpalaPlugin.java


package org.apache.impala.authorization.ranger;


/**
* An implementation of Ranger Impala plugin. Make this a singleton since each process
* should have only one ranger plugin instance. Impalad and Catalogd already satisfy this
* requirement. The main purpose is to avoid test classes that has embedded catalogd and
* frontend create multiple ranger plugins.
*/
public class RangerImpalaPlugin extends RangerBasePlugin {
private static volatile RangerImpalaPlugin INSTANCE = null;
private static String SERVICE_TYPE = null;
private static String APP_ID = null;
private static boolean BLOCK_UPDATE_IF_TABLE_MASK_SPECIFIED = true;

private RangerImpalaPlugin(String serviceType, String appId) {
super(serviceType, appId);
}

@Override
public void init() {
super.init();
RangerImpalaPlugin.BLOCK_UPDATE_IF_TABLE_MASK_SPECIFIED = getConfig().getBoolean(
RangerHadoopConstants
.HIVE_BLOCK_UPDATE_IF_ROWFILTER_COLUMNMASK_SPECIFIED_PROP,
RangerHadoopConstants
.HIVE_BLOCK_UPDATE_IF_ROWFILTER_COLUMNMASK_SPECIFIED_DEFAULT_VALUE);
}

impala ranger grant privilege

impala 里支持 grant privilege to role, 其实调用的是 ranger 创建 policy 的 api. imapla 还支持add group to role, add user to role, 这些竟然都是调用 ranger api.ranger支持这么多 sql 类授权操作了?

刚去看了下ranger hive plugin, 发现其实也都支持, 原来只是我没去看hive源码, 大惊小怪了...

https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/authorization/ranger/RangerImpaladAuthorizationManager.java

package org.apache.impala.authorization.ranger;

/**
* An implementation of {@link AuthorizationManager} for Catalogd using Ranger.
*
* Operations here make requests to Ranger via the {@link RangerImpalaPlugin} to
* manage privileges for users.
*
* Operations not supported by Ranger will throw an {@link UnsupportedFeatureException}.
*/
public class RangerCatalogdAuthorizationManager implements AuthorizationManager {


@Override
public void createRole(User requestingUser, TCreateDropRoleParams params,
TDdlExecResponse response) throws ImpalaException {
RangerRole role = new RangerRole();
role.setName(params.getRole_name());
role.setCreatedByUser(requestingUser.getShortName());

try {
plugin_.get().createRole(role, /*resultProcessor*/ null);
} catch (Exception e) {
LOG.error("Error creating role {} by user {} in Ranger.", params.getRole_name(),
requestingUser.getShortName());
throw new InternalException("Error creating role " + params.getRole_name() +
" by user " + requestingUser.getShortName() +
" in Ranger. Ranger error message: " + e.getMessage());
}
// Update the authorization refresh marker so that the Impalads can refresh their
// Ranger caches.
refreshAuthorization(response);
}


@Override
public void grantPrivilegeToRole(TCatalogServiceRequestHeader header,
TGrantRevokePrivParams params, TDdlExecResponse response) throws ImpalaException {
List<GrantRevokeRequest> requests = createGrantRevokeRequests(
new User(header.getRequesting_user()).getShortName(), /*isGrant*/ true,
/*user*/ null, Collections.emptyList(),
Collections.singletonList(params.getPrincipal_name()),
plugin_.get().getClusterName(), header.getClient_ip(), params.getPrivileges());

grantPrivilege(requests, header.getRedacted_sql_stmt(), header.getClient_ip());
// Update the authorization refresh marker so that the Impalads can refresh their
// Ranger caches.
refreshAuthorization(response);
}


@VisibleForTesting
public void grantPrivilege(List<GrantRevokeRequest> requests, String sqlStmt,
String clientIp) throws ImpalaException {
try {
for (GrantRevokeRequest request : requests) {
try (AutoFlush auditHandler = RangerBufferAuditHandler.autoFlush(sqlStmt,
plugin_.get().getClusterName(), clientIp)) {
plugin_.get().grantAccess(request, auditHandler);
}
}
} catch (Exception e) {
LOG.error("Error granting a privilege in Ranger: ", e);
throw new InternalException("Error granting a privilege in Ranger. " +
"Ranger error message: " + e.getMessage());
}
}

}

impala 通过ranger判断权限

impala的鉴权逻辑, 主要工作通过补全元数据信息构建ranger request所需要的参数, 然后调用ranger的请求, 在本地plugin完成鉴权.


/**
* Returns true if the given user has permission to execute the given
* request, false otherwise. Always returns true if authorization is disabled or the
* given user is an admin user.
*/
@Override
public boolean hasAccess(User user, PrivilegeRequest request) throws InternalException {
// We don't want to do an audit log or profile events logged here. This method is used
// by "show databases", "show tables", "describe" to filter out unauthorized database,
// table, or column names.
return hasAccess(createAuthorizationContext(false /*no audit log*/,
null /*no SQL statement*/, null /*no session state*/,
Optional.empty()), user, request);
}

private boolean hasAccess(AuthorizationContext authzCtx, User user,
PrivilegeRequest request) throws InternalException {
Preconditions.checkNotNull(user);
Preconditions.checkNotNull(request);

// If authorization is not enabled the user will always have access. If this is
// an internal request, the user will always have permission.
if (!config_.isEnabled() || user instanceof ImpalaInternalAdminUser) {
return true;
}
return authorizeResource(authzCtx, user, request);
}



@Override
protected boolean authorizeResource(AuthorizationContext authzCtx, User user,
PrivilegeRequest request) throws InternalException {
Preconditions.checkArgument(authzCtx instanceof RangerAuthorizationContext);
Preconditions.checkNotNull(user);
Preconditions.checkNotNull(request);
RangerAuthorizationContext rangerAuthzCtx = (RangerAuthorizationContext) authzCtx;
List<RangerAccessResourceImpl> resources = new ArrayList<>();
Authorizable authorizable = request.getAuthorizable();
Privilege privilege = request.getPrivilege();
switch (authorizable.getType()) {
case SERVER:
// Hive service definition does not have a concept of server. So we define
// server to mean all access to all resource sets.
resources.add(new RangerImpalaResourceBuilder()
.database("*").table("*").column("*").build());
resources.add(new RangerImpalaResourceBuilder()
.database("*").function("*").build());
resources.add(new RangerImpalaResourceBuilder().uri("*").build());
if (privilege == Privilege.ALL || privilege == Privilege.OWNER ||
privilege == Privilege.RWSTORAGE) {
resources.add(new RangerImpalaResourceBuilder()
.storageType("*").storageUri("*").build());
}
break;
case DB:
resources.add(new RangerImpalaResourceBuilder()
.database(authorizable.getDbName())
.owner(authorizable.getOwnerUser())
.build());
break;
case TABLE:
resources.add(new RangerImpalaResourceBuilder()
.database(authorizable.getDbName())
.table(authorizable.getTableName())
.owner(authorizable.getOwnerUser())
.build());
break;
case COLUMN:
RangerImpalaResourceBuilder builder = new RangerImpalaResourceBuilder();
builder.database(authorizable.getDbName());
// * in Ranger means "all". For example, to check access for all columns, we need
// to create a request, such as:
// [server=server1, database=foo, table=bar, column=*]
//
// "Any" column access is special in Ranger. For example if we want to check if
// we have access to "any" column on a particular table, we need to build a
// request without the column defined in the resource and use a special
// ANY_ACCESS access type.
//
// For example any column in foo.bar table:
// access type: RangerPolicyEngine.ANY_ACCESS
// resources: [server=server1, database=foo, table=bar]
if (privilege != Privilege.ANY ||
!DefaultAuthorizableFactory.ALL.equals(authorizable.getTableName())) {
builder.table(authorizable.getTableName());
}
if (privilege != Privilege.ANY ||
!DefaultAuthorizableFactory.ALL.equals(authorizable.getColumnName())) {
builder.column(authorizable.getColumnName());
}
builder.owner(authorizable.getOwnerUser());
resources.add(builder.build());
break;
case FUNCTION:
resources.add(new RangerImpalaResourceBuilder()
.database(authorizable.getDbName())
.function(authorizable.getFnName())
.build());
break;
case URI:
resources.add(new RangerImpalaResourceBuilder()
.uri(authorizable.getName())
.build());
break;
case STORAGEHANDLER_URI:
resources.add(new RangerImpalaResourceBuilder()
.storageType(authorizable.getStorageType())
.storageUri(authorizable.getStorageUri())
.build());
break;
default:
throw new IllegalArgumentException(String.format("Invalid authorizable type: %s",
authorizable.getType()));
}

for (RangerAccessResourceImpl resource: resources) {
if (privilege == Privilege.ANY) {
if (!authorizeResource(rangerAuthzCtx, user, resource, authorizable, privilege,
rangerAuthzCtx.getAuditHandler())) {
return false;
}
} else {
boolean authorized = privilege.hasAnyOf() ?
authorizeAny(rangerAuthzCtx, resource, authorizable, user, privilege) :
authorizeAll(rangerAuthzCtx, resource, authorizable, user, privilege);
if (!authorized) {
return false;
}
}
}
return true;
}

权限判断完, 提交ranger audit信息



@Override
public void postAuthorize(AuthorizationContext authzCtx, boolean authzOk,
boolean analysisOk) {
Preconditions.checkArgument(authzCtx instanceof RangerAuthorizationContext);
super.postAuthorize(authzCtx, authzOk, analysisOk);
// Consolidate the audit log entries and apply the deduplicated column masking events
// to update the List of all AuthzAuditEvent's only if the authorization is
// successful.
if (authzOk) {
// We consolidate the audit log entries before invoking
// applyDeduplicatedAuthzEvents() where we add the deduplicated table
// masking-related log entries to 'auditHandler' because currently a Ranger column
// masking policy can be applied to only one single column in a table. Thus, we do
// not need to combine log entries produced due to column masking policies.
((RangerAuthorizationContext) authzCtx).consolidateAuthzEvents();
((RangerAuthorizationContext) authzCtx).applyDeduplicatedAuthzEvents();
}
RangerBufferAuditHandler auditHandler =
((RangerAuthorizationContext) authzCtx).getAuditHandler();
if (authzOk && !analysisOk) {
// When the query was authorized, we do not send any audit log entry to the Ranger
// server if there was an AnalysisException during query analysis.
// We still have to call clear() to remove audit log entries in this case because
// the current test framework checks the contents in auditHandler.getAuthzEvents()
// to determine whether the correct audit events are collected.
auditHandler.getAuthzEvents().clear();
} else {
// We send audit log entries to the Ranger server only if authorization failed or
// analysis succeeded.
auditHandler.flush();
}
}

imapla 权限判断的主要工作在于补全参数

imapla 权限判断的主要工作在于补全参数, 跟着调用链路看一遍就知道了.

不过这些鉴权函数的命名有点问题, authorizeTableAccess 这类名字还以为是提供授权的操作, 其实是判断鉴权的操作, 刚看到的时候一脸懵逼.

package org.apache.impala.authorization.ranger;

public class RangerAuthorizationChecker extends BaseAuthorizationChecker {

@Override
protected void authorizeTableAccess(AuthorizationContext authzCtx,
AnalysisResult analysisResult, FeCatalog catalog, List<PrivilegeRequest> requests)
throws AuthorizationException, InternalException {
RangerAuthorizationContext originalCtx = (RangerAuthorizationContext) authzCtx;
RangerBufferAuditHandler originalAuditHandler = originalCtx.getAuditHandler();
// case 1: table (select) OK, columns (select) OK --> add the table event,
// add the column events
// case 2: table (non-select) ERROR --> add the table event
// case 3: table (select) ERROR, columns (select) OK -> only add the column events
// case 4: table (select) ERROR, columns (select) ERROR --> add the table event
// case 5: table (select) ERROR --> add the table event
// This could happen when the select request for a non-existing table fails
// the authorization.
// case 6: table (select) OK, columns (select) ERROR --> add the first column event
// This could happen when the requesting user is granted the select privilege
// on the table but is denied access to a column in the same table in Ranger.
RangerAuthorizationContext tmpCtx = new RangerAuthorizationContext(
originalCtx.getSessionState(), originalCtx.getTimeline());
tmpCtx.setAuditHandler(new RangerBufferAuditHandler(originalAuditHandler));
AuthorizationException authorizationException = null;
try {
super.authorizeTableAccess(tmpCtx, analysisResult, catalog, requests);
} catch (AuthorizationException e) {
authorizationException = e;
tmpCtx.getAuditHandler().getAuthzEvents().stream()
.filter(evt ->
// case 2: get the first failing non-select table
(!"select".equalsIgnoreCase(evt.getAccessType()) &&
"@table".equals(evt.getResourceType())) ||
// case 4 & 5 & 6: get the table or a column event
(("@table".equals(evt.getResourceType()) ||
"@column".equals(evt.getResourceType())) &&
evt.getAccessResult() == 0))
.findFirst()
.ifPresent(evt -> originalCtx.getAuditHandler().getAuthzEvents().add(evt));
throw e;
} finally {
// We should not add successful events when there was an AuthorizationException.
// Specifically, we should not add the successful table event in case 6.
if (authorizationException == null) {
// case 1 & 3: we only add the successful events.
// TODO(IMPALA-11381): Consider whether we should keep the table-level event which
// corresponds to a check only for short-circuiting authorization.
List<AuthzAuditEvent> events = tmpCtx.getAuditHandler().getAuthzEvents().stream()
.filter(evt -> evt.getAccessResult() != 0)
.collect(Collectors.toList());
originalCtx.getAuditHandler().getAuthzEvents().addAll(events);
}
}
}
}


/**
* A base class for the {@link AuthorizationChecker}.
*/
public abstract class BaseAuthorizationChecker implements AuthorizationChecker {

/**
* Authorize a list of privilege requests associated with a single table.
* It checks if the user has sufficient table-level privileges and if that is
* not the case, it falls back on checking column-level privileges, if any. This
* function requires 'SELECT' requests to be ordered by table and then by column
* privilege requests. Throws an AuthorizationException if the user doesn't have
* sufficient privileges.
*/
protected void authorizeTableAccess(AuthorizationContext authzCtx,
AnalysisResult analysisResult, FeCatalog catalog, List<PrivilegeRequest> requests)
throws AuthorizationException, InternalException {
Preconditions.checkArgument(!requests.isEmpty());
Analyzer analyzer = analysisResult.getAnalyzer();
// Deny access of columns containing column masking policies when column masking
// feature is disabled. Deny access of the tables containing row filtering policies
// when row filtering feature is disabled. This is to prevent data leak since we do
// not want Impala to show any information when these features are disabled.
authorizeRowFilterAndColumnMask(analyzer.getUser(), requests);

boolean hasTableSelectPriv = true;
boolean hasColumnSelectPriv = false;
for (PrivilegeRequest request: requests) {
if (request.getAuthorizable().getType() == Authorizable.Type.TABLE) {
try {
authorizePrivilegeRequest(authzCtx, analysisResult, catalog, request);
} catch (AuthorizationException e) {
// Authorization fails if we fail to authorize any table-level request that is
// not a SELECT privilege (e.g. INSERT).
if (request.getPrivilege() != Privilege.SELECT) throw e;
hasTableSelectPriv = false;
}
} else {
Preconditions.checkState(
request.getAuthorizable().getType() == Authorizable.Type.COLUMN);
// In order to support deny policies on columns
if (hasTableSelectPriv &&
request.getPrivilege() != Privilege.SELECT &&
request.getPrivilege() != Privilege.INSERT) {
continue;
}
if (hasAccess(authzCtx, analyzer.getUser(), request)) {
hasColumnSelectPriv = true;
continue;
}
// Make sure we don't reveal any column names in the error message.
throw new AuthorizationException(String.format("User '%s' does not have " +
"privileges to execute '%s' on: %s", analyzer.getUser().getName(),
request.getPrivilege().toString(),
request.getAuthorizable().getFullTableName()));
}
}
if (!hasTableSelectPriv && !hasColumnSelectPriv) {
throw new AuthorizationException(String.format("User '%s' does not have " +
"privileges to execute 'SELECT' on: %s", analyzer.getUser().getName(),
requests.get(0).getAuthorizable().getFullTableName()));
}
}

}


/**
* Authorize a privilege request.
* Throws an AuthorizationException if the user doesn't have sufficient privileges for
* this request. Also, checks if the request references a system database.
*/
private void authorizePrivilegeRequest(AuthorizationContext authzCtx,
AnalysisResult analysisResult, FeCatalog catalog, PrivilegeRequest request)
throws AuthorizationException, InternalException {
Preconditions.checkNotNull(request);
String dbName = null;
if (request.getAuthorizable() != null) {
dbName = request.getAuthorizable().getDbName();
}
// If this is a system database, some actions should always be allowed
// or disabled, regardless of what is in the auth policy.
if (dbName != null && checkSystemDbAccess(catalog, dbName, request.getPrivilege())) {
return;
}
// Populate column names to check column masking policies in blocking updates.
if (config_.isEnabled() && request.getAuthorizable() != null
&& request.getAuthorizable().getType() == Type.TABLE) {
Preconditions.checkNotNull(dbName);
AuthorizableTable authorizableTable = (AuthorizableTable) request.getAuthorizable();
FeDb db = catalog.getDb(dbName);
if (db != null) {
// 'db', 'table' could be null for an unresolved table ref. 'table' could be
// null for target table of a CTAS statement. Don't need to populate column
// names in such cases since no column masking policies will be checked.
FeTable table = db.getTable(authorizableTable.getTableName());
if (table != null && !(table instanceof FeIncompleteTable)) {
authorizableTable.setColumns(table.getColumnNames());
}
}
}
checkAccess(authzCtx, analysisResult.getAnalyzer().getUser(), request);
}

转了一圈, 其实最后还是通过调用hasAccess,最终调用authorizeResource



/**
* Returns true if the given user has permission to execute the given
* request, false otherwise. Always returns true if authorization is disabled or the
* given user is an admin user.
*/
@Override
public boolean hasAccess(User user, PrivilegeRequest request) throws InternalException {
// We don't want to do an audit log or profile events logged here. This method is used
// by "show databases", "show tables", "describe" to filter out unauthorized database,
// table, or column names.
return hasAccess(createAuthorizationContext(false /*no audit log*/,
null /*no SQL statement*/, null /*no session state*/,
Optional.empty()), user, request);
}

private boolean hasAccess(AuthorizationContext authzCtx, User user,
PrivilegeRequest request) throws InternalException {
Preconditions.checkNotNull(user);
Preconditions.checkNotNull(request);

// If authorization is not enabled the user will always have access. If this is
// an internal request, the user will always have permission.
if (!config_.isEnabled() || user instanceof ImpalaInternalAdminUser) {
return true;
}
return authorizeResource(authzCtx, user, request);
}