From 32c0391b03dec43ba71b798e098bb7d4a3f72c65 Mon Sep 17 00:00:00 2001 From: Keith Lohnes Date: Tue, 7 Mar 2017 11:20:16 -0500 Subject: [PATCH] Use Jest for Elasticsearch http #92 Issue: #92 This adds http support to ElasticSearchIndex and removes support for the Transport and Client modes. It also refactors the add/delete scripts to use bindings since throwing a bunch of unique scripts at a Groovy compiler isn't the best idea. Signed-off-by: Keith Lohnes --- janusgraph-es/pom.xml | 19 +- .../diskstorage/es/ElasticSearchIndex.java | 932 ++++++++++-------- .../diskstorage/es/ElasticSearchSetup.java | 310 ------ .../es/BerkeleyElasticsearchTest.java | 1 - .../es/ElasticSearchConfigTest.java | 38 - .../es/ElasticSearchIndexTest.java | 5 - .../es/ThriftElasticsearchTest.java | 2 - 7 files changed, 516 insertions(+), 791 deletions(-) delete mode 100644 janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchSetup.java diff --git a/janusgraph-es/pom.xml b/janusgraph-es/pom.xml index 4dd1bef656..2d8acf0d6d 100644 --- a/janusgraph-es/pom.xml +++ b/janusgraph-es/pom.xml @@ -66,7 +66,24 @@ antlr-runtime ${antlr.version} - + + + com.google.code.gson + gson + 2.2.4 + + + io.searchbox + jest + 2.0.4 + + + com.google.code.gson + gson + + + diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java index fe9eb09989..321d22f551 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java @@ -14,11 +14,30 @@ package org.janusgraph.diskstorage.es; + import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Multimap; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.searchbox.client.JestResult; +import io.searchbox.client.JestClient; +import io.searchbox.client.JestClientFactory; +import io.searchbox.client.config.HttpClientConfig; +import io.searchbox.core.Bulk; +import io.searchbox.core.BulkResult; +import io.searchbox.core.Delete; +import io.searchbox.core.Index; +import io.searchbox.core.Search; +import io.searchbox.core.Update; +import io.searchbox.indices.CreateIndex; +import io.searchbox.indices.OpenIndex; +import io.searchbox.indices.aliases.AddAliasMapping; +import io.searchbox.indices.aliases.ModifyAliases; +import io.searchbox.indices.mapping.PutMapping; import org.janusgraph.core.Cardinality; import org.janusgraph.core.JanusGraphException; import org.janusgraph.core.attribute.*; @@ -43,32 +62,13 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.update.UpdateRequestBuilder; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.*; import org.elasticsearch.indices.IndexMissingException; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.slf4j.Logger; @@ -100,121 +100,148 @@ public class ElasticSearchIndex implements IndexProvider { public static final ConfigNamespace ELASTICSEARCH_NS = new ConfigNamespace(INDEX_NS, "elasticsearch", "Elasticsearch index configuration"); - public static final ConfigOption CLIENT_ONLY = - new ConfigOption(ELASTICSEARCH_NS, "client-only", - "The Elasticsearch node.client option is set to this boolean value, and the Elasticsearch node.data " + - "option is set to the negation of this value. True creates a thin client which holds no data. False " + - "creates a regular Elasticsearch cluster node that may store data.", - ConfigOption.Type.GLOBAL_OFFLINE, true); + public static final ConfigOption DISCOVERY_FREQUENCY = new ConfigOption( + ELASTICSEARCH_NS, + "discovery-frequency", + "How long to wait between polling for node discovery", + ConfigOption.Type.MASKABLE, + 10l + ); + + public static final ConfigOption TTL_INTERVAL = + new ConfigOption(ELASTICSEARCH_NS, "ttl-interval", + "The period of time between runs of ES's bulit-in expired document deleter. " + + "This string will become the value of ES's indices.ttl.interval setting and should " + + "be formatted accordingly, e.g. 5s or 60s.", ConfigOption.Type.MASKABLE, "5s"); public static final ConfigOption CLUSTER_NAME = new ConfigOption(ELASTICSEARCH_NS, "cluster-name", "The name of the Elasticsearch cluster. This should match the \"cluster.name\" setting " + "in the Elasticsearch nodes' configuration.", ConfigOption.Type.GLOBAL_OFFLINE, "elasticsearch"); - public static final ConfigOption LOCAL_MODE = - new ConfigOption(ELASTICSEARCH_NS, "local-mode", - "On the legacy config track, this option chooses between starting a TransportClient (false) or " + - "a Node with JVM-local transport and local data (true). On the interface config track, this option " + - "is considered by (but optional for) the Node client and ignored by the TransportClient. See the manual " + - "for more information about ES config tracks.", - ConfigOption.Type.GLOBAL_OFFLINE, false); - public static final ConfigOption CLIENT_SNIFF = new ConfigOption(ELASTICSEARCH_NS, "sniff", - "Whether to enable cluster sniffing. This option only applies to the TransportClient. " + - "Enabling this option makes the TransportClient attempt to discover other cluster nodes " + + "Whether to enable cluster sniffing." + + "Enabling this option makes the client attempt to discover other cluster nodes " + "besides those in the initial host list provided at startup.", ConfigOption.Type.MASKABLE, true); - public static final ConfigOption INTERFACE = - new ConfigOption<>(ELASTICSEARCH_NS, "interface", - "Whether to connect to ES using the Node or Transport client (see the \"Talking to Elasticsearch\" " + - "section of the ES manual for discussion of the difference). Setting this option enables the " + - "interface config track (see manual for more information about ES config tracks).", - ConfigOption.Type.MASKABLE, String.class, ElasticSearchSetup.TRANSPORT_CLIENT.toString(), - disallowEmpty(String.class)); - - public static final ConfigOption IGNORE_CLUSTER_NAME = - new ConfigOption(ELASTICSEARCH_NS, "ignore-cluster-name", - "Whether to bypass validation of the cluster name of connected nodes. " + - "This option is only used on the interface configuration track (see manual for " + - "information about ES config tracks).", ConfigOption.Type.MASKABLE, true); + public static final ConfigOption LOCAL_MODE = + new ConfigOption(ELASTICSEARCH_NS, "local-mode", + "Whether or not to boot a local node. Mostly for testing.", + ConfigOption.Type.GLOBAL_OFFLINE, false); - public static final ConfigOption TTL_INTERVAL = - new ConfigOption(ELASTICSEARCH_NS, "ttl-interval", - "The period of time between runs of ES's bulit-in expired document deleter. " + - "This string will become the value of ES's indices.ttl.interval setting and should " + - "be formatted accordingly, e.g. 5s or 60s.", ConfigOption.Type.MASKABLE, "5s"); + private static final ConfigOption ES_TRANSPORT_SCHEME = new ConfigOption( + ELASTICSEARCH_NS, + "transport-scheme", + "What scheme (http or https) to use for communications with Elasticsearch", + ConfigOption.Type.MASKABLE, + "https"); + + public static final ConfigOption TIMEOUT = new ConfigOption( + ELASTICSEARCH_NS, + "timeout", + "How long to wait (in ms) for normal operations to complete", + ConfigOption.Type.MASKABLE, + 30000 + ); + + public static final ConfigOption METADATA_TIMEOUT = new ConfigOption( + ELASTICSEARCH_NS, + "metadata-timeout", + "How long to wait (in ms) for metadata operations to complete", + ConfigOption.Type.MASKABLE, + 30000 + ); + + public static final ConfigOption SHARD_COUNT = new ConfigOption( + ELASTICSEARCH_NS, + "shard-count", + "How many primary shards to create per Elasticsearch index", + ConfigOption.Type.MASKABLE, + 5 + ); + + public static final ConfigOption REPLICA_COUNT = new ConfigOption( + ELASTICSEARCH_NS, + "replica-count", + "How many replicas to create per Elasticsearch index", + ConfigOption.Type.MASKABLE, + 2 + ); + + private static final String deletionScript = "" + + "singles.each { " + + " ctx._source.remove((String)it); " + + "}; " + + "lists.each { " + + " def index = ctx._source[(String)it[0]].indexOf(it[1]); " + + " ctx._source[(String)it[0]].remove(index); " + + "};"; + + private static final String additionScript = "" + + "singles.each { " + + " ctx._source[it[0]] = it[1]; " + + "}; " + + "lists.each { " + + " if (ctx._source[it[0]] == null) { " + + " ctx._source[it[0]] = []; " + + " }; " + + " ctx._source[it[0]].add(it[1]); " + + "};"; - public static final ConfigOption HEALTH_REQUEST_TIMEOUT = - new ConfigOption(ELASTICSEARCH_NS, "health-request-timeout", - "When JanusGraph initializes its ES backend, JanusGraph waits up to this duration for the " + - "ES cluster health to reach at least yellow status. " + - "This string should be formatted as a natural number followed by the lowercase letter " + - "\"s\", e.g. 3s or 60s.", ConfigOption.Type.MASKABLE, "30s"); - - public static final ConfigOption LOAD_DEFAULT_NODE_SETTINGS = - new ConfigOption(ELASTICSEARCH_NS, "load-default-node-settings", - "Whether ES's Node client will internally attempt to load default configuration settings " + - "from system properties/process environment variables. Only meaningful when using the Node " + - "client (has no effect with TransportClient).", ConfigOption.Type.MASKABLE, true); - - public static final ConfigOption USE_EDEPRECATED_IGNORE_UNMAPPED_OPTION = - new ConfigOption<>(ELASTICSEARCH_NS, "use-deprecated-ignore-unmapped-option", - "Elasticsearch versions before 1.4.0 supported the \"ignore_unmapped\" sort option. " + - "In 1.4.0, it was deprecated by the new \"unmapped_type\" sort option. This configuration" + - "setting controls which ES option JanusGraph uses: false for the newer \"unmapped_type\"," + - "true for the older \"ignore_unmapped\".", ConfigOption.Type.MASKABLE, false); - - public static final ConfigNamespace ES_EXTRAS_NS = - new ConfigNamespace(ELASTICSEARCH_NS, "ext", "Overrides for arbitrary elasticsearch.yaml settings", true); - - public static final ConfigNamespace ES_CREATE_NS = - new ConfigNamespace(ELASTICSEARCH_NS, "create", "Settings related to index creation"); - - public static final ConfigOption CREATE_SLEEP = - new ConfigOption(ES_CREATE_NS, "sleep", - "How long to sleep, in milliseconds, between the successful completion of a (blocking) index " + - "creation request and the first use of that index. This only applies when creating an index in ES, " + - "which typically only happens the first time JanusGraph is started on top of ES. If the index JanusGraph is " + - "configured to use already exists, then this setting has no effect.", ConfigOption.Type.MASKABLE, 200L); - - public static final ConfigNamespace ES_CREATE_EXTRAS_NS = - new ConfigNamespace(ES_CREATE_NS, "ext", "Overrides for arbitrary settings applied at index creation", true); private static final IndexFeatures ES_FEATURES = new IndexFeatures.Builder().supportsDocumentTTL() .setDefaultStringMapping(Mapping.TEXT).supportedStringMappings(Mapping.TEXT, Mapping.TEXTSTRING, Mapping.STRING).setWildcardField("_all").supportsCardinality(Cardinality.SINGLE).supportsCardinality(Cardinality.LIST).supportsCardinality(Cardinality.SET).supportsNanoseconds().build(); - public static final int HOST_PORT_DEFAULT = 9300; - private final Node node; - private final Client client; + public static final int HOST_PORT_DEFAULT = 9200; + private final String indexName; private final int maxResultsSize; - private final boolean useDeprecatedIgnoreUnmapped; + private final Configuration configuration; + private final JestClient client; + private final String metadataTimeout; + private final String timeout; public ElasticSearchIndex(Configuration config) { - indexName = config.get(INDEX_NAME); - useDeprecatedIgnoreUnmapped = config.get(USE_EDEPRECATED_IGNORE_UNMAPPED_OPTION); + this.configuration = config; + this.metadataTimeout = config.get(METADATA_TIMEOUT).toString() + "ms"; + this.timeout = config.get(TIMEOUT).toString() + "ms"; + this.indexName = config.get(INDEX_NAME); + this.maxResultsSize = config.get(INDEX_MAX_RESULT_SET_SIZE); + this.client = createClient(config); + } - checkExpectedClientVersion(); - final ElasticSearchSetup.Connection c; - if (!config.has(INTERFACE)) { - c = legacyConfiguration(config); - } else { - c = interfaceConfiguration(config); + private JestClient createClient(Configuration config) { + JestClient client; + final String scheme = config.get(ES_TRANSPORT_SCHEME); + final Integer port = config.has(INDEX_PORT) ? config.get(INDEX_PORT) : HOST_PORT_DEFAULT; + final List serverUris = new ArrayList(); + String clientKey = ""; + for (String host : config.get(INDEX_HOSTS)) { + String uri = scheme + "://" + host + ":" + port.toString(); + serverUris.add(uri); + clientKey += uri; } - node = c.getNode(); - client = c.getClient(); - - maxResultsSize = config.get(INDEX_MAX_RESULT_SET_SIZE); - log.debug("Configured ES query result set max size to {}", maxResultsSize); - - client.admin().cluster().prepareHealth().setTimeout(config.get(HEALTH_REQUEST_TIMEOUT)) - .setWaitForYellowStatus().execute().actionGet(); + JestClientFactory factory = new JestClientFactory(); + HttpClientConfig clientConfig = new HttpClientConfig.Builder(serverUris) + .multiThreaded(true) + .discoveryEnabled(config.has(CLIENT_SNIFF) && config.get(CLIENT_SNIFF)) + .discoveryFrequency(config.get(DISCOVERY_FREQUENCY), TimeUnit.SECONDS) + .readTimeout(Math.max(config.get(TIMEOUT), config.get(METADATA_TIMEOUT))) + .build(); + factory.setHttpClientConfig(clientConfig); + return factory.getObject(); + } - checkForOrCreateIndex(config); + private static String formatException(String command, JestResult result) { + return String.format( + "Unknown ES error in %s. Status code is %d; message is %s", + command, + result.getResponseCode(), + result.getErrorMessage() + ); } /** @@ -228,153 +255,71 @@ public ElasticSearchIndex(Configuration config) { * @param config the config for this ElasticSearchIndex * @throws java.lang.IllegalArgumentException if the index could not be created */ - private void checkForOrCreateIndex(Configuration config) { - Preconditions.checkState(null != client); - - //Create index if it does not already exist - IndicesExistsResponse response = client.admin().indices().exists(new IndicesExistsRequest(indexName)).actionGet(); - if (!response.isExists()) { - - ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder(); - - ElasticSearchSetup.applySettingsFromJanusGraphConf(settings, config, ES_CREATE_EXTRAS_NS); - - CreateIndexResponse create = client.admin().indices().prepareCreate(indexName) - .setSettings(settings.build()).execute().actionGet(); - try { - final long sleep = config.get(CREATE_SLEEP); - log.debug("Sleeping {} ms after {} index creation returned from actionGet()", sleep, indexName); - Thread.sleep(sleep); - } catch (InterruptedException e) { - throw new JanusGraphException("Interrupted while waiting for index to settle in", e); - } - if (!create.isAcknowledged()) throw new IllegalArgumentException("Could not create index: " + indexName); - } - } - - - /** - * Configure ElasticSearchIndex's ES client according to semantics introduced in - * 0.5.1. Allows greater flexibility than the previous config semantics. See - * {@link org.janusgraph.diskstorage.es.ElasticSearchSetup} for more - * information. - *

