diff --git a/janusgraph-es/pom.xml b/janusgraph-es/pom.xml
index 4dd1bef656..2d8acf0d6d 100644
--- a/janusgraph-es/pom.xml
+++ b/janusgraph-es/pom.xml
@@ -66,7 +66,24 @@
antlr-runtime${antlr.version}
-
+
+
+ com.google.code.gson
+ gson
+ 2.2.4
+
+
+ io.searchbox
+ jest
+ 2.0.4
+
+
+ com.google.code.gson
+ gson
+
+
+
diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java
index fe9eb09989..321d22f551 100644
--- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java
+++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java
@@ -14,11 +14,30 @@
package org.janusgraph.diskstorage.es;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.searchbox.client.JestResult;
+import io.searchbox.client.JestClient;
+import io.searchbox.client.JestClientFactory;
+import io.searchbox.client.config.HttpClientConfig;
+import io.searchbox.core.Bulk;
+import io.searchbox.core.BulkResult;
+import io.searchbox.core.Delete;
+import io.searchbox.core.Index;
+import io.searchbox.core.Search;
+import io.searchbox.core.Update;
+import io.searchbox.indices.CreateIndex;
+import io.searchbox.indices.OpenIndex;
+import io.searchbox.indices.aliases.AddAliasMapping;
+import io.searchbox.indices.aliases.ModifyAliases;
+import io.searchbox.indices.mapping.PutMapping;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.core.attribute.*;
@@ -43,32 +62,13 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.update.UpdateRequestBuilder;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.*;
import org.elasticsearch.indices.IndexMissingException;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.script.ScriptService;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
@@ -100,121 +100,148 @@ public class ElasticSearchIndex implements IndexProvider {
public static final ConfigNamespace ELASTICSEARCH_NS =
new ConfigNamespace(INDEX_NS, "elasticsearch", "Elasticsearch index configuration");
- public static final ConfigOption CLIENT_ONLY =
- new ConfigOption(ELASTICSEARCH_NS, "client-only",
- "The Elasticsearch node.client option is set to this boolean value, and the Elasticsearch node.data " +
- "option is set to the negation of this value. True creates a thin client which holds no data. False " +
- "creates a regular Elasticsearch cluster node that may store data.",
- ConfigOption.Type.GLOBAL_OFFLINE, true);
+ public static final ConfigOption DISCOVERY_FREQUENCY = new ConfigOption(
+ ELASTICSEARCH_NS,
+ "discovery-frequency",
+ "How long to wait between polling for node discovery",
+ ConfigOption.Type.MASKABLE,
+ 10l
+ );
+
+ public static final ConfigOption TTL_INTERVAL =
+ new ConfigOption(ELASTICSEARCH_NS, "ttl-interval",
+ "The period of time between runs of ES's bulit-in expired document deleter. " +
+ "This string will become the value of ES's indices.ttl.interval setting and should " +
+ "be formatted accordingly, e.g. 5s or 60s.", ConfigOption.Type.MASKABLE, "5s");
public static final ConfigOption CLUSTER_NAME =
new ConfigOption(ELASTICSEARCH_NS, "cluster-name",
"The name of the Elasticsearch cluster. This should match the \"cluster.name\" setting " +
"in the Elasticsearch nodes' configuration.", ConfigOption.Type.GLOBAL_OFFLINE, "elasticsearch");
- public static final ConfigOption LOCAL_MODE =
- new ConfigOption(ELASTICSEARCH_NS, "local-mode",
- "On the legacy config track, this option chooses between starting a TransportClient (false) or " +
- "a Node with JVM-local transport and local data (true). On the interface config track, this option " +
- "is considered by (but optional for) the Node client and ignored by the TransportClient. See the manual " +
- "for more information about ES config tracks.",
- ConfigOption.Type.GLOBAL_OFFLINE, false);
-
public static final ConfigOption CLIENT_SNIFF =
new ConfigOption(ELASTICSEARCH_NS, "sniff",
- "Whether to enable cluster sniffing. This option only applies to the TransportClient. " +
- "Enabling this option makes the TransportClient attempt to discover other cluster nodes " +
+ "Whether to enable cluster sniffing." +
+ "Enabling this option makes the client attempt to discover other cluster nodes " +
"besides those in the initial host list provided at startup.", ConfigOption.Type.MASKABLE, true);
- public static final ConfigOption INTERFACE =
- new ConfigOption<>(ELASTICSEARCH_NS, "interface",
- "Whether to connect to ES using the Node or Transport client (see the \"Talking to Elasticsearch\" " +
- "section of the ES manual for discussion of the difference). Setting this option enables the " +
- "interface config track (see manual for more information about ES config tracks).",
- ConfigOption.Type.MASKABLE, String.class, ElasticSearchSetup.TRANSPORT_CLIENT.toString(),
- disallowEmpty(String.class));
-
- public static final ConfigOption IGNORE_CLUSTER_NAME =
- new ConfigOption(ELASTICSEARCH_NS, "ignore-cluster-name",
- "Whether to bypass validation of the cluster name of connected nodes. " +
- "This option is only used on the interface configuration track (see manual for " +
- "information about ES config tracks).", ConfigOption.Type.MASKABLE, true);
+ public static final ConfigOption LOCAL_MODE =
+ new ConfigOption(ELASTICSEARCH_NS, "local-mode",
+ "Whether or not to boot a local node. Mostly for testing.",
+ ConfigOption.Type.GLOBAL_OFFLINE, false);
- public static final ConfigOption TTL_INTERVAL =
- new ConfigOption(ELASTICSEARCH_NS, "ttl-interval",
- "The period of time between runs of ES's bulit-in expired document deleter. " +
- "This string will become the value of ES's indices.ttl.interval setting and should " +
- "be formatted accordingly, e.g. 5s or 60s.", ConfigOption.Type.MASKABLE, "5s");
+ private static final ConfigOption ES_TRANSPORT_SCHEME = new ConfigOption(
+ ELASTICSEARCH_NS,
+ "transport-scheme",
+ "What scheme (http or https) to use for communications with Elasticsearch",
+ ConfigOption.Type.MASKABLE,
+ "https");
+
+ public static final ConfigOption TIMEOUT = new ConfigOption(
+ ELASTICSEARCH_NS,
+ "timeout",
+ "How long to wait (in ms) for normal operations to complete",
+ ConfigOption.Type.MASKABLE,
+ 30000
+ );
+
+ public static final ConfigOption METADATA_TIMEOUT = new ConfigOption(
+ ELASTICSEARCH_NS,
+ "metadata-timeout",
+ "How long to wait (in ms) for metadata operations to complete",
+ ConfigOption.Type.MASKABLE,
+ 30000
+ );
+
+ public static final ConfigOption SHARD_COUNT = new ConfigOption(
+ ELASTICSEARCH_NS,
+ "shard-count",
+ "How many primary shards to create per Elasticsearch index",
+ ConfigOption.Type.MASKABLE,
+ 5
+ );
+
+ public static final ConfigOption REPLICA_COUNT = new ConfigOption(
+ ELASTICSEARCH_NS,
+ "replica-count",
+ "How many replicas to create per Elasticsearch index",
+ ConfigOption.Type.MASKABLE,
+ 2
+ );
+
+ private static final String deletionScript = ""
+ + "singles.each { "
+ + " ctx._source.remove((String)it); "
+ + "}; "
+ + "lists.each { "
+ + " def index = ctx._source[(String)it[0]].indexOf(it[1]); "
+ + " ctx._source[(String)it[0]].remove(index); "
+ + "};";
+
+ private static final String additionScript = ""
+ + "singles.each { "
+ + " ctx._source[it[0]] = it[1]; "
+ + "}; "
+ + "lists.each { "
+ + " if (ctx._source[it[0]] == null) { "
+ + " ctx._source[it[0]] = []; "
+ + " }; "
+ + " ctx._source[it[0]].add(it[1]); "
+ + "};";
- public static final ConfigOption HEALTH_REQUEST_TIMEOUT =
- new ConfigOption(ELASTICSEARCH_NS, "health-request-timeout",
- "When JanusGraph initializes its ES backend, JanusGraph waits up to this duration for the " +
- "ES cluster health to reach at least yellow status. " +
- "This string should be formatted as a natural number followed by the lowercase letter " +
- "\"s\", e.g. 3s or 60s.", ConfigOption.Type.MASKABLE, "30s");
-
- public static final ConfigOption LOAD_DEFAULT_NODE_SETTINGS =
- new ConfigOption(ELASTICSEARCH_NS, "load-default-node-settings",
- "Whether ES's Node client will internally attempt to load default configuration settings " +
- "from system properties/process environment variables. Only meaningful when using the Node " +
- "client (has no effect with TransportClient).", ConfigOption.Type.MASKABLE, true);
-
- public static final ConfigOption USE_EDEPRECATED_IGNORE_UNMAPPED_OPTION =
- new ConfigOption<>(ELASTICSEARCH_NS, "use-deprecated-ignore-unmapped-option",
- "Elasticsearch versions before 1.4.0 supported the \"ignore_unmapped\" sort option. " +
- "In 1.4.0, it was deprecated by the new \"unmapped_type\" sort option. This configuration" +
- "setting controls which ES option JanusGraph uses: false for the newer \"unmapped_type\"," +
- "true for the older \"ignore_unmapped\".", ConfigOption.Type.MASKABLE, false);
-
- public static final ConfigNamespace ES_EXTRAS_NS =
- new ConfigNamespace(ELASTICSEARCH_NS, "ext", "Overrides for arbitrary elasticsearch.yaml settings", true);
-
- public static final ConfigNamespace ES_CREATE_NS =
- new ConfigNamespace(ELASTICSEARCH_NS, "create", "Settings related to index creation");
-
- public static final ConfigOption CREATE_SLEEP =
- new ConfigOption(ES_CREATE_NS, "sleep",
- "How long to sleep, in milliseconds, between the successful completion of a (blocking) index " +
- "creation request and the first use of that index. This only applies when creating an index in ES, " +
- "which typically only happens the first time JanusGraph is started on top of ES. If the index JanusGraph is " +
- "configured to use already exists, then this setting has no effect.", ConfigOption.Type.MASKABLE, 200L);
-
- public static final ConfigNamespace ES_CREATE_EXTRAS_NS =
- new ConfigNamespace(ES_CREATE_NS, "ext", "Overrides for arbitrary settings applied at index creation", true);
private static final IndexFeatures ES_FEATURES = new IndexFeatures.Builder().supportsDocumentTTL()
.setDefaultStringMapping(Mapping.TEXT).supportedStringMappings(Mapping.TEXT, Mapping.TEXTSTRING, Mapping.STRING).setWildcardField("_all").supportsCardinality(Cardinality.SINGLE).supportsCardinality(Cardinality.LIST).supportsCardinality(Cardinality.SET).supportsNanoseconds().build();
- public static final int HOST_PORT_DEFAULT = 9300;
- private final Node node;
- private final Client client;
+ public static final int HOST_PORT_DEFAULT = 9200;
+
private final String indexName;
private final int maxResultsSize;
- private final boolean useDeprecatedIgnoreUnmapped;
+ private final Configuration configuration;
+ private final JestClient client;
+ private final String metadataTimeout;
+ private final String timeout;
public ElasticSearchIndex(Configuration config) {
- indexName = config.get(INDEX_NAME);
- useDeprecatedIgnoreUnmapped = config.get(USE_EDEPRECATED_IGNORE_UNMAPPED_OPTION);
+ this.configuration = config;
+ this.metadataTimeout = config.get(METADATA_TIMEOUT).toString() + "ms";
+ this.timeout = config.get(TIMEOUT).toString() + "ms";
+ this.indexName = config.get(INDEX_NAME);
+ this.maxResultsSize = config.get(INDEX_MAX_RESULT_SET_SIZE);
+ this.client = createClient(config);
+ }
- checkExpectedClientVersion();
- final ElasticSearchSetup.Connection c;
- if (!config.has(INTERFACE)) {
- c = legacyConfiguration(config);
- } else {
- c = interfaceConfiguration(config);
+ private JestClient createClient(Configuration config) {
+ JestClient client;
+ final String scheme = config.get(ES_TRANSPORT_SCHEME);
+ final Integer port = config.has(INDEX_PORT) ? config.get(INDEX_PORT) : HOST_PORT_DEFAULT;
+ final List serverUris = new ArrayList();
+ String clientKey = "";
+ for (String host : config.get(INDEX_HOSTS)) {
+ String uri = scheme + "://" + host + ":" + port.toString();
+ serverUris.add(uri);
+ clientKey += uri;
}
- node = c.getNode();
- client = c.getClient();
-
- maxResultsSize = config.get(INDEX_MAX_RESULT_SET_SIZE);
- log.debug("Configured ES query result set max size to {}", maxResultsSize);
-
- client.admin().cluster().prepareHealth().setTimeout(config.get(HEALTH_REQUEST_TIMEOUT))
- .setWaitForYellowStatus().execute().actionGet();
+ JestClientFactory factory = new JestClientFactory();
+ HttpClientConfig clientConfig = new HttpClientConfig.Builder(serverUris)
+ .multiThreaded(true)
+ .discoveryEnabled(config.has(CLIENT_SNIFF) && config.get(CLIENT_SNIFF))
+ .discoveryFrequency(config.get(DISCOVERY_FREQUENCY), TimeUnit.SECONDS)
+ .readTimeout(Math.max(config.get(TIMEOUT), config.get(METADATA_TIMEOUT)))
+ .build();
+ factory.setHttpClientConfig(clientConfig);
+ return factory.getObject();
+ }
- checkForOrCreateIndex(config);
+ private static String formatException(String command, JestResult result) {
+ return String.format(
+ "Unknown ES error in %s. Status code is %d; message is %s",
+ command,
+ result.getResponseCode(),
+ result.getErrorMessage()
+ );
}
/**
@@ -228,153 +255,71 @@ public ElasticSearchIndex(Configuration config) {
* @param config the config for this ElasticSearchIndex
* @throws java.lang.IllegalArgumentException if the index could not be created
*/
- private void checkForOrCreateIndex(Configuration config) {
- Preconditions.checkState(null != client);
-
- //Create index if it does not already exist
- IndicesExistsResponse response = client.admin().indices().exists(new IndicesExistsRequest(indexName)).actionGet();
- if (!response.isExists()) {
-
- ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder();
-
- ElasticSearchSetup.applySettingsFromJanusGraphConf(settings, config, ES_CREATE_EXTRAS_NS);
-
- CreateIndexResponse create = client.admin().indices().prepareCreate(indexName)
- .setSettings(settings.build()).execute().actionGet();
- try {
- final long sleep = config.get(CREATE_SLEEP);
- log.debug("Sleeping {} ms after {} index creation returned from actionGet()", sleep, indexName);
- Thread.sleep(sleep);
- } catch (InterruptedException e) {
- throw new JanusGraphException("Interrupted while waiting for index to settle in", e);
- }
- if (!create.isAcknowledged()) throw new IllegalArgumentException("Could not create index: " + indexName);
- }
- }
-
-
- /**
- * Configure ElasticSearchIndex's ES client according to semantics introduced in
- * 0.5.1. Allows greater flexibility than the previous config semantics. See
- * {@link org.janusgraph.diskstorage.es.ElasticSearchSetup} for more
- * information.
- *
- * This is activated by setting an explicit value for {@link #INTERFACE} in
- * the JanusGraph configuration.
- *
- * @see #legacyConfiguration(org.janusgraph.diskstorage.configuration.Configuration)
- * @param config a config passed to ElasticSearchIndex's constructor
- * @return a node and client object open and ready for use
- */
- private ElasticSearchSetup.Connection interfaceConfiguration(Configuration config) {
- ElasticSearchSetup clientMode = ConfigOption.getEnumValue(config.get(INTERFACE), ElasticSearchSetup.class);
+ private void checkForOrCreateIndex() throws BackendException {
+ JestResult result;
+ OpenIndex oi = new OpenIndex.Builder(indexName)
+ .setParameter("master_timeout", this.metadataTimeout)
+ .setParameter("timeout", this.metadataTimeout)
+ .build();
try {
- return clientMode.connect(config);
+ result = this.client.execute(oi);
} catch (IOException e) {
- throw new JanusGraphException(e);
+ throw new TemporaryBackendException("ES transport error in OpenIndex");
}
- }
+ if (!result.isSucceeded()) {
+ if (result.getResponseCode() == 404) {
+ /*
+ * Use aliases for operational support; actual index name is
+ * suffixed with the index creation time.
+ */
+ Long now = System.currentTimeMillis();
+ String localIndexName = this.indexName + ":" + now.toString();
+ Map sb = new HashMap();
+ sb.put("number_of_shards", this.configuration.get(SHARD_COUNT));
+ sb.put("number_of_replicas", this.configuration.get(REPLICA_COUNT));
+
+ CreateIndex ci = new CreateIndex.Builder(localIndexName)
+ .settings(sb)
+ .setParameter("master_timeout", metadataTimeout)
+ .setParameter("timeout", metadataTimeout)
+ .build();
- /**
- * Configure ElasticSearchIndex's ES client according to 0.4.x - 0.5.0 semantics.
- * This checks local-mode first. If local-mode is true, then it creates a Node that
- * uses JVM local transport and can't talk over the network. If local-mode is
- * false, then it creates a TransportClient that can talk over the network and
- * uses {@link org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration#INDEX_HOSTS}
- * as the server addresses. Note that this configuration method
- * does not allow creating a Node that talks over the network.
- *
- * This is activated by not setting an explicit value for {@link #INTERFACE} in the
- * JanusGraph configuration.
- *
- * @see #interfaceConfiguration(org.janusgraph.diskstorage.configuration.Configuration)
- * @param config a config passed to ElasticSearchIndex's constructor
- * @return a node and client object open and ready for use
- */
- private ElasticSearchSetup.Connection legacyConfiguration(Configuration config) {
- Node node;
- Client client;
-
- if (config.get(LOCAL_MODE)) {
-
- log.debug("Configuring ES for JVM local transport");
-
- boolean clientOnly = config.get(CLIENT_ONLY);
- boolean local = config.get(LOCAL_MODE);
-
- NodeBuilder builder = NodeBuilder.nodeBuilder();
- Preconditions.checkArgument(config.has(INDEX_CONF_FILE) || config.has(INDEX_DIRECTORY),
- "Must either configure configuration file or base directory");
- if (config.has(INDEX_CONF_FILE)) {
- String configFile = config.get(INDEX_CONF_FILE);
- ImmutableSettings.Builder sb = ImmutableSettings.settingsBuilder();
- log.debug("Configuring ES from YML file [{}]", configFile);
- FileInputStream fis = null;
try {
- fis = new FileInputStream(configFile);
- sb.loadFromStream(configFile, fis);
- builder.settings(sb.build());
- } catch (FileNotFoundException e) {
- throw new JanusGraphException(e);
- } finally {
- IOUtils.closeQuietly(fis);
+ result = this.client.execute(ci);
+ } catch (IOException e) {
+ throw new TemporaryBackendException("ES transport error in CreateIndex");
}
- } else {
- String dataDirectory = config.get(INDEX_DIRECTORY);
- log.debug("Configuring ES with data directory [{}]", dataDirectory);
- File f = new File(dataDirectory);
- if (!f.exists()) f.mkdirs();
- ImmutableSettings.Builder b = ImmutableSettings.settingsBuilder();
- for (String sub : DATA_SUBDIRS) {
- String subdir = dataDirectory + File.separator + sub;
- f = new File(subdir);
- if (!f.exists()) f.mkdirs();
- b.put("path." + sub, subdir);
+ if (!result.isSucceeded()) {
+ throw new PermanentBackendException(formatException("CreateIndex", result));
}
- b.put("script.disable_dynamic", false);
- b.put("indices.ttl.interval", "5s");
- builder.settings(b.build());
+ AddAliasMapping am = new AddAliasMapping.Builder(localIndexName, this.indexName)
+ .build();
- String clustername = config.get(CLUSTER_NAME);
- Preconditions.checkArgument(StringUtils.isNotBlank(clustername), "Invalid cluster name: %s", clustername);
- builder.clusterName(clustername);
- }
+ ModifyAliases ma = new ModifyAliases.Builder(am)
+ .setParameter("master_timeout", metadataTimeout)
+ .setParameter("timeout", metadataTimeout)
+ .build();
- node = builder.client(clientOnly).data(!clientOnly).local(local).node();
- client = node.client();
-
- } else {
- log.debug("Configuring ES for network transport");
- ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder();
- if (config.has(CLUSTER_NAME)) {
- String clustername = config.get(CLUSTER_NAME);
- Preconditions.checkArgument(StringUtils.isNotBlank(clustername), "Invalid cluster name: %s", clustername);
- settings.put("cluster.name", clustername);
+ try {
+ result = this.client.execute(ma);
+ } catch (IOException e) {
+ log.error("Index {} is was orphaned by alias creation failure!", localIndexName);
+ throw new TemporaryBackendException("ES transport error in add alias");
+ }
+ if (!result.isSucceeded()) {
+ log.error("Index {} is was orphaned by alias creation failure!", localIndexName);
+ throw new PermanentBackendException(formatException("AddAlias", result));
+ }
} else {
- settings.put("client.transport.ignore_cluster_name", true);
- }
- log.debug("Transport sniffing enabled: {}", config.get(CLIENT_SNIFF));
- settings.put("client.transport.sniff", config.get(CLIENT_SNIFF));
- settings.put("script.disable_dynamic", false);
- TransportClient tc = new TransportClient(settings.build());
- int defaultPort = config.has(INDEX_PORT)?config.get(INDEX_PORT):HOST_PORT_DEFAULT;
- for (String host : config.get(INDEX_HOSTS)) {
- String[] hostparts = host.split(":");
- String hostname = hostparts[0];
- int hostport = defaultPort;
- if (hostparts.length == 2) hostport = Integer.parseInt(hostparts[1]);
- log.info("Configured remote host: {} : {}", hostname, hostport);
- tc.addTransportAddress(new InetSocketTransportAddress(hostname, hostport));
+ throw new PermanentBackendException(formatException("OpenIndex", result));
}
- client = tc;
- node = null;
}
-
- return new ElasticSearchSetup.Connection(node, client);
}
+
+
private BackendException convert(Exception esException) {
if (esException instanceof InterruptedException) {
return new TemporaryBackendException("Interrupted while waiting for response", esException);
@@ -389,33 +334,42 @@ private static String getDualMappingName(String key) {
@Override
public void register(String store, String key, KeyInformation information, BaseTransaction tx) throws BackendException {
+ checkForOrCreateIndex();
XContentBuilder mapping;
+ String source;
Class> dataType = information.getDataType();
Mapping map = Mapping.getMapping(information);
- Preconditions.checkArgument(map==Mapping.DEFAULT || AttributeUtil.isString(dataType),
- "Specified illegal mapping [%s] for data type [%s]",map,dataType);
+ Preconditions.checkArgument(
+ map == Mapping.DEFAULT || AttributeUtil.isString(dataType),
+ "Specified illegal mapping [%s] for data type [%s]",
+ map,
+ dataType
+ );
try {
mapping = XContentFactory.jsonBuilder().
- startObject().
- startObject(store).
- field(TTL_FIELD, new HashMap() {{
- put("enabled", true);
- }}).
- startObject("properties").
- startObject(key);
+ startObject().
+ startObject(store).
+ field(TTL_FIELD, new HashMap() {{
+ put("enabled", true);
+ }}).
+ startObject("properties").
+ startObject(key);
if (AttributeUtil.isString(dataType)) {
- if (map==Mapping.DEFAULT) map=Mapping.TEXT;
+ if (map == Mapping.DEFAULT) {
+ map = Mapping.TEXT;
+ }
+
log.debug("Registering string type for {} with mapping {}", key, map);
mapping.field("type", "string");
switch (map) {
case STRING:
- mapping.field("index","not_analyzed");
+ mapping.field("index", "not_analyzed");
break;
case TEXT:
//default, do nothing
- break;
+ break;
case TEXTSTRING:
mapping.endObject();
//add string mapping
@@ -462,16 +416,21 @@ public void register(String store, String key, KeyInformation information, BaseT
}
mapping.endObject().endObject().endObject().endObject();
-
+ source = mapping.string();
} catch (IOException e) {
throw new PermanentBackendException("Could not render json for put mapping request", e);
}
+ PutMapping putMapping = new PutMapping.Builder(this.indexName, store, source)
+ .setParameter("master_timeout", this.metadataTimeout)
+ .setParameter("timeout", this.metadataTimeout)
+ .build();
+
+ JestResult result;
try {
- PutMappingResponse response = client.admin().indices().preparePutMapping(indexName).
- setIgnoreConflicts(false).setType(store).setSource(mapping).execute().actionGet();
- } catch (Exception e) {
- throw convert(e);
+ result = this.client.execute(putMapping);
+ } catch (IOException e) {
+ throw new TemporaryBackendException("ES transport error in PutMapping", e);
}
}
@@ -558,136 +517,215 @@ private static Object convertToEsType(Object value) {
@Override
public void mutate(Map> mutations, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException {
- BulkRequestBuilder brb = client.prepareBulk();
-
- int bulkrequests = 0;
- try {
- for (Map.Entry> stores : mutations.entrySet()) {
- String storename = stores.getKey();
- for (Map.Entry entry : stores.getValue().entrySet()) {
-
- String docid = entry.getKey();
- IndexMutation mutation = entry.getValue();
- assert mutation.isConsolidated();
- Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted()));
- Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions());
- Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions());
- //Deletions first
- if (mutation.hasDeletions()) {
- if (mutation.isDeleted()) {
- log.trace("Deleting entire document {}", docid);
- brb.add(new DeleteRequest(indexName, storename, docid));
- } else {
- String script = getDeletionScript(informations, storename, mutation);
- brb.add(client.prepareUpdate(indexName, storename, docid).setScript(script, ScriptService.ScriptType.INLINE));
- log.trace("Adding script {}", script);
+ String source;
+ int requests = 0;
+ Bulk.Builder bulk = new Bulk.Builder()
+ .defaultIndex(this.indexName);
+ for (Map.Entry> stores : mutations.entrySet()) {
+ String store = stores.getKey();
+ for (Map.Entry entry : stores.getValue().entrySet()) {
+ String id = entry.getKey();
+ IndexMutation mutation = entry.getValue();
+ assert mutation.isConsolidated();
+ Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted()));
+ Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions());
+ Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions());
+ if (mutation.hasDeletions()) {
+ requests++;
+ if (mutation.isDeleted()) {
+ log.trace("Deleting entire document {}", id);
+ Delete delete = new Delete.Builder(id).type(store).build();
+ bulk.addAction(delete);
+ } else {
+ try {
+ source = XContentFactory.jsonBuilder().startObject()
+ .field("lang", "groovy")
+ .field("script", deletionScript)
+ .field("params", getDeletionParams(informations, store, mutation))
+ .string();
+ log.trace("Adding script {}", deletionScript);
+ } catch (IOException e) {
+ throw new PermanentBackendException(
+ "Could not render json for mutate request", e);
}
- bulkrequests++;
+ Update update = new Update.Builder(source)
+ .type(store)
+ .id(id)
+ .build();
+
+ bulk.addAction(update);
}
- if (mutation.hasAdditions()) {
- int ttl = mutation.determineTTL();
-
- if (mutation.isNew()) { //Index
- log.trace("Adding entire document {}", docid);
- brb.add(new IndexRequest(indexName, storename, docid)
- .source(getNewDocument(mutation.getAdditions(), informations.get(storename), ttl)));
-
- } else {
- Preconditions.checkArgument(ttl == 0, "Elasticsearch only supports TTL on new documents [%s]", docid);
-
- boolean needUpsert = !mutation.hasDeletions();
- String script = getAdditionScript(informations, storename, mutation);
- UpdateRequestBuilder update = client.prepareUpdate(indexName, storename, docid).setScript(script, ScriptService.ScriptType.INLINE);
- if (needUpsert) {
- XContentBuilder doc = getNewDocument(mutation.getAdditions(), informations.get(storename), ttl);
- update.setUpsert(doc);
+ }
+
+ if (mutation.hasAdditions()) {
+ requests++;
+ int ttl = mutation.determineTTL();
+ if (mutation.isNew()) {
+ log.trace("Adding entire document {}", id);
+ try {
+ source = getNewDocument(
+ mutation.getAdditions(),
+ informations.get(store),
+ ttl
+ ).string();
+ } catch (IOException e) {
+ throw new PermanentBackendException(
+ "Could not render json for mutate request", e);
+ }
+ Index index = new Index.Builder(source)
+ .type(store)
+ .id(id)
+ .build();
+
+ bulk.addAction(index);
+ } else {
+ Preconditions.checkArgument(
+ ttl == 0,
+ "Elasticsearch only supports TTL on new documents [%s]",
+ id
+ );
+ try {
+ XContentBuilder json = XContentFactory.jsonBuilder().startObject()
+ .field("lang", "groovy")
+ .field("script", additionScript)
+ .field("params", getAdditionParams(informations, store, mutation));
+
+ if (!mutation.hasDeletions()) {
+ XContentBuilder doc = getNewDocument(
+ mutation.getAdditions(),
+ informations.get(store),
+ ttl
+ );
+ json.field("upsert", doc);
}
- brb.add(update);
- log.trace("Adding script {}", script);
+ source = json.string();
+ log.trace("Adding script {}", additionScript);
+ } catch (IOException e) {
+ throw new PermanentBackendException(
+ "Could not render json for mutate request", e);
}
- bulkrequests++;
- }
+ Update update = new Update.Builder(source)
+ .type(store)
+ .id(id)
+ .build();
+ bulk.addAction(update);
+ }
}
}
- if (bulkrequests > 0) {
- BulkResponse bulkItemResponses = brb.execute().actionGet();
- if (bulkItemResponses.hasFailures()) {
- boolean actualFailure = false;
- for(BulkItemResponse response : bulkItemResponses.getItems()) {
- //The document may have been deleted, which is OK
- if(response.isFailed() && response.getFailure().getStatus() != RestStatus.NOT_FOUND) {
- log.error("Failed to execute ES query {}", response.getFailureMessage());
- actualFailure = true;
- }
- }
- if(actualFailure) {
- throw new Exception(bulkItemResponses.buildFailureMessage());
+ }
+
+ if (requests > 0) {
+ BulkResult result;
+ bulk.setParameter("timeout", this.timeout);
+ try {
+ result = this.client.execute(bulk.build());
+ } catch (IOException e) {
+ throw new TemporaryBackendException("ES transport error in bulk mutate", e);
+ }
+ if (!result.isSucceeded()) {
+ boolean actualFailure = false;
+ for (BulkResult.BulkResultItem item : result.getItems()) {
+ if (null != item.error && item.status != 404) {
+ actualFailure = true;
+ log.warn(String.format(
+ "Mutate (%s) on item %s in index %s failed with code %d and error %s",
+ item.operation,
+ item.id,
+ item.index,
+ item.status,
+ item.error
+ ));
}
}
+ if (actualFailure) {
+ throw new PermanentBackendException(formatException("bulk mutate", result));
+ }
}
- } catch (Exception e) {
- log.error("Failed to execute ES query {}", brb.request().timeout(), e);
- throw convert(e);
}
}
- private String getDeletionScript(KeyInformation.IndexRetriever informations, String storename, IndexMutation mutation) throws PermanentBackendException {
- StringBuilder script = new StringBuilder();
+ private Map getDeletionParams(KeyInformation.IndexRetriever informations, String storename, IndexMutation mutation) throws PermanentBackendException {
+ Map params = new HashMap();
+ List singles = new ArrayList();
+ params.put("singles", singles);
+ List> lists = new ArrayList>();
+ params.put("lists", lists);
+ List