Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for transforms #284

Merged
merged 27 commits into from
Mar 19, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0858720
initial commit
Cristigeo Mar 6, 2023
8985ad1
support for Update
Cristigeo Mar 6, 2023
0c72f4a
adding missing properties for transform
Cristigeo Mar 6, 2023
91e5a0b
misc corrections
Cristigeo Mar 6, 2023
abb261b
some documentation
Cristigeo Mar 7, 2023
2f32dc8
support for transform start/stop
Cristigeo Mar 7, 2023
df3201b
addiong options on the client calls for transform start/stop
Cristigeo Mar 7, 2023
7840c93
initial commit
Cristigeo Mar 6, 2023
de26951
support for Update
Cristigeo Mar 6, 2023
db04052
adding missing properties for transform
Cristigeo Mar 6, 2023
b9d0a87
misc corrections
Cristigeo Mar 6, 2023
a19faaf
some documentation
Cristigeo Mar 7, 2023
8c2fb9a
support for transform start/stop
Cristigeo Mar 7, 2023
603e586
addiong options on the client calls for transform start/stop
Cristigeo Mar 7, 2023
b17dcae
Merge branch '49-support-for-transforms' of https://github.com/Cristi…
Cristigeo Mar 9, 2023
1d9aa97
code updates based on feedback
Cristigeo Mar 9, 2023
2f3b5e8
updated documentation (added md template)
Cristigeo Mar 9, 2023
e497cd9
timeout query param is only available from v7.17.0
Cristigeo Mar 9, 2023
c040977
fixed the messed up md file
Cristigeo Mar 9, 2023
8fc867c
settings promoted to individual arguments; drift detection for updata…
Cristigeo Mar 11, 2023
1b5db02
check versions before using features
Cristigeo Mar 12, 2023
a00822c
updated doc
Cristigeo Mar 12, 2023
4f4eb56
support for terraform import
Cristigeo Mar 12, 2023
e2ebb79
minor corrections/updates
Cristigeo Mar 14, 2023
b900984
corrected min version for align_checkpoints
Cristigeo Mar 14, 2023
0bedc94
updates on acc tests
Cristigeo Mar 14, 2023
3f447ea
removed some defaults; updated docs; updated acc test
Cristigeo Mar 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions docs/resources/elasticsearch_transform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: 'elasticstack_elasticsearch_transform Resource - terraform-provider-elasticstack'
subcategory: ''
Cristigeo marked this conversation as resolved.
Show resolved Hide resolved
description: |-
Manages Elasticsearch transforms. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/transforms.html
---

# elasticstack_elasticsearch_transform (Resource)

Manages Elasticsearch transforms. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/transforms.html

## Example Usage

```terraform
resource "elasticstack_elasticsearch_transform" "transform_with_pivot" {
name = "transform-pivot"
description = "A meaningful description"

source {
indices = ["name_or_pattern_for_input_index"]
}

destination {
index = "destination_index_for_transform"
}

pivot = jsonencode({
"group_by" : {
"customer_id" : {
"terms" : {
"field" : "customer_id",
"missing_bucket" : true
}
}
},
"aggregations" : {
"max_price" : {
"max" : {
"field" : "taxful_total_price"
}
}
}
})

frequency = "5m"

retention_policy {
time {
field = "order_date"
max_age = "30d"
}
}

sync {
time {
field = "order_date"
delay = "10s"
}
}

enabled = false

defer_validation = false
}
```

<!-- schema generated by tfplugindocs -->

## Schema

### Required

