Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#7 Htsget data-retrieval with encryption #8

Merged
merged 3 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,13 @@ func (h FTPStorage) Valid() bool {
// HTSGETStorage configures the http storage backend.
type HTSGETStorage struct {
Disabled bool
// Timeout duration for http GET calls
Timeout Duration
User string
Password string
// Actual protocol for fetching the resource (defaults to 'https')
Protocol string
// Whether Funnel should generate and send its crypt4gh public key (default: false)
SendPublicKey bool
}

// Valid validates the FTPStorage configuration.
// Valid validates the HTSGETStorage configuration.
func (h HTSGETStorage) Valid() bool {
return !h.Disabled
}
Expand Down
9 changes: 7 additions & 2 deletions config/default-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Server:
# OidcAuth:
# # URL of the OIDC service configuration:
# ServiceConfigUrl: ""
# # Client ID and secret are sent with the token introspection request
# # Client ID and secret are sent with the token introspection request
# # (Basic authentication):
# ClientId:
# ClientSecret:
Expand Down Expand Up @@ -313,7 +313,7 @@ AWSBatch:
# Kubernetes describes the configuration for the Kubernetes compute backend.
Kubernetes:
# The executor used to execute tasks. Available executors: docker, kubernetes
Executor: "docker"
Executor: "docker"
# Turn off task state reconciler. When enabled, Funnel communicates with Kubernetes
# to find tasks that are stuck in a queued state or errored and
# updates the task state accordingly.
Expand Down Expand Up @@ -392,3 +392,8 @@ FTPStorage:
Timeout: 10s
User: "anonymous"
Password: "anonymous"

HTSGETStorage:
Disabled: false
Protocol: https
SendPublicKey: false
15 changes: 10 additions & 5 deletions docs/funnel-config-examples/default-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Compute: local

# The name of the active event writer backend(s).
# Available backends: log, boltdb, badger, datastore, dynamodb, elastic, mongodb, kafka
EventWriters:
EventWriters:
- boltdb
- log

Expand Down Expand Up @@ -42,7 +42,7 @@ Server:
DisableHTTPCache: true

RPCClient:
# RPC server address
# RPC server address
ServerAddress: localhost:9090

# Credentials for Basic authentication for the server APIs using a password.
Expand All @@ -59,7 +59,7 @@ RPCClient:
# up to 1 minute
MaxRetries: 10

# The scheduler is used for the Manual compute backend.
# The scheduler is used for the Manual compute backend.
Scheduler:
# How often to run a scheduler iteration.
ScheduleRate: 1s
Expand All @@ -78,7 +78,7 @@ Node:
# -1 means there is no timeout. 0 means timeout immediately after the first task.
Timeout: -1s

# A Node will automatically try to detect what resources are available to it.
# A Node will automatically try to detect what resources are available to it.
# Defining Resources in the Node configuration overrides this behavior.
Resources:
# CPUs available.
Expand Down Expand Up @@ -149,7 +149,7 @@ Datastore:
# Optional. If possible, credentials will be automatically discovered
# from the environment.
CredentialsFile: ""

MongoDB:
# Addrs holds the addresses for the seed servers.
Addrs:
Expand Down Expand Up @@ -369,3 +369,8 @@ FTPStorage:
Timeout: 10s
User: "anonymous"
Password: "anonymous"

HTSGETStorage:
Disabled: false
Protocol: https
SendPublicKey: false
248 changes: 226 additions & 22 deletions storage/htsget.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
package storage

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"strings"

"github.com/ohsu-comp-bio/funnel/config"
)

const (
protocol = "htsget://"
protocolBearer = protocol + "bearer:"
privateKeyFile = ".private.key"
publicKeyFile = ".public.key"
)

// HTSGET provides read access to public URLs.
//
// Note that it relies on following programs to be installed and available in
// the system PATH:
//
// - "htsget" (client implementation of the protocol)
// - "crypt4gh" (to support "*.c4gh" encrypted resources)
// - "crypt4gh-keygen" (to generate private and public keys)
//
// For more info about the programs:
// - https://htsget.readthedocs.io/en/latest/
// - https://crypt4gh.readthedocs.io/en/latest/
type HTSGET struct {
conf config.HTSGETStorage
}
Expand Down Expand Up @@ -41,32 +62,20 @@ func (b *HTSGET) Put(ctx context.Context, url, path string) (*Object, error) {
}

// Get copies a file from a given URL to the host path.
//
// If configuration specifies sending a public key, the received content will
// be also decrypted locally before writing to the file.
func (b *HTSGET) Get(ctx context.Context, url, path string) (*Object, error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
var cmd *exec.Cmd
if strings.HasPrefix(url, "htsget://bearer:") {
_bearer_start := len("htsget://bearer:")
_bearer_stop := strings.Index(url, "@")
if _bearer_stop < 1 {
return nil, fmt.Errorf("Bearer token not terminated by @")
}
bearer := url[_bearer_start:_bearer_stop]
url = "htsget://" + url[_bearer_stop+1:]
cmd = exec.Command("htsget", "--bearer-token", bearer, strings.Replace(url, "htsget://", "https://", 1), "--output", path)
} else {
cmd = exec.Command("htsget", strings.Replace(url, "htsget://", "https://", 1), "--output", path)
}
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
return nil, fmt.Errorf("Error running htsget: %v %s %s", err, stdout.String(), stderr.String())
}
htsgetArgs := htsgetArgs(url, b.conf.Protocol, b.conf.SendPublicKey)
cmd1, cmd2 := htsgetCmds(htsgetArgs, b.conf.SendPublicKey)
cmdPipe(cmd1, cmd2, path)

