Skip to content

Commit

Permalink
feat: migrate output azure_blob_storage to azure-sdk-for-go/storage/a…
Browse files Browse the repository at this point in the history
…zblob (#1901)

* feat: migrate output azure_blob_storage to azure-sdk-for-go/storage/azblob

Signed-off-by: eduardodbr <eduardodbr@hotmail.com>

* fix: make deps

Signed-off-by: eduardodbr <eduardodbr@hotmail.com>

* tests: add azure integration tests

Signed-off-by: eduardodbr <eduardodbr@hotmail.com>

* fix: use WriteBatch context

* Port over changes from #1928

Also fixed a few issues:
- Documented when `storage_connection_string` doesn't have `AccountName` in it and added linting rules for it
- Ensure that the `blob_storage_append` integration test creates new containers

Signed-off-by: Mihai Todor <mihaitodor@optum.com>

---------

Signed-off-by: eduardodbr <eduardodbr@hotmail.com>
Signed-off-by: Mihai Todor <mihaitodor@optum.com>
Co-authored-by: Mihai Todor <mihaitodor@optum.com>
  • Loading branch information
eduardodbr and mihaitodor committed Jun 4, 2023
1 parent 9766281 commit 3e133d0
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 210 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
github.com/Azure/go-amqp v0.17.0
github.com/Azure/go-autorest/autorest v0.11.23
github.com/ClickHouse/clickhouse-go/v2 v2.6.1
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.4.0
github.com/Jeffail/gabs/v2 v2.7.0
Expand Down Expand Up @@ -150,6 +149,7 @@ require (
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.23 // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.18 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
Expand Down
49 changes: 16 additions & 33 deletions internal/impl/azure/input_blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"

"github.com/benthosdev/benthos/v4/internal/bundle"
Expand All @@ -21,6 +20,7 @@ import (
"github.com/benthosdev/benthos/v4/internal/component/input/processors"
"github.com/benthosdev/benthos/v4/internal/component/metrics"
"github.com/benthosdev/benthos/v4/internal/docs"
"github.com/benthosdev/benthos/v4/internal/impl/azure/shared"
"github.com/benthosdev/benthos/v4/internal/log"
"github.com/benthosdev/benthos/v4/internal/message"
)
Expand All @@ -40,7 +40,16 @@ func init() {
Downloads objects within an Azure Blob Storage container, optionally filtered by
a prefix.`,
Description: `
Downloads objects within an Azure Blob Storage container, optionally filtered by a prefix.
Supports multiple authentication methods but only one of the following is required:
- ` + "`storage_connection_string`" + `
- ` + "`storage_account` and `storage_access_key`" + `
- ` + "`storage_account` and `storage_sas_token`" + `
- ` + "`storage_account` to access via [DefaultAzureCredential](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#DefaultAzureCredential)" + `
If multiple are set then the ` + "`storage_connection_string`" + ` is given priority.
If the ` + "`storage_connection_string`" + ` does not contain the ` + "`AccountName`" + ` parameter, please specify it in the
` + "`storage_account`" + ` field.
## Downloading Large Files
Expand Down Expand Up @@ -84,7 +93,9 @@ You can access these metadata fields using [function interpolation](/docs/config
docs.FieldString("prefix", "An optional path prefix, if set only objects with the prefix are consumed."),
codec.ReaderDocs,
docs.FieldBool("delete_objects", "Whether to delete downloaded objects from the blob once they are processed.").Advanced(),
).ChildDefaultAndTypesFromStruct(input.NewAzureBlobStorageConfig()),
).LinterBlobl(
`root = if this.storage_connection_string != "" && !this.storage_connection_string.contains("AccountName=") && this.storage_account == "" { [ "storage_account must be set if storage_connection_string does not contain the \"AccountName\" parameter" ] }`).
ChildDefaultAndTypesFromStruct(input.NewAzureBlobStorageConfig()),
Categories: []string{
"Services",
"Azure",
Expand Down Expand Up @@ -245,37 +256,9 @@ func newAzureBlobStorage(conf input.AzureBlobStorageConfig, log log.Modular, sta
return nil, errors.New("invalid azure storage account credentials")
}

var client *azblob.Client
var err error
if len(conf.StorageConnectionString) > 0 {
connStr := conf.StorageConnectionString
if strings.Contains(conf.StorageConnectionString, "UseDevelopmentStorage=true;") {
// This conn string is necessary to work with azurite
// The new SDK no longer provides has NewEmulatorClient() so the connStr was copied from
// https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio#http-connection-strings
connStr = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
}
client, err = azblob.NewClientFromConnectionString(connStr, nil)
} else if len(conf.StorageAccessKey) > 0 {
cred, credErr := azblob.NewSharedKeyCredential(conf.StorageAccount, conf.StorageAccessKey)
if credErr != nil {
return nil, fmt.Errorf("error creating shared key credential: %w", credErr)
}
serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net", conf.StorageAccount)
client, err = azblob.NewClientWithSharedKeyCredential(serviceURL, cred, nil)
} else if len(conf.StorageSASToken) > 0 {
serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/%s", conf.StorageAccount, conf.StorageSASToken)
client, err = azblob.NewClientWithNoCredential(serviceURL, nil)
} else {
cred, credErr := azidentity.NewDefaultAzureCredential(nil)
if credErr != nil {
return nil, fmt.Errorf("error getting default azure credentials: %v", credErr)
}
serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net", conf.StorageAccount)
client, err = azblob.NewClient(serviceURL, cred, nil)
}
client, err := shared.GetBlobStorageClient(conf.StorageConnectionString, conf.StorageAccount, conf.StorageAccessKey, conf.StorageSASToken)
if err != nil {
return nil, fmt.Errorf("invalid azure storage account credentials: %w", err)
return nil, fmt.Errorf("failed to get storage client: %v", err)
}

var objectScannerCtor codec.ReaderConstructor
Expand Down
167 changes: 65 additions & 102 deletions internal/impl/azure/integration_test.go
Original file line number Diff line number Diff line change
@@ -1,71 +1,23 @@
package azure

import (
"fmt"
"io"
"net/http"
"context"
"os"
"strings"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/storage"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/benthosdev/benthos/v4/internal/impl/azure/shared"
"github.com/benthosdev/benthos/v4/internal/integration"
_ "github.com/benthosdev/benthos/v4/public/components/pure"
)

type AzuriteTransport struct {
Host string
}

func (t AzuriteTransport) RoundTrip(req *http.Request) (*http.Response, error) {
// Intercept all requests made to 127.0.0.1:10000 and substitute the port
// with the actual one that dockertest allocates for the Azurite container.
// azure-sdk-for-go doesn't let us change this port when adding
// `UseDevelopmentStorage=true;` to the connection string and using the
// default credentials. If we use custom credentials (see the
// `AZURITE_ACCOUNTS` env var) and don't pass `UseDevelopmentStorage=true;`
// in the connection string, then azure-sdk-for-go will try to reach a
// custom domain instead of localhost, so we'd have to use a similar hack
// to point the request to localhost instead.
if req.URL.Host == "127.0.0.1:10000" {
req.URL.Host = t.Host
}

resp, err := http.DefaultTransport.RoundTrip(req)
if err != nil {
return resp, err
}

reqURL := req.URL.String()

// Ugly hack: Detect API calls to storage.Container.ListBlobs and delete the
// empty `<Snapshot/>` node from the XML response because azure-sdk-for-go
// fails to deserialise an empty string to a valid timestamp.
// Details here: https://github.com/Azure/Azurite/issues/663
if strings.Contains(reqURL, "comp=list") &&
strings.Contains(reqURL, "restype=container") {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return resp, fmt.Errorf("failed to read response body: %w", err)
}
newBody := strings.ReplaceAll(string(bodyBytes), "<Snapshot/>", "")
resp.Body = io.NopCloser(strings.NewReader(newBody))
resp.ContentLength = int64(len(newBody))
}

return resp, err
}

func TestIntegrationAzure(t *testing.T) {
integration.CheckSkip(t)
// Don't run this test by default, because it messes around with the
// http.DefaultClient
t.Skip()

t.Parallel()

pool, err := dockertest.NewPool("")
Expand All @@ -78,10 +30,8 @@ func TestIntegrationAzure(t *testing.T) {

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "mcr.microsoft.com/azure-storage/azurite",
Tag: "3.9.0",
// Expose Azurite ports in the random port range, so we don't clash with
// other apps.
ExposedPorts: []string{"10000/tcp", "10001/tcp"},
// Expose blob, queue and table service ports
ExposedPorts: []string{"10000/tcp", "10001/tcp", "10002/tcp"},
})
require.NoError(t, err)
t.Cleanup(func() {
Expand All @@ -90,26 +40,24 @@ func TestIntegrationAzure(t *testing.T) {

_ = resource.Expire(900)

blobServicePort := resource.GetPort("10000/tcp")
origDefaultClientTransport := http.DefaultClient.Transport
http.DefaultClient.Transport = AzuriteTransport{Host: "localhost:" + blobServicePort}
t.Cleanup(func() {
http.DefaultClient.Transport = origDefaultClientTransport
})
connString := shared.GetEmulatorConnectionString(resource.GetPort("10000/tcp"), resource.GetPort("10001/tcp"), resource.GetPort("10002/tcp"))

// Wait for Azurite to properly start up
// Copied from https://github.com/mfamador/data-webhooks/blob/2dca9b0fa36bcbadf38884fb1a2e8a3614e6135e/lib/docker_containers.go#L225-L236
// Wait for Azurite to start up
err = pool.Retry(func() error {
client, eerr := storage.NewEmulatorClient()
if eerr != nil {
return eerr
client, err := azblob.NewClientFromConnectionString(connString, nil)
if err != nil {
return err

}
s := client.GetBlobService()
c := s.GetContainerReference("cont")
if _, err = c.Exists(); err != nil {

ctx, done := context.WithTimeout(context.Background(), 1*time.Second)
defer done()

if _, err = client.NewListContainersPager(nil).NextPage(ctx); err != nil {
return err
}
return nil

})
require.NoError(t, err, "Failed to start Azurite")

Expand All @@ -124,13 +72,13 @@ output:
max_in_flight: 1
path: $VAR2/${!count("$ID")}.txt
public_access_level: PRIVATE
storage_connection_string: "UseDevelopmentStorage=true;"
storage_connection_string: $VAR3
input:
azure_blob_storage:
container: $VAR1-$ID
prefix: $VAR2
storage_connection_string: "UseDevelopmentStorage=true;"
storage_connection_string: $VAR3
`
integration.StreamTests(
integration.StreamTestOpenCloseIsolated(),
Expand All @@ -139,56 +87,71 @@ input:
t, template,
integration.StreamTestOptVarOne(dummyContainer),
integration.StreamTestOptVarTwo(dummyPrefix),
integration.StreamTestOptVarThree(connString),
)
})

// TODO: Re-enable this after https://github.com/Azure/Azurite/issues/682 is fixed
// t.Run("blob_storage_append", func(t *testing.T) {
// template := `
// output:
// azure_blob_storage:
// blob_type: APPEND
// container: $VAR1
// max_in_flight: 1
// path: $VAR2/data.txt
// public_access_level: PRIVATE
// storage_connection_string: "UseDevelopmentStorage=true;"

// input:
// azure_blob_storage:
// container: $VAR1
// prefix: $VAR2/data.txt
// storage_connection_string: "UseDevelopmentStorage=true;"
// `
// integration.StreamTests(
// integration.StreamTestOpenCloseIsolated(),
// ).Run(
// t, template,
// integration.StreamTestOptVarOne(dummyContainer),
// integration.StreamTestOptVarTwo(dummyPrefix),
// )
// })
t.Run("blob_storage_append", func(t *testing.T) {
template := `
output:
broker:
pattern: fan_out_sequential
outputs:
- azure_blob_storage:
blob_type: APPEND
container: $VAR1-$ID
max_in_flight: 1
path: $VAR2/data.txt
public_access_level: PRIVATE
storage_connection_string: $VAR3
- azure_blob_storage:
blob_type: APPEND
container: $VAR1-$ID
max_in_flight: 1
path: $VAR2/data.txt
public_access_level: PRIVATE
storage_connection_string: $VAR3
input:
azure_blob_storage:
container: $VAR1-$ID
prefix: $VAR2/data.txt
storage_connection_string: $VAR3
processors:
- mapping: |
root = if content() == "hello worldhello world" { "hello world" } else { "" }
`
integration.StreamTests(
integration.StreamTestOpenCloseIsolated(),
).Run(
t, template,
integration.StreamTestOptVarOne(dummyContainer),
integration.StreamTestOptVarTwo(dummyPrefix),
integration.StreamTestOptVarThree(connString),
)
})

os.Setenv("AZURITE_QUEUE_ENDPOINT_PORT", resource.GetPort("10001/tcp"))
dummyQueue := "foo"
t.Run("queue_storage", func(t *testing.T) {
template := `
output:
azure_queue_storage:
queue_name: $VAR1$ID
storage_connection_string: "UseDevelopmentStorage=true;"
queue_name: $VAR1$ID
storage_connection_string: $VAR2
input:
azure_queue_storage:
queue_name: $VAR1$ID
storage_connection_string: "UseDevelopmentStorage=true;"
queue_name: $VAR1$ID
storage_connection_string: $VAR2
`
integration.StreamTests(
integration.StreamTestOpenCloseIsolated(),
integration.StreamTestStreamIsolated(10),
).Run(
t, template,
integration.StreamTestOptVarOne(dummyQueue),
integration.StreamTestOptVarTwo("UseDevelopmentStorage=true;"),
)
})
}
Loading

0 comments on commit 3e133d0

Please sign in to comment.