跳到主要内容

ranger-admin policy 策略的缓存与锁

有一个现象, 修改 ranger policy 之后, 下一个 read policy 的 ranger api 请求耗时总是非常长, 容易导致依赖模块的 api 超时.

测试: 普通读取全量策略为0.2s, 修改policy后再读取, 耗时变为2s, 如果策略数多可能耗时更长.

curl -u admin:admin -H "Accept: application/json" -H "Content-Type: application/json"  -o /dev/null -w 'Total: %{time_total}s\n' -v -i -s -X GET http://1.1.1.1:6080/service/plugins/policies/service/name/hive

看了一下ranger的代码, 结合之前看过网易数帆的技术文档, 确认这就是ranger policy 缓存与锁导致的问题.

网易怎么解决这个问题呢? 看文档他们额外构建了一套自己的缓存体系, 所有读取操作都从缓存里获取, 并且从ranger定期同步. 写操作则使用ranger做为唯一操作对象, 避免多个服务状态不一致.

ranger 读取策略时判断 cache 版本

ranger查询service的策略列表, 有一个service id记录版本信息. 如果与缓存里的策略id相同, 则直接从缓存里读取; 如果不相同, 则清空缓存, 重新读取. 由于一个service(比如hive)的策略可能有好几十Mb, 因此一次全量重新读取可能会耗费几秒时间.


@GET
@Path("/policies/service/name/{name}")
@Produces({ "application/json" })
public RangerPolicyList getServicePoliciesByName(@PathParam("name") String serviceName,
@Context HttpServletRequest request) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> ServiceREST.getServicePolicies(" + serviceName + ")");
}

SearchFilter filter = searchUtil.getSearchFilter(request, policyService.sortFields);

RangerPolicyList ret = getServicePolicies(serviceName, filter);

if (LOG.isDebugEnabled()) {
LOG.debug("<== ServiceREST.getServicePolicies(" + serviceName + "): count="
+ (ret == null ? 0 : ret.getListSize()));
}

return ret;
}


List<RangerPolicy> servicePolicies = svcStore.getServicePolicies(serviceName, filter);

ServicePolicies servicePolicies = RangerServicePoliciesCache.getInstance().getServicePolicies(service.getName(), service.getId(), -1L, true, this);
    public ServicePolicies getServicePolicies(String serviceName, Long serviceId, Long lastKnownVersion, boolean needsBackwardCompatibility, ServiceStore serviceStore) throws Exception {

if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerServicePoliciesCache.getServicePolicies(" + serviceName + ", " + serviceId + ", " + lastKnownVersion + ", " + needsBackwardCompatibility + ")");
}

ServicePolicies ret = null;

if (StringUtils.isNotBlank(serviceName) && serviceId != null) {

ServicePoliciesWrapper servicePoliciesWrapper;

synchronized (this) {
servicePoliciesWrapper = servicePoliciesMap.get(serviceName);

if (servicePoliciesWrapper != null) {
if (!serviceId.equals(servicePoliciesWrapper.getServiceId())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Service [" + serviceName + "] changed service-id from " + servicePoliciesWrapper.getServiceId()
+ " to " + serviceId);
LOG.debug("Recreating servicePoliciesWrapper for serviceName [" + serviceName + "]");
}
servicePoliciesMap.remove(serviceName);
servicePoliciesWrapper = null;
}
}

if (servicePoliciesWrapper == null) {
servicePoliciesWrapper = new ServicePoliciesWrapper(serviceId);
servicePoliciesMap.put(serviceName, servicePoliciesWrapper);
}
}

if (serviceStore != null) {
ret = servicePoliciesWrapper.getLatestOrCached(serviceName, serviceStore, lastKnownVersion, needsBackwardCompatibility);
} else {
LOG.error("getServicePolicies(" + serviceName + "): failed to get latest policies as service-store is null! Returning cached servicePolicies!");
ret = servicePoliciesWrapper.getServicePolicies();
}
} else {
LOG.error("getServicePolicies() failed to get policies as serviceName is null or blank and/or serviceId is null!");
}

if (LOG.isDebugEnabled()) {
LOG.debug("<== RangerServicePoliciesCache.getServicePolicies(" + serviceName + ", " + serviceId + ", " + lastKnownVersion + ", " + needsBackwardCompatibility + "): ret:[" + ret + "]");
}