// Check that the destination file exists:
info, err := os.Stat(path)
if err != nil {
return nil, err
}

return &Object{
URL: url,
Size: info.Size(),
Expand All @@ -92,8 +101,203 @@ func (b *HTSGET) UnsupportedOperations(url string) UnsupportedOperations {
}

func (b *HTSGET) supportsPrefix(url string) error {
if !strings.HasPrefix(url, "htsget://") {
if !strings.HasPrefix(url, protocol) {
return &ErrUnsupportedProtocol{"htsgetStorage"}
}
return nil
}

func htsgetUrl(url, useProtocol string) (updatedUrl string, token string) {
if useProtocol == "" {
useProtocol = "https"
}
useProtocol += "://"
updatedUrl = strings.Replace(url, protocol, useProtocol, 1)

// Optional info: parse the "token" from "htsget://bearer:token@host..."
if strings.HasPrefix(url, protocolBearer) {
bearerStart := len(protocolBearer)
bearerStop := strings.Index(url, "@")

if bearerStop > bearerStart {
updatedUrl = useProtocol + url[bearerStop+1:]
token = url[bearerStart:bearerStop]
}
}

return
}

func htsgetHeader() string {
ensureKeyFiles()

file, err := os.Open(publicKeyFile)
if err != nil {
fmt.Println("Could not read", publicKeyFile, "file, which should exist:", err)
panic(1)
}

publicKey := ""
scanner := bufio.NewScanner(file)
if scanner.Scan() { // Skip one header line.
if scanner.Scan() {
publicKey = scanner.Text() // The key is on the second line.
}
}
file.Close()

// HTTP headers to be encoded as JSON:
headers := make(map[string]string)

if publicKey == "" {
fmt.Println("[WARN] Could not read public key (second line) from", publicKeyFile, "file.")
} else {
headers["client-public-key"] = publicKey
}

headersJson, err := json.Marshal(&headers)
if err != nil {
fmt.Println("Failed to format JSON-header for passing client-public-key:", err)
panic(1)
}

return string(headersJson)
}

func htsgetArgs(url, useProtocol string, decrypt bool) []string {
httpsUrl, token := htsgetUrl(url, useProtocol)
cmdArgs := make([]string, 0)

if len(token) > 0 {
cmdArgs = append(cmdArgs, "--bearer-token", token)
}

if decrypt {
cmdArgs = append(cmdArgs, "--headers", htsgetHeader())
}

cmdArgs = append(cmdArgs, httpsUrl)
return cmdArgs
}

func htsgetCmds(htsgetArgs []string, decrypt bool) (cmd1, cmd2 *exec.Cmd) {
cmd1 = exec.Command("htsget", htsgetArgs...)

if decrypt {
cmd2 = exec.Command("crypt4gh", "decrypt", "--sk", privateKeyFile)
} else {
cmd2 = exec.Command("cat")
}

return
}

func ensureKeyFiles() {
files := []string{publicKeyFile, privateKeyFile}
filesExist := true

for i := range files {
if file, err := os.Open(files[i]); err == nil {
file.Close()
} else {
filesExist = false
break
}
}

if !filesExist {
err := runCmd("crypt4gh-keygen", "-f", "--nocrypt",
"--sk", privateKeyFile, "--pk", publicKeyFile)
if err != nil {
fmt.Println("Could not generate crypt4gh key-files:", err)
panic(1)
} else {
fmt.Println("[INFO] Generated crypt4gh key-pair.")
}
}
}

func runCmd(commandName string, commandArgs ...string) error {
cmd := exec.Command(commandName, commandArgs...)

var stdout bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr

err := cmd.Run()
if err != nil {
err = fmt.Errorf("Error running command %s: %v\nSTDOUT: %s\nSTDERR: %s",
commandName, err, stdout.String(), stderr.String())
}
return err
}

func cmdFailed(cmd *exec.Cmd, stderr *bytes.Buffer) bool {
fmt.Println("Waiting for ", cmd.Path)
if err := cmd.Wait(); err != nil {
fmt.Printf("[ERROR] `%s` command failed: %v\n", cmd.Path, err)
if stderr.Len() > 0 {
fmt.Println("Output from STDERR:")
fmt.Print(stderr.String())
}
return true
} else {
fmt.Println("Waiting done ")
return false
}
}

func cmdPipe(cmd1, cmd2 *exec.Cmd, destFilePath string) {
fw, err := os.Create(destFilePath)
if err != nil {
fmt.Println("[ERROR] Failed to create file for saving content:", destFilePath, err)
return
}
defer fw.Close()

// Output from cmd1 goes to cmd2, and output from cmd2 goes to the file.
stderr1 := new(bytes.Buffer)
stderr2 := new(bytes.Buffer)
r, w := io.Pipe()

if err != nil {
fmt.Printf("[ERROR] failed to create OS pipe: %v", err)
return
}

cmd1.Stdout = w
cmd1.Stderr = stderr1

cmd2.Stdin = r
cmd2.Stdout = fw
cmd2.Stderr = stderr2

if err := cmd1.Start(); err != nil {
fmt.Printf("[ERROR] failed to run `%s` command: %v", cmd1.Path, err)
return
}

if err := cmd2.Start(); err != nil {
fmt.Printf("[ERROR] failed to run `%s` command: %v", cmd2.Path, err)
return
}

fmt.Println("cmd1:", cmd1.String())
fmt.Println("cmd2:", cmd2.String())
fmt.Println("dest:", destFilePath)

if cmdFailed(cmd1, stderr1) {
fw.Close()
os.Remove(destFilePath)
}

w.Close()

if cmdFailed(cmd2, stderr2) {
fw.Close()
os.Remove(destFilePath)
}

r.Close()
}
Loading