forked from mstein/elasticsearch-grails-plugin
-
Notifications
You must be signed in to change notification settings - Fork 83
/
ClientNodeFactoryBean.groovy
250 lines (219 loc) · 11.2 KB
/
ClientNodeFactoryBean.groovy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
/*
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package grails.plugins.elasticsearch
import org.elasticsearch.Version
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.mapper.attachments.MapperAttachmentsPlugin
import org.elasticsearch.node.Node
import org.elasticsearch.plugins.Plugin
import org.elasticsearch.transport.client.PreBuiltTransportClient
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.FactoryBean
import org.springframework.core.io.Resource
import org.springframework.core.io.support.PathMatchingResourcePatternResolver
import org.elasticsearch.node.InternalSettingsPreparer
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
class ClientNodeFactoryBean implements FactoryBean {
static final SUPPORTED_MODES = ['local', 'transport', 'node', 'dataNode']
private static final Logger LOG = LoggerFactory.getLogger(this)
ElasticSearchContextHolder elasticSearchContextHolder
def node
Object getObject() {
// Retrieve client mode, default is "node"
String clientMode = elasticSearchContextHolder.config.client.mode ?: 'node'
if (!(clientMode in SUPPORTED_MODES)) {
throw new IllegalArgumentException("Invalid client mode, expected values were ${SUPPORTED_MODES}.")
}
Settings.Builder settings = Settings.builder()
def configFile = elasticSearchContextHolder.config.bootstrap.config.file
if (configFile) {
LOG.info "Looking for bootstrap configuration file at: $configFile"
Resource resource = new PathMatchingResourcePatternResolver().getResource(configFile)
settings = settings.loadFromStream(configFile, resource.inputStream)
}
def transportClient
// Cluster name
if (elasticSearchContextHolder.config.cluster.name) {
settings.put('cluster.name', elasticSearchContextHolder.config.cluster.name)
}
// Path to the data folder of ES
def dataPath = elasticSearchContextHolder.config.path.data
if (dataPath) {
settings.put('path.data', dataPath as String)
LOG.info "Using ElasticSearch data path: ${dataPath}"
}
// Configure the client based on the client mode
switch (clientMode) {
case 'transport':
def transportSettingsBuilder = Settings.builder()
def transportSettingsFile = elasticSearchContextHolder.config.bootstrap.transportSettings.file
if (transportSettingsFile) {
Resource resource = new PathMatchingResourcePatternResolver().getResource(transportSettingsFile)
transportSettingsBuilder.loadFromStream(transportSettingsFile, resource.inputStream)
}
// Use the "sniff" feature of transport client ?
if (elasticSearchContextHolder.config.client.transport.sniff) {
transportSettingsBuilder.put("client.transport.sniff", false)
}
if (elasticSearchContextHolder.config.cluster.name) {
transportSettingsBuilder.put('cluster.name', elasticSearchContextHolder.config.cluster.name.toString())
}
def transportSettings = transportSettingsBuilder.build()
transportClient = new PreBuiltTransportClient(transportSettings, Collections.emptyList());
boolean ip4Enabled = elasticSearchContextHolder.config.shield.ip4Enabled ?: true
boolean ip6Enabled = elasticSearchContextHolder.config.shield.ip6Enabled ?: false
try {
def shield = Class.forName("org.elasticsearch.shield.ShieldPlugin")
transportClient = new PreBuiltTransportClient(transportSettings, Collections.singletonList(shield));
LOG.info("Shield Enabled")
} catch (ClassNotFoundException e) {
transportClient = new PreBuiltTransportClient(transportSettings, Collections.emptyList());
}
// Configure transport addresses
if (!elasticSearchContextHolder.config.client.hosts) {
transportClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress('localhost', 9300)))
} else {
elasticSearchContextHolder.config.client.hosts.each {
try {
for (InetAddress address : InetAddress.getAllByName(it.host)) {
if ((ip6Enabled && address instanceof Inet6Address) || (ip4Enabled && address instanceof Inet4Address)) {
LOG.info("Adding host: ${address}:${it.port}")
transportClient.addTransportAddress(new InetSocketTransportAddress(address, it.port));
}
}
} catch (UnknownHostException e) {
LOG.error("Unable to get the host", e.getMessage());
}
}
}
break
case 'local':
// Determines how the data is stored (on disk, in memory, ...)
def storeType = elasticSearchContextHolder.config.index.store.type
if (storeType) {
settings.put('index.store.type', storeType as String)
LOG.debug "Local ElasticSearch client with store type of ${storeType} configured."
} else {
LOG.debug "Local ElasticSearch client with default store type configured."
}
def gatewayType = elasticSearchContextHolder.config.gateway.type
if (gatewayType) {
settings.put('gateway.type', gatewayType as String)
LOG.debug "Local ElasticSearch client with gateway type of ${gatewayType} configured."
} else {
LOG.debug "Local ElasticSearch client with default gateway type configured."
}
def queryParsers = elasticSearchContextHolder.config.index.queryparser
if (queryParsers) {
queryParsers.each { type, clz ->
settings.put("index.queryparser.types.${type}".toString(), clz)
}
}
def pluginsDirectory = elasticSearchContextHolder.config.path.plugins
if (pluginsDirectory) {
settings.put('path.plugins', new File(pluginsDirectory as String).absolutePath)
}
// Path to the config folder of ES
def confDirectory = elasticSearchContextHolder.config.path.conf
if (confDirectory) {
settings.put('path.conf', confDirectory as String)
}
def tmpDirectory = tmpDirectory()
LOG.info "Setting embedded ElasticSearch tmp dir to ${tmpDirectory}"
settings.put("path.home", tmpDirectory)
//settings.put("node.local", true)
settings.put("transport.type", "local")
settings.put("http.enabled", false)
break
case 'dataNode':
def storeType = elasticSearchContextHolder.config.index.store.type
if (storeType) {
settings.put('index.store.type', storeType as String)
LOG.debug "DataNode ElasticSearch client with store type of ${storeType} configured."
} else {
LOG.debug "DataNode ElasticSearch client with default store type configured."
}
def queryParsers = elasticSearchContextHolder.config.index.queryparser
if (queryParsers) {
queryParsers.each { type, clz ->
settings.put("index.queryparser.types.${type}".toString(), clz)
}
}
if (elasticSearchContextHolder.config.discovery.zen.ping.unicast.hosts) {
settings.put("discovery.zen.ping.unicast.hosts", elasticSearchContextHolder.config.discovery.zen.ping.unicast.hosts)
}
settings.put("node.client", false)
settings.put("node.data", true)
break
case 'node':
default:
settings.put("node.client", true)
break
}
if (transportClient) {
return transportClient
}
//Inject http settings...
if (elasticSearchContextHolder.config.http) {
flattenMap(elasticSearchContextHolder.config.http).each { p ->
settings.put("http.${p.key}", p.value as String)
}
}
// Avoiding this:
node = new PluginEnabledNode(settings, MapperAttachmentsPlugin)
node.start()
def client = node.client()
// Wait for the cluster to become alive.
// LOG.info "Waiting for ElasticSearch GREEN status."
// client.admin().cluster().health(new ClusterHealthRequest().waitForGreenStatus()).actionGet()
return client
}
//From http://groovy.329449.n5.nabble.com/Flatten-Map-using-closure-td364360.html
def flattenMap(map) {
[:].putAll(map.entrySet().flatten {
it.value instanceof Map ? it.value.collect { k, v -> new MapEntry(it.key + '.' + k, v) } : it
})
}
Class getObjectType() {
return org.elasticsearch.client.Client
}
boolean isSingleton() {
return true
}
def shutdown() {
if (elasticSearchContextHolder.config.client.mode == 'local' || elasticSearchContextHolder.config.client.mode == 'dataNode' && node) {
LOG.info "Stopping embedded ElasticSearch."
node.close() // close() seems to be more appropriate than stop()
}
}
private String tmpDirectory() {
String baseDirectory = System.getProperty("java.io.tmpdir") ?: '/tmp'
Path path = Files.createTempDirectory(Paths.get(baseDirectory), 'elastic-data-' + new Date().time)
File file = path.toFile()
file.deleteOnExit()
return file.absolutePath
}
private static class PluginEnabledNode extends Node {
PluginEnabledNode(Settings.Builder settings, Class<? extends Plugin> ... plugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings.build(), null), plugins as List<Plugin>)
}
}
}