Skip to main content

ranger-download-cache

ranger plugin 组件里的ranger插件初始化的时候, init方法就会启动线程定时从ranger admin server里下载policy策略.

policy refresher的启动

bash plugin


package org.apache.ranger.plugin.service;

public class RangerBasePlugin {

public void init() {
cleanup();

AuditProviderFactory providerFactory = AuditProviderFactory.getInstance();

if (!providerFactory.isInitDone()) {
if (pluginConfig.getProperties() != null) {
providerFactory.init(pluginConfig.getProperties(), getAppId());
} else {
LOG.error("Audit subsystem is not initialized correctly. Please check audit configuration. ");
LOG.error("No authorization audits will be generated. ");
}
}

if (!pluginConfig.getPolicyEngineOptions().disablePolicyRefresher) {
refresher = new PolicyRefresher(this);
LOG.info("Created PolicyRefresher Thread(" + refresher.getName() + ")");
refresher.setDaemon(true);
refresher.startRefresher();
}

for (RangerChainedPlugin chainedPlugin : chainedPlugins) {
chainedPlugin.init();
}
}
}

线程通讯定期同步下载cache


package org.apache.ranger.plugin.util;

public final class DownloadTrigger {
private boolean isNotified = false;

public synchronized void waitForCompletion() throws InterruptedException {
while (!isNotified) {
wait();
}
isNotified = false;
}

public synchronized void signalCompletion() {
isNotified = true;
notifyAll();
}
}

package org.apache.ranger.plugin.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;

public final class DownloaderTask extends TimerTask {
private static final Logger LOG = LoggerFactory.getLogger(DownloaderTask.class);

private final DownloadTrigger timerTrigger = new DownloadTrigger();
private final BlockingQueue<DownloadTrigger> queue;

public DownloaderTask(BlockingQueue<DownloadTrigger> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
queue.put(timerTrigger);
timerTrigger.waitForCompletion();
} catch (InterruptedException excp) {
LOG.error("Caught exception. Exiting thread");
}
}
}
  • policy refresher

public void startRefresher() {
loadRoles();
loadPolicy();

super.start();

policyDownloadTimer = new Timer("policyDownloadTimer", true);

try {
policyDownloadTimer.schedule(new DownloaderTask(policyDownloadQueue), pollingIntervalMs, pollingIntervalMs);

if (LOG.isDebugEnabled()) {
LOG.debug("Scheduled policyDownloadRefresher to download policies every " + pollingIntervalMs + " milliseconds");
}
} catch (IllegalStateException exception) {
LOG.error("Error scheduling policyDownloadTimer:", exception);
LOG.error("*** Policies will NOT be downloaded every " + pollingIntervalMs + " milliseconds ***");

policyDownloadTimer = null;
}

}

public void run() {

if(LOG.isDebugEnabled()) {
LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").run()");
}

while(true) {
DownloadTrigger trigger = null;
try {
trigger = policyDownloadQueue.take();
loadRoles();
loadPolicy();
} catch(InterruptedException excp) {
LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp);
break;
} finally {
if (trigger != null) {
trigger.signalCompletion();
}
}
}

if(LOG.isDebugEnabled()) {
LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").run()");
}
}

public void syncPoliciesWithAdmin(DownloadTrigger token) throws InterruptedException {
policyDownloadQueue.put(token);
token.waitForCompletion();
}

wait()虚假唤醒问题

picture 0

policy 的同步机制

  1. 默认的policy version和update time, 只是用于判断是否需要全量下载策略进行缓存, 并没有支持增量更新缓存.

cache默认周期是30秒, 每30秒检查一次配置是否有更新, 如果versionId没有变更, 就没有额外的工作了.

this.pollingIntervalMs = pluginConfig.getLong(propertyPrefix + ".policy.pollIntervalMs", 30 * 1000);

如果没有更新, 则不会进行处理



private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFoundException {

if(LOG.isDebugEnabled()) {
LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadPolicyfromPolicyAdmin()");
}

ServicePolicies svcPolicies = null;

RangerPerfTracer perf = null;

if(RangerPerfTracer.isPerfTraceEnabled(PERF_POLICYENGINE_INIT_LOG)) {
perf = RangerPerfTracer.getPerfTracer(PERF_POLICYENGINE_INIT_LOG, "PolicyRefresher.loadPolicyFromPolicyAdmin(serviceName=" + serviceName + ")");
}

try {
svcPolicies = rangerAdmin.getServicePoliciesIfUpdated(lastKnownVersion, lastActivationTimeInMillis);

boolean isUpdated = svcPolicies != null;

if(isUpdated) {
long newVersion = svcPolicies.getPolicyVersion() == null ? -1 : svcPolicies.getPolicyVersion().longValue();

if(!StringUtils.equals(serviceName, svcPolicies.getServiceName())) {
LOG.warn("PolicyRefresher(serviceName=" + serviceName + "): ignoring unexpected serviceName '" + svcPolicies.getServiceName() + "' in service-store");

svcPolicies.setServiceName(serviceName);
}

LOG.info("PolicyRefresher(serviceName=" + serviceName + "): found updated version. lastKnownVersion=" + lastKnownVersion + "; newVersion=" + newVersion);

} else {
if(LOG.isDebugEnabled()) {
LOG.debug("PolicyRefresher(serviceName=" + serviceName + ").run(): no update found. lastKnownVersion=" + lastKnownVersion);
}
}
} catch (RangerServiceNotFoundException snfe) {
LOG.error("PolicyRefresher(serviceName=" + serviceName + "): failed to find service. Will clean up local cache of policies (" + lastKnownVersion + ")", snfe);
throw snfe;
} catch (Exception excp) {
LOG.error("PolicyRefresher(serviceName=" + serviceName + "): failed to refresh policies. Will continue to use last known version of policies (" + lastKnownVersion + ")", excp);
svcPolicies = null;
}

RangerPerfTracer.log(perf);

if(LOG.isDebugEnabled()) {
LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").loadPolicyfromPolicyAdmin()");
}