return ret;
}

全量读取策略加锁

估计因为一次全量读取策略消耗过多, 没有必要两个线程同时进行操作, 于是在读取的时候加了个锁.

lockResult = lock.tryLock(waitTimeInSeconds, TimeUnit.SECONDS);

ServicePolicies getLatestOrCached(String serviceName, ServiceStore serviceStore, Long lastKnownVersion, boolean needsBackwardCompatibility) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerServicePoliciesCache.getLatestOrCached(lastKnownVersion=" + lastKnownVersion + ", " + needsBackwardCompatibility + ")");
}
ServicePolicies ret = null;
boolean lockResult = false;
boolean doSaveToCache = false;

try {
final boolean isCacheReloadedByDQEvent;

lockResult = lock.tryLock(waitTimeInSeconds, TimeUnit.SECONDS);

if (lockResult) {
isCacheReloadedByDQEvent = getLatest(serviceName, serviceStore, lastKnownVersion);

if (this.servicePolicies != null) {
if (isCacheReloadedByDQEvent) {
if (LOG.isDebugEnabled()) {
LOG.debug("ServicePolicies cache was completely loaded from database because of a disqualifying event - such as service-definition change!");
}
}
if (!lastKnownVersion.equals(servicePolicies.getPolicyVersion()) || isCacheReloadedByDQEvent) {
doSaveToCache = true;
}

if (needsBackwardCompatibility || isCacheReloadedByDQEvent
|| lastKnownVersion == -1L || lastKnownVersion.equals(servicePolicies.getPolicyVersion())) {
// Looking for all policies, or Some disqualifying change encountered
if (LOG.isDebugEnabled()) {
LOG.debug("All policies were requested, returning cached ServicePolicies");
}
ret = this.servicePolicies;
} else {
boolean isDeltaCacheReinitialized = false;
ServicePolicies servicePoliciesForDeltas = this.deltaCache != null ? this.deltaCache.getServicePolicyDeltasFromVersion(lastKnownVersion) : null;

if (servicePoliciesForDeltas == null) {
servicePoliciesForDeltas = serviceStore.getServicePolicyDeltas(serviceName, lastKnownVersion, servicePolicies.getPolicyVersion());
isDeltaCacheReinitialized = true;
}
if (servicePoliciesForDeltas != null && servicePoliciesForDeltas.getPolicyDeltas() != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Deltas were requested. Returning deltas from lastKnownVersion:[" + lastKnownVersion + "]");
}
if (isDeltaCacheReinitialized) {
this.deltaCache = new ServicePolicyDeltasCache(lastKnownVersion, servicePoliciesForDeltas);
}
ret = servicePoliciesForDeltas;
} else {
LOG.warn("Deltas were requested for service:[" + serviceName + "], but could not get them!! lastKnownVersion:[" + lastKnownVersion + "]; Returning cached ServicePolicies:[" + (servicePolicies != null ? servicePolicies.getPolicyVersion() : -1L) + "]");

this.deltaCache = null;
ret = this.servicePolicies;
}
}
} else {
LOG.error("ServicePolicies object is null!");
}
} else {
LOG.error("Could not get lock in [" + waitTimeInSeconds + "] seconds, returning cached ServicePolicies and wait Queue Length:[" +lock.getQueueLength() + "], servicePolicies version:[" + (servicePolicies != null ? servicePolicies.getPolicyVersion() : -1L) + "]");
ret = this.servicePolicies;
doSaveToCache = true;
}
} catch (InterruptedException exception) {
LOG.error("getLatestOrCached:lock got interrupted..", exception);
} finally {
// Dump cached policies to disk
if (doSaveToCache) {
saveToCache(this.servicePolicies);
}
if (lockResult) {
lock.unlock();
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("RangerServicePoliciesCache.getLatestOrCached - Returns ServicePolicies:[" + ret +"]");
}

if (LOG.isDebugEnabled()) {
LOG.debug("<== RangerServicePoliciesCache.getLatestOrCached(lastKnownVersion=" + lastKnownVersion + ", " + needsBackwardCompatibility + ") : " + ret);
}
return ret;
}

ranger admin 的 policy 缓存

不只ranger pluin 使用cache json, ranger admin 在提供策略api的时候, 也是走文件缓存.