- * This is activated by setting an explicit value for {@link #INTERFACE} in - * the JanusGraph configuration. - * - * @see #legacyConfiguration(org.janusgraph.diskstorage.configuration.Configuration) - * @param config a config passed to ElasticSearchIndex's constructor - * @return a node and client object open and ready for use - */ - private ElasticSearchSetup.Connection interfaceConfiguration(Configuration config) { - ElasticSearchSetup clientMode = ConfigOption.getEnumValue(config.get(INTERFACE), ElasticSearchSetup.class); + private void checkForOrCreateIndex() throws BackendException { + JestResult result; + OpenIndex oi = new OpenIndex.Builder(indexName) + .setParameter("master_timeout", this.metadataTimeout) + .setParameter("timeout", this.metadataTimeout) + .build(); try { - return clientMode.connect(config); + result = this.client.execute(oi); } catch (IOException e) { - throw new JanusGraphException(e); + throw new TemporaryBackendException("ES transport error in OpenIndex"); } - } + if (!result.isSucceeded()) { + if (result.getResponseCode() == 404) { + /* + * Use aliases for operational support; actual index name is + * suffixed with the index creation time. + */ + Long now = System.currentTimeMillis(); + String localIndexName = this.indexName + ":" + now.toString(); + Map sb = new HashMap(); + sb.put("number_of_shards", this.configuration.get(SHARD_COUNT)); + sb.put("number_of_replicas", this.configuration.get(REPLICA_COUNT)); + + CreateIndex ci = new CreateIndex.Builder(localIndexName) + .settings(sb) + .setParameter("master_timeout", metadataTimeout) + .setParameter("timeout", metadataTimeout) + .build(); - /** - * Configure ElasticSearchIndex's ES client according to 0.4.x - 0.5.0 semantics. - * This checks local-mode first. If local-mode is true, then it creates a Node that - * uses JVM local transport and can't talk over the network. If local-mode is - * false, then it creates a TransportClient that can talk over the network and - * uses {@link org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration#INDEX_HOSTS} - * as the server addresses. Note that this configuration method - * does not allow creating a Node that talks over the network. - *

- * This is activated by not setting an explicit value for {@link #INTERFACE} in the - * JanusGraph configuration. - * - * @see #interfaceConfiguration(org.janusgraph.diskstorage.configuration.Configuration) - * @param config a config passed to ElasticSearchIndex's constructor - * @return a node and client object open and ready for use - */ - private ElasticSearchSetup.Connection legacyConfiguration(Configuration config) { - Node node; - Client client; - - if (config.get(LOCAL_MODE)) { - - log.debug("Configuring ES for JVM local transport"); - - boolean clientOnly = config.get(CLIENT_ONLY); - boolean local = config.get(LOCAL_MODE); - - NodeBuilder builder = NodeBuilder.nodeBuilder(); - Preconditions.checkArgument(config.has(INDEX_CONF_FILE) || config.has(INDEX_DIRECTORY), - "Must either configure configuration file or base directory"); - if (config.has(INDEX_CONF_FILE)) { - String configFile = config.get(INDEX_CONF_FILE); - ImmutableSettings.Builder sb = ImmutableSettings.settingsBuilder(); - log.debug("Configuring ES from YML file [{}]", configFile); - FileInputStream fis = null; try { - fis = new FileInputStream(configFile); - sb.loadFromStream(configFile, fis); - builder.settings(sb.build()); - } catch (FileNotFoundException e) { - throw new JanusGraphException(e); - } finally { - IOUtils.closeQuietly(fis); + result = this.client.execute(ci); + } catch (IOException e) { + throw new TemporaryBackendException("ES transport error in CreateIndex"); } - } else { - String dataDirectory = config.get(INDEX_DIRECTORY); - log.debug("Configuring ES with data directory [{}]", dataDirectory); - File f = new File(dataDirectory); - if (!f.exists()) f.mkdirs(); - ImmutableSettings.Builder b = ImmutableSettings.settingsBuilder(); - for (String sub : DATA_SUBDIRS) { - String subdir = dataDirectory + File.separator + sub; - f = new File(subdir); - if (!f.exists()) f.mkdirs(); - b.put("path." + sub, subdir); + if (!result.isSucceeded()) { + throw new PermanentBackendException(formatException("CreateIndex", result)); } - b.put("script.disable_dynamic", false); - b.put("indices.ttl.interval", "5s"); - builder.settings(b.build()); + AddAliasMapping am = new AddAliasMapping.Builder(localIndexName, this.indexName) + .build(); - String clustername = config.get(CLUSTER_NAME); - Preconditions.checkArgument(StringUtils.isNotBlank(clustername), "Invalid cluster name: %s", clustername); - builder.clusterName(clustername); - } + ModifyAliases ma = new ModifyAliases.Builder(am) + .setParameter("master_timeout", metadataTimeout) + .setParameter("timeout", metadataTimeout) + .build(); - node = builder.client(clientOnly).data(!clientOnly).local(local).node(); - client = node.client(); - - } else { - log.debug("Configuring ES for network transport"); - ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder(); - if (config.has(CLUSTER_NAME)) { - String clustername = config.get(CLUSTER_NAME); - Preconditions.checkArgument(StringUtils.isNotBlank(clustername), "Invalid cluster name: %s", clustername); - settings.put("cluster.name", clustername); + try { + result = this.client.execute(ma); + } catch (IOException e) { + log.error("Index {} is was orphaned by alias creation failure!", localIndexName); + throw new TemporaryBackendException("ES transport error in add alias"); + } + if (!result.isSucceeded()) { + log.error("Index {} is was orphaned by alias creation failure!", localIndexName); + throw new PermanentBackendException(formatException("AddAlias", result)); + } } else { - settings.put("client.transport.ignore_cluster_name", true); - } - log.debug("Transport sniffing enabled: {}", config.get(CLIENT_SNIFF)); - settings.put("client.transport.sniff", config.get(CLIENT_SNIFF)); - settings.put("script.disable_dynamic", false); - TransportClient tc = new TransportClient(settings.build()); - int defaultPort = config.has(INDEX_PORT)?config.get(INDEX_PORT):HOST_PORT_DEFAULT; - for (String host : config.get(INDEX_HOSTS)) { - String[] hostparts = host.split(":"); - String hostname = hostparts[0]; - int hostport = defaultPort; - if (hostparts.length == 2) hostport = Integer.parseInt(hostparts[1]); - log.info("Configured remote host: {} : {}", hostname, hostport); - tc.addTransportAddress(new InetSocketTransportAddress(hostname, hostport)); + throw new PermanentBackendException(formatException("OpenIndex", result)); } - client = tc; - node = null; } - - return new ElasticSearchSetup.Connection(node, client); } + + private BackendException convert(Exception esException) { if (esException instanceof InterruptedException) { return new TemporaryBackendException("Interrupted while waiting for response", esException); @@ -389,33 +334,42 @@ private static String getDualMappingName(String key) { @Override public void register(String store, String key, KeyInformation information, BaseTransaction tx) throws BackendException { + checkForOrCreateIndex(); XContentBuilder mapping; + String source; Class dataType = information.getDataType(); Mapping map = Mapping.getMapping(information); - Preconditions.checkArgument(map==Mapping.DEFAULT || AttributeUtil.isString(dataType), - "Specified illegal mapping [%s] for data type [%s]",map,dataType); + Preconditions.checkArgument( + map == Mapping.DEFAULT || AttributeUtil.isString(dataType), + "Specified illegal mapping [%s] for data type [%s]", + map, + dataType + ); try { mapping = XContentFactory.jsonBuilder(). - startObject(). - startObject(store). - field(TTL_FIELD, new HashMap() {{ - put("enabled", true); - }}). - startObject("properties"). - startObject(key); + startObject(). + startObject(store). + field(TTL_FIELD, new HashMap() {{ + put("enabled", true); + }}). + startObject("properties"). + startObject(key); if (AttributeUtil.isString(dataType)) { - if (map==Mapping.DEFAULT) map=Mapping.TEXT; + if (map == Mapping.DEFAULT) { + map = Mapping.TEXT; + } + log.debug("Registering string type for {} with mapping {}", key, map); mapping.field("type", "string"); switch (map) { case STRING: - mapping.field("index","not_analyzed"); + mapping.field("index", "not_analyzed"); break; case TEXT: //default, do nothing - break; + break; case TEXTSTRING: mapping.endObject(); //add string mapping @@ -462,16 +416,21 @@ public void register(String store, String key, KeyInformation information, BaseT } mapping.endObject().endObject().endObject().endObject(); - + source = mapping.string(); } catch (IOException e) { throw new PermanentBackendException("Could not render json for put mapping request", e); } + PutMapping putMapping = new PutMapping.Builder(this.indexName, store, source) + .setParameter("master_timeout", this.metadataTimeout) + .setParameter("timeout", this.metadataTimeout) + .build(); + + JestResult result; try { - PutMappingResponse response = client.admin().indices().preparePutMapping(indexName). - setIgnoreConflicts(false).setType(store).setSource(mapping).execute().actionGet(); - } catch (Exception e) { - throw convert(e); + result = this.client.execute(putMapping); + } catch (IOException e) { + throw new TemporaryBackendException("ES transport error in PutMapping", e); } } @@ -558,136 +517,215 @@ private static Object convertToEsType(Object value) { @Override public void mutate(Map> mutations, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { - BulkRequestBuilder brb = client.prepareBulk(); - - int bulkrequests = 0; - try { - for (Map.Entry> stores : mutations.entrySet()) { - String storename = stores.getKey(); - for (Map.Entry entry : stores.getValue().entrySet()) { - - String docid = entry.getKey(); - IndexMutation mutation = entry.getValue(); - assert mutation.isConsolidated(); - Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted())); - Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions()); - Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions()); - //Deletions first - if (mutation.hasDeletions()) { - if (mutation.isDeleted()) { - log.trace("Deleting entire document {}", docid); - brb.add(new DeleteRequest(indexName, storename, docid)); - } else { - String script = getDeletionScript(informations, storename, mutation); - brb.add(client.prepareUpdate(indexName, storename, docid).setScript(script, ScriptService.ScriptType.INLINE)); - log.trace("Adding script {}", script); + String source; + int requests = 0; + Bulk.Builder bulk = new Bulk.Builder() + .defaultIndex(this.indexName); + for (Map.Entry> stores : mutations.entrySet()) { + String store = stores.getKey(); + for (Map.Entry entry : stores.getValue().entrySet()) { + String id = entry.getKey(); + IndexMutation mutation = entry.getValue(); + assert mutation.isConsolidated(); + Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted())); + Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions()); + Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions()); + if (mutation.hasDeletions()) { + requests++; + if (mutation.isDeleted()) { + log.trace("Deleting entire document {}", id); + Delete delete = new Delete.Builder(id).type(store).build(); + bulk.addAction(delete); + } else { + try { + source = XContentFactory.jsonBuilder().startObject() + .field("lang", "groovy") + .field("script", deletionScript) + .field("params", getDeletionParams(informations, store, mutation)) + .string(); + log.trace("Adding script {}", deletionScript); + } catch (IOException e) { + throw new PermanentBackendException( + "Could not render json for mutate request", e); } - bulkrequests++; + Update update = new Update.Builder(source) + .type(store) + .id(id) + .build(); + + bulk.addAction(update); } - if (mutation.hasAdditions()) { - int ttl = mutation.determineTTL(); - - if (mutation.isNew()) { //Index - log.trace("Adding entire document {}", docid); - brb.add(new IndexRequest(indexName, storename, docid) - .source(getNewDocument(mutation.getAdditions(), informations.get(storename), ttl))); - - } else { - Preconditions.checkArgument(ttl == 0, "Elasticsearch only supports TTL on new documents [%s]", docid); - - boolean needUpsert = !mutation.hasDeletions(); - String script = getAdditionScript(informations, storename, mutation); - UpdateRequestBuilder update = client.prepareUpdate(indexName, storename, docid).setScript(script, ScriptService.ScriptType.INLINE); - if (needUpsert) { - XContentBuilder doc = getNewDocument(mutation.getAdditions(), informations.get(storename), ttl); - update.setUpsert(doc); + } + + if (mutation.hasAdditions()) { + requests++; + int ttl = mutation.determineTTL(); + if (mutation.isNew()) { + log.trace("Adding entire document {}", id); + try { + source = getNewDocument( + mutation.getAdditions(), + informations.get(store), + ttl + ).string(); + } catch (IOException e) { + throw new PermanentBackendException( + "Could not render json for mutate request", e); + } + Index index = new Index.Builder(source) + .type(store) + .id(id) + .build(); + + bulk.addAction(index); + } else { + Preconditions.checkArgument( + ttl == 0, + "Elasticsearch only supports TTL on new documents [%s]", + id + ); + try { + XContentBuilder json = XContentFactory.jsonBuilder().startObject() + .field("lang", "groovy") + .field("script", additionScript) + .field("params", getAdditionParams(informations, store, mutation)); + + if (!mutation.hasDeletions()) { + XContentBuilder doc = getNewDocument( + mutation.getAdditions(), + informations.get(store), + ttl + ); + json.field("upsert", doc); } - brb.add(update); - log.trace("Adding script {}", script); + source = json.string(); + log.trace("Adding script {}", additionScript); + } catch (IOException e) { + throw new PermanentBackendException( + "Could not render json for mutate request", e); } - bulkrequests++; - } + Update update = new Update.Builder(source) + .type(store) + .id(id) + .build(); + bulk.addAction(update); + } } } - if (bulkrequests > 0) { - BulkResponse bulkItemResponses = brb.execute().actionGet(); - if (bulkItemResponses.hasFailures()) { - boolean actualFailure = false; - for(BulkItemResponse response : bulkItemResponses.getItems()) { - //The document may have been deleted, which is OK - if(response.isFailed() && response.getFailure().getStatus() != RestStatus.NOT_FOUND) { - log.error("Failed to execute ES query {}", response.getFailureMessage()); - actualFailure = true; - } - } - if(actualFailure) { - throw new Exception(bulkItemResponses.buildFailureMessage()); + } + + if (requests > 0) { + BulkResult result; + bulk.setParameter("timeout", this.timeout); + try { + result = this.client.execute(bulk.build()); + } catch (IOException e) { + throw new TemporaryBackendException("ES transport error in bulk mutate", e); + } + if (!result.isSucceeded()) { + boolean actualFailure = false; + for (BulkResult.BulkResultItem item : result.getItems()) { + if (null != item.error && item.status != 404) { + actualFailure = true; + log.warn(String.format( + "Mutate (%s) on item %s in index %s failed with code %d and error %s", + item.operation, + item.id, + item.index, + item.status, + item.error + )); } } + if (actualFailure) { + throw new PermanentBackendException(formatException("bulk mutate", result)); + } } - } catch (Exception e) { - log.error("Failed to execute ES query {}", brb.request().timeout(), e); - throw convert(e); } } - private String getDeletionScript(KeyInformation.IndexRetriever informations, String storename, IndexMutation mutation) throws PermanentBackendException { - StringBuilder script = new StringBuilder(); + private Map getDeletionParams(KeyInformation.IndexRetriever informations, String storename, IndexMutation mutation) throws PermanentBackendException { + Map params = new HashMap(); + List singles = new ArrayList(); + params.put("singles", singles); + List> lists = new ArrayList>(); + params.put("lists", lists); + List op; for (IndexEntry deletion : mutation.getDeletions()) { KeyInformation keyInformation = informations.get(storename).get(deletion.field); - switch (keyInformation.getCardinality()) { case SINGLE: - script.append("ctx._source.remove(\"" + deletion.field + "\");"); + singles.add(deletion.field); if (hasDualStringMapping(informations.get(storename, deletion.field))) { - script.append("ctx._source.remove(\"" + getDualMappingName(deletion.field) + "\");"); + singles.add(getDualMappingName(deletion.field)); } break; case SET: case LIST: String jsValue = convertToJsType(deletion.value); - script.append("def index = ctx._source[\"" + deletion.field + "\"].indexOf(" + jsValue + "); ctx._source[\"" + deletion.field + "\"].remove(index);"); + op = new ArrayList(2); + op.add(deletion.field); + op.add(jsValue); + lists.add(op); if (hasDualStringMapping(informations.get(storename, deletion.field))) { - script.append("def index = ctx._source[\"" + getDualMappingName(deletion.field) + "\"].indexOf(" + jsValue + "); ctx._source[\"" + getDualMappingName(deletion.field) + "\"].remove(index);"); + op = new ArrayList(2); + op.add(getDualMappingName(deletion.field)); + op.add(jsValue); + lists.add(op); } break; } } - return script.toString(); + return params; } - private String getAdditionScript(KeyInformation.IndexRetriever informations, String storename, IndexMutation mutation) throws PermanentBackendException { - StringBuilder script = new StringBuilder(); + private Map getAdditionParams(KeyInformation.IndexRetriever informations, String storename, IndexMutation mutation) throws PermanentBackendException { + Map params = new HashMap(); + List> singles = new ArrayList>(); + params.put("singles", singles); + List> lists = new ArrayList>(); + params.put("lists", lists); + List op; for (IndexEntry e : mutation.getAdditions()) { KeyInformation keyInformation = informations.get(storename).get(e.field); + String jsValue = convertToJsType(e.value); switch (keyInformation.getCardinality()) { case SINGLE: - script.append("ctx._source[\"" + e.field + "\"] = " + convertToJsType(e.value) + ";"); + op = new ArrayList(2); + op.add(e.field); + op.add(jsValue); + singles.add(op); if (hasDualStringMapping(keyInformation)) { - script.append("ctx._source[\"" + getDualMappingName(e.field) + "\"] = " + convertToJsType(e.value) + ";"); + op = new ArrayList(2); + op.add(getDualMappingName(e.field)); + op.add(jsValue); + singles.add(op); } break; case SET: case LIST: - script.append("if(ctx._source[\"" + e.field + "\"] == null) {ctx._source[\"" + e.field + "\"] = []};"); - script.append("ctx._source[\"" + e.field + "\"].add(" + convertToJsType(e.value) + ");"); + op = new ArrayList(2); + op.add(e.field); + op.add(jsValue); + lists.add(op); if (hasDualStringMapping(keyInformation)) { - script.append("if(ctx._source[\"" + getDualMappingName(e.field) + "\"] == null) {ctx._source[\"" + e.field + "\"] = []};"); - script.append("ctx._source[\"" + getDualMappingName(e.field) + "\"].add(" + convertToJsType(e.value) + ");"); + op = new ArrayList(2); + op.add(getDualMappingName(e.field)); + op.add(jsValue); + lists.add(op); } break; - } - } - return script.toString(); + return params; } + private static String convertToJsType(Object value) throws PermanentBackendException { try { XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); @@ -703,43 +741,65 @@ private static String convertToJsType(Object value) throws PermanentBackendExcep } catch (IOException e) { throw new PermanentBackendException("Could not write json"); } - - } public void restore(Map>> documents, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { - BulkRequestBuilder bulk = client.prepareBulk(); int requests = 0; - try { - for (Map.Entry>> stores : documents.entrySet()) { - String store = stores.getKey(); - - for (Map.Entry> entry : stores.getValue().entrySet()) { - String docID = entry.getKey(); - List content = entry.getValue(); - - if (content == null || content.size() == 0) { - // delete - if (log.isTraceEnabled()) - log.trace("Deleting entire document {}", docID); + Bulk.Builder bulk = new Bulk.Builder() + .defaultIndex(this.indexName); + + for (Map.Entry>> stores : documents.entrySet()) { + String store = stores.getKey(); + for (Map.Entry> entry : stores.getValue().entrySet()) { + String id = entry.getKey(); + List content = entry.getValue(); + if (content == null || content.size() == 0) { + if (log.isTraceEnabled()) { + log.trace("Deleting entire document {}", id); + } - bulk.add(new DeleteRequest(indexName, store, docID)); - requests++; - } else { - // Add - if (log.isTraceEnabled()) - log.trace("Adding entire document {}", docID); - bulk.add(new IndexRequest(indexName, store, docID).source(getNewDocument(content, informations.get(store), IndexMutation.determineTTL(content)))); - requests++; + Delete delete = new Delete.Builder(id).type(store).build(); + bulk.addAction(delete); + requests++; + } else { + String source; + if (log.isTraceEnabled()) { + log.trace("Adding entire document {}", id); + } + try { + source = getNewDocument( + content, + informations.get(store), + IndexMutation.determineTTL(content) + ).string(); + } catch (IOException e) { + throw new PermanentBackendException( + "Could not render json for restore request", e); } + Index index = new Index.Builder(source) + .type(store) + .id(id) + .build(); + + bulk.addAction(index); + requests++; } } + } - if (requests > 0) - bulk.execute().actionGet(); - } catch (Exception e) { - throw convert(e); + if (requests > 0) { + JestResult result; + bulk.setParameter("timeout", this.timeout); + try { + result = this.client.execute(bulk.build()); + } catch (IOException e) { + throw new TemporaryBackendException( + "ES transport exception during bulk restore", e); + } + if (!result.isSucceeded()) { + throw new PermanentBackendException(formatException("bulk restore", result)); + } } } @@ -874,41 +934,50 @@ public FilterBuilder getFilter(Condition condition, KeyInformation.StoreRetri @Override public List query(IndexQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { - SearchRequestBuilder srb = client.prepareSearch(indexName); - srb.setTypes(query.getStore()); - srb.setQuery(QueryBuilders.matchAllQuery()); - srb.setPostFilter(getFilter(query.getCondition(),informations.get(query.getStore()))); + String type = query.getStore(); + SearchSourceBuilder ssb = new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery()) + .postFilter(getFilter(query.getCondition(),informations.get(type))) + .from(0) + .size(query.hasLimit() ? query.getLimit() : this.maxResultsSize) + .noFields(); + if (!query.getOrder().isEmpty()) { List orders = query.getOrder(); for (int i = 0; i < orders.size(); i++) { IndexQuery.OrderEntry orderEntry = orders.get(i); - FieldSortBuilder fsb = new FieldSortBuilder(orders.get(i).getKey()) - .order(orderEntry.getOrder() == Order.ASC ? SortOrder.ASC : SortOrder.DESC); - if (useDeprecatedIgnoreUnmapped) { - fsb.ignoreUnmapped(true); - } else { - Class datatype = orderEntry.getDatatype(); - fsb.unmappedType(convertToEsDataType(datatype)); - } - srb.addSort(fsb); + String unmapped = convertToEsDataType(orderEntry.getDatatype()); + FieldSortBuilder fsb = new FieldSortBuilder(orderEntry.getKey()) + .order(orderEntry.getOrder() == Order.ASC ? SortOrder.ASC : SortOrder.DESC) + .unmappedType(unmapped); + + ssb.sort(fsb); } } - srb.setFrom(0); - if (query.hasLimit()) srb.setSize(query.getLimit()); - else srb.setSize(maxResultsSize); - srb.setNoFields(); - //srb.setExplain(true); - - SearchResponse response = srb.execute().actionGet(); - log.debug("Executed query [{}] in {} ms", query.getCondition(), response.getTookInMillis()); - SearchHits hits = response.getHits(); - if (!query.hasLimit() && hits.totalHits() >= maxResultsSize) - log.warn("Query result set truncated to first [{}] elements for query: {}", maxResultsSize, query); - List result = new ArrayList(hits.hits().length); - for (SearchHit hit : hits) { - result.add(hit.id()); + + Search search = new Search.Builder(ssb.toString()) + .setParameter("timeout", this.timeout) + .addType(type) + .addIndex(this.indexName) + .build(); + + JestResult result; + try { + result = this.client.execute(search); + } catch (IOException e) { + throw new TemporaryBackendException("ES transport error in search", e); + } + if (!result.isSucceeded()) { + throw new PermanentBackendException(formatException("search", result)); + } + + JsonArray hits = result.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits"); + List results = new ArrayList(hits.size()); + for (JsonElement hit : hits) { + results.add(hit.getAsJsonObject().get("_id").getAsString()); } - return result; + + return results; } private String convertToEsDataType(Class datatype) { @@ -945,26 +1014,39 @@ else if (Geoshape.class.isAssignableFrom(datatype)) { @Override public Iterable> query(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { - SearchRequestBuilder srb = client.prepareSearch(indexName); - srb.setTypes(query.getStore()); - srb.setQuery(QueryBuilders.queryStringQuery(query.getQuery())); - - srb.setFrom(query.getOffset()); - if (query.hasLimit()) srb.setSize(query.getLimit()); - else srb.setSize(maxResultsSize); - srb.setNoFields(); - //srb.setExplain(true); - - SearchResponse response = srb.execute().actionGet(); - log.debug("Executed query [{}] in {} ms", query.getQuery(), response.getTookInMillis()); - SearchHits hits = response.getHits(); - if (!query.hasLimit() && hits.totalHits() >= maxResultsSize) - log.warn("Query result set truncated to first [{}] elements for query: {}", maxResultsSize, query); - List> result = new ArrayList>(hits.hits().length); - for (SearchHit hit : hits) { - result.add(new RawQuery.Result(hit.id(),hit.getScore())); + SearchSourceBuilder ssb = new SearchSourceBuilder() + .query(QueryBuilders.queryStringQuery(query.getQuery())) + .from(query.getOffset()) + .size(query.hasLimit() ? query.getLimit() : this.maxResultsSize) + .noFields(); + + Search search = new Search.Builder(ssb.toString()) + .setParameter("timeout", this.timeout) + .addType(query.getStore()) + .addIndex(this.indexName) + .build(); + + JestResult result; + try { + result = this.client.execute(search); + } catch (IOException e) { + throw new TemporaryBackendException("ES transport error in search", e); + } + if (!result.isSucceeded()) { + throw new PermanentBackendException(formatException("search", result)); } - return result; + + JsonArray hits = result.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits"); + List> results = new ArrayList>(hits.size()); + for (JsonElement hit : hits) { + JsonObject obj = hit.getAsJsonObject(); + results.add(new RawQuery.Result( + obj.get("_id").getAsString(), + obj.get("_score").getAsDouble() + )); + } + + return results; } @Override @@ -1029,25 +1111,14 @@ public BaseTransactionConfigurable beginTransaction(BaseTransactionConfig config @Override public void close() throws BackendException { - - if (node != null && !node.isClosed()) { - node.close(); - } - client.close(); - + client.shutdownClient(); } @Override public void clearStorage() throws BackendException { try { - try { - client.admin().indices() - .delete(new DeleteIndexRequest(indexName)).actionGet(); - // We wait for one second to let ES delete the river - Thread.sleep(1000); - } catch (IndexMissingException e) { - // Index does not exist... Fine - } + Delete deleteIndex = new Delete.Builder(indexName).build(); + client.execute(deleteIndex); } catch (Exception e) { throw new PermanentBackendException("Could not delete index " + indexName, e); } finally { @@ -1055,13 +1126,6 @@ public void clearStorage() throws BackendException { } } - /** - * Exposed for testing - */ - Node getNode() { - return node; - } - private void checkExpectedClientVersion() { /* * This is enclosed in a catch block to prevent an unchecked exception diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchSetup.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchSetup.java deleted file mode 100644 index 3fdfede8ab..0000000000 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchSetup.java +++ /dev/null @@ -1,310 +0,0 @@ -// Copyright 2017 JanusGraph Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.janusgraph.diskstorage.es; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import org.janusgraph.diskstorage.configuration.ConfigNamespace; -import org.janusgraph.diskstorage.configuration.ConfigOption; -import org.janusgraph.diskstorage.configuration.Configuration; -import org.janusgraph.util.system.IOUtils; -import org.apache.commons.lang.StringUtils; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.node.Node; -import org.elasticsearch.node.NodeBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.lang.reflect.Array; -import java.util.List; -import java.util.Map; - -import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.*; - -/** - * Create an ES {@link org.elasticsearch.client.transport.TransportClient} or - * {@link org.elasticsearch.node.Node} from a JanusGraph - * {@link org.janusgraph.diskstorage.configuration.Configuration}. - *

- * TransportClient assumes that an ES cluster is already running. It does not attempt - * to start an embedded ES instance. It just connects to whatever hosts are given in - * {@link org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration#INDEX_HOSTS}. - *

- * Node can be configured to either behave strictly as a client or as both a client - * and ES data node. The latter is essentially a fully-fledged ES cluster node embedded in JanusGraph. - * Node can also be configured to use either network or JVM local transport. - * In practice, JVM local transport is usually only useful for testing. Most deployments - * will use the network transport. - *

- * Setting arbitrary ES options is supported with both TransportClient and Node - * via {@link org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration#INDEX_CONF_FILE}. - * When this is set, it will be opened as an ordinary file and the contents will be - * parsed as Elasticsearch settings. These settings override JanusGraph's defaults but - * options explicitly provided in JanusGraph's config file (e.g. setting an explicit value for - * {@link org.janusgraph.diskstorage.es.ElasticSearchIndex#CLIENT_ONLY} in - * JanusGraph's properties will override any value that might be in the ES settings file). - *

- * After loading the index conf file (when provided), any key-value pairs under the - * {@link org.janusgraph.diskstorage.es.ElasticSearchIndex#ES_EXTRAS_NS} namespace - * are copied into the Elasticsearch settings builder. This allows overridding arbitrary - * ES settings from within the JanusGraph properties file. Settings in the ext namespace take - * precedence over those in the index conf file. - *

- * After loading the index conf file and any key-value pairs under the ext namespace, - * JanusGraph checks for ConfigOptions defined in - * {@link org.janusgraph.diskstorage.es.ElasticSearchIndex} - * that correspond directly to ES settings and copies them into the ES settings builder. - */ -public enum ElasticSearchSetup { - - /** - * Start an ES TransportClient connected to - * {@link org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration#INDEX_HOSTS}. - */ - TRANSPORT_CLIENT { - @Override - public Connection connect(Configuration config) throws IOException { - log.debug("Configuring TransportClient"); - - ImmutableSettings.Builder settingsBuilder = settingsBuilder(config); - - if (config.has(ElasticSearchIndex.CLIENT_SNIFF)) { - String k = "client.transport.sniff"; - settingsBuilder.put(k, config.get(ElasticSearchIndex.CLIENT_SNIFF)); - log.debug("Set {}: {}", k, config.get(ElasticSearchIndex.CLIENT_SNIFF)); - } - - TransportClient tc = new TransportClient(settingsBuilder.build()); - int defaultPort = config.has(INDEX_PORT) ? config.get(INDEX_PORT) : ElasticSearchIndex.HOST_PORT_DEFAULT; - for (String host : config.get(INDEX_HOSTS)) { - String[] hostparts = host.split(":"); - String hostname = hostparts[0]; - int hostport = defaultPort; - if (hostparts.length == 2) hostport = Integer.parseInt(hostparts[1]); - log.info("Configured remote host: {} : {}", hostname, hostport); - tc.addTransportAddress(new InetSocketTransportAddress(hostname, hostport)); - } - return new Connection(null, tc); - } - }, - - /** - * Start an ES {@code Node} and use its attached {@code Client}. - */ - NODE { - @Override - public Connection connect(Configuration config) throws IOException { - - log.debug("Configuring Node Client"); - - ImmutableSettings.Builder settingsBuilder = settingsBuilder(config); - - if (config.has(ElasticSearchIndex.TTL_INTERVAL)) { - String k = "indices.ttl.interval"; - settingsBuilder.put(k, config.get(ElasticSearchIndex.TTL_INTERVAL)); - log.debug("Set {}: {}", k, config.get(ElasticSearchIndex.TTL_INTERVAL)); - } - - makeLocalDirsIfNecessary(settingsBuilder, config); - - NodeBuilder nodeBuilder = NodeBuilder.nodeBuilder().settings(settingsBuilder.build()); - - // Apply explicit JanusGraph properties file overrides (otherwise conf-file or ES defaults apply) - if (config.has(ElasticSearchIndex.CLIENT_ONLY)) { - boolean clientOnly = config.get(ElasticSearchIndex.CLIENT_ONLY); - nodeBuilder.client(clientOnly).data(!clientOnly); - } - - if (config.has(ElasticSearchIndex.LOCAL_MODE)) - nodeBuilder.local(config.get(ElasticSearchIndex.LOCAL_MODE)); - - if (config.has(ElasticSearchIndex.LOAD_DEFAULT_NODE_SETTINGS)) - nodeBuilder.loadConfigSettings(config.get(ElasticSearchIndex.LOAD_DEFAULT_NODE_SETTINGS)); - - Node node = nodeBuilder.node(); - Client client = node.client(); - return new Connection(node, client); - } - }; - - /** - * Build and setup a new ES settings builder by consulting all JanusGraph config options - * relevant to TransportClient or Node. Options may be specific to a single client, - * but in this case they have no effect/are ignored on the other client. - *

- * This method creates a new ES ImmutableSettings.Builder, then carries out the following - * operations on that settings builder in the listed order: - * - *

    - *
  1. Enable client.transport.ignore_cluster_name in the settings builder
  2. - *
  3. If conf-file is set, open it using a FileInputStream and load its contents into the settings builder
  4. - *
  5. Apply any settings in the ext.* meta namespace
  6. - *
  7. If cluster-name is set, copy that value to cluster.name in the settings builder
  8. - *
  9. If ignore-cluster-name is set, copy that value to client.transport.ignore_cluster_name in the settings builder
  10. - *
  11. If client-sniff is set, copy that value to client.transport.sniff in the settings builder
  12. - *
  13. If ttl-interval is set, copy that volue to indices.ttl.interval in the settings builder
  14. - *
  15. Unconditionally set script.disable_dynamic to false (i.e. enable dynamic scripting)
  16. - *
- * - * This method then returns the builder. - * - * @param config a JanusGraph configuration possibly containing Elasticsearch index settings - * @return ES settings builder configured according to the {@code config} parameter - * @throws java.io.IOException if conf-file was set but could not be read - */ - private static ImmutableSettings.Builder settingsBuilder(Configuration config) throws IOException { - - ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder(); - - // Set JanusGraph defaults - settings.put("client.transport.ignore_cluster_name", true); - - // Apply overrides from ES conf file - applySettingsFromFile(settings, config, INDEX_CONF_FILE); - - // Apply ext.* overrides from JanusGraph conf file - applySettingsFromJanusGraphConf(settings, config, ElasticSearchIndex.ES_EXTRAS_NS); - - // Apply individual JanusGraph ConfigOptions that map to ES settings - - if (config.has(ElasticSearchIndex.CLUSTER_NAME)) { - String clustername = config.get(ElasticSearchIndex.CLUSTER_NAME); - Preconditions.checkArgument(StringUtils.isNotBlank(clustername), "Invalid cluster name: %s", clustername); - String k = "cluster.name"; - settings.put(k, clustername); - log.debug("Set {}: {}", k, clustername); - } - - if (config.has(ElasticSearchIndex.IGNORE_CLUSTER_NAME)) { - boolean ignoreClusterName = config.get(ElasticSearchIndex.IGNORE_CLUSTER_NAME); - String k = "client.transport.ignore_cluster_name"; - settings.put(k, ignoreClusterName); - log.debug("Set {}: {}", k, ignoreClusterName); - } - - // Force-enable dynamic scripting. This is probably only useful in Node mode. - String disableScriptsKey = "script.disable_dynamic"; - String disableScriptsVal = settings.get(disableScriptsKey); - if (null != disableScriptsVal && !"false".equals(disableScriptsVal)) { - log.warn("JanusGraph requires Elasticsearch dynamic scripting. Setting {} to false. " + - "Dynamic scripting must be allowed in the Elasticsearch cluster configuration.", - disableScriptsKey); - } - settings.put(disableScriptsKey, false); - log.debug("Set {}: {}", disableScriptsKey, false); - - return settings; - } - - static void applySettingsFromFile(ImmutableSettings.Builder settings, - Configuration config, - ConfigOption confFileOption) throws FileNotFoundException { - if (config.has(confFileOption)) { - String confFile = config.get(confFileOption); - log.debug("Loading Elasticsearch settings from file {}", confFile); - InputStream confStream = null; - try { - confStream = new FileInputStream(confFile); - settings.loadFromStream(confFile, confStream); - } finally { - IOUtils.closeQuietly(confStream); - } - } else { - log.debug("Option {} is not set; not attempting to load Elasticsearch settings from file", confFileOption); - } - } - - static void applySettingsFromJanusGraphConf(ImmutableSettings.Builder settings, - Configuration config, - ConfigNamespace rootNS) { - int keysLoaded = 0; - Map configSub = config.getSubset(rootNS); - for (Map.Entry entry : configSub.entrySet()) { - String key = entry.getKey(); - Object val = entry.getValue(); - if (null == val) continue; - if (List.class.isAssignableFrom(val.getClass())) { - // Pretty print lists using comma-separated values and no surrounding square braces for ES - List l = (List) val; - settings.put(key, Joiner.on(",").join(l)); - } else if (val.getClass().isArray()) { - // As with Lists, but now for arrays - // The Object copy[] business lets us avoid repetitive primitive array type checking and casting - Object copy[] = new Object[Array.getLength(val)]; - for (int i= 0; i < copy.length; i++) { - copy[i] = Array.get(val, i); - } - settings.put(key, Joiner.on(",").join(copy)); - } else { - // Copy anything else unmodified - settings.put(key, val.toString()); - } - log.debug("[ES ext.* cfg] Set {}: {}", key, val); - keysLoaded++; - } - log.debug("Loaded {} settings from the {} JanusGraph config namespace", keysLoaded, rootNS); - } - - - private static void makeLocalDirsIfNecessary(ImmutableSettings.Builder settingsBuilder, Configuration config) { - if (config.has(INDEX_DIRECTORY)) { - String dataDirectory = config.get(INDEX_DIRECTORY); - File f = new File(dataDirectory); - if (!f.exists()) { - log.info("Creating ES directory prefix: {}", f); - f.mkdirs(); - } - for (String sub : ElasticSearchIndex.DATA_SUBDIRS) { - String subdir = dataDirectory + File.separator + sub; - f = new File(subdir); - if (!f.exists()) { - log.info("Creating ES {} directory: {}", sub, f); - f.mkdirs(); - } - settingsBuilder.put("path." + sub, subdir); - log.debug("Set ES {} directory: {}", sub, f); - } - } - } - - private static final Logger log = LoggerFactory.getLogger(ElasticSearchSetup.class); - - public abstract Connection connect(Configuration config) throws IOException; - - public static class Connection { - - private final Node node; - private final Client client; - - public Connection(Node node, Client client) { - this.node = node; - this.client = client; - Preconditions.checkNotNull(this.client, "Unable to instantiate Elasticsearch Client object"); - // node may be null - } - - public Node getNode() { - return node; - } - - public Client getClient() { - return client; - } - } -} diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/BerkeleyElasticsearchTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/BerkeleyElasticsearchTest.java index 4a876c8278..4ce20ab825 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/BerkeleyElasticsearchTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/BerkeleyElasticsearchTest.java @@ -45,7 +45,6 @@ public WriteConfiguration getConfiguration() { //Add index config.set(INDEX_BACKEND,"elasticsearch",INDEX); config.set(LOCAL_MODE,true,INDEX); - config.set(CLIENT_ONLY,false,INDEX); config.set(INDEX_DIRECTORY, StorageSetup.getHomeDir("es"), INDEX); return config.getConfiguration(); diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticSearchConfigTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticSearchConfigTest.java index be52f90683..0681d4d904 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticSearchConfigTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticSearchConfigTest.java @@ -89,34 +89,6 @@ public void testJanusGraphFactoryBuilder() graph.close(); } - @Test - public void testTransportClient() throws BackendException, InterruptedException { - ElasticsearchRunner esr = new ElasticsearchRunner(".", "transportClient.yml"); - esr.start(); - ModifiableConfiguration config = GraphDatabaseConfiguration.buildGraphConfiguration(); - config.set(INTERFACE, ElasticSearchSetup.TRANSPORT_CLIENT.toString(), INDEX_NAME); - config.set(INDEX_HOSTS, new String[]{ "127.0.0.1" }, INDEX_NAME); - Configuration indexConfig = config.restrictTo(INDEX_NAME); - IndexProvider idx = new ElasticSearchIndex(indexConfig); - simpleWriteAndQuery(idx); - idx.close(); - - config = GraphDatabaseConfiguration.buildGraphConfiguration(); - config.set(INTERFACE, ElasticSearchSetup.TRANSPORT_CLIENT.toString(), INDEX_NAME); - config.set(INDEX_HOSTS, new String[]{ "10.11.12.13" }, INDEX_NAME); - indexConfig = config.restrictTo(INDEX_NAME); - Throwable failure = null; - try { - idx = new ElasticSearchIndex(indexConfig); - } catch (Throwable t) { - failure = t; - } - // idx.close(); - Assert.assertNotNull("ES client failed to throw exception on connection failure", failure); - - esr.stop(); - } - @Test public void testLocalNodeUsingExt() throws BackendException, InterruptedException { @@ -134,7 +106,6 @@ public void testLocalNodeUsingExt() throws BackendException, InterruptedExceptio ModifiableConfiguration config = new ModifiableConfiguration(GraphDatabaseConfiguration.ROOT_NS, cc, BasicConfiguration.Restriction.NONE); - config.set(INTERFACE, ElasticSearchSetup.NODE.toString(), INDEX_NAME); Configuration indexConfig = config.restrictTo(INDEX_NAME); IndexProvider idx = new ElasticSearchIndex(indexConfig); simpleWriteAndQuery(idx); @@ -157,7 +128,6 @@ public void testLocalNodeUsingExtAndIndexDirectory() throws BackendException, In ModifiableConfiguration config = new ModifiableConfiguration(GraphDatabaseConfiguration.ROOT_NS, cc, BasicConfiguration.Restriction.NONE); - config.set(INTERFACE, ElasticSearchSetup.NODE.toString(), INDEX_NAME); config.set(INDEX_DIRECTORY, baseDir, INDEX_NAME); Configuration indexConfig = config.restrictTo(INDEX_NAME); IndexProvider idx = new ElasticSearchIndex(indexConfig); @@ -175,7 +145,6 @@ public void testLocalNodeUsingYaml() throws BackendException, InterruptedExcepti assertFalse(new File(baseDir + File.separator + "data").exists()); ModifiableConfiguration config = GraphDatabaseConfiguration.buildGraphConfiguration(); - config.set(INTERFACE, ElasticSearchSetup.NODE.toString(), INDEX_NAME); config.set(INDEX_CONF_FILE, Joiner.on(File.separator).join("target", "test-classes", "es_jvmlocal.yml"), INDEX_NAME); Configuration indexConfig = config.restrictTo(INDEX_NAME); @@ -199,7 +168,6 @@ public void testNetworkNodeUsingExt() throws BackendException, InterruptedExcept ModifiableConfiguration config = new ModifiableConfiguration(GraphDatabaseConfiguration.ROOT_NS, cc, BasicConfiguration.Restriction.NONE); - config.set(INTERFACE, ElasticSearchSetup.NODE.toString(), INDEX_NAME); Configuration indexConfig = config.restrictTo(INDEX_NAME); IndexProvider idx = new ElasticSearchIndex(indexConfig); simpleWriteAndQuery(idx); @@ -208,8 +176,6 @@ public void testNetworkNodeUsingExt() throws BackendException, InterruptedExcept cc.set("index." + INDEX_NAME + ".elasticsearch.ext.discovery.zen.ping.unicast.hosts", "10.11.12.13"); config = new ModifiableConfiguration(GraphDatabaseConfiguration.ROOT_NS, cc, BasicConfiguration.Restriction.NONE); - config.set(INTERFACE, ElasticSearchSetup.NODE.toString(), INDEX_NAME); - config.set(HEALTH_REQUEST_TIMEOUT, "5s", INDEX_NAME); indexConfig = config.restrictTo(INDEX_NAME); Throwable failure = null; @@ -228,7 +194,6 @@ public void testNetworkNodeUsingYaml() throws BackendException, InterruptedExcep ElasticsearchRunner esr = new ElasticsearchRunner(".", "networkNodeUsingYaml.yml"); esr.start(); ModifiableConfiguration config = GraphDatabaseConfiguration.buildGraphConfiguration(); - config.set(INTERFACE, ElasticSearchSetup.NODE.toString(), INDEX_NAME); config.set(INDEX_CONF_FILE, Joiner.on(File.separator).join("target", "test-classes", "es_cfg_nodeclient.yml"), INDEX_NAME); Configuration indexConfig = config.restrictTo(INDEX_NAME); @@ -237,8 +202,6 @@ public void testNetworkNodeUsingYaml() throws BackendException, InterruptedExcep idx.close(); config = GraphDatabaseConfiguration.buildGraphConfiguration(); - config.set(INTERFACE, ElasticSearchSetup.NODE.toString(), INDEX_NAME); - config.set(HEALTH_REQUEST_TIMEOUT, "5s", INDEX_NAME); config.set(INDEX_CONF_FILE, Joiner.on(File.separator).join("target", "test-classes", "es_cfg_bogus_nodeclient.yml"), INDEX_NAME); indexConfig = config.restrictTo(INDEX_NAME); @@ -266,7 +229,6 @@ public void testIndexCreationOptions() throws InterruptedException, BackendExcep ModifiableConfiguration config = new ModifiableConfiguration(GraphDatabaseConfiguration.ROOT_NS, cc, BasicConfiguration.Restriction.NONE); - config.set(INTERFACE, ElasticSearchSetup.NODE.toString(), INDEX_NAME); Configuration indexConfig = config.restrictTo(INDEX_NAME); IndexProvider idx = new ElasticSearchIndex(indexConfig); simpleWriteAndQuery(idx); diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticSearchIndexTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticSearchIndexTest.java index acbb310c21..0dfe1039c5 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticSearchIndexTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ElasticSearchIndexTest.java @@ -67,7 +67,6 @@ public static final Configuration getLocalESTestConfig() { final String index = "es"; ModifiableConfiguration config = GraphDatabaseConfiguration.buildGraphConfiguration(); config.set(LOCAL_MODE, true, index); - config.set(CLIENT_ONLY, false, index); config.set(TTL_INTERVAL, "5s", index); config.set(GraphDatabaseConfiguration.INDEX_DIRECTORY, StorageSetup.getHomeDir("es"), index); return config.restrictTo(index); @@ -106,7 +105,6 @@ public void testConfiguration() throws BackendException { final String index = "es"; ModifiableConfiguration config = GraphDatabaseConfiguration.buildGraphConfiguration(); config.set(LOCAL_MODE, true, index); - config.set(CLIENT_ONLY, true, index); config.set(INDEX_HOSTS, new String[] { "10.0.0.1" }, index); config.set(GraphDatabaseConfiguration.INDEX_DIRECTORY, StorageSetup.getHomeDir("es"), index); Configuration indexConfig = config.restrictTo(index); @@ -116,7 +114,6 @@ public void testConfiguration() throws BackendException { config = GraphDatabaseConfiguration.buildGraphConfiguration(); config.set(LOCAL_MODE, false, index); - config.set(CLIENT_ONLY, true, index); config.set(INDEX_HOSTS, new String[] { "10.0.0.1" }, index); config.set(GraphDatabaseConfiguration.INDEX_DIRECTORY, StorageSetup.getHomeDir("es"), index); indexConfig = config.restrictTo(index); @@ -136,7 +133,6 @@ public void testConfigurationFile() throws BackendException { final String index = "es"; ModifiableConfiguration config = GraphDatabaseConfiguration.buildGraphConfiguration(); config.set(LOCAL_MODE, true, index); - config.set(CLIENT_ONLY, true, index); config.set(INDEX_CONF_FILE, Joiner.on(File.separator).join("target", "test-classes", "es_nodename_foo.yml"), index); config.set(GraphDatabaseConfiguration.INDEX_DIRECTORY, StorageSetup.getHomeDir("es"), index); Configuration indexConfig = config.restrictTo(index); @@ -148,7 +144,6 @@ public void testConfigurationFile() throws BackendException { config = GraphDatabaseConfiguration.buildGraphConfiguration(); config.set(LOCAL_MODE, true, index); - config.set(CLIENT_ONLY, true, index); config.set(INDEX_CONF_FILE, Joiner.on(File.separator).join("target", "test-classes", "es_nodename_bar.yml"), index); config.set(GraphDatabaseConfiguration.INDEX_DIRECTORY, StorageSetup.getHomeDir("es"), index); indexConfig = config.restrictTo(index); diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ThriftElasticsearchTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ThriftElasticsearchTest.java index e55dfb5895..859cc53332 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ThriftElasticsearchTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/ThriftElasticsearchTest.java @@ -23,7 +23,6 @@ import org.junit.BeforeClass; import static org.janusgraph.CassandraStorageSetup.*; -import static org.janusgraph.diskstorage.es.ElasticSearchIndex.CLIENT_ONLY; import static org.janusgraph.diskstorage.es.ElasticSearchIndex.LOCAL_MODE; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_BACKEND; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_DIRECTORY; @@ -41,7 +40,6 @@ public WriteConfiguration getConfiguration() { //Add index config.set(INDEX_BACKEND,"elasticsearch",INDEX); config.set(LOCAL_MODE,true,INDEX); - config.set(CLIENT_ONLY,false,INDEX); config.set(INDEX_DIRECTORY, StorageSetup.getHomeDir("es"),INDEX); return config.getConfiguration(); }