- `destination` (Block List, Min: 1, Max: 1) The destination for the transform. (see [below for nested schema](#nestedblock--destination))
- `name` (String) Name of the transform you wish to create.
- `source` (Block List, Min: 1, Max: 1) The source of the data for the transform. (see [below for nested schema](#nestedblock--source))

### Optional

- `defer_validation` (Boolean) When true, deferrable validations are not run upon creation, but rather when the transform is started. This behavior may be desired if the source index does not exist until after the transform is created.
- `description` (String) Free text description of the transform.
- `elasticsearch_connection` (Block List, Max: 1, Deprecated) Elasticsearch connection configuration block. This property will be removed in a future provider version. Configure the Elasticsearch connection via the provider configuration instead. (see [below for nested schema](#nestedblock--elasticsearch_connection))
- `enabled` (Boolean) Controls wether the transform is started or stopped. Default is `false` (stopped).
- `frequency` (String) The interval between checks for changes in the source indices when the transform is running continuously. Defaults to `1m`.
- `latest` (String) The latest method transforms the data by finding the latest document for each unique key. JSON definition expected. Either 'pivot' or 'latest' must be present.
- `metadata` (String) Defines optional transform metadata.
- `pivot` (String) The pivot method transforms the data by aggregating and grouping it. JSON definition expected. Either 'pivot' or 'latest' must be present.
- `retention_policy` (Block List, Max: 1) Defines a retention policy for the transform. (see [below for nested schema](#nestedblock--retention_policy))
- `settings` (Block List, Max: 1) Defines optional transform settings. (see [below for nested schema](#nestedblock--settings))
- `sync` (Block List, Max: 1) Defines the properties transforms require to run continuously. (see [below for nested schema](#nestedblock--sync))
- `timeout` (String) Period to wait for a response from Elastisearch when performing any management operation. If no response is received before the timeout expires, the operation fails and returns an error. Defaults to `30s`.

### Read-Only

- `id` (String) Internal identifier of the resource

<a id="nestedblock--destination"></a>

### Nested Schema for `destination`

Required:

- `index` (String) The destination index for the transform.

Optional:

- `pipeline` (String) The unique identifier for an ingest pipeline.

<a id="nestedblock--source"></a>

### Nested Schema for `source`

Required:

- `indices` (List of String) The source indices for the transform.

Optional:

- `query` (String) A query clause that retrieves a subset of data from the source index.
- `runtime_mappings` (String) Definitions of search-time runtime fields that can be used by the transform.

<a id="nestedblock--elasticsearch_connection"></a>

### Nested Schema for `elasticsearch_connection`

Optional:

- `api_key` (String, Sensitive) API Key to use for authentication to Elasticsearch
- `ca_data` (String) PEM-encoded custom Certificate Authority certificate
- `ca_file` (String) Path to a custom Certificate Authority certificate
- `cert_data` (String) PEM encoded certificate for client auth
- `cert_file` (String) Path to a file containing the PEM encoded certificate for client auth
- `endpoints` (List of String, Sensitive) A list of endpoints where the terraform provider will point to, this must include the http(s) schema and port number.
- `insecure` (Boolean) Disable TLS certificate validation
- `key_data` (String, Sensitive) PEM encoded private key for client auth
- `key_file` (String) Path to a file containing the PEM encoded private key for client auth
- `password` (String, Sensitive) Password to use for API authentication to Elasticsearch.
- `username` (String) Username to use for API authentication to Elasticsearch.

<a id="nestedblock--retention_policy"></a>

### Nested Schema for `retention_policy`

Required:

- `time` (Block List, Min: 1, Max: 1) Specifies that the transform uses a time field to set the retention policy. (see [below for nested schema](#nestedblock--retention_policy--time))

<a id="nestedblock--retention_policy--time"></a>

### Nested Schema for `retention_policy.time`

Required:

- `field` (String) The date field that is used to calculate the age of the document.
- `max_age` (String) Specifies the maximum age of a document in the destination index.

<a id="nestedblock--settings"></a>

### Nested Schema for `settings`

Optional:

- `align_checkpoints` (Boolean) Specifies whether the transform checkpoint ranges should be optimized for performance. Default value is true.
- `dates_as_epoch_millis` (Boolean) Defines if dates in the output should be written as ISO formatted string (default) or as millis since epoch.
- `deduce_mappings` (Boolean) Specifies whether the transform should deduce the destination index mappings from the transform config. The default value is true
- `docs_per_second` (Number) Specifies a limit on the number of input documents per second. Default value is null, which disables throttling.
- `max_page_search_size` (Number) Defines the initial page size to use for the composite aggregation for each checkpoint. The default value is 500.
- `num_failure_retries` (Number) Defines the number of retries on a recoverable failure before the transform task is marked as failed. The default value is the cluster-level setting num_transform_failure_retries.
- `unattended` (Boolean) In unattended mode, the transform retries indefinitely in case of an error which means the transform never fails. Defaults to false.

<a id="nestedblock--sync"></a>

### Nested Schema for `sync`

Required:

- `time` (Block List, Min: 1, Max: 1) Specifies that the transform uses a time field to synchronize the source and destination indices. (see [below for nested schema](#nestedblock--sync--time))

<a id="nestedblock--sync--time"></a>

### Nested Schema for `sync.time`

Required:

- `field` (String) The date field that is used to identify new documents in the source.

Optional:

- `delay` (String) The time delay between the current time and the latest input data time. The default value is 60s.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
resource "elasticstack_elasticsearch_transform" "transform_with_pivot" {
name = "transform-pivot"
description = "A meaningful description"

source {
indices = ["name_or_pattern_for_input_index"]
}

destination {
index = "destination_index_for_transform"
}

pivot = jsonencode({
"group_by" : {
"customer_id" : {
"terms" : {
"field" : "customer_id",
"missing_bucket" : true
}
}
},
"aggregations" : {
"max_price" : {
"max" : {
"field" : "taxful_total_price"
}
}
}
})

frequency = "5m"

retention_policy {
time {
field = "order_date"
max_age = "30d"
}
}

sync {
time {
field = "order_date"
delay = "10s"
}
}

enabled = false

defer_validation = false
}
155 changes: 155 additions & 0 deletions internal/clients/elasticsearch/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,3 +544,158 @@ func DeleteIngestPipeline(ctx context.Context, apiClient *clients.ApiClient, nam
}
return diags
}

func PutTransform(ctx context.Context, apiClient *clients.ApiClient, transform *models.Transform, params *models.PutTransformParams) diag.Diagnostics {
fmt.Println("entering PutTransform for", transform.Name)
Cristigeo marked this conversation as resolved.
Show resolved Hide resolved
var diags diag.Diagnostics
transformBytes, err := json.Marshal(transform)
if err != nil {
return diag.FromErr(err)
}

fmt.Printf("%s\n", transformBytes)

esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}

putOptions := []func(*esapi.TransformPutTransformRequest){
esClient.TransformPutTransform.WithContext(ctx),
esClient.TransformPutTransform.WithDeferValidation(params.DeferValidation),
esClient.TransformPutTransform.WithTimeout(params.Timeout),
}

res, err := esClient.TransformPutTransform(bytes.NewReader(transformBytes), transform.Name, putOptions...)
if err != nil {
return diag.FromErr(err)
}

defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to create transform: %s", transform.Name)); diags.HasError() {
return diags
}

if params.Enabled {
startOptions := []func(*esapi.TransformStartTransformRequest){
esClient.TransformStartTransform.WithContext(ctx),
esClient.TransformStartTransform.WithTimeout(params.Timeout),
}
_, err := esClient.TransformStartTransform(transform.Name, startOptions...)
Cristigeo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return diag.FromErr(err)
}
}

return diags
}

func GetTransform(ctx context.Context, apiClient *clients.ApiClient, name *string) (*models.Transform, diag.Diagnostics) {
fmt.Println("entering GetTransform for", *name)
var diags diag.Diagnostics
esClient, err := apiClient.GetESClient()
if err != nil {
return nil, diag.FromErr(err)
}
req := esClient.TransformGetTransform.WithTransformID(*name)
res, err := esClient.TransformGetTransform(req, esClient.TransformGetTransform.WithContext(ctx))
if err != nil {
return nil, diag.FromErr(err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
return nil, nil
}
if diags := utils.CheckError(res, fmt.Sprintf("Unable to get requested transform: %s", *name)); diags.HasError() {
return nil, diags
}

transformsResponse := models.GetTransformResponse{}
if err := json.NewDecoder(res.Body).Decode(&transformsResponse); err != nil {
return nil, diag.FromErr(err)
}

for _, t := range transformsResponse.Transforms {
if t.Id == *name {
t.Name = *name
return &t, diags
}
}

return nil, diags
}

func UpdateTransform(ctx context.Context, apiClient *clients.ApiClient, transform *models.Transform, params *models.UpdateTransformParams) diag.Diagnostics {
fmt.Println("entering UpdateTransform with Enabled", params.Enabled)
var diags diag.Diagnostics
transformBytes, err := json.Marshal(transform)
if err != nil {
return diag.FromErr(err)
}

fmt.Printf("%s\n", transformBytes)

esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}

updateOptions := []func(*esapi.TransformUpdateTransformRequest){
esClient.TransformUpdateTransform.WithContext(ctx),
esClient.TransformUpdateTransform.WithDeferValidation(params.DeferValidation),
esClient.TransformUpdateTransform.WithTimeout(params.Timeout),
}

res, err := esClient.TransformUpdateTransform(bytes.NewReader(transformBytes), transform.Name, updateOptions...)
if err != nil {
return diag.FromErr(err)
}

defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to update transform: %s", transform.Name)); diags.HasError() {
return diags
}

if params.Enabled {
startOptions := []func(*esapi.TransformStartTransformRequest){
esClient.TransformStartTransform.WithContext(ctx),
esClient.TransformStartTransform.WithTimeout(params.Timeout),
}
_, err := esClient.TransformStartTransform(transform.Name, startOptions...)
Cristigeo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return diag.FromErr(err)
}
} else {
stopOptions := []func(*esapi.TransformStopTransformRequest){
esClient.TransformStopTransform.WithContext(ctx),
esClient.TransformStopTransform.WithTimeout(params.Timeout),
}
_, err := esClient.TransformStopTransform(transform.Name, stopOptions...)
Cristigeo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return diag.FromErr(err)
}
}

return diags
}

func DeleteTransform(ctx context.Context, apiClient *clients.ApiClient, name *string) diag.Diagnostics {
fmt.Println("entering DeleteTransform for", *name)
var diags diag.Diagnostics

esClient, err := apiClient.GetESClient()
if err != nil {
return diag.FromErr(err)
}

res, err := esClient.TransformDeleteTransform(*name, esClient.TransformDeleteTransform.WithForce(true), esClient.TransformDeleteTransform.WithContext(ctx))
if err != nil {
return diag.FromErr(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to delete the transform: %s", *name)); diags.HasError() {
return diags
}

return diags
}
Loading