diff --git a/config/config.go b/config/config.go index b42fecc2..75662b2d 100644 --- a/config/config.go +++ b/config/config.go @@ -393,13 +393,13 @@ func (h FTPStorage) Valid() bool { // HTSGETStorage configures the http storage backend. type HTSGETStorage struct { Disabled bool - // Timeout duration for http GET calls - Timeout Duration - User string - Password string + // Actual protocol for fetching the resource (defaults to 'https') + Protocol string + // Whether Funnel should generate and send its crypt4gh public key (default: false) + SendPublicKey bool } -// Valid validates the FTPStorage configuration. +// Valid validates the HTSGETStorage configuration. func (h HTSGETStorage) Valid() bool { return !h.Disabled } diff --git a/config/default-config.yaml b/config/default-config.yaml index 82ef1ee3..7ac3cc13 100644 --- a/config/default-config.yaml +++ b/config/default-config.yaml @@ -42,7 +42,7 @@ Server: # OidcAuth: # # URL of the OIDC service configuration: # ServiceConfigUrl: "" - # # Client ID and secret are sent with the token introspection request + # # Client ID and secret are sent with the token introspection request # # (Basic authentication): # ClientId: # ClientSecret: @@ -313,7 +313,7 @@ AWSBatch: # Kubernetes describes the configuration for the Kubernetes compute backend. Kubernetes: # The executor used to execute tasks. Available executors: docker, kubernetes - Executor: "docker" + Executor: "docker" # Turn off task state reconciler. When enabled, Funnel communicates with Kubernetes # to find tasks that are stuck in a queued state or errored and # updates the task state accordingly. @@ -392,3 +392,8 @@ FTPStorage: Timeout: 10s User: "anonymous" Password: "anonymous" + +HTSGETStorage: + Disabled: false + Protocol: https + SendPublicKey: false diff --git a/docs/funnel-config-examples/default-config.yaml b/docs/funnel-config-examples/default-config.yaml index cafe5e51..5bcc50ea 100644 --- a/docs/funnel-config-examples/default-config.yaml +++ b/docs/funnel-config-examples/default-config.yaml @@ -8,7 +8,7 @@ Compute: local # The name of the active event writer backend(s). # Available backends: log, boltdb, badger, datastore, dynamodb, elastic, mongodb, kafka -EventWriters: +EventWriters: - boltdb - log @@ -42,7 +42,7 @@ Server: DisableHTTPCache: true RPCClient: - # RPC server address + # RPC server address ServerAddress: localhost:9090 # Credentials for Basic authentication for the server APIs using a password. @@ -59,7 +59,7 @@ RPCClient: # up to 1 minute MaxRetries: 10 -# The scheduler is used for the Manual compute backend. +# The scheduler is used for the Manual compute backend. Scheduler: # How often to run a scheduler iteration. ScheduleRate: 1s @@ -78,7 +78,7 @@ Node: # -1 means there is no timeout. 0 means timeout immediately after the first task. Timeout: -1s - # A Node will automatically try to detect what resources are available to it. + # A Node will automatically try to detect what resources are available to it. # Defining Resources in the Node configuration overrides this behavior. Resources: # CPUs available. @@ -149,7 +149,7 @@ Datastore: # Optional. If possible, credentials will be automatically discovered # from the environment. CredentialsFile: "" - + MongoDB: # Addrs holds the addresses for the seed servers. Addrs: @@ -369,3 +369,8 @@ FTPStorage: Timeout: 10s User: "anonymous" Password: "anonymous" + +HTSGETStorage: + Disabled: false + Protocol: https + SendPublicKey: false diff --git a/storage/htsget.go b/storage/htsget.go index 3222308e..fa1706c3 100644 --- a/storage/htsget.go +++ b/storage/htsget.go @@ -1,9 +1,12 @@ package storage import ( + "bufio" "bytes" "context" + "encoding/json" "fmt" + "io" "os" "os/exec" "strings" @@ -11,7 +14,25 @@ import ( "github.com/ohsu-comp-bio/funnel/config" ) +const ( + protocol = "htsget://" + protocolBearer = protocol + "bearer:" + privateKeyFile = ".private.key" + publicKeyFile = ".public.key" +) + // HTSGET provides read access to public URLs. +// +// Note that it relies on following programs to be installed and available in +// the system PATH: +// +// - "htsget" (client implementation of the protocol) +// - "crypt4gh" (to support "*.c4gh" encrypted resources) +// - "crypt4gh-keygen" (to generate private and public keys) +// +// For more info about the programs: +// - https://htsget.readthedocs.io/en/latest/ +// - https://crypt4gh.readthedocs.io/en/latest/ type HTSGET struct { conf config.HTSGETStorage } @@ -41,32 +62,20 @@ func (b *HTSGET) Put(ctx context.Context, url, path string) (*Object, error) { } // Get copies a file from a given URL to the host path. +// +// If configuration specifies sending a public key, the received content will +// be also decrypted locally before writing to the file. func (b *HTSGET) Get(ctx context.Context, url, path string) (*Object, error) { - var stdout bytes.Buffer - var stderr bytes.Buffer - var cmd *exec.Cmd - if strings.HasPrefix(url, "htsget://bearer:") { - _bearer_start := len("htsget://bearer:") - _bearer_stop := strings.Index(url, "@") - if _bearer_stop < 1 { - return nil, fmt.Errorf("Bearer token not terminated by @") - } - bearer := url[_bearer_start:_bearer_stop] - url = "htsget://" + url[_bearer_stop+1:] - cmd = exec.Command("htsget", "--bearer-token", bearer, strings.Replace(url, "htsget://", "https://", 1), "--output", path) - } else { - cmd = exec.Command("htsget", strings.Replace(url, "htsget://", "https://", 1), "--output", path) - } - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - if err != nil { - return nil, fmt.Errorf("Error running htsget: %v %s %s", err, stdout.String(), stderr.String()) - } + htsgetArgs := htsgetArgs(url, b.conf.Protocol, b.conf.SendPublicKey) + cmd1, cmd2 := htsgetCmds(htsgetArgs, b.conf.SendPublicKey) + cmdPipe(cmd1, cmd2, path) + + // Check that the destination file exists: info, err := os.Stat(path) if err != nil { return nil, err } + return &Object{ URL: url, Size: info.Size(), @@ -92,8 +101,203 @@ func (b *HTSGET) UnsupportedOperations(url string) UnsupportedOperations { } func (b *HTSGET) supportsPrefix(url string) error { - if !strings.HasPrefix(url, "htsget://") { + if !strings.HasPrefix(url, protocol) { return &ErrUnsupportedProtocol{"htsgetStorage"} } return nil } + +func htsgetUrl(url, useProtocol string) (updatedUrl string, token string) { + if useProtocol == "" { + useProtocol = "https" + } + useProtocol += "://" + updatedUrl = strings.Replace(url, protocol, useProtocol, 1) + + // Optional info: parse the "token" from "htsget://bearer:token@host..." + if strings.HasPrefix(url, protocolBearer) { + bearerStart := len(protocolBearer) + bearerStop := strings.Index(url, "@") + + if bearerStop > bearerStart { + updatedUrl = useProtocol + url[bearerStop+1:] + token = url[bearerStart:bearerStop] + } + } + + return +} + +func htsgetHeader() string { + ensureKeyFiles() + + file, err := os.Open(publicKeyFile) + if err != nil { + fmt.Println("Could not read", publicKeyFile, "file, which should exist:", err) + panic(1) + } + + publicKey := "" + scanner := bufio.NewScanner(file) + if scanner.Scan() { // Skip one header line. + if scanner.Scan() { + publicKey = scanner.Text() // The key is on the second line. + } + } + file.Close() + + // HTTP headers to be encoded as JSON: + headers := make(map[string]string) + + if publicKey == "" { + fmt.Println("[WARN] Could not read public key (second line) from", publicKeyFile, "file.") + } else { + headers["client-public-key"] = publicKey + } + + headersJson, err := json.Marshal(&headers) + if err != nil { + fmt.Println("Failed to format JSON-header for passing client-public-key:", err) + panic(1) + } + + return string(headersJson) +} + +func htsgetArgs(url, useProtocol string, decrypt bool) []string { + httpsUrl, token := htsgetUrl(url, useProtocol) + cmdArgs := make([]string, 0) + + if len(token) > 0 { + cmdArgs = append(cmdArgs, "--bearer-token", token) + } + + if decrypt { + cmdArgs = append(cmdArgs, "--headers", htsgetHeader()) + } + + cmdArgs = append(cmdArgs, httpsUrl) + return cmdArgs +} + +func htsgetCmds(htsgetArgs []string, decrypt bool) (cmd1, cmd2 *exec.Cmd) { + cmd1 = exec.Command("htsget", htsgetArgs...) + + if decrypt { + cmd2 = exec.Command("crypt4gh", "decrypt", "--sk", privateKeyFile) + } else { + cmd2 = exec.Command("cat") + } + + return +} + +func ensureKeyFiles() { + files := []string{publicKeyFile, privateKeyFile} + filesExist := true + + for i := range files { + if file, err := os.Open(files[i]); err == nil { + file.Close() + } else { + filesExist = false + break + } + } + + if !filesExist { + err := runCmd("crypt4gh-keygen", "-f", "--nocrypt", + "--sk", privateKeyFile, "--pk", publicKeyFile) + if err != nil { + fmt.Println("Could not generate crypt4gh key-files:", err) + panic(1) + } else { + fmt.Println("[INFO] Generated crypt4gh key-pair.") + } + } +} + +func runCmd(commandName string, commandArgs ...string) error { + cmd := exec.Command(commandName, commandArgs...) + + var stdout bytes.Buffer + var stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err := cmd.Run() + if err != nil { + err = fmt.Errorf("Error running command %s: %v\nSTDOUT: %s\nSTDERR: %s", + commandName, err, stdout.String(), stderr.String()) + } + return err +} + +func cmdFailed(cmd *exec.Cmd, stderr *bytes.Buffer) bool { + fmt.Println("Waiting for ", cmd.Path) + if err := cmd.Wait(); err != nil { + fmt.Printf("[ERROR] `%s` command failed: %v\n", cmd.Path, err) + if stderr.Len() > 0 { + fmt.Println("Output from STDERR:") + fmt.Print(stderr.String()) + } + return true + } else { + fmt.Println("Waiting done ") + return false + } +} + +func cmdPipe(cmd1, cmd2 *exec.Cmd, destFilePath string) { + fw, err := os.Create(destFilePath) + if err != nil { + fmt.Println("[ERROR] Failed to create file for saving content:", destFilePath, err) + return + } + defer fw.Close() + + // Output from cmd1 goes to cmd2, and output from cmd2 goes to the file. + stderr1 := new(bytes.Buffer) + stderr2 := new(bytes.Buffer) + r, w := io.Pipe() + + if err != nil { + fmt.Printf("[ERROR] failed to create OS pipe: %v", err) + return + } + + cmd1.Stdout = w + cmd1.Stderr = stderr1 + + cmd2.Stdin = r + cmd2.Stdout = fw + cmd2.Stderr = stderr2 + + if err := cmd1.Start(); err != nil { + fmt.Printf("[ERROR] failed to run `%s` command: %v", cmd1.Path, err) + return + } + + if err := cmd2.Start(); err != nil { + fmt.Printf("[ERROR] failed to run `%s` command: %v", cmd2.Path, err) + return + } + + fmt.Println("cmd1:", cmd1.String()) + fmt.Println("cmd2:", cmd2.String()) + fmt.Println("dest:", destFilePath) + + if cmdFailed(cmd1, stderr1) { + fw.Close() + os.Remove(destFilePath) + } + + w.Close() + + if cmdFailed(cmd2, stderr2) { + fw.Close() + os.Remove(destFilePath) + } + + r.Close() +} diff --git a/website/content/docs/storage/htsget.md b/website/content/docs/storage/htsget.md new file mode 100644 index 00000000..e64913aa --- /dev/null +++ b/website/content/docs/storage/htsget.md @@ -0,0 +1,71 @@ +--- +title: Htsget Storage +menu: + main: + parent: Storage +--- + +# Htsget Storage + +Funnel supports content-retrieval using [Htsget][spec]-compatible API, if the +host environment has [htsget](htsget-client) and [crypt4gh](crypt4gh) +(including `crypt4gh-keygen`) software installed. +(These programs are not part of Funnel itself.) + +Htsget is a protocol that enables downloading only specific parts of genomic +data (reads/variants). The first HTTP query receives a JSON that instructs next +HTTP requests for fetching the parts. Finally the parts need to be concatenated +(in the order they were specified) into a single valid file (e.g. VCF or BAM). +Note that the htsget storage supports only retrieval and not storing the data! + +The task input file URL needs to specify `htsget` as the protocol. Funnel +replaces it with the protocol specified in the configuration (default is +`https`). + +If the service expects a `Bearer` token, it can be specified in the URL. +For example: `htsget://bearer:your-token-here@fakedomain.com/...`. +Here the `bearer:` part is the required syntax to active the `your-token-here` +value to be sent to the htsget-service as a header value: +`Authorization: Bearer your-token-here`. + +If the htsget-service expects the client (Funnel) to send its public key +(crypt4gh), the `SendPublicKey` option must be set to `true` in the +configuration. In this scenario, Funnel will generate a local key-pair and +send its public key in the `client-public-key` header value. Htsget-service is +expected to send the content encrypted with the public key, and Funnel will +decrypt the data locally using `crypt4gh`. + +```yaml +HTSGETStorage: + Disabled: false + Protocol: https + SendPublicKey: false +``` + +### Example task +``` +{ + "name": "Hello world", + "inputs": [{ + "url": "htsget://fakedomain.com/variants/genome2341?referenceName=1&start=10000&end=20000", + "path": "/inputs/genome.vcf.gz" + }], + "outputs": [{ + "url": "file:///path/to/funnel-data/output.txt", + "path": "/outputs/out.txt" + }], + "executors": [{ + "image": "alpine", + "command": [ + "sh", + "-c", + "zcat /inputs/genome.vcf.gz | wc -l" + ], + "stdout": "/outputs/out.txt", + }] +} +``` + +[spec]: https://samtools.github.io/hts-specs/htsget.html +[htsget-client]: https://htsget.readthedocs.io/en/latest/ +[crypt4gh]: https://crypt4gh.readthedocs.io/en/latest/