package org.apache.ranger.common;
public class RangerServicePoliciesCache {

public void saveToCache(ServicePolicies policies) {
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerServicePoliciesCache(serviceName=" + policies.getServiceName() + ").saveToCache()");
}
if (policies != null) {
RangerAdminConfig config = RangerAdminConfig.getInstance();
boolean doSaveToDisk = config.getBoolean("ranger.admin.policy.save.to.disk", false);

if (doSaveToDisk) {
File cacheFile = null;

String cacheDir = config.get("ranger.admin.policy.cache.dir");
if (cacheDir != null) {
String appId = policies.getServiceDef().getName();
String serviceName = policies.getServiceName();
String cacheFileName = String.format("%s_%s.json", appId, serviceName);

cacheFileName = cacheFileName.replace(File.separatorChar, '_');
cacheFileName = cacheFileName.replace(File.pathSeparatorChar, '_');
cacheFileName = cacheFileName + "_" + policies.getPolicyVersion();

// Create the cacheDir if it doesn't already exist
File cacheDirTmp = new File(cacheDir);
if (cacheDirTmp.exists()) {
cacheFile = new File(cacheDir + File.separator + cacheFileName);
} else {
try {
cacheDirTmp.mkdirs();
cacheFile = new File(cacheDir + File.separator + cacheFileName);
} catch (SecurityException ex) {
LOG.error("Cannot create cache directory", ex);
}
}
}

if (cacheFile != null) {
try (Writer writer = new FileWriter(cacheFile)) {
gson.toJson(policies, writer);
} catch (Exception excp) {
LOG.error("failed to save policies to cache file '" + cacheFile.getAbsolutePath() + "'", excp);
}
}
}
} else {
LOG.error("ServicePolicies is null object!!");
}

if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerServicePoliciesCache(serviceName=" + policies.getServiceName() + ").saveToCache()");
}
}
}

读取策略的函数细节

这些就太琐碎了, 没仔细看了.


boolean getLatest(String serviceName, ServiceStore serviceStore, Long lastKnownVersion) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("==> ServicePoliciesWrapper.getLatest(serviceName=" + serviceName + ", lastKnownVersion=" + lastKnownVersion + ")");
}

final Long servicePolicyVersionInDb = serviceStore.getServicePolicyVersion(serviceName);
final Long cachedServicePoliciesVersion = servicePolicies != null ? servicePolicies.getPolicyVersion() : -1L;

if (LOG.isDebugEnabled()) {
LOG.debug("ServicePolicies version in cache[" + cachedServicePoliciesVersion + "], ServicePolicies version in database[" + servicePolicyVersionInDb + "]");
}

boolean isCacheReloadedByDQEvent = false;