return svcPolicies;
}

private ServicePolicies getServicePoliciesIfUpdatedWithCookie(final long lastKnownVersion, final long lastActivationTimeInMillis) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerAdminRESTClient.getServicePoliciesIfUpdatedWithCookie(" + lastKnownVersion + ", " + lastActivationTimeInMillis + ")");
}

final ServicePolicies ret;

final UserGroupInformation user = MiscUtil.getUGILoginUser();
final boolean isSecureMode = isKerberosEnabled(user);
final ClientResponse response = getRangerAdminPolicyDownloadResponse(lastKnownVersion, lastActivationTimeInMillis, user, isSecureMode);

if (response == null || response.getStatus() == HttpServletResponse.SC_NOT_MODIFIED || response.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
if (response == null) {
policyDownloadSessionId = null;
isValidPolicyDownloadSessionCookie = false;
LOG.error("Error getting policies; Received NULL response!!. secureMode=" + isSecureMode + ", user=" + user + ", serviceName=" + serviceName);
} else {
checkAndResetSessionCookie(response);
RESTResponse resp = RESTResponse.fromClientResponse(response);
if (LOG.isDebugEnabled()) {
LOG.debug("No change in policies. secureMode=" + isSecureMode + ", user=" + user + ", response=" + resp + ", serviceName=" + serviceName);
}
}
ret = null;
} else if (response.getStatus() == HttpServletResponse.SC_OK) {
checkAndResetSessionCookie(response);
ret = response.getEntity(ServicePolicies.class);
} else if (response.getStatus() == HttpServletResponse.SC_NOT_FOUND) {
policyDownloadSessionId = null;
isValidPolicyDownloadSessionCookie = false;
ret = null;
LOG.error("Error getting policies; service not found. secureMode=" + isSecureMode + ", user=" + user
+ ", response=" + response.getStatus() + ", serviceName=" + serviceName
+ ", " + "lastKnownVersion=" + lastKnownVersion
+ ", " + "lastActivationTimeInMillis=" + lastActivationTimeInMillis);
String exceptionMsg = response.hasEntity() ? response.getEntity(String.class) : null;
RangerServiceNotFoundException.throwExceptionIfServiceNotFound(serviceName, exceptionMsg);
LOG.warn("Received 404 error code with body:[" + exceptionMsg + "], Ignoring");
} else {
policyDownloadSessionId = null;
isValidPolicyDownloadSessionCookie = false;
ret = null;
RESTResponse resp = RESTResponse.fromClientResponse(response);
LOG.warn("Error getting policies. secureMode=" + isSecureMode + ", user=" + user + ", response=" + resp + ", serviceName=" + serviceName);
}

if (LOG.isDebugEnabled()) {
LOG.debug("<== RangerAdminRESTClient.getServicePoliciesIfUpdatedWithCookie(" + lastKnownVersion + ", " + lastActivationTimeInMillis + "): " + ret);
}

return ret;
}
```


2. 后来新增了policy delta的配置, 支持按policy的变更进行缓存更新. 机制是保存policy的create/update/delete记录和时间, 然后从本地缓存时间之后的变更都通过api获取, 然后进行合并即可. 合并逻辑也非常简单, 先删除deleta变更id中本地存在的, 然后直接写入新的policy即可.

github pr, support incremental policy updates:

https://github.com/apache/ranger/commit/0f229b01e23b12d0c9e0c4ee3de817ce80d68a17?diff=unified


```java

supportsPolicyDeltas = RangerConfiguration.getInstance().get(propertyPrefix + ".policy.rest.supports.policy.deltas", "false");





public static final String RANGER_ADMIN_SUFFIX_POLICY_DELTA = ".supports.policy.deltas";
public static final String PLUGIN_CONFIG_SUFFIX_POLICY_DELTA = ".supports.policy.deltas";
    public static List<RangerPolicy> applyDeltas(List<RangerPolicy> policies, List<RangerPolicyDelta> deltas, String serviceType) {

...

Map<Long, RangerPolicy> retMap = new HashMap<>();

for (RangerPolicy policy : policies) {
retMap.put(policy.getId(), policy);
}

for (RangerPolicyDelta delta : deltas) {
if (!StringUtils.equals(serviceType, delta.getServiceType()) || delta.getPolicyId() == null) {
continue;
}

int changeType = delta.getChangeType();

if (changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE && changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE && changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
LOG.warn("Found unexpected changeType in policyDelta:[" + delta + "]. Ignoring delta");

continue;
}

Long policyId = delta.getPolicyId();
RangerPolicy deletedPolicy = retMap.remove(policyId);

switch(changeType) {
case RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE: {
if (deletedPolicy != null) {
LOG.warn("Unexpected: found existing policy for CHANGE_TYPE_POLICY_CREATE: " + deletedPolicy);
}
break;
}
case RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE: {
if (deletedPolicy == null) {
LOG.warn("Unexpected: found no existing policy for CHANGE_TYPE_POLICY_UPDATE: " + deletedPolicy);
}
break;
}
case RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE: {
if (deletedPolicy == null) {
LOG.warn("Unexpected: found no existing policy for CHANGE_TYPE_POLICY_DELETE: " + deletedPolicy);
}
break;
}
default:
break;
}

if (changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
retMap.put(policyId, delta.getPolicy());
}
}

ret = new ArrayList<>(retMap.values());
ret.sort(RangerPolicy.POLICY_ID_COMPARATOR);

...
}