跳到主要内容

Ranger Elasticsearch 操作相关

ranger elasticsearch 连接使用

ranger里连接elasticsearch, 启动embeded webserver的时候自动连接es用作审计数据库, 并检查是否已经创建了es index, 没有则创建.

包括了配置密码加解密, kerberos keytab 访问 es等方法可以借鉴, 还有默认的重试.


package org.apache.ranger.server.tomcat;

import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;

public class ElasticSearchIndexBootStrapper extends Thread {

String rangerHomeDir = new File(jarLocation).getParentFile().getParentFile().getParentFile().getPath();
Path es_schema_path = Paths.get(rangerHomeDir, "contrib", "elasticsearch_for_audit_setup", "conf",
ES_RANGER_AUDIT_SCHEMA_FILE);
es_ranger_audit_schema_json = new String(Files.readAllBytes(es_schema_path), StandardCharsets.UTF_8);

String providerPath = EmbeddedServerUtil.getConfig(ES_CREDENTIAL_PROVIDER_PATH);
String credentialAlias = EmbeddedServerUtil.getConfig(ES_CREDENTIAL_ALIAS, ES_CONFIG_PASSWORD);
String keyStoreFileType = EmbeddedServerUtil.getConfig("ranger.keystore.file.type", KeyStore.getDefaultType());
if (providerPath != null && credentialAlias != null) {
password = CredentialReader.getDecryptedString(providerPath.trim(), credentialAlias.trim(), keyStoreFileType);
if (StringUtils.isBlank(password) || "none".equalsIgnoreCase(password.trim())) {
password = EmbeddedServerUtil.getConfig(ES_CONFIG_PASSWORD);
}


private String connectionString() {
return String.format(Locale.ROOT,"User:%s, %s://%s:%s/%s", user, protocol, hosts, port, index);
}

public void run() {
LOG.info("Started run method");
if (StringUtils.isNotBlank(hosts)) {
LOG.info("Elastic search hosts=" + hosts + ", index=" + index);
while (!is_completed && (max_retry == TRY_UNTIL_SUCCESS || retry_counter < max_retry)) {
try {
LOG.info("Trying to acquire elastic search connection");
if (connect()) {
LOG.info("Connection to elastic search established successfully");
if (createIndex()) {
is_completed = true;
break;
} else {
logErrorMessageAndWait("Error while performing operations on elasticsearch. ", null);
}
} else {
logErrorMessageAndWait(
"Cannot connect to elasticsearch kindly check the elasticsearch related configs. ",
null);
}
} catch (Exception ex) {
logErrorMessageAndWait("Error while validating elasticsearch index ", ex);
}
}
} else {
LOG.severe("elasticsearch hosts values are empty. Please set property " + ES_CONFIG_URLS);
}

}

private synchronized boolean connect() {
if (client == null) {
synchronized (ElasticSearchIndexBootStrapper.class) {
if (client == null) {
try {
createClient();
} catch (Exception ex) {
LOG.severe("Can't connect to elasticsearch server. host=" + hosts + ", index=" + index + ex);
}
}
}
}
return client != null ? true : false;
}

private void createClient() {
try {
RestClientBuilder restClientBuilder =
getRestClientBuilder(hosts, protocol, user, password, port);
client = new RestHighLevelClient(restClientBuilder);
} catch (Throwable t) {
lastLoggedAt.updateAndGet(lastLoggedAt -> {
long now = System.currentTimeMillis();
long elapsed = now - lastLoggedAt;
if (elapsed > TimeUnit.MINUTES.toMillis(1)) {
LOG.severe("Can't connect to ElasticSearch server: " + connectionString() + t);
return now;
} else {
return lastLoggedAt;
}
});
}
}

public static RestClientBuilder getRestClientBuilder(String urls, String protocol, String user, String password, int port) {
RestClientBuilder restClientBuilder = RestClient.builder(
EmbeddedServerUtil.toArray(urls, ",").stream()
.map(x -> new HttpHost(x, port, protocol))
.<HttpHost>toArray(i -> new HttpHost[i])
);
if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) && !user.equalsIgnoreCase("NONE") && !password.equalsIgnoreCase("NONE")) {
if (password.contains("keytab") && new File(password).exists()) {
final KerberosCredentialsProvider credentialsProvider =
CredentialsProviderUtil.getKerberosCredentials(user, password);
Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
.register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()).build();
restClientBuilder.setHttpClientConfigCallback(clientBuilder -> {
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
clientBuilder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
return clientBuilder;
});
} else {
final CredentialsProvider credentialsProvider =
CredentialsProviderUtil.getBasicCredentials(user, password);
restClientBuilder.setHttpClientConfigCallback(clientBuilder ->
clientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
} else {
LOG.severe("ElasticSearch Credentials not provided!!");
final CredentialsProvider credentialsProvider = null;
restClientBuilder.setHttpClientConfigCallback(clientBuilder ->
clientBuilder.setDefaultCredentialsProvider(credentialsProvider));
}
return restClientBuilder;
}

private boolean createIndex() {
boolean exits = false;
if (client == null) {
connect();
}
if (client != null) {
try {
exits = client.indices().open(new OpenIndexRequest(this.index), RequestOptions.DEFAULT)
.isShardsAcknowledged();
} catch (Exception e) {
LOG.info("Index " + this.index + " not available.");
}
if (!exits) {
LOG.info("Index does not exist. Attempting to create index:" + this.index);
CreateIndexRequest request = new CreateIndexRequest(this.index);
if (this.no_of_shards >= 0 && this.no_of_replicas >= 0) {
request.settings(Settings.builder().put("index.number_of_shards", this.no_of_shards)
.put("index.number_of_replicas", this.no_of_replicas));
}
request.mapping(es_ranger_audit_schema_json, XContentType.JSON);
request.setMasterTimeout(TimeValue.timeValueMinutes(1));
request.setTimeout(TimeValue.timeValueMinutes(2));
try {
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
if (createIndexResponse != null) {
exits = client.indices().open(new OpenIndexRequest(this.index), RequestOptions.DEFAULT)
.isShardsAcknowledged();
if (exits) {
LOG.info("Index " + this.index + " created successfully.");
}
}
} catch (Exception e) {
LOG.severe("Unable to create Index. Reason:" + e.toString());
e.printStackTrace();
}
} else {
LOG.info("Index " + this.index + " is already created.");
}
}
return exits;
}
}