if (servicePolicyVersionInDb == null || !servicePolicyVersionInDb.equals(cachedServicePoliciesVersion)) {

if (LOG.isDebugEnabled()) {
LOG.debug("loading servicePolicies from database");
}

final long startTimeMs = System.currentTimeMillis();
final ServicePolicies servicePoliciesFromDb = serviceStore.getServicePolicyDeltasOrPolicies(serviceName, cachedServicePoliciesVersion);
final long dbLoadTime = System.currentTimeMillis() - startTimeMs;

if (dbLoadTime > longestDbLoadTimeInMs) {
longestDbLoadTimeInMs = dbLoadTime;
}

updateTime = new Date();

if (servicePoliciesFromDb != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("loading servicePolicies from database and it took:" + TimeUnit.MILLISECONDS.toSeconds(dbLoadTime) + " seconds");
}

if (dedupStrings) {
servicePoliciesFromDb.dedupStrings();
}

if (LOG.isDebugEnabled()) {
LOG.debug("Successfully loaded ServicePolicies from database: ServicePolicies:[" + servicePoliciesFromDb + "]");
}
if (servicePolicies == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing ServicePolicies cache for the first time");
}
servicePolicies = servicePoliciesFromDb;
pruneUnusedAttributes();
} else if (servicePoliciesFromDb.getPolicyDeltas() == null) {
// service-policies are loaded because service/service-def changed
if (LOG.isDebugEnabled()) {
LOG.debug("Complete set of policies are loaded from database, because of some disqualifying event");
}
servicePolicies = servicePoliciesFromDb;
pruneUnusedAttributes();
isCacheReloadedByDQEvent = true;
} else { // Previously cached service policies are still valid - no service/service-def change
// Rebuild policies cache from original policies and deltas
if (LOG.isDebugEnabled()) {
LOG.debug("Retrieved policy-deltas from database. These will be applied on top of ServicePolicy version:[" + cachedServicePoliciesVersion +"], policy-deltas:[" + servicePoliciesFromDb.getPolicyDeltas() + "]");
}

final List<RangerPolicy> policies = servicePolicies.getPolicies() == null ? new ArrayList<>() : servicePolicies.getPolicies();
final List<RangerPolicy> newPolicies = RangerPolicyDeltaUtil.applyDeltas(policies, servicePoliciesFromDb.getPolicyDeltas(), servicePolicies.getServiceDef().getName());

servicePolicies.setPolicies(newPolicies);
servicePolicies.setPolicyVersion(servicePoliciesFromDb.getPolicyVersion());

checkCacheSanity(serviceName, serviceStore, false);

// Rebuild tag-policies from original tag-policies and deltas
if (servicePoliciesFromDb.getTagPolicies() != null) {
String tagServiceName = servicePoliciesFromDb.getTagPolicies().getServiceName();
if (LOG.isDebugEnabled()) {
LOG.debug("This service has associated tag service:[" + tagServiceName + "]. Will compute tagPolicies from corresponding policy-deltas");
}

final List<RangerPolicy> tagPolicies = (servicePolicies.getTagPolicies() == null || CollectionUtils.isEmpty(servicePolicies.getTagPolicies().getPolicies())) ? new ArrayList<>() : servicePolicies.getTagPolicies().getPolicies();
final List<RangerPolicy> newTagPolicies = RangerPolicyDeltaUtil.applyDeltas(tagPolicies, servicePoliciesFromDb.getPolicyDeltas(), servicePoliciesFromDb.getTagPolicies().getServiceDef().getName());

servicePolicies.getTagPolicies().setPolicies(newTagPolicies);
servicePolicies.getTagPolicies().setPolicyVersion(servicePoliciesFromDb.getTagPolicies().getPolicyVersion());

checkCacheSanity(servicePoliciesFromDb.getTagPolicies().getServiceName(), serviceStore, true);

} else {
if (LOG.isDebugEnabled()) {
LOG.debug("This service has no associated tag service");
}
}
}
this.deltaCache = null;
} else {
LOG.error("Could not get policies from database, from-version:[" + cachedServicePoliciesVersion + ")");
}
if (LOG.isDebugEnabled()) {
LOG.debug("ServicePolicies old-version:[" + cachedServicePoliciesVersion + "], new-version:[" + servicePolicies.getPolicyVersion() + "]");
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("ServicePolicies Cache already has the latest version, version:[" + servicePolicies.getPolicyVersion() + "]");
}
}

if (LOG.isTraceEnabled()) {
LOG.trace("Latest Cached ServicePolicies:[" + servicePolicies +"]");
}

if (LOG.isDebugEnabled()) {
LOG.debug("<== ServicePoliciesWrapper.getLatest(serviceName=" + serviceName + ", lastKnownVersion=" + lastKnownVersion + ") : " + isCacheReloadedByDQEvent);
}
return isCacheReloadedByDQEvent;
}

private void checkCacheSanity(String serviceName, ServiceStore serviceStore, boolean isTagService) {
final boolean result;
Long dbPolicyVersion = serviceStore.getServicePolicyVersion(serviceName);
Long cachedPolicyVersion = isTagService ? servicePolicies.getTagPolicies().getPolicyVersion() : servicePolicies.getPolicyVersion();

result = Objects.equals(dbPolicyVersion, cachedPolicyVersion);

if (!result && cachedPolicyVersion != null && dbPolicyVersion != null && cachedPolicyVersion < dbPolicyVersion) {
LOG.info("checkCacheSanity(serviceName=" + serviceName + "): policy cache has a different version than one in the database. However, changes from " + cachedPolicyVersion + " to " + dbPolicyVersion + " will be downloaded in the next download. policyVersionInDB=" + dbPolicyVersion + ", policyVersionInCache=" + cachedPolicyVersion);
}
}
created at 2023-08-29