ranger-download-cache
ranger plugin 组件里的ranger插件初始化的时候, init方法就会启动线程定时从ranger admin server里下载policy策略.
policy refresher的启动
bash plugin
package org.apache.ranger.plugin.service;
public class RangerBasePlugin {
public void init() {
cleanup();
AuditProviderFactory providerFactory = AuditProviderFactory.getInstance();
if (!providerFactory.isInitDone()) {
if (pluginConfig.getProperties() != null) {
providerFactory.init(pluginConfig.getProperties(), getAppId());
} else {
LOG.error("Audit subsystem is not initialized correctly. Please check audit configuration. ");
LOG.error("No authorization audits will be generated. ");
}
}
if (!pluginConfig.getPolicyEngineOptions().disablePolicyRefresher) {
refresher = new PolicyRefresher(this);
LOG.info("Created PolicyRefresher Thread(" + refresher.getName() + ")");
refresher.setDaemon(true);
refresher.startRefresher();
}
for (RangerChainedPlugin chainedPlugin : chainedPlugins) {
chainedPlugin.init();
}
}
}
线程通讯定期同步下载cache
package org.apache.ranger.plugin.util;
public final class DownloadTrigger {
private boolean isNotified = false;
public synchronized void waitForCompletion() throws InterruptedException {
while (!isNotified) {
wait();
}
isNotified = false;
}
public synchronized void signalCompletion() {
isNotified = true;
notifyAll();
}
}
package org.apache.ranger.plugin.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
public final class DownloaderTask extends TimerTask {
private static final Logger LOG = LoggerFactory.getLogger(DownloaderTask.class);
private final DownloadTrigger timerTrigger = new DownloadTrigger();
private final BlockingQueue<DownloadTrigger> queue;
public DownloaderTask(BlockingQueue<DownloadTrigger> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
queue.put(timerTrigger);
timerTrigger.waitForCompletion();
} catch (InterruptedException excp) {
LOG.error("Caught exception. Exiting thread");
}
}
}
- policy refresher
public void startRefresher() {
loadRoles();
loadPolicy();
super.start();
policyDownloadTimer = new Timer("policyDownloadTimer", true);
try {
policyDownloadTimer.schedule(new DownloaderTask(policyDownloadQueue), pollingIntervalMs, pollingIntervalMs);
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduled policyDownloadRefresher to download policies every " + pollingIntervalMs + " milliseconds");
}
} catch (IllegalStateException exception) {
LOG.error("Error scheduling policyDownloadTimer:", exception);
LOG.error("*** Policies will NOT be downloaded every " + pollingIntervalMs + " milliseconds ***");
policyDownloadTimer = null;
}
}
public void run() {
if(LOG.isDebugEnabled()) {
LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").run()");
}
while(true) {
DownloadTrigger trigger = null;
try {
trigger = policyDownloadQueue.take();
loadRoles();
loadPolicy();
} catch(InterruptedException excp) {
LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp);
break;
} finally {
if (trigger != null) {
trigger.signalCompletion();
}
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").run()");
}
}
public void syncPoliciesWithAdmin(DownloadTrigger token) throws InterruptedException {
policyDownloadQueue.put(token);
token.waitForCompletion();
}
wait()虚假唤醒问题
policy 的同步机制
- 默认的policy version和update time, 只是用于判断是否需要全量下载策略进行缓存, 并没有支持增量更新缓存.
cache默认周期是30秒, 每30秒检查一次配置是否有更新, 如果versionId没有变更, 就没有额外的工作了.
this.pollingIntervalMs = pluginConfig.getLong(propertyPrefix + ".policy.pollIntervalMs", 30 * 1000);
如果没有更新, 则不会进行处理
private ServicePolicies loadPolicyfromPolicyAdmin() throws RangerServiceNotFoundException {
if(LOG.isDebugEnabled()) {
LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadPolicyfromPolicyAdmin()");
}
ServicePolicies svcPolicies = null;
RangerPerfTracer perf = null;
if(RangerPerfTracer.isPerfTraceEnabled(PERF_POLICYENGINE_INIT_LOG)) {
perf = RangerPerfTracer.getPerfTracer(PERF_POLICYENGINE_INIT_LOG, "PolicyRefresher.loadPolicyFromPolicyAdmin(serviceName=" + serviceName + ")");
}
try {
svcPolicies = rangerAdmin.getServicePoliciesIfUpdated(lastKnownVersion, lastActivationTimeInMillis);
boolean isUpdated = svcPolicies != null;
if(isUpdated) {
long newVersion = svcPolicies.getPolicyVersion() == null ? -1 : svcPolicies.getPolicyVersion().longValue();
if(!StringUtils.equals(serviceName, svcPolicies.getServiceName())) {
LOG.warn("PolicyRefresher(serviceName=" + serviceName + "): ignoring unexpected serviceName '" + svcPolicies.getServiceName() + "' in service-store");
svcPolicies.setServiceName(serviceName);
}
LOG.info("PolicyRefresher(serviceName=" + serviceName + "): found updated version. lastKnownVersion=" + lastKnownVersion + "; newVersion=" + newVersion);
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("PolicyRefresher(serviceName=" + serviceName + ").run(): no update found. lastKnownVersion=" + lastKnownVersion);
}
}
} catch (RangerServiceNotFoundException snfe) {
LOG.error("PolicyRefresher(serviceName=" + serviceName + "): failed to find service. Will clean up local cache of policies (" + lastKnownVersion + ")", snfe);
throw snfe;
} catch (Exception excp) {
LOG.error("PolicyRefresher(serviceName=" + serviceName + "): failed to refresh policies. Will continue to use last known version of policies (" + lastKnownVersion + ")", excp);
svcPolicies = null;
}
RangerPerfTracer.log(perf);
if(LOG.isDebugEnabled()) {
LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").loadPolicyfromPolicyAdmin()");
}
return svcPolicies;
}
private ServicePolicies getServicePoliciesIfUpdatedWithCookie(final long lastKnownVersion, final long lastActivationTimeInMillis) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerAdminRESTClient.getServicePoliciesIfUpdatedWithCookie(" + lastKnownVersion + ", " + lastActivationTimeInMillis + ")");
}
final ServicePolicies ret;
final UserGroupInformation user = MiscUtil.getUGILoginUser();
final boolean isSecureMode = isKerberosEnabled(user);
final ClientResponse response = getRangerAdminPolicyDownloadResponse(lastKnownVersion, lastActivationTimeInMillis, user, isSecureMode);
if (response == null || response.getStatus() == HttpServletResponse.SC_NOT_MODIFIED || response.getStatus() == HttpServletResponse.SC_NO_CONTENT) {
if (response == null) {
policyDownloadSessionId = null;
isValidPolicyDownloadSessionCookie = false;
LOG.error("Error getting policies; Received NULL response!!. secureMode=" + isSecureMode + ", user=" + user + ", serviceName=" + serviceName);
} else {
checkAndResetSessionCookie(response);
RESTResponse resp = RESTResponse.fromClientResponse(response);
if (LOG.isDebugEnabled()) {
LOG.debug("No change in policies. secureMode=" + isSecureMode + ", user=" + user + ", response=" + resp + ", serviceName=" + serviceName);
}
}
ret = null;
} else if (response.getStatus() == HttpServletResponse.SC_OK) {
checkAndResetSessionCookie(response);
ret = response.getEntity(ServicePolicies.class);
} else if (response.getStatus() == HttpServletResponse.SC_NOT_FOUND) {
policyDownloadSessionId = null;
isValidPolicyDownloadSessionCookie = false;
ret = null;
LOG.error("Error getting policies; service not found. secureMode=" + isSecureMode + ", user=" + user
+ ", response=" + response.getStatus() + ", serviceName=" + serviceName
+ ", " + "lastKnownVersion=" + lastKnownVersion
+ ", " + "lastActivationTimeInMillis=" + lastActivationTimeInMillis);
String exceptionMsg = response.hasEntity() ? response.getEntity(String.class) : null;
RangerServiceNotFoundException.throwExceptionIfServiceNotFound(serviceName, exceptionMsg);
LOG.warn("Received 404 error code with body:[" + exceptionMsg + "], Ignoring");
} else {
policyDownloadSessionId = null;
isValidPolicyDownloadSessionCookie = false;
ret = null;
RESTResponse resp = RESTResponse.fromClientResponse(response);
LOG.warn("Error getting policies. secureMode=" + isSecureMode + ", user=" + user + ", response=" + resp + ", serviceName=" + serviceName);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== RangerAdminRESTClient.getServicePoliciesIfUpdatedWithCookie(" + lastKnownVersion + ", " + lastActivationTimeInMillis + "): " + ret);
}
return ret;
}
```
2. 后来新增了policy delta的配置, 支持按policy的变更进行缓存更新. 机制是保存policy的create/update/delete记录和时间, 然后从本地缓存时间之后的变更都通过api获取, 然后进行合并即可. 合并逻辑也非常简单, 先删除deleta变更id中本地存在的, 然后直接写入新的policy即可.
github pr, support incremental policy updates:
https://github.com/apache/ranger/commit/0f229b01e23b12d0c9e0c4ee3de817ce80d68a17?diff=unified
```java
supportsPolicyDeltas = RangerConfiguration.getInstance().get(propertyPrefix + ".policy.rest.supports.policy.deltas", "false");
public static final String RANGER_ADMIN_SUFFIX_POLICY_DELTA = ".supports.policy.deltas";
public static final String PLUGIN_CONFIG_SUFFIX_POLICY_DELTA = ".supports.policy.deltas";
public static List<RangerPolicy> applyDeltas(List<RangerPolicy> policies, List<RangerPolicyDelta> deltas, String serviceType) {
...
Map<Long, RangerPolicy> retMap = new HashMap<>();
for (RangerPolicy policy : policies) {
retMap.put(policy.getId(), policy);
}
for (RangerPolicyDelta delta : deltas) {
if (!StringUtils.equals(serviceType, delta.getServiceType()) || delta.getPolicyId() == null) {
continue;
}
int changeType = delta.getChangeType();
if (changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE && changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE && changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
LOG.warn("Found unexpected changeType in policyDelta:[" + delta + "]. Ignoring delta");
continue;
}
Long policyId = delta.getPolicyId();
RangerPolicy deletedPolicy = retMap.remove(policyId);
switch(changeType) {
case RangerPolicyDelta.CHANGE_TYPE_POLICY_CREATE: {
if (deletedPolicy != null) {
LOG.warn("Unexpected: found existing policy for CHANGE_TYPE_POLICY_CREATE: " + deletedPolicy);
}
break;
}
case RangerPolicyDelta.CHANGE_TYPE_POLICY_UPDATE: {
if (deletedPolicy == null) {
LOG.warn("Unexpected: found no existing policy for CHANGE_TYPE_POLICY_UPDATE: " + deletedPolicy);
}
break;
}
case RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE: {
if (deletedPolicy == null) {
LOG.warn("Unexpected: found no existing policy for CHANGE_TYPE_POLICY_DELETE: " + deletedPolicy);
}
break;
}
default:
break;
}
if (changeType != RangerPolicyDelta.CHANGE_TYPE_POLICY_DELETE) {
retMap.put(policyId, delta.getPolicy());
}
}
ret = new ArrayList<>(retMap.values());
ret.sort(RangerPolicy.POLICY_ID_COMPARATOR);
...
}