Skip to content

Commit

Permalink
Merge pull request #8 from mrtamm/dev-htsget-crypt4gh
Browse files Browse the repository at this point in the history
#7 Htsget data-retrieval with encryption
  • Loading branch information
xhejtman committed May 22, 2024
2 parents 1bbbc00 + eebc4f8 commit d0399c9
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 34 deletions.
10 changes: 5 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,13 @@ func (h FTPStorage) Valid() bool {
// HTSGETStorage configures the http storage backend.
type HTSGETStorage struct {
Disabled bool
// Timeout duration for http GET calls
Timeout Duration
User string
Password string
// Actual protocol for fetching the resource (defaults to 'https')
Protocol string
// Whether Funnel should generate and send its crypt4gh public key (default: false)
SendPublicKey bool
}

// Valid validates the FTPStorage configuration.
// Valid validates the HTSGETStorage configuration.
func (h HTSGETStorage) Valid() bool {
return !h.Disabled
}
Expand Down
9 changes: 7 additions & 2 deletions config/default-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Server:
# OidcAuth:
# # URL of the OIDC service configuration:
# ServiceConfigUrl: ""
# # Client ID and secret are sent with the token introspection request
# # Client ID and secret are sent with the token introspection request
# # (Basic authentication):
# ClientId:
# ClientSecret:
Expand Down Expand Up @@ -313,7 +313,7 @@ AWSBatch:
# Kubernetes describes the configuration for the Kubernetes compute backend.
Kubernetes:
# The executor used to execute tasks. Available executors: docker, kubernetes
Executor: "docker"
Executor: "docker"
# Turn off task state reconciler. When enabled, Funnel communicates with Kubernetes
# to find tasks that are stuck in a queued state or errored and
# updates the task state accordingly.
Expand Down Expand Up @@ -392,3 +392,8 @@ FTPStorage:
Timeout: 10s
User: "anonymous"
Password: "anonymous"

HTSGETStorage:
Disabled: false
Protocol: https
SendPublicKey: false
15 changes: 10 additions & 5 deletions docs/funnel-config-examples/default-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Compute: local

# The name of the active event writer backend(s).
# Available backends: log, boltdb, badger, datastore, dynamodb, elastic, mongodb, kafka
EventWriters:
EventWriters:
- boltdb
- log

Expand Down Expand Up @@ -42,7 +42,7 @@ Server:
DisableHTTPCache: true

RPCClient:
# RPC server address
# RPC server address
ServerAddress: localhost:9090

# Credentials for Basic authentication for the server APIs using a password.
Expand All @@ -59,7 +59,7 @@ RPCClient:
# up to 1 minute
MaxRetries: 10

# The scheduler is used for the Manual compute backend.
# The scheduler is used for the Manual compute backend.
Scheduler:
# How often to run a scheduler iteration.
ScheduleRate: 1s
Expand All @@ -78,7 +78,7 @@ Node:
# -1 means there is no timeout. 0 means timeout immediately after the first task.
Timeout: -1s

# A Node will automatically try to detect what resources are available to it.
# A Node will automatically try to detect what resources are available to it.
# Defining Resources in the Node configuration overrides this behavior.
Resources:
# CPUs available.
Expand Down Expand Up @@ -149,7 +149,7 @@ Datastore:
# Optional. If possible, credentials will be automatically discovered
# from the environment.
CredentialsFile: ""

MongoDB:
# Addrs holds the addresses for the seed servers.
Addrs:
Expand Down Expand Up @@ -369,3 +369,8 @@ FTPStorage:
Timeout: 10s
User: "anonymous"
Password: "anonymous"

HTSGETStorage:
Disabled: false
Protocol: https
SendPublicKey: false
248 changes: 226 additions & 22 deletions storage/htsget.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
package storage

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"strings"

"github.com/ohsu-comp-bio/funnel/config"
)

const (
protocol = "htsget://"
protocolBearer = protocol + "bearer:"
privateKeyFile = ".private.key"
publicKeyFile = ".public.key"
)

// HTSGET provides read access to public URLs.
//
// Note that it relies on following programs to be installed and available in
// the system PATH:
//
// - "htsget" (client implementation of the protocol)
// - "crypt4gh" (to support "*.c4gh" encrypted resources)
// - "crypt4gh-keygen" (to generate private and public keys)
//
// For more info about the programs:
// - https://htsget.readthedocs.io/en/latest/
// - https://crypt4gh.readthedocs.io/en/latest/
type HTSGET struct {
conf config.HTSGETStorage
}
Expand Down Expand Up @@ -41,32 +62,20 @@ func (b *HTSGET) Put(ctx context.Context, url, path string) (*Object, error) {
}

// Get copies a file from a given URL to the host path.
//
// If configuration specifies sending a public key, the received content will
// be also decrypted locally before writing to the file.
func (b *HTSGET) Get(ctx context.Context, url, path string) (*Object, error) {
var stdout bytes.Buffer
var stderr bytes.Buffer
var cmd *exec.Cmd
if strings.HasPrefix(url, "htsget://bearer:") {
_bearer_start := len("htsget://bearer:")
_bearer_stop := strings.Index(url, "@")
if _bearer_stop < 1 {
return nil, fmt.Errorf("Bearer token not terminated by @")
}
bearer := url[_bearer_start:_bearer_stop]
url = "htsget://" + url[_bearer_stop+1:]
cmd = exec.Command("htsget", "--bearer-token", bearer, strings.Replace(url, "htsget://", "https://", 1), "--output", path)
} else {
cmd = exec.Command("htsget", strings.Replace(url, "htsget://", "https://", 1), "--output", path)
}
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
return nil, fmt.Errorf("Error running htsget: %v %s %s", err, stdout.String(), stderr.String())
}
htsgetArgs := htsgetArgs(url, b.conf.Protocol, b.conf.SendPublicKey)
cmd1, cmd2 := htsgetCmds(htsgetArgs, b.conf.SendPublicKey)
cmdPipe(cmd1, cmd2, path)

// Check that the destination file exists:
info, err := os.Stat(path)
if err != nil {
return nil, err
}

return &Object{
URL: url,
Size: info.Size(),
Expand All @@ -92,8 +101,203 @@ func (b *HTSGET) UnsupportedOperations(url string) UnsupportedOperations {
}

func (b *HTSGET) supportsPrefix(url string) error {
if !strings.HasPrefix(url, "htsget://") {
if !strings.HasPrefix(url, protocol) {
return &ErrUnsupportedProtocol{"htsgetStorage"}
}
return nil
}

func htsgetUrl(url, useProtocol string) (updatedUrl string, token string) {
if useProtocol == "" {
useProtocol = "https"
}
useProtocol += "://"
updatedUrl = strings.Replace(url, protocol, useProtocol, 1)

// Optional info: parse the "token" from "htsget://bearer:token@host..."
if strings.HasPrefix(url, protocolBearer) {
bearerStart := len(protocolBearer)
bearerStop := strings.Index(url, "@")

if bearerStop > bearerStart {
updatedUrl = useProtocol + url[bearerStop+1:]
token = url[bearerStart:bearerStop]
}
}

return
}

func htsgetHeader() string {
ensureKeyFiles()

file, err := os.Open(publicKeyFile)
if err != nil {
fmt.Println("Could not read", publicKeyFile, "file, which should exist:", err)
panic(1)
}

publicKey := ""
scanner := bufio.NewScanner(file)
if scanner.Scan() { // Skip one header line.
if scanner.Scan() {
publicKey = scanner.Text() // The key is on the second line.
}
}
file.Close()

// HTTP headers to be encoded as JSON:
headers := make(map[string]string)

if publicKey == "" {
fmt.Println("[WARN] Could not read public key (second line) from", publicKeyFile, "file.")
} else {
headers["client-public-key"] = publicKey
}

headersJson, err := json.Marshal(&headers)
if err != nil {
fmt.Println("Failed to format JSON-header for passing client-public-key:", err)
panic(1)
}

return string(headersJson)
}

func htsgetArgs(url, useProtocol string, decrypt bool) []string {
httpsUrl, token := htsgetUrl(url, useProtocol)
cmdArgs := make([]string, 0)

if len(token) > 0 {
cmdArgs = append(cmdArgs, "--bearer-token", token)
}

if decrypt {
cmdArgs = append(cmdArgs, "--headers", htsgetHeader())
}

cmdArgs = append(cmdArgs, httpsUrl)
return cmdArgs
}

func htsgetCmds(htsgetArgs []string, decrypt bool) (cmd1, cmd2 *exec.Cmd) {
cmd1 = exec.Command("htsget", htsgetArgs...)

if decrypt {
cmd2 = exec.Command("crypt4gh", "decrypt", "--sk", privateKeyFile)
} else {
cmd2 = exec.Command("cat")
}

return
}

func ensureKeyFiles() {
files := []string{publicKeyFile, privateKeyFile}
filesExist := true

for i := range files {
if file, err := os.Open(files[i]); err == nil {
file.Close()
} else {
filesExist = false
break
}
}

if !filesExist {
err := runCmd("crypt4gh-keygen", "-f", "--nocrypt",
"--sk", privateKeyFile, "--pk", publicKeyFile)
if err != nil {
fmt.Println("Could not generate crypt4gh key-files:", err)
panic(1)
} else {
fmt.Println("[INFO] Generated crypt4gh key-pair.")
}
}
}

func runCmd(commandName string, commandArgs ...string) error {
cmd := exec.Command(commandName, commandArgs...)

var stdout bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr

err := cmd.Run()
if err != nil {
err = fmt.Errorf("Error running command %s: %v\nSTDOUT: %s\nSTDERR: %s",
commandName, err, stdout.String(), stderr.String())
}
return err
}

func cmdFailed(cmd *exec.Cmd, stderr *bytes.Buffer) bool {
fmt.Println("Waiting for ", cmd.Path)
if err := cmd.Wait(); err != nil {
fmt.Printf("[ERROR] `%s` command failed: %v\n", cmd.Path, err)
if stderr.Len() > 0 {
fmt.Println("Output from STDERR:")
fmt.Print(stderr.String())
}
return true
} else {
fmt.Println("Waiting done ")
return false
}
}

func cmdPipe(cmd1, cmd2 *exec.Cmd, destFilePath string) {
fw, err := os.Create(destFilePath)
if err != nil {
fmt.Println("[ERROR] Failed to create file for saving content:", destFilePath, err)
return
}
defer fw.Close()

// Output from cmd1 goes to cmd2, and output from cmd2 goes to the file.
stderr1 := new(bytes.Buffer)
stderr2 := new(bytes.Buffer)
r, w := io.Pipe()

if err != nil {
fmt.Printf("[ERROR] failed to create OS pipe: %v", err)
return
}

cmd1.Stdout = w
cmd1.Stderr = stderr1

cmd2.Stdin = r
cmd2.Stdout = fw
cmd2.Stderr = stderr2

if err := cmd1.Start(); err != nil {
fmt.Printf("[ERROR] failed to run `%s` command: %v", cmd1.Path, err)
return
}

if err := cmd2.Start(); err != nil {
fmt.Printf("[ERROR] failed to run `%s` command: %v", cmd2.Path, err)
return
}

fmt.Println("cmd1:", cmd1.String())
fmt.Println("cmd2:", cmd2.String())
fmt.Println("dest:", destFilePath)

if cmdFailed(cmd1, stderr1) {
fw.Close()
os.Remove(destFilePath)
}

w.Close()

if cmdFailed(cmd2, stderr2) {
fw.Close()
os.Remove(destFilePath)
}

r.Close()
}
Loading

0 comments on commit d0399c9

Please sign in to comment.