etcd: progress on etcd

Open() Create() and Delete() are there'ish.
But get/set metadata is not quite right yet.
And it seems that _viewing_ an uploaded object is not working.

But all of the index fetching is not there yet.

Signed-off-by: Vincent Batts <vbatts@hashbangbash.com>
This commit is contained in:
Vincent Batts 2017-04-07 16:01:54 -04:00
parent b2a0ac7a3b
commit 56ccafefa9
Signed by: vbatts
GPG Key ID: 10937E57733F1362
6 changed files with 195 additions and 10 deletions

View File

@ -15,6 +15,7 @@ type Config struct {
MongoDbName string // mongoDB db name, if different than 'filesrv' (server)
MongoUsername string // mongoDB username, if any (server)
MongoPassword string // mongoDB password, if any (server)
EtcdEndpoints string
RemoteHost string // imgsrv server to push files to (client)

82
dbutil/etcd/file.go Normal file
View File

@ -0,0 +1,82 @@
package etcd
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"github.com/vbatts/imgsrv/hash"
"github.com/vbatts/imgsrv/types"
)
// eFile is for wrapping files and satisfy the dbutil.File interface
type eFile struct {
h *eHandle
fh *os.File
info types.File
hasWritten bool
}
func (f *eFile) Read(p []byte) (n int, err error) {
if f.fh != nil {
return f.fh.Read(p)
}
return -1, errors.New("no file to read from")
}
func (f *eFile) Write(p []byte) (n int, err error) {
if f.fh != nil {
f.hasWritten = true
return f.fh.Write(p)
}
return -1, errors.New("no file to write to")
}
func (f *eFile) Close() error {
if f.fh != nil {
if f.hasWritten {
f.fh.Sync()
f.fh.Seek(0, 0)
buf, err := ioutil.ReadAll(f.fh)
if err != nil {
return err
}
f.info.Md5 = fmt.Sprintf("%x", hash.GetMd5FromBytes(buf))
_, err = f.h.kapi.Set(context.Background(), objPrefix+f.info.Md5, base64.StdEncoding.EncodeToString(buf), nil)
if err != nil {
return err
}
buf, err = json.Marshal(f.info)
if err != nil {
return err
}
_, err = f.h.kapi.Set(context.Background(), filesPrefix+f.info.Filename, string(buf), nil)
if err != nil {
return err
}
if _, err := f.h.refCountAdd(f.info.Md5, 1); err != nil {
return err
}
}
if err := f.fh.Close(); err != nil {
os.Remove(f.fh.Name())
return err
}
return os.Remove(f.fh.Name())
}
return nil
}
func (f *eFile) GetMeta(result interface{}) (err error) {
result = &f.info.Metadata
return nil
}
func (f *eFile) SetMeta(metadata interface{}) {
f.info.Metadata = *metadata.(*types.Info)
f.hasWritten = true
}

View File

@ -1,18 +1,29 @@
package etcd
import (
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"strconv"
"time"
"github.com/coreos/etcd/client"
"github.com/vbatts/imgsrv/dbutil"
"github.com/vbatts/imgsrv/types"
"golang.org/x/net/context"
)
func init() {
dbutil.Handles["etcd"] = &eHandle{}
}
var (
filesPrefix = "/files/"
objPrefix = "/obj/"
refPrefix = "/refcount/"
)
type dbConfig struct {
Endpoints []string
}
@ -42,7 +53,6 @@ func (e *eHandle) Init(config []byte, err error) error {
return err
}
e.kapi = client.NewKeysAPI(e.c)
// This is going to require a wild helper to
return nil
}
@ -51,15 +61,98 @@ func (e *eHandle) Close() error {
}
func (e *eHandle) Open(filename string) (dbutil.File, error) {
return nil, nil
// This is going to require a wild helper to stash read the file contents
// from the store, perhaps base64 encoded blob at /obj/md5/<blob>, then at
// /files/<filename> it is a marshalled object with the md5 sum reference
// plus additional metadata for the file (i.e. types.File and types.Info)
resp, err := e.kapi.Get(context.Background(), filesPrefix+filename, nil)
if err != nil {
return nil, err
}
// unmarshal the data
fi := types.File{}
if err := json.Unmarshal([]byte(resp.Node.Value), &fi); err != nil {
return nil, err
}
// then get the object blob
resp, err = e.kapi.Get(context.Background(), objPrefix+fi.Md5, nil)
if err != nil {
return nil, err
}
decoded, err := base64.StdEncoding.DecodeString(resp.Node.Value)
if err != nil {
return nil, err
}
// then write obj to this file
fh, err := ioutil.TempFile("", "imgsrv."+filename)
if err != nil {
return nil, err
}
if _, err := fh.Write(decoded); err != nil {
return nil, err
}
fh.Sync()
fh.Seek(0, 0)
return &eFile{h: e, fh: fh, info: fi}, nil
}
func (e *eHandle) Create(filename string) (dbutil.File, error) {
return nil, nil
// This is will have some similarities to Open(), but will have to buffer the
// file (bytes.Buffer or ioutil.TempFile). This will have to be in a
// goroutine that does a checksum and pushes it to the backend on .Close() of
// the returned File. :-\
fh, err := ioutil.TempFile("", "imgsrv."+filename)
if err != nil {
return nil, err
}
now := time.Now()
return &eFile{h: e, fh: fh, info: types.File{Filename: filename, UploadDate: now, Metadata: types.Info{TimeStamp: now}}}, nil
}
func (e *eHandle) Remove(filename string) error {
// Perhaps a little tricky, since you can remove the /files/<filename>, but
// the /obj/md5/<blob> needs a ref counter, so that blob can be ejected when
// it has no refs.
resp, err := e.kapi.Get(context.Background(), filesPrefix+filename, nil)
if err != nil {
return err
}
fi := types.File{}
if err := json.Unmarshal([]byte(resp.Node.Value), &fi); err != nil {
return err
}
if _, err := e.kapi.Delete(context.Background(), filesPrefix+filename, nil); err != nil {
return err
}
i, err := e.refCountAdd(fi.Md5, -1)
if err != nil {
return err
}
if i < 1 {
if _, err := e.kapi.Delete(context.Background(), objPrefix+fi.Md5, nil); err != nil {
return err
}
}
return nil
}
// intended for ref counting md5 objects. Add a negative number to decrement
func (e *eHandle) refCountAdd(refname string, i int) (int, error) {
resp, err := e.kapi.Get(context.Background(), refPrefix+refname, nil)
if err != nil {
return -1, err
}
count, err := strconv.Atoi(resp.Node.Value)
if err != nil {
return -1, err
}
_, err = e.kapi.Set(context.Background(), refPrefix+refname, fmt.Sprintf("%d", count+i), nil)
return count + i, err
}
func (e *eHandle) HasFileByFilename(filename string) (exists bool, err error) {
return false, nil
}

View File

@ -31,6 +31,7 @@ var (
MongoDbName: "filesrv",
MongoUsername: "",
MongoPassword: "",
EtcdEndpoints: "http://127.0.0.1:2379",
RemoteHost: "",
}
@ -148,6 +149,13 @@ func init() {
DefaultConfig.MongoPassword,
"Mongo password to auth with (if needed) ('mongopassword' in the config)")
/* etcd settings */
flag.StringVar(&DefaultConfig.EtcdEndpoints,
"etcds-endpoints",
DefaultConfig.EtcdEndpoints,
"Etcd endpoint to use for the 'etcd' dbhandler")
/* Client-side */
flag.StringVar(&FetchUrl,
"fetch",

View File

@ -52,6 +52,12 @@ func runServer(c *config.Config) {
serverConfig.MongoPassword,
serverConfig.MongoDbName,
}
} else if serverConfig.DbHandler == "etcd" {
duConfig = struct {
Endpoints []string
}{
strings.Split(serverConfig.EtcdEndpoints, ","),
}
}
if err := du.Init(json.Marshal(duConfig)); err != nil {
@ -228,7 +234,6 @@ func routeFilesPOST(w http.ResponseWriter, r *http.Request) {
var filename string
info := types.Info{
Ip: r.RemoteAddr,
Random: hash.Rand64(),
TimeStamp: time.Now(),
}
@ -634,7 +639,6 @@ func routeGetFromUrl(w http.ResponseWriter, r *http.Request) {
info = types.Info{
Ip: r.RemoteAddr,
Random: hash.Rand64(),
TimeStamp: time.Now(),
}
log.Println(info)
@ -731,7 +735,6 @@ func routeUpload(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
info := types.Info{
Ip: r.RemoteAddr,
Random: hash.Rand64(),
TimeStamp: time.Now(),
}

View File

@ -8,16 +8,14 @@ import (
)
type Info struct {
Keywords []string // tags
Ip string // who uploaded it
Random int64
Keywords []string // tags
Ip string // who uploaded it
TimeStamp time.Time "timestamp,omitempty"
}
type File struct {
Metadata Info ",omitempty"
Md5 string
ChunkSize int
UploadDate time.Time
Length uint64
Filename string ",omitempty"