kyuubi 鉴权与 ranger
kyuubi ranger
看了下kyuubi里自带的ranger 插件代码, 主要工作就是解析 spark 的执行计划 logical plan, 转化为 ranger 插件里权限请求request的参数, 比如补上需要访问哪些库表字段(database/table/column), 什么操作(select/update), 哪些用户操作(user information), 接下来按部就班调用ranger插件就完成了整条鉴权链路.
至于ranger admin里是否需要额外的策略条目, 看起来是不需要的, 直接使用hive的策略. 毕竟从原理上来讲, ranger上的sql 类型的策略, 属于哪个服务其实与真正的鉴权流程毫无关系. 对于hive plugin, 也是从ranger admin server里下载策略, 然后在本地完成鉴权. 对于kyuubi plugin, 并没有区别. 真正有用的是构建插件时候的元数据, 比如有哪些库, 库下有哪些表, 这些只要与hive共用一个metastore就可以了.
kyuubi官方鉴权介绍
Storage-based Authorization
由于kyuubi支持严格的用户识别, 存储引擎的鉴权体系可以直接生效, 比如hdfs ranger 策略, 也就是storage-based authorization.
不像hive, 很多大数据组件引擎很难被ranger sql管控到, 只能由hdfs存储策略提供兜底方法,
A so called Storage-based authorization mode is supported by Kyuubi by default. In this model, all objects, such as databases, tables, partitions, in meta layer are mapping to folders or files in the storage layer, as well as their permissions.
Storage-based authorization offers users with database, table and partition-level coarse-gained access control.
SQL-standard authorization with Ranger
This plugin enables Kyuubi with data and metadata control access ability for Spark SQL Engines, including,
- Column-level fine-grained authorization
- Row-level fine-grained authorization, a.k.a. Row-level filtering
- Data masking
kyuubi spark authz ranger插件
kyuubi spark authz 插件可以为所有spark sql任务提供 ranger 鉴权支持, 并不一定需要安装kyuubi组件. 不过由于鉴权的前提是认证, 识别用户身份, 因此ranger鉴权插件与kyuubi搭配更佳.
Kyuubi Spark Authz Plugin itself provides general purpose for ACL management for data & metadata while using Spark SQL. It is not necessary to deploy it with the Kyuubi server and engine, and can be used as an extension for any Spark SQL jobs. However, the authorization always requires a robust authentication layer and multi tenancy support, so Kyuubi is a perfect match.
插件的安装方法:
https://kyuubi.readthedocs.io/en/v1.6.0-incubating/security/authorization/spark/install.html
Settings for Spark Session Extensions
Add org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension to the spark configuration spark.sql.extensions.
spark.sql.extensions=org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension
kyuubi ranger 插件源码
以前看过spark atals lineage 插件, spark提供了listener插件接口方便读取每个spark执行阶段的信息, 解析spark引擎的重点是对spark执行计划logical plan进行解析得到血缘信息. 看了下kyuubi spark ranger的源码, 其实是相同的原理, 需要摸索熟悉spark执行计划才行.
https://github.com/apache/kyuubi/blob/master/extensions/spark/kyuubi-spark-authz/README.md
多种操作类型简化鉴权
多种操作类型简化鉴权, 简化为crud操作, 比如ALTERTABLE_RENAME简化为 alter 操作.
package org.apache.kyuubi.plugin.spark.authz.ranger
import org.apache.kyuubi.plugin.spark.authz.{PrivilegeObject, PrivilegeObjectActionType}
import org.apache.kyuubi.plugin.spark.authz.OperationType._
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectType._
object AccessType extends Enumeration {
type AccessType = Value
val NONE, CREATE, ALTER, DROP, SELECT, UPDATE, USE, READ, WRITE, ALL, ADMIN = Value
def apply(obj: PrivilegeObject, opType: OperationType, isInput: Boolean): AccessType = {
obj.actionType match {
case PrivilegeObjectActionType.OTHER => opType match {
case CREATEDATABASE if obj.privilegeObjectType == DATABASE => CREATE
case CREATEFUNCTION if obj.privilegeObjectType == FUNCTION => CREATE
case CREATETABLE | CREATEVIEW | CREATETABLE_AS_SELECT
if obj.privilegeObjectType == TABLE_OR_VIEW =>
if (isInput) SELECT else CREATE
case ALTERDATABASE |
ALTERDATABASE_LOCATION |
ALTERTABLE_ADDCOLS |
ALTERTABLE_ADDPARTS |
ALTERTABLE_DROPPARTS |
ALTERTABLE_LOCATION |
ALTERTABLE_RENAME |
ALTERTABLE_PROPERTIES |
ALTERTABLE_RENAMECOL |
ALTERTABLE_RENAMEPART |
ALTERTABLE_REPLACECOLS |
ALTERTABLE_SERDEPROPERTIES |
ALTERVIEW_RENAME |
MSCK => ALTER
case ALTERVIEW_AS => if (isInput) SELECT else ALTER
case DROPDATABASE | DROPTABLE | DROPFUNCTION | DROPVIEW => DROP
case LOAD => if (isInput) SELECT else UPDATE
case QUERY |
SHOW_CREATETABLE |
SHOW_TBLPROPERTIES |
SHOWPARTITIONS |
ANALYZE_TABLE => SELECT
case SHOWCOLUMNS | DESCTABLE => SELECT
case SHOWDATABASES |
SWITCHDATABASE |
DESCDATABASE |
SHOWTABLES |
SHOWFUNCTIONS |
DESCFUNCTION => USE
case TRUNCATETABLE => UPDATE
case _ => NONE
}
case PrivilegeObjectActionType.DELETE => DROP
case _ => UPDATE
}
}
}
buildQuery 构建鉴权操作
这块是解析的重点了, 构造出一个请求里需要的鉴权信息.
package org.apache.kyuubi.plugin.spark.authz
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.slf4j.LoggerFactory
import org.apache.kyuubi.plugin.spark.authz.OperationType.OperationType
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
import org.apache.kyuubi.plugin.spark.authz.serde._
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
import org.apache.kyuubi.util.reflect.ReflectUtils._
object PrivilegesBuilder {
final private val LOG = LoggerFactory.getLogger(getClass)
private def collectLeaves(expr: Expression): Seq[NamedExpression] = {
expr.collect { case p: NamedExpression if p.children.isEmpty => p }
}
private def setCurrentDBIfNecessary(
tableIdent: Table,
spark: SparkSession): Table = {
if (tableIdent.database.isEmpty) {
tableIdent.copy(database = Some(spark.catalog.currentDatabase))
} else {
tableIdent
}
}
/**
* Build PrivilegeObjects from Spark LogicalPlan
*
* @param plan a Spark LogicalPlan used to generate SparkPrivilegeObjects
* @param privilegeObjects input or output spark privilege object list
* @param projectionList Projection list after pruning
*/
def buildQuery(
plan: LogicalPlan,
privilegeObjects: ArrayBuffer[PrivilegeObject],
projectionList: Seq[NamedExpression] = Nil,
conditionList: Seq[NamedExpression] = Nil,
spark: SparkSession): Unit = {
def mergeProjection(table: Table, plan: LogicalPlan): Unit = {
if (projectionList.isEmpty) {
privilegeObjects += PrivilegeObject(table, plan.output.map(_.name))
} else {
val cols = (projectionList ++ conditionList).flatMap(collectLeaves)
.filter(plan.outputSet.contains).map(_.name).distinct
privilegeObjects += PrivilegeObject(table, cols)
}
}
plan match {
case p: Project => buildQuery(p.child, privilegeObjects, p.projectList, conditionList, spark)
case j: Join =>
val cols =
conditionList ++ j.condition.map(expr => collectLeaves(expr)).getOrElse(Nil)
buildQuery(j.left, privilegeObjects, projectionList, cols, spark)
buildQuery(j.right, privilegeObjects, projectionList, cols, spark)
case f: Filter =>
val cols = conditionList ++ collectLeaves(f.condition)
buildQuery(f.child, privilegeObjects, projectionList, cols, spark)
case w: Window =>
val orderCols = w.orderSpec.flatMap(orderSpec => collectLeaves(orderSpec))
val partitionCols = w.partitionSpec.flatMap(partitionSpec => collectLeaves(partitionSpec))
val cols = conditionList ++ orderCols ++ partitionCols
buildQuery(w.child, privilegeObjects, projectionList, cols, spark)
case s: Sort =>
val sortCols = s.order.flatMap(sortOrder => collectLeaves(sortOrder))
val cols = conditionList ++ sortCols
buildQuery(s.child, privilegeObjects, projectionList, cols, spark)
case a: Aggregate =>
val aggCols =
(a.aggregateExpressions ++ a.groupingExpressions).flatMap(e => collectLeaves(e))
val cols = conditionList ++ aggCols
buildQuery(a.child, privilegeObjects, projectionList, cols, spark)
case scan if isKnownScan(scan) && scan.resolved =>
getScanSpec(scan).tables(scan, spark).foreach(mergeProjection(_, scan))
case u if u.nodeName == "UnresolvedRelation" =>
val parts = invokeAs[String](u, "tableName").split("\\.")
val db = quote(parts.init)
val table = Table(None, Some(db), parts.last, None)
privilegeObjects += PrivilegeObject(table)
case p =>
for (child <- p.children) {
buildQuery(child, privilegeObjects, projectionList, conditionList, spark)
}
}
}
/**
* Build PrivilegeObjects from Spark LogicalPlan
* @param plan a Spark LogicalPlan used to generate Spark PrivilegeObjects
* @param inputObjs input privilege object list
* @param outputObjs output privilege object list
*/
private def buildCommand(
plan: LogicalPlan,
inputObjs: ArrayBuffer[PrivilegeObject],
outputObjs: ArrayBuffer[PrivilegeObject],
spark: SparkSession): OperationType = {
def getTablePriv(tableDesc: TableDesc): Seq[PrivilegeObject] = {
try {
val maybeTable = tableDesc.extract(plan, spark)
maybeTable match {
case Some(table) =>
val newTable = if (tableDesc.setCurrentDatabaseIfMissing) {
setCurrentDBIfNecessary(table, spark)
} else {
table
}
if (tableDesc.tableTypeDesc.exists(_.skip(plan))) {
Nil
} else {
val actionType = tableDesc.actionTypeDesc.map(_.extract(plan)).getOrElse(OTHER)
val columnNames = tableDesc.columnDesc.map(_.extract(plan)).getOrElse(Nil)
Seq(PrivilegeObject(newTable, columnNames, actionType))
}
case None => Nil
}
} catch {
case e: Exception =>
LOG.debug(tableDesc.error(plan, e))
Nil
}
}
plan.getClass.getName match {
case classname if DB_COMMAND_SPECS.contains(classname) =>
val desc = DB_COMMAND_SPECS(classname)
desc.databaseDescs.foreach { databaseDesc =>
try {
val database = databaseDesc.extract(plan)
if (databaseDesc.isInput) {
inputObjs += PrivilegeObject(database)
} else {
outputObjs += PrivilegeObject(database)
}
} catch {
case e: Exception =>
LOG.debug(databaseDesc.error(plan, e))
}
}
desc.operationType
case classname if TABLE_COMMAND_SPECS.contains(classname) =>
val spec = TABLE_COMMAND_SPECS(classname)
spec.tableDescs.foreach { td =>
if (td.isInput) {
inputObjs ++= getTablePriv(td)
} else {
outputObjs ++= getTablePriv(td)
}
}
spec.queries(plan).foreach(buildQuery(_, inputObjs, spark = spark))
spec.operationType
case classname if FUNCTION_COMMAND_SPECS.contains(classname) =>
val spec = FUNCTION_COMMAND_SPECS(classname)
spec.functionDescs.foreach { fd =>
try {
val function = fd.extract(plan)
if (!fd.functionTypeDesc.exists(_.skip(plan, spark))) {
if (fd.isInput) {
inputObjs += PrivilegeObject(function)
} else {
outputObjs += PrivilegeObject(function)
}
}
} catch {
case e: Exception =>
LOG.debug(fd.error(plan, e))
}
}
spec.operationType
case _ => OperationType.QUERY
}
}
type PrivilegesAndOpType = (Seq[PrivilegeObject], Seq[PrivilegeObject], OperationType)
/**
* Build input privilege objects from a Spark's LogicalPlan for hive permanent udf
*
* @param plan A Spark LogicalPlan
*/
def buildFunctions(
plan: LogicalPlan,
spark: SparkSession): PrivilegesAndOpType = {
val inputObjs = new ArrayBuffer[PrivilegeObject]
plan match {
case command: Command if isKnownTableCommand(command) =>
val spec = getTableCommandSpec(command)
val functionPrivAndOpType = spec.queries(plan)
.map(plan => buildFunctions(plan, spark))
functionPrivAndOpType.map(_._1)
.reduce(_ ++ _)
.foreach(functionPriv => inputObjs += functionPriv)
case plan => plan transformAllExpressions {
case hiveFunction: Expression if isKnownFunction(hiveFunction) =>
val functionSpec: ScanSpec = getFunctionSpec(hiveFunction)
if (functionSpec.functionDescs
.exists(!_.functionTypeDesc.get.skip(hiveFunction, spark))) {
functionSpec.functions(hiveFunction).foreach(func =>
inputObjs += PrivilegeObject(func))
}
hiveFunction
}
}
(inputObjs, Seq.empty, OperationType.QUERY)
}
/**
* Build input and output privilege objects from a Spark's LogicalPlan
*
* For `Command`s, build outputs if it has an target to write, build inputs for the
* inside query if exists.
*
* For other queries, build inputs.
*
* @param plan A Spark LogicalPlan
*/
def build(
plan: LogicalPlan,
spark: SparkSession): PrivilegesAndOpType = {
val inputObjs = new ArrayBuffer[PrivilegeObject]
val outputObjs = new ArrayBuffer[PrivilegeObject]
val opType = plan match {
// RunnableCommand
case cmd: Command => buildCommand(cmd, inputObjs, outputObjs, spark)
// Queries
case _ =>
buildQuery(plan, inputObjs, spark = spark)
OperationType.QUERY
}
(inputObjs, outputObjs, opType)
}
}
kyuubi ranger 脱敏操作
package org.apache.kyuubi.plugin.spark.authz.ranger
def getFilterExpr(req: AccessRequest): Option[String] = {
val result = evalRowFilterPolicies(req, null)
Option(result)
.filter(_.isRowFilterEnabled)
.map(_.getFilterExpr)
.filter(fe => fe != null && fe.nonEmpty)
}
def getMaskingExpr(req: AccessRequest): Option[String] = {
val col = req.getResource.asInstanceOf[AccessResource].getColumn
val result = evalDataMaskPolicies(req, null)
Option(result).filter(_.isMaskEnabled).map { res =>
if ("MASK_NULL".equalsIgnoreCase(res.getMaskType)) {
"NULL"
} else if ("CUSTOM".equalsIgnoreCase(result.getMaskType)) {
val maskVal = res.getMaskedValue
if (maskVal == null) {
"NULL"
} else {
s"${maskVal.replace("{col}", col)}"
}
} else if (result.getMaskTypeDef != null) {
result.getMaskTypeDef.getName match {
case "MASK" => regexp_replace(col)
case "MASK_SHOW_FIRST_4" if isSparkV31OrGreater =>
regexp_replace(col, hasLen = true)
case "MASK_SHOW_FIRST_4" =>
val right = regexp_replace(s"substr($col, 5)")
s"concat(substr($col, 0, 4), $right)"
case "MASK_SHOW_LAST_4" =>
val left = regexp_replace(s"left($col, length($col) - 4)")
s"concat($left, right($col, 4))"
case "MASK_HASH" => s"md5(cast($col as string))"
case "MASK_DATE_SHOW_YEAR" => s"date_trunc('YEAR', $col)"
case _ => result.getMaskTypeDef.getTransformer match {
case transformer if transformer != null && transformer.nonEmpty =>
s"${transformer.replace("{col}", col)}"
case _ => null
}
}
} else {
null
}
}
}
private def regexp_replace(expr: String, hasLen: Boolean = false): String = {
val pos = if (hasLen) ", 5" else ""
val upper = s"regexp_replace($expr, '[A-Z]', 'X'$pos)"
val lower = s"regexp_replace($upper, '[a-z]', 'x'$pos)"
val digits = s"regexp_replace($lower, '[0-9]', 'n'$pos)"
val other = s"regexp_replace($digits, '[^A-Za-z0-9]', 'U'$pos)"
other
}