Skip to content

Commit

Permalink
Provider 2.x support
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <pgodithi@amazon.com>
  • Loading branch information
prudhvigodithi committed Aug 3, 2023
1 parent 7555b94 commit 240d9f8
Show file tree
Hide file tree
Showing 44 changed files with 712 additions and 2,397 deletions.
18 changes: 8 additions & 10 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@ jobs:
go-version: [1.18.x]
os: [ubuntu-latest]
os-major-version:
- "1-opensearch"
- "2-opensearch"
include:
- os-major-version: "1-opensearch"
version: 1.1.0
- os-major-version: "2-opensearch"
version: 2.3.0
oss-image: "opensearchproject/opensearch"
OS_OPENDISTRO_IMAGE: "opensearchproject/opensearch:1.1.0"
OS_DASHBOARD_IMAGE: "opensearchproject/opensearch-dashboards:1.1.0"
OS_IMAGE: "opensearchproject/opensearch:2.3.0"
OS_DASHBOARD_IMAGE: "opensearchproject/opensearch-dashboards:2.3.0"
OPENSEARCH_PREFIX: "plugins.security"
OSS_ENV_VAR: "plugins.security.disabled=true"
needs: [lint]
name: Test against OS ${{ matrix.os-major-version }} on ${{ matrix.go-version }}/${{ matrix.os }}
runs-on: ${{ matrix.os }}
env:
OS_OPENDISTRO_IMAGE: "${{matrix.OS_OPENDISTRO_IMAGE}}"
OS_IMAGE: "${{matrix.OS_IMAGE}}"
OS_DASHBOARD_IMAGE: "${{matrix.OS_DASHBOARD_IMAGE}}"
OPENSEARCH_PREFIX: "${{matrix.OPENSEARCH_PREFIX}}"
OSS_ENV_VAR: "${{matrix.OSS_ENV_VAR}}"
Expand Down Expand Up @@ -88,7 +88,6 @@ jobs:
- name: Run check scripts
run: |
./script/test-mod-tidy
./script/test-terraform-fmt
- name: Wait for Opensearch
# ensure that OS has come up and is available
run: |
Expand All @@ -97,9 +96,8 @@ jobs:
# - OpenDistro lazily initializes its indexes, see
# https://github.com/opendistro-for-elasticsearch/alerting/issues/60
run: |
if [ -n "$OS_OPENDISTRO_IMAGE" ]; then
./script/wait-for-endpoint --timeout=120 http://admin:admin@localhost:9220
curl -s -v -X POST -H 'Content-type: application/json' -d '{"name":"_warmup","type":"slack","slack":{"url": "http://www.example.com"}}' http://admin:admin@localhost:9220/_opendistro/_alerting/destinations
if [ -n "$OS_IMAGE" ]; then
./script/wait-for-endpoint --timeout=120 http://admin:admin@localhost:9200
fi
- name: Dump docker logs on failure
if: failure()
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ This provider will target compatibility with major versions of Opensearch, each

| Opensearch version | Supported |
| ----------- | --------- |
| 1.x | :white_check_mark: |
| 2.x | :x: |
| 2.x | :white_check_mark: |


## Version and Branching
Expand Down
24 changes: 0 additions & 24 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,6 @@ services:
hard: -1
ports:
- 9200:9200
opendistro:
image: ${OS_OPENDISTRO_IMAGE:-rwgrim/docker-noop}
hostname: opensearch-opendistro
container_name: opensearch-opendistro
environment:
- cluster.name=opendistro
- bootstrap.memory_lock=true
- discovery.type=single-node
- path.repo=/tmp
- ${OPENSEARCH_PREFIX:-opendistro_security}.ssl.http.enabled=false
- http.port=9220
- network.publish_host=127.0.0.1
- logger.org.opensearch=warn
- "OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g"
ulimits:
nproc: 65536
nofile:
soft: 65536
hard: 65536
memlock:
soft: -1
hard: -1
ports:
- 9220:9220
networks:
opensearch:
driver: bridge
48 changes: 5 additions & 43 deletions provider/data_source_opensearch_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package provider
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"

"github.com/olivere/elastic/uritemplates"
elastic7 "github.com/olivere/elastic/v7"
elastic6 "gopkg.in/olivere/elastic.v6"
)

const DESTINATION_NAME_FIELD = "destination.name.keyword"
Expand Down Expand Up @@ -43,25 +41,13 @@ func dataSourceOpensearchOpenDistroDestinationRead(d *schema.ResourceData, m int
var id string
var destination map[string]interface{}
var err error
esClient, err := getClient(m.(*ProviderConf))
osClient, err := getClient(m.(*ProviderConf))
if err != nil {
return err
}
switch client := esClient.(type) {
case *elastic7.Client:
// See https://github.com/opendistro-for-elasticsearch/alerting/issues/70,
// no tags or API endpoint for searching destination. In ODFE >= 1.11.0,
// the index has become a "system index", so it cannot be searched:
// https://opendistro.github.io/for-elasticsearch-docs/docs/alerting/settings/#alerting-indices
// instead we paginate through all destinations to find the first name match :|
id, destination, err = destinationOpenSearch7GetAll(client, destinationName)
if err != nil {
id, destination, err = destinationOpenSearch7Search(client, DESTINATION_INDEX, destinationName)
}
case *elastic6.Client:
id, destination, err = destinationOpenSearch6Search(client, DESTINATION_INDEX, destinationName)
default:
err = errors.New("destination resource not implemented prior to v6")
id, destination, err = destinationOpenSearch7GetAll(osClient, destinationName)
if err != nil {
id, destination, err = destinationOpenSearch7Search(osClient, DESTINATION_INDEX, destinationName)
}

if err != nil {
Expand Down Expand Up @@ -112,36 +98,12 @@ func destinationOpenSearch7Search(client *elastic7.Client, index string, name st
}
}

func destinationOpenSearch6Search(client *elastic6.Client, index string, name string) (string, map[string]interface{}, error) {
termQuery := elastic6.NewTermQuery(DESTINATION_NAME_FIELD, name)
result, err := client.Search().
Index(index).
Query(termQuery).
Do(context.TODO())

destination := make(map[string]interface{})
if err != nil {
return "", destination, err
}
if result.TotalHits() == 1 {
if err := json.Unmarshal(*result.Hits.Hits[0].Source, &destination); err != nil {
return "", destination, fmt.Errorf("error unmarshalling destination body: %+v", err)
}

return result.Hits.Hits[0].Id, destination["destination"].(map[string]interface{}), nil
} else if result.TotalHits() < 1 {
return "", destination, err
} else {
return "", destination, fmt.Errorf("1 result expected, found %d.", result.TotalHits())
}
}

func destinationOpenSearch7GetAll(client *elastic7.Client, name string) (string, map[string]interface{}, error) {
offset := 0
pageSize := 1000
destination := make(map[string]interface{})
for {
path, err := uritemplates.Expand("/_opendistro/_alerting/destinations?startIndex={startIndex}&size={size}", map[string]string{
path, err := uritemplates.Expand("/_plugins/_alerting/destinations?startIndex={startIndex}&size={size}", map[string]string{
"startIndex": fmt.Sprint(offset),
"size": fmt.Sprint(pageSize),
})
Expand Down
22 changes: 4 additions & 18 deletions provider/data_source_opensearch_host.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package provider

import (
"errors"
"reflect"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"

elastic7 "github.com/olivere/elastic/v7"
elastic6 "gopkg.in/olivere/elastic.v6"
)

func dataSourceOpensearchHost() *schema.Resource {
Expand Down Expand Up @@ -36,25 +32,15 @@ func dataSourceOpensearchHostRead(d *schema.ResourceData, m interface{}) error {
// it's using. Presumably the URLS would be available where the client is
// intantiated, but in terraform, that's not always practicable.
var err error
esClient, err := getClient(m.(*ProviderConf))
osClient, err := getClient(m.(*ProviderConf))
if err != nil {
return err
}

var url string
switch client := esClient.(type) {
case *elastic7.Client:
urls := reflect.ValueOf(client).Elem().FieldByName("urls")
if urls.Len() > 0 {
url = urls.Index(0).String()
}
case *elastic6.Client:
urls := reflect.ValueOf(client).Elem().FieldByName("urls")
if urls.Len() > 0 {
url = urls.Index(0).String()
}
default:
return errors.New("this version of OpenSearch is not supported")
urls := reflect.ValueOf(osClient).Elem().FieldByName("urls")
if urls.Len() > 0 {
url = urls.Index(0).String()
}
d.SetId(url)
err = d.Set("url", url)
Expand Down
83 changes: 7 additions & 76 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"

elastic7 "github.com/olivere/elastic/v7"
elastic6 "gopkg.in/olivere/elastic.v6"
)

type ServerFlavor int64
Expand All @@ -35,6 +34,7 @@ type ServerFlavor int64
const (
Unknown ServerFlavor = iota
OpenSearch
Default = 2
)

var awsUrlRegexp = regexp.MustCompile(`([a-z0-9-]+).es.amazonaws.com$`)
Expand Down Expand Up @@ -283,7 +283,7 @@ func providerConfigure(c context.Context, d *schema.ResourceData) (interface{},
}, nil
}

func getClient(conf *ProviderConf) (interface{}, error) {
func getClient(conf *ProviderConf) (*elastic7.Client, error) {
opts := []elastic7.ClientOptionFunc{
elastic7.SetURL(conf.rawUrl),
elastic7.SetScheme(conf.parsedUrl.Scheme),
Expand Down Expand Up @@ -352,7 +352,6 @@ func getClient(conf *ProviderConf) (interface{}, error) {
opts = append(opts, elastic7.SetErrorLog(errorLogger))
}

var relevantClient interface{}
client, err := elastic7.NewClient(opts...)
if err != nil {
if errors.Is(err, elastic7.ErrNoClient) {
Expand All @@ -361,7 +360,6 @@ func getClient(conf *ProviderConf) (interface{}, error) {
}
return nil, err
}
relevantClient = client

// Use the v7 client to ping the cluster to determine the version if one was not provided
if conf.osVersion == "" {
Expand All @@ -387,82 +385,15 @@ func getClient(conf *ProviderConf) (interface{}, error) {
log.Printf("[INFO] OS version %+v", info.Version)
switch info.Version.BuildFlavor {
case "default":
conf.flavor = OpenSearch
}
}

if conf.osVersion < "7.0.0" && conf.osVersion >= "6.0.0" {
log.Printf("[INFO] Using ES 6")
opts := []elastic6.ClientOptionFunc{
elastic6.SetURL(conf.rawUrl),
elastic6.SetScheme(conf.parsedUrl.Scheme),
elastic6.SetSniff(conf.sniffing),
elastic6.SetHealthcheck(conf.healthchecking),
}

if conf.parsedUrl.User.Username() != "" {
p, _ := conf.parsedUrl.User.Password()
opts = append(opts, elastic6.SetBasicAuth(conf.parsedUrl.User.Username(), p))
}
if conf.username != "" && conf.password != "" {
opts = append(opts, elastic6.SetBasicAuth(conf.username, conf.password))
}

if m := awsUrlRegexp.FindStringSubmatch(conf.parsedUrl.Hostname()); m != nil && conf.signAWSRequests {
log.Printf("[INFO] Using AWS: %+v", m[1])
client, err := awsHttpClient(m[1], conf, map[string]string{})
if err != nil {
return nil, err
}
opts = append(opts, elastic6.SetHttpClient(client), elastic6.SetSniff(false))
} else if awsRegion := conf.awsRegion; conf.awsRegion != "" && conf.signAWSRequests {
log.Printf("[INFO] Using AWS: %+v", conf.awsRegion)
client, err := awsHttpClient(awsRegion, conf, map[string]string{})
if err != nil {
return nil, err
}
opts = append(opts, elastic6.SetHttpClient(client), elastic6.SetSniff(false))
} else if conf.insecure || conf.cacertFile != "" {
opts = append(opts, elastic6.SetHttpClient(tlsHttpClient(conf, map[string]string{})), elastic6.SetSniff(false))
} else if conf.token != "" {
opts = append(opts, elastic6.SetHttpClient(tokenHttpClient(conf, map[string]string{})), elastic6.SetSniff(false))
} else {
opts = append(opts, elastic6.SetHttpClient(defaultHttpClient(conf, map[string]string{})))
}

switch logProviderLevel {
case "TRACE":
traceLogger := esLogger.StandardLogger(&hclog.StandardLoggerOptions{
ForceLevel: hclog.LevelFromString("TRACE"),
})
opts = append(opts, elastic6.SetTraceLog(traceLogger))
fallthrough
case "INFO":
infoLogger := esLogger.StandardLogger(&hclog.StandardLoggerOptions{
ForceLevel: hclog.LevelFromString("INFO"),
})
opts = append(opts, elastic6.SetInfoLog(infoLogger))
fallthrough
conf.flavor = Unknown
default:
errorLogger := esLogger.StandardLogger(&hclog.StandardLoggerOptions{
ForceLevel: hclog.LevelFromString("ERROR"),
})
opts = append(opts, elastic6.SetErrorLog(errorLogger))
}

relevantClient, err = elastic6.NewClient(opts...)
if err != nil {
return nil, err
conf.flavor = OpenSearch
}
} else if conf.flavor == Unknown && conf.osVersion < "2.0.0" && conf.osVersion >= "1.0.0" {
// Version 1.x of OpenSearch very likely. Nothing to do since it's API
// compatible with 7.x of ES. If elastic client library supports detecting
// flavor, update to Opensearch.
} else if conf.osVersion < "6.0.0" {
return nil, fmt.Errorf("opensearch version %s is older than 6.0.0 and is not supported, flavor: %v.", conf.osVersion, conf.flavor)
} else if conf.flavor == Unknown || conf.osVersion < "1.0.0" {
return nil, fmt.Errorf("opensearch version %s is older than 1.0.0 and is not supported, flavor: %v.", conf.osVersion, conf.flavor)
}

return relevantClient, nil
return client, nil
}

func assumeRoleCredentials(region, roleARN, roleExternalID, profile string) *awscredentials.Credentials {
Expand Down
Loading

0 comments on commit 240d9f8

Please sign in to comment.