-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(cache, gcloud): new cache interface, starting into gcloud cache
- Loading branch information
Showing
5 changed files
with
591 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package cafs | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/ipfs/go-datastore" | ||
) | ||
|
||
// Cache is the interface for wrapping a cafs, cache behaviour will vary from case-to-case | ||
type Cache interface { | ||
// Caches must fully implement filestores, this should always delegate operations | ||
// to the underlying store at some point in each Filestore method call | ||
Filestore | ||
// Filestore must return the underlying filestore | ||
Filestore() Filestore | ||
// Cache explicitly adds a key to the Cache. it should be retrievable | ||
// by the underlying store | ||
Cache(key datastore.Key) error | ||
// Uncache explicitly removes a key from the cache | ||
Uncache(key datastore.Key) error | ||
} | ||
|
||
// TTECache is a cache that allows storing with an expiry | ||
type TTECache interface { | ||
// TTECache implements the cache interface | ||
Cache | ||
// CacheTTE caches a key with a time-to-expiry | ||
CacheTTE(key datastore.Key, tte time.Duration) error | ||
} | ||
|
||
// NewCacheFunc is a generic function for creating a cache | ||
// More sophisticated functions that configure caches & take lots of implementation-specific | ||
// details can return a NewCacheFunc instead of the cache itself to conform to this signature | ||
type NewCacheFunc func(fs Filestore) Cache |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
package gcloud | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
|
||
"cloud.google.com/go/storage" | ||
"github.com/ipfs/go-datastore" | ||
"github.com/qri-io/cafs" | ||
"github.com/qri-io/cafs/memfs" | ||
) | ||
|
||
// CacheCfg configures cache behaviour | ||
type CacheCfg struct { | ||
// Setting Full == true will cache everything that is Put() into the store, | ||
// and check the cache on all Get()/Has() requests (falling back to the store) | ||
Full bool | ||
// Async will skip waiting for writes to cache | ||
AsyncWrite bool | ||
// Set the kind of key to read & write from the store. cannot be empty | ||
BucketName string | ||
|
||
// MaxFileSize specifies how large a byte slice can be before it | ||
// TODO - not yet implemented | ||
// MaxFileSize int64 | ||
} | ||
|
||
// Cache implements the cafs.Cache interface using a google cloud datastore | ||
type Cache struct { | ||
ctx context.Context | ||
fs cafs.Filestore | ||
client *storage.Client | ||
cfg *CacheCfg | ||
} | ||
|
||
// NewCache creates a function that can create new cache from a filestore | ||
func NewCacheFunc(ctx context.Context, cli *storage.Client, opts ...func(*CacheCfg)) cafs.NewCacheFunc { | ||
cfg := &CacheCfg{} | ||
for _, opt := range opts { | ||
opt(cfg) | ||
} | ||
|
||
return cafs.NewCacheFunc(func(fs cafs.Filestore) cafs.Cache { | ||
return Cache{ | ||
ctx: ctx, | ||
fs: fs, | ||
client: cli, | ||
cfg: cfg, | ||
} | ||
}) | ||
} | ||
|
||
// Put places a file in the store | ||
func (c Cache) Put(file cafs.File, pin bool) (key datastore.Key, err error) { | ||
key, err = c.fs.Put(file, pin) | ||
if err != nil { | ||
return | ||
} | ||
|
||
if c.shouldCache(file) { | ||
if c.cfg.AsyncWrite { | ||
go func() { | ||
err := c.putCache(key) | ||
if err != nil { | ||
fmt.Errorf("error placing in cache: %s", err.Error()) | ||
} | ||
}() | ||
} | ||
err = c.putCache(key) | ||
} | ||
|
||
return key, err | ||
} | ||
|
||
// Get | ||
func (c Cache) Get(key datastore.Key) (file cafs.File, err error) { | ||
return c.fs.Get(key) | ||
} | ||
|
||
// Has checks for the presence of a key | ||
func (c Cache) Has(key datastore.Key) (bool, error) { | ||
return c.fs.Has(key) | ||
} | ||
|
||
// Delete removes a key from the store, possibly affecting the cache | ||
func (c Cache) Delete(key datastore.Key) error { | ||
return c.fs.Delete(key) | ||
} | ||
|
||
// PathPrefix returns the prefix of the underlying store | ||
func (c Cache) PathPrefix() string { | ||
return c.fs.PathPrefix() | ||
} | ||
|
||
// NewAdder proxies the store NewAdder method | ||
func (c Cache) NewAdder(pin, wrap bool) (cafs.Adder, error) { | ||
return c.fs.NewAdder(pin, wrap) | ||
// fsAdder, err := c.fs.NewAdder(pin, wrap) | ||
// if err != nil { | ||
// return nil, err | ||
// } | ||
|
||
// adder := &adder{ | ||
// cache: &c, | ||
// adder: fsAdder, | ||
// out: make(chan cafs.AddedFile, 9), | ||
// } | ||
|
||
// go func() { | ||
// for af := range fsAdder.Added() { | ||
// adder.out <- af | ||
// } | ||
// }() | ||
|
||
// return adder | ||
} | ||
|
||
// Adder wraps a coreunix adder to conform to the cafs adder interface | ||
// type adder struct { | ||
// cache *Cache | ||
// adder cafs.Adder | ||
// out chan cafs.AddedFile | ||
// } | ||
|
||
// func (a *adder) AddFile(f cafs.File) error { | ||
// if a.cache.shouldCache(f) { | ||
// pr, pw := io.Pipe() | ||
// r := io.TeeReader(f, pw) | ||
// } | ||
|
||
// // a.adder.AddFile(f cafs.File) | ||
// // // path, err := a.z.Put(f, a.pin) | ||
// // if err != nil { | ||
// // fmt.Errorf("error putting file in mapstore: %s", err.Error()) | ||
// // return err | ||
// // } | ||
// // a.out <- cafs.AddedFile{ | ||
// // Path: path, | ||
// // Name: f.FileName(), | ||
// // Bytes: 0, | ||
// // Hash: path.String(), | ||
// // } | ||
// return nil | ||
// } | ||
|
||
// func (a *adder) Added() chan cafs.AddedFile { | ||
// return a.out | ||
// } | ||
|
||
// func (a *adder) Close() error { | ||
// close(a.out) | ||
// return nil | ||
// } | ||
|
||
// Filestore returns | ||
func (c Cache) Filestore() cafs.Filestore { | ||
return c.fs | ||
} | ||
|
||
// Cache | ||
func (c Cache) Cache(key datastore.Key) error { | ||
return c.putCache(key) | ||
} | ||
|
||
// Uncache removes a cache from the store | ||
func (c Cache) Uncache(key datastore.Key) error { | ||
return c.deleteCache(key) | ||
} | ||
|
||
func (c Cache) shouldCache(file cafs.File) bool { | ||
return c.cfg.Full | ||
} | ||
|
||
// | ||
func (c Cache) putCache(key datastore.Key) (err error) { | ||
file, err := c.fs.Get(key) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
w := c.object(key).NewWriter(c.ctx) | ||
_, err = io.Copy(w, file) | ||
return | ||
} | ||
|
||
// | ||
func (c Cache) getCache(key datastore.Key) (file cafs.File, err error) { | ||
obj := c.object(key) | ||
r, err := obj.NewReader(c.ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return memfs.NewMemfileReader(key.BaseNamespace(), r), err | ||
} | ||
|
||
func (c Cache) deleteCache(key datastore.Key) error { | ||
return c.object(key).Delete(c.ctx) | ||
} | ||
|
||
func (c Cache) object(key datastore.Key) *storage.ObjectHandle { | ||
return c.bucket().Object(key.String()) | ||
} | ||
|
||
// | ||
func (c Cache) bucket() *storage.BucketHandle { | ||
return c.client.Bucket(c.cfg.BucketName) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package gcloud | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"cloud.google.com/go/storage" | ||
"github.com/qri-io/cafs/test" | ||
"google.golang.org/api/option" | ||
) | ||
|
||
func TestCache(t *testing.T) { | ||
ctx := context.Background() | ||
cli, err := storage.NewClient(ctx, option.WithoutAuthentication()) | ||
if err != nil { | ||
t.Errorf("error creating client: %s", err.Error()) | ||
return | ||
} | ||
|
||
cf := NewCacheFunc(ctx, cli, func(c *CacheCfg) { | ||
c.Full = true | ||
c.BucketName = "qri_tests" | ||
}) | ||
|
||
test.RunCacheTests(cf, t) | ||
} |
Oops, something went wrong.