Skip to content

Commit

Permalink
Update azure blob go sdk and rewrite most of azure storage impl
Browse files Browse the repository at this point in the history
  • Loading branch information
chuan committed Mar 1, 2021
1 parent f197749 commit e3ead07
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 94 deletions.
232 changes: 171 additions & 61 deletions azure/public.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
package azure

import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"time"

azure "github.com/Azure/azure-storage-go"
"github.com/smira/aptly/aptly"
"github.com/smira/aptly/files"
"github.com/smira/aptly/utils"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/files"
"github.com/aptly-dev/aptly/utils"
"github.com/pkg/errors"
)

// PublishedStorage abstract file system with published files (actually hosted on Azure)
type PublishedStorage struct {
wasb azure.BlobStorageClient
container string
container azblob.ContainerURL
prefix string
pathCache map[string]string
}
Expand All @@ -27,13 +30,22 @@ var (

// NewPublishedStorage creates published storage from Azure storage credentials
func NewPublishedStorage(accountName, accountKey, container, prefix string) (*PublishedStorage, error) {
client, err := azure.NewBasicClient(accountName, accountKey)
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return nil, err
}

containerUrl, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", accountName, container))
if err != nil {
return nil, err
}

result := &PublishedStorage{
wasb: client.GetBlobService(),
container: container,
container: azblob.NewContainerURL(*containerUrl, azblob.NewPipeline(credential, azblob.PipelineOptions{})),
prefix: prefix,
}
return result, err

return result, nil
}

// String
Expand All @@ -60,27 +72,24 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err
defer source.Close()

path = filepath.Join(storage.prefix, path)
err = storage.wasb.PutAppendBlob(storage.container, path, nil)
if err != nil {
return fmt.Errorf("error create blob for %s in %s: %s", sourceFilename, storage, err)
}

data := make([]byte, azure.MaxBlobBlockSize)
for {
count, err := source.Read(data)
if count == 0 && err == io.EOF {
break
} else if err != nil {
return err
} else {
err = storage.wasb.AppendBlock(storage.container, path, data[:count], nil)
if err != nil {
return fmt.Errorf("error uploading %s to %s: %s", sourceFilename, storage, err)
}
}
blob := storage.container.NewBlockBlobURL(path)

uploadOptions := azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16}

_, err = azblob.UploadFileToBlockBlob(
context.Background(),
source,
blob,
uploadOptions)

if err != nil {
err = errors.Wrap(err, fmt.Sprintf("error uploading %s to %s", sourceFilename, storage))
}

return nil
return err
}

// RemoveDirs removes directory structure under public path
Expand All @@ -91,22 +100,23 @@ func (storage *PublishedStorage) RemoveDirs(path string, progress aptly.Progress
}

for _, filename := range filelist {
_, err := storage.wasb.DeleteBlobIfExists(storage.container,
filepath.Join(storage.prefix, path, filename), nil)
blob := storage.container.NewBlobURL(filepath.Join(storage.prefix, path, filename))
_, err := blob.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
if err != nil {
return fmt.Errorf("error deleting path %s from %s: %s", filename, storage, err)
err = errors.Wrap(err, fmt.Sprintf("error deleting path %s from %s: %s", filename, storage, err))
}
}
return nil
return err
}

// Remove removes single file under public path
func (storage *PublishedStorage) Remove(path string) error {
_, err := storage.wasb.DeleteBlobIfExists(storage.container, path, nil)
blob := storage.container.NewBlobURL(path)
_, err := blob.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
if err != nil {
return fmt.Errorf("error deleting %s from %s: %s", path, storage, err)
err = errors.Wrap(err, fmt.Sprintf("error deleting %s from %s: %s", path, storage, err))
}
return nil
return err
}

// LinkFromPool links package file from pool to dist's pool location
Expand All @@ -116,7 +126,7 @@ func (storage *PublishedStorage) Remove(path string) error {
// sourcePath is filepath to package file in package pool
//
// LinkFromPool returns relative path for the published file to be included in package index
func (storage *PublishedStorage) LinkFromPool(publishedDirectory string, sourcePool aptly.PackagePool,
func (storage *PublishedStorage) LinkFromPool(publishedDirectory, fileName string, sourcePool aptly.PackagePool,
sourcePath string, sourceChecksums utils.ChecksumInfo, force bool) error {

_ = sourcePool.(*files.PackagePool)
Expand Down Expand Up @@ -160,37 +170,30 @@ func (storage *PublishedStorage) LinkFromPool(publishedDirectory string, sourceP
}

func (storage *PublishedStorage) internalFilelist(prefix string) (paths []string, md5s []string, err error) {
const delimiter = "/"
paths = make([]string, 0, 1024)
md5s = make([]string, 0, 1024)
prefix = filepath.Join(storage.prefix, prefix)
if prefix != "" {
prefix += "/"
prefix += delimiter
}

marker := ""
for {
params := azure.ListBlobsParameters{
Prefix: prefix,
MaxResults: 5000,
Marker: marker,
}

cnt := storage.wasb.GetContainerReference(storage.container)
resp, err := cnt.ListBlobs(params)

for marker := (azblob.Marker{}); marker.NotDone(); {
listBlob, err := storage.container.ListBlobsHierarchySegment(
context.Background(), marker, delimiter, azblob.ListBlobsSegmentOptions{})
if err != nil {
return nil, nil, fmt.Errorf("error listing under prefix %s in %s: %s", prefix, storage, err)
}

for _, blob := range resp.Blobs {
paths = append(paths, blob.Name[len(prefix):])
md5s = append(md5s, blob.Properties.ContentMD5)
}
marker = listBlob.NextMarker

if len(resp.NextMarker) > 0 {
marker = resp.NextMarker
} else {
break
for _, blob := range listBlob.Segment.BlobItems {
if prefix == "" {
paths = append(paths, blob.Name)
} else {
paths = append(paths, blob.Name[len(prefix):])
}
md5s = append(md5s, fmt.Sprintf("%x", blob.Properties.ContentMD5))
}
}

Expand All @@ -203,12 +206,119 @@ func (storage *PublishedStorage) Filelist(prefix string) ([]string, error) {
return paths, err
}

// Internal copy or move implementation
func (storage *PublishedStorage) internalCopyOrMoveBlob(src, dst string, metadata azblob.Metadata, move bool) error {
const leaseDuration = 30

dstBlobUrl := storage.container.NewBlobURL(filepath.Join(storage.prefix, dst))
leaseResp, err := dstBlobUrl.AcquireLease(context.Background(), "", leaseDuration, azblob.ModifiedAccessConditions{})
if err != nil || leaseResp.StatusCode() != http.StatusCreated {
return fmt.Errorf("error acquiring lease on destination blob %s", dstBlobUrl)
}
defer dstBlobUrl.BreakLease(context.Background(), azblob.LeaseBreakNaturally, azblob.ModifiedAccessConditions{})

dstBlobLeaseId := leaseResp.LeaseID()

srcBlobUrl := storage.container.NewBlobURL(filepath.Join(storage.prefix, src))
leaseResp, err = srcBlobUrl.AcquireLease(context.Background(), "", leaseDuration, azblob.ModifiedAccessConditions{})
if err != nil || leaseResp.StatusCode() != http.StatusCreated {
return fmt.Errorf("error acquiring lease on source blob %s", srcBlobUrl)
}
defer srcBlobUrl.BreakLease(context.Background(), azblob.LeaseBreakNaturally, azblob.ModifiedAccessConditions{})

srcBlobLeaseId := leaseResp.LeaseID()

copyResp, err := dstBlobUrl.StartCopyFromURL(
context.Background(),
srcBlobUrl.URL(),
metadata,
azblob.ModifiedAccessConditions{},
azblob.BlobAccessConditions{
LeaseAccessConditions: azblob.LeaseAccessConditions{LeaseID: dstBlobLeaseId},
},
azblob.DefaultAccessTier,
nil)
if err != nil {
return fmt.Errorf("error copying %s -> %s in %s: %s", src, dst, storage, err)
}

copyStatus := copyResp.CopyStatus()
for {
if copyStatus == azblob.CopyStatusSuccess {
if move {
_, err = srcBlobUrl.Delete(
context.Background(),
azblob.DeleteSnapshotsOptionNone,
azblob.BlobAccessConditions{
LeaseAccessConditions: azblob.LeaseAccessConditions{LeaseID: srcBlobLeaseId},
})
return err
} else {
return nil
}
} else if copyStatus == azblob.CopyStatusPending {
time.Sleep(1 * time.Second)
blobPropsResp, err := dstBlobUrl.GetProperties(
context.Background(),
azblob.BlobAccessConditions{LeaseAccessConditions: azblob.LeaseAccessConditions{LeaseID: srcBlobLeaseId}},
azblob.ClientProvidedKeyOptions{})
if err != nil {
return fmt.Errorf("error getting destination blob properties %s", dstBlobUrl)
}
copyStatus = blobPropsResp.CopyStatus()

_, err = dstBlobUrl.RenewLease(context.Background(), dstBlobLeaseId, azblob.ModifiedAccessConditions{})
if err != nil {
return fmt.Errorf("error renewing destination blob lease %s", dstBlobUrl)
}
_, err = srcBlobUrl.RenewLease(context.Background(), srcBlobLeaseId, azblob.ModifiedAccessConditions{})
if err != nil {
return fmt.Errorf("error renewing source blob lease %s", srcBlobUrl)
}
}
return fmt.Errorf("error copying %s -> %s in %s: %s", dst, src, storage, copyStatus)
}
}

// RenameFile renames (moves) file
func (storage *PublishedStorage) RenameFile(oldName, newName string) error {
sourceBlobURL := storage.wasb.GetBlobURL(storage.container, filepath.Join(storage.prefix, oldName))
err := storage.wasb.CopyBlob(storage.container, filepath.Join(storage.prefix, newName), sourceBlobURL)
return storage.internalCopyOrMoveBlob(oldName, newName, nil, true)
}

// SymLink creates a copy of src file and adds link information as meta data
func (storage *PublishedStorage) SymLink(src string, dst string) error {
return storage.internalCopyOrMoveBlob(src, dst, azblob.Metadata{"SymLink": src}, false)
}

// HardLink using symlink functionality as hard links do not exist
func (storage *PublishedStorage) HardLink(src string, dst string) error {
return storage.SymLink(src, dst)
}

// FileExists returns true if path exists
func (storage *PublishedStorage) FileExists(path string) (bool, error) {
blob := storage.container.NewBlobURL(filepath.Join(storage.prefix, path))
resp, err := blob.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
if err != nil {
return false, err
} else if resp.StatusCode() == http.StatusNotFound {
return false, nil
} else if resp.StatusCode() == http.StatusOK {
return true, nil
} else {
return false, fmt.Errorf("error checking if blob %s exists %d", blob, resp.StatusCode())
}
}

// ReadLink returns the symbolic link pointed to by path.
// This simply reads text file created with SymLink
func (storage *PublishedStorage) ReadLink(path string) (string, error) {
blob := storage.container.NewBlobURL(filepath.Join(storage.prefix, path))
resp, err := blob.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
if err != nil {
return fmt.Errorf("error copying %s -> %s in %s: %s", oldName, newName, storage, err)
return "", err
} else if resp.StatusCode() != http.StatusOK {
return "", fmt.Errorf("error checking if blob %s exists %d", blob, resp.StatusCode())
}
return storage.Remove(oldName)
return resp.NewMetadata()["SymLink"], nil
}
Loading

0 comments on commit e3ead07

Please sign in to comment.