Merge pull request #443 from gierschv/driver-rados

Storage Driver: Ceph Object Storage (RADOS)
This commit is contained in:
Stephen Day 2015-05-29 18:10:49 -07:00
commit 318af0b1ce
17 changed files with 2517 additions and 4 deletions

View file

@ -0,0 +1,300 @@
package rados
// #cgo LDFLAGS: -lrados
// #include <stdlib.h>
// #include <rados/librados.h>
import "C"
import "unsafe"
import "bytes"
// ClusterStat represents Ceph cluster statistics.
type ClusterStat struct {
Kb uint64
Kb_used uint64
Kb_avail uint64
Num_objects uint64
}
// Conn is a connection handle to a Ceph cluster.
type Conn struct {
cluster C.rados_t
}
// PingMonitor sends a ping to a monitor and returns the reply.
func (c *Conn) PingMonitor(id string) (string, error) {
c_id := C.CString(id)
defer C.free(unsafe.Pointer(c_id))
var strlen C.size_t
var strout *C.char
ret := C.rados_ping_monitor(c.cluster, c_id, &strout, &strlen)
defer C.rados_buffer_free(strout)
if ret == 0 {
reply := C.GoStringN(strout, (C.int)(strlen))
return reply, nil
} else {
return "", RadosError(int(ret))
}
}
// Connect establishes a connection to a RADOS cluster. It returns an error,
// if any.
func (c *Conn) Connect() error {
ret := C.rados_connect(c.cluster)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Shutdown disconnects from the cluster.
func (c *Conn) Shutdown() {
C.rados_shutdown(c.cluster)
}
// ReadConfigFile configures the connection using a Ceph configuration file.
func (c *Conn) ReadConfigFile(path string) error {
c_path := C.CString(path)
defer C.free(unsafe.Pointer(c_path))
ret := C.rados_conf_read_file(c.cluster, c_path)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// ReadDefaultConfigFile configures the connection using a Ceph configuration
// file located at default locations.
func (c *Conn) ReadDefaultConfigFile() error {
ret := C.rados_conf_read_file(c.cluster, nil)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
func (c *Conn) OpenIOContext(pool string) (*IOContext, error) {
c_pool := C.CString(pool)
defer C.free(unsafe.Pointer(c_pool))
ioctx := &IOContext{}
ret := C.rados_ioctx_create(c.cluster, c_pool, &ioctx.ioctx)
if ret == 0 {
return ioctx, nil
} else {
return nil, RadosError(int(ret))
}
}
// ListPools returns the names of all existing pools.
func (c *Conn) ListPools() (names []string, err error) {
buf := make([]byte, 4096)
for {
ret := int(C.rados_pool_list(c.cluster,
(*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
if ret < 0 {
return nil, RadosError(int(ret))
}
if ret > len(buf) {
buf = make([]byte, ret)
continue
}
tmp := bytes.SplitAfter(buf[:ret-1], []byte{0})
for _, s := range tmp {
if len(s) > 0 {
name := C.GoString((*C.char)(unsafe.Pointer(&s[0])))
names = append(names, name)
}
}
return names, nil
}
}
// SetConfigOption sets the value of the configuration option identified by
// the given name.
func (c *Conn) SetConfigOption(option, value string) error {
c_opt, c_val := C.CString(option), C.CString(value)
defer C.free(unsafe.Pointer(c_opt))
defer C.free(unsafe.Pointer(c_val))
ret := C.rados_conf_set(c.cluster, c_opt, c_val)
if ret < 0 {
return RadosError(int(ret))
} else {
return nil
}
}
// GetConfigOption returns the value of the Ceph configuration option
// identified by the given name.
func (c *Conn) GetConfigOption(name string) (value string, err error) {
buf := make([]byte, 4096)
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_name))
ret := int(C.rados_conf_get(c.cluster, c_name,
(*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
// FIXME: ret may be -ENAMETOOLONG if the buffer is not large enough. We
// can handle this case, but we need a reliable way to test for
// -ENAMETOOLONG constant. Will the syscall/Errno stuff in Go help?
if ret == 0 {
value = C.GoString((*C.char)(unsafe.Pointer(&buf[0])))
return value, nil
} else {
return "", RadosError(ret)
}
}
// WaitForLatestOSDMap blocks the caller until the latest OSD map has been
// retrieved.
func (c *Conn) WaitForLatestOSDMap() error {
ret := C.rados_wait_for_latest_osdmap(c.cluster)
if ret < 0 {
return RadosError(int(ret))
} else {
return nil
}
}
// GetClusterStat returns statistics about the cluster associated with the
// connection.
func (c *Conn) GetClusterStats() (stat ClusterStat, err error) {
c_stat := C.struct_rados_cluster_stat_t{}
ret := C.rados_cluster_stat(c.cluster, &c_stat)
if ret < 0 {
return ClusterStat{}, RadosError(int(ret))
} else {
return ClusterStat{
Kb: uint64(c_stat.kb),
Kb_used: uint64(c_stat.kb_used),
Kb_avail: uint64(c_stat.kb_avail),
Num_objects: uint64(c_stat.num_objects),
}, nil
}
}
// ParseCmdLineArgs configures the connection from command line arguments.
func (c *Conn) ParseCmdLineArgs(args []string) error {
// add an empty element 0 -- Ceph treats the array as the actual contents
// of argv and skips the first element (the executable name)
argc := C.int(len(args) + 1)
argv := make([]*C.char, argc)
// make the first element a string just in case it is ever examined
argv[0] = C.CString("placeholder")
defer C.free(unsafe.Pointer(argv[0]))
for i, arg := range args {
argv[i+1] = C.CString(arg)
defer C.free(unsafe.Pointer(argv[i+1]))
}
ret := C.rados_conf_parse_argv(c.cluster, argc, &argv[0])
if ret < 0 {
return RadosError(int(ret))
} else {
return nil
}
}
// ParseDefaultConfigEnv configures the connection from the default Ceph
// environment variable(s).
func (c *Conn) ParseDefaultConfigEnv() error {
ret := C.rados_conf_parse_env(c.cluster, nil)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// GetFSID returns the fsid of the cluster as a hexadecimal string. The fsid
// is a unique identifier of an entire Ceph cluster.
func (c *Conn) GetFSID() (fsid string, err error) {
buf := make([]byte, 37)
ret := int(C.rados_cluster_fsid(c.cluster,
(*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
// FIXME: the success case isn't documented correctly in librados.h
if ret == 36 {
fsid = C.GoString((*C.char)(unsafe.Pointer(&buf[0])))
return fsid, nil
} else {
return "", RadosError(int(ret))
}
}
// GetInstanceID returns a globally unique identifier for the cluster
// connection instance.
func (c *Conn) GetInstanceID() uint64 {
// FIXME: are there any error cases for this?
return uint64(C.rados_get_instance_id(c.cluster))
}
// MakePool creates a new pool with default settings.
func (c *Conn) MakePool(name string) error {
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_name))
ret := int(C.rados_pool_create(c.cluster, c_name))
if ret == 0 {
return nil
} else {
return RadosError(ret)
}
}
// DeletePool deletes a pool and all the data inside the pool.
func (c *Conn) DeletePool(name string) error {
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_name))
ret := int(C.rados_pool_delete(c.cluster, c_name))
if ret == 0 {
return nil
} else {
return RadosError(ret)
}
}
// MonCommand sends a command to one of the monitors
func (c *Conn) MonCommand(args []byte) (buffer []byte, info string, err error) {
argv := make([]*C.char, len(args))
for i, _ := range args {
argv[i] = (*C.char)(unsafe.Pointer(&args[i]))
}
var (
outs, outbuf *C.char
outslen, outbuflen C.size_t
)
inbuf := C.CString("")
defer C.free(unsafe.Pointer(inbuf))
ret := C.rados_mon_command(c.cluster,
&argv[0], C.size_t(len(args)),
inbuf, // bulk input (e.g. crush map)
C.size_t(0), // length inbuf
&outbuf, // buffer
&outbuflen, // buffer length
&outs, // status string
&outslen)
if outslen > 0 {
info = C.GoStringN(outs, C.int(outslen))
C.free(unsafe.Pointer(outs))
}
if outbuflen > 0 {
buffer = C.GoBytes(unsafe.Pointer(outbuf), C.int(outbuflen))
C.free(unsafe.Pointer(outbuf))
}
if ret != 0 {
err = RadosError(int(ret))
return nil, info, err
}
return
}

View file

@ -0,0 +1,4 @@
/*
Set of wrappers around librados API.
*/
package rados

View file

@ -0,0 +1,547 @@
package rados
// #cgo LDFLAGS: -lrados
// #include <stdlib.h>
// #include <rados/librados.h>
import "C"
import "unsafe"
import "time"
// PoolStat represents Ceph pool statistics.
type PoolStat struct {
// space used in bytes
Num_bytes uint64
// space used in KB
Num_kb uint64
// number of objects in the pool
Num_objects uint64
// number of clones of objects
Num_object_clones uint64
// num_objects * num_replicas
Num_object_copies uint64
Num_objects_missing_on_primary uint64
// number of objects found on no OSDs
Num_objects_unfound uint64
// number of objects replicated fewer times than they should be
// (but found on at least one OSD)
Num_objects_degraded uint64
Num_rd uint64
Num_rd_kb uint64
Num_wr uint64
Num_wr_kb uint64
}
// ObjectStat represents an object stat information
type ObjectStat struct {
// current length in bytes
Size uint64
// last modification time
ModTime time.Time
}
// IOContext represents a context for performing I/O within a pool.
type IOContext struct {
ioctx C.rados_ioctx_t
}
// Pointer returns a uintptr representation of the IOContext.
func (ioctx *IOContext) Pointer() uintptr {
return uintptr(ioctx.ioctx)
}
// Write writes len(data) bytes to the object with key oid starting at byte
// offset offset. It returns an error, if any.
func (ioctx *IOContext) Write(oid string, data []byte, offset uint64) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
ret := C.rados_write(ioctx.ioctx, c_oid,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)),
(C.uint64_t)(offset))
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Read reads up to len(data) bytes from the object with key oid starting at byte
// offset offset. It returns the number of bytes read and an error, if any.
func (ioctx *IOContext) Read(oid string, data []byte, offset uint64) (int, error) {
if len(data) == 0 {
return 0, nil
}
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
ret := C.rados_read(
ioctx.ioctx,
c_oid,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)),
(C.uint64_t)(offset))
if ret >= 0 {
return int(ret), nil
} else {
return 0, RadosError(int(ret))
}
}
// Delete deletes the object with key oid. It returns an error, if any.
func (ioctx *IOContext) Delete(oid string) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
ret := C.rados_remove(ioctx.ioctx, c_oid)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Truncate resizes the object with key oid to size size. If the operation
// enlarges the object, the new area is logically filled with zeroes. If the
// operation shrinks the object, the excess data is removed. It returns an
// error, if any.
func (ioctx *IOContext) Truncate(oid string, size uint64) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
ret := C.rados_trunc(ioctx.ioctx, c_oid, (C.uint64_t)(size))
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Destroy informs librados that the I/O context is no longer in use.
// Resources associated with the context may not be freed immediately, and the
// context should not be used again after calling this method.
func (ioctx *IOContext) Destroy() {
C.rados_ioctx_destroy(ioctx.ioctx)
}
// Stat returns a set of statistics about the pool associated with this I/O
// context.
func (ioctx *IOContext) GetPoolStats() (stat PoolStat, err error) {
c_stat := C.struct_rados_pool_stat_t{}
ret := C.rados_ioctx_pool_stat(ioctx.ioctx, &c_stat)
if ret < 0 {
return PoolStat{}, RadosError(int(ret))
} else {
return PoolStat{
Num_bytes: uint64(c_stat.num_bytes),
Num_kb: uint64(c_stat.num_kb),
Num_objects: uint64(c_stat.num_objects),
Num_object_clones: uint64(c_stat.num_object_clones),
Num_object_copies: uint64(c_stat.num_object_copies),
Num_objects_missing_on_primary: uint64(c_stat.num_objects_missing_on_primary),
Num_objects_unfound: uint64(c_stat.num_objects_unfound),
Num_objects_degraded: uint64(c_stat.num_objects_degraded),
Num_rd: uint64(c_stat.num_rd),
Num_rd_kb: uint64(c_stat.num_rd_kb),
Num_wr: uint64(c_stat.num_wr),
Num_wr_kb: uint64(c_stat.num_wr_kb),
}, nil
}
}
// GetPoolName returns the name of the pool associated with the I/O context.
func (ioctx *IOContext) GetPoolName() (name string, err error) {
buf := make([]byte, 128)
for {
ret := C.rados_ioctx_get_pool_name(ioctx.ioctx,
(*C.char)(unsafe.Pointer(&buf[0])), C.unsigned(len(buf)))
if ret == -34 { // FIXME
buf = make([]byte, len(buf)*2)
continue
} else if ret < 0 {
return "", RadosError(ret)
}
name = C.GoStringN((*C.char)(unsafe.Pointer(&buf[0])), ret)
return name, nil
}
}
// ObjectListFunc is the type of the function called for each object visited
// by ListObjects.
type ObjectListFunc func(oid string)
// ListObjects lists all of the objects in the pool associated with the I/O
// context, and called the provided listFn function for each object, passing
// to the function the name of the object.
func (ioctx *IOContext) ListObjects(listFn ObjectListFunc) error {
var ctx C.rados_list_ctx_t
ret := C.rados_objects_list_open(ioctx.ioctx, &ctx)
if ret < 0 {
return RadosError(ret)
}
defer func() { C.rados_objects_list_close(ctx) }()
for {
var c_entry *C.char
ret := C.rados_objects_list_next(ctx, &c_entry, nil)
if ret == -2 { // FIXME
return nil
} else if ret < 0 {
return RadosError(ret)
}
listFn(C.GoString(c_entry))
}
panic("invalid state")
}
// Stat returns the size of the object and its last modification time
func (ioctx *IOContext) Stat(object string) (stat ObjectStat, err error) {
var c_psize C.uint64_t
var c_pmtime C.time_t
c_object := C.CString(object)
defer C.free(unsafe.Pointer(c_object))
ret := C.rados_stat(
ioctx.ioctx,
c_object,
&c_psize,
&c_pmtime)
if ret < 0 {
return ObjectStat{}, RadosError(int(ret))
} else {
return ObjectStat{
Size: uint64(c_psize),
ModTime: time.Unix(int64(c_pmtime), 0),
}, nil
}
}
// GetXattr gets an xattr with key `name`, it returns the length of
// the key read or an error if not successful
func (ioctx *IOContext) GetXattr(object string, name string, data []byte) (int, error) {
c_object := C.CString(object)
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_object))
defer C.free(unsafe.Pointer(c_name))
ret := C.rados_getxattr(
ioctx.ioctx,
c_object,
c_name,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)))
if ret >= 0 {
return int(ret), nil
} else {
return 0, RadosError(int(ret))
}
}
// Sets an xattr for an object with key `name` with value as `data`
func (ioctx *IOContext) SetXattr(object string, name string, data []byte) error {
c_object := C.CString(object)
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_object))
defer C.free(unsafe.Pointer(c_name))
ret := C.rados_setxattr(
ioctx.ioctx,
c_object,
c_name,
(*C.char)(unsafe.Pointer(&data[0])),
(C.size_t)(len(data)))
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// function that lists all the xattrs for an object, since xattrs are
// a k-v pair, this function returns a map of k-v pairs on
// success, error code on failure
func (ioctx *IOContext) ListXattrs(oid string) (map[string][]byte, error) {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
var it C.rados_xattrs_iter_t
ret := C.rados_getxattrs(ioctx.ioctx, c_oid, &it)
if ret < 0 {
return nil, RadosError(ret)
}
defer func() { C.rados_getxattrs_end(it) }()
m := make(map[string][]byte)
for {
var c_name, c_val *C.char
var c_len C.size_t
defer C.free(unsafe.Pointer(c_name))
defer C.free(unsafe.Pointer(c_val))
ret := C.rados_getxattrs_next(it, &c_name, &c_val, &c_len)
if ret < 0 {
return nil, RadosError(int(ret))
}
// rados api returns a null name,val & 0-length upon
// end of iteration
if c_name == nil {
return m, nil // stop iteration
}
m[C.GoString(c_name)] = C.GoBytes(unsafe.Pointer(c_val), (C.int)(c_len))
}
}
// Remove an xattr with key `name` from object `oid`
func (ioctx *IOContext) RmXattr(oid string, name string) error {
c_oid := C.CString(oid)
c_name := C.CString(name)
defer C.free(unsafe.Pointer(c_oid))
defer C.free(unsafe.Pointer(c_name))
ret := C.rados_rmxattr(
ioctx.ioctx,
c_oid,
c_name)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Append the map `pairs` to the omap `oid`
func (ioctx *IOContext) SetOmap(oid string, pairs map[string][]byte) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
var s C.size_t
var c *C.char
ptrSize := unsafe.Sizeof(c)
c_keys := C.malloc(C.size_t(len(pairs)) * C.size_t(ptrSize))
c_values := C.malloc(C.size_t(len(pairs)) * C.size_t(ptrSize))
c_lengths := C.malloc(C.size_t(len(pairs)) * C.size_t(unsafe.Sizeof(s)))
defer C.free(unsafe.Pointer(c_keys))
defer C.free(unsafe.Pointer(c_values))
defer C.free(unsafe.Pointer(c_lengths))
i := 0
for key, value := range pairs {
// key
c_key_ptr := (**C.char)(unsafe.Pointer(uintptr(c_keys) + uintptr(i) * ptrSize))
*c_key_ptr = C.CString(key)
defer C.free(unsafe.Pointer(*c_key_ptr))
// value and its length
c_value_ptr := (**C.char)(unsafe.Pointer(uintptr(c_values) + uintptr(i) * ptrSize))
var c_length C.size_t
if len(value) > 0 {
*c_value_ptr = (*C.char)(unsafe.Pointer(&value[0]))
c_length = C.size_t(len(value))
} else {
*c_value_ptr = nil
c_length = C.size_t(0)
}
c_length_ptr := (*C.size_t)(unsafe.Pointer(uintptr(c_lengths) + uintptr(i) * ptrSize))
*c_length_ptr = c_length
i++
}
op := C.rados_create_write_op()
C.rados_write_op_omap_set(
op,
(**C.char)(c_keys),
(**C.char)(c_values),
(*C.size_t)(c_lengths),
C.size_t(len(pairs)))
ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0)
C.rados_release_write_op(op)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// OmapListFunc is the type of the function called for each omap key
// visited by ListOmapValues
type OmapListFunc func(key string, value []byte)
// Iterate on a set of keys and their values from an omap
// `startAfter`: iterate only on the keys after this specified one
// `filterPrefix`: iterate only on the keys beginning with this prefix
// `maxReturn`: iterate no more than `maxReturn` key/value pairs
// `listFn`: the function called at each iteration
func (ioctx *IOContext) ListOmapValues(oid string, startAfter string, filterPrefix string, maxReturn int64, listFn OmapListFunc) error {
c_oid := C.CString(oid)
c_start_after := C.CString(startAfter)
c_filter_prefix := C.CString(filterPrefix)
c_max_return := C.uint64_t(maxReturn)
defer C.free(unsafe.Pointer(c_oid))
defer C.free(unsafe.Pointer(c_start_after))
defer C.free(unsafe.Pointer(c_filter_prefix))
op := C.rados_create_read_op()
var c_iter C.rados_omap_iter_t
var c_prval C.int
C.rados_read_op_omap_get_vals(
op,
c_start_after,
c_filter_prefix,
c_max_return,
&c_iter,
&c_prval,
)
ret := C.rados_read_op_operate(op, ioctx.ioctx, c_oid, 0)
if int(c_prval) != 0 {
return RadosError(int(c_prval))
} else if int(ret) != 0 {
return RadosError(int(ret))
}
for {
var c_key *C.char
var c_val *C.char
var c_len C.size_t
ret = C.rados_omap_get_next(c_iter, &c_key, &c_val, &c_len)
if int(ret) != 0 {
return RadosError(int(ret))
}
if c_key == nil {
break
}
listFn(C.GoString(c_key), C.GoBytes(unsafe.Pointer(c_val), C.int(c_len)))
}
C.rados_omap_get_end(c_iter)
C.rados_release_read_op(op)
return nil
}
// Fetch a set of keys and their values from an omap and returns then as a map
// `startAfter`: retrieve only the keys after this specified one
// `filterPrefix`: retrieve only the keys beginning with this prefix
// `maxReturn`: retrieve no more than `maxReturn` key/value pairs
func (ioctx *IOContext) GetOmapValues(oid string, startAfter string, filterPrefix string, maxReturn int64) (map[string][]byte, error) {
omap := map[string][]byte{}
err := ioctx.ListOmapValues(
oid, startAfter, filterPrefix, maxReturn,
func(key string, value []byte) {
omap[key] = value
},
)
return omap, err
}
// Fetch all the keys and their values from an omap and returns then as a map
// `startAfter`: retrieve only the keys after this specified one
// `filterPrefix`: retrieve only the keys beginning with this prefix
// `iteratorSize`: internal number of keys to fetch during a read operation
func (ioctx *IOContext) GetAllOmapValues(oid string, startAfter string, filterPrefix string, iteratorSize int64) (map[string][]byte, error) {
omap := map[string][]byte{}
omapSize := 0
for {
err := ioctx.ListOmapValues(
oid, startAfter, filterPrefix, iteratorSize,
func (key string, value []byte) {
omap[key] = value
startAfter = key
},
)
if err != nil {
return omap, err
}
// End of omap
if len(omap) == omapSize {
break
}
omapSize = len(omap)
}
return omap, nil
}
// Remove the specified `keys` from the omap `oid`
func (ioctx *IOContext) RmOmapKeys(oid string, keys []string) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
var c *C.char
ptrSize := unsafe.Sizeof(c)
c_keys := C.malloc(C.size_t(len(keys)) * C.size_t(ptrSize))
defer C.free(unsafe.Pointer(c_keys))
i := 0
for _, key := range keys {
c_key_ptr := (**C.char)(unsafe.Pointer(uintptr(c_keys) + uintptr(i) * ptrSize))
*c_key_ptr = C.CString(key)
defer C.free(unsafe.Pointer(*c_key_ptr))
i++
}
op := C.rados_create_write_op()
C.rados_write_op_omap_rm_keys(
op,
(**C.char)(c_keys),
C.size_t(len(keys)))
ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0)
C.rados_release_write_op(op)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}
// Clear the omap `oid`
func (ioctx *IOContext) CleanOmap(oid string) error {
c_oid := C.CString(oid)
defer C.free(unsafe.Pointer(c_oid))
op := C.rados_create_write_op()
C.rados_write_op_omap_clear(op)
ret := C.rados_write_op_operate(op, ioctx.ioctx, c_oid, nil, 0)
C.rados_release_write_op(op)
if ret == 0 {
return nil
} else {
return RadosError(int(ret))
}
}

View file

@ -0,0 +1,54 @@
package rados
// #cgo LDFLAGS: -lrados
// #include <stdlib.h>
// #include <rados/librados.h>
import "C"
import (
"fmt"
"unsafe"
)
type RadosError int
func (e RadosError) Error() string {
return fmt.Sprintf("rados: ret=%d", e)
}
// Version returns the major, minor, and patch components of the version of
// the RADOS library linked against.
func Version() (int, int, int) {
var c_major, c_minor, c_patch C.int
C.rados_version(&c_major, &c_minor, &c_patch)
return int(c_major), int(c_minor), int(c_patch)
}
// NewConn creates a new connection object. It returns the connection and an
// error, if any.
func NewConn() (*Conn, error) {
conn := &Conn{}
ret := C.rados_create(&conn.cluster, nil)
if ret == 0 {
return conn, nil
} else {
return nil, RadosError(int(ret))
}
}
// NewConnWithUser creates a new connection object with a custom username.
// It returns the connection and an error, if any.
func NewConnWithUser(user string) (*Conn, error) {
c_user := C.CString(user)
defer C.free(unsafe.Pointer(c_user))
conn := &Conn{}
ret := C.rados_create(&conn.cluster, c_user)
if ret == 0 {
return conn, nil
} else {
return nil, RadosError(int(ret))
}
}

View file

@ -0,0 +1,703 @@
package rados_test
import "testing"
//import "bytes"
import "github.com/noahdesu/go-ceph/rados"
import "github.com/stretchr/testify/assert"
import "os"
import "os/exec"
import "io"
import "io/ioutil"
import "time"
import "net"
import "fmt"
import "sort"
import "encoding/json"
func GetUUID() string {
out, _ := exec.Command("uuidgen").Output()
return string(out[:36])
}
func TestVersion(t *testing.T) {
var major, minor, patch = rados.Version()
assert.False(t, major < 0 || major > 1000, "invalid major")
assert.False(t, minor < 0 || minor > 1000, "invalid minor")
assert.False(t, patch < 0 || patch > 1000, "invalid patch")
}
func TestGetSetConfigOption(t *testing.T) {
conn, _ := rados.NewConn()
// rejects invalid options
err := conn.SetConfigOption("wefoijweojfiw", "welfkwjelkfj")
assert.Error(t, err, "Invalid option")
// verify SetConfigOption changes a values
log_file_val, err := conn.GetConfigOption("log_file")
assert.NotEqual(t, log_file_val, "/dev/null")
err = conn.SetConfigOption("log_file", "/dev/null")
assert.NoError(t, err, "Invalid option")
log_file_val, err = conn.GetConfigOption("log_file")
assert.Equal(t, log_file_val, "/dev/null")
}
func TestParseDefaultConfigEnv(t *testing.T) {
conn, _ := rados.NewConn()
log_file_val, _ := conn.GetConfigOption("log_file")
assert.NotEqual(t, log_file_val, "/dev/null")
err := os.Setenv("CEPH_ARGS", "--log-file /dev/null")
assert.NoError(t, err)
err = conn.ParseDefaultConfigEnv()
assert.NoError(t, err)
log_file_val, _ = conn.GetConfigOption("log_file")
assert.Equal(t, log_file_val, "/dev/null")
}
func TestParseCmdLineArgs(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
mon_host_val, _ := conn.GetConfigOption("mon_host")
assert.NotEqual(t, mon_host_val, "1.1.1.1")
args := []string{"--mon-host", "1.1.1.1"}
err := conn.ParseCmdLineArgs(args)
assert.NoError(t, err)
mon_host_val, _ = conn.GetConfigOption("mon_host")
assert.Equal(t, mon_host_val, "1.1.1.1")
}
func TestGetClusterStats(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
// grab current stats
prev_stat, err := conn.GetClusterStats()
fmt.Printf("prev_stat: %+v\n", prev_stat)
assert.NoError(t, err)
// make some changes to the cluster
buf := make([]byte, 1<<20)
for i := 0; i < 10; i++ {
objname := GetUUID()
pool.Write(objname, buf, 0)
}
// wait a while for the stats to change
for i := 0; i < 30; i++ {
stat, err := conn.GetClusterStats()
assert.NoError(t, err)
// wait for something to change
if stat == prev_stat {
fmt.Printf("curr_stat: %+v (trying again...)\n", stat)
time.Sleep(time.Second)
} else {
// success
fmt.Printf("curr_stat: %+v (change detected)\n", stat)
conn.Shutdown()
return
}
}
pool.Destroy()
conn.Shutdown()
t.Error("Cluster stats aren't changing")
}
func TestGetFSID(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
fsid, err := conn.GetFSID()
assert.NoError(t, err)
assert.NotEqual(t, fsid, "")
conn.Shutdown()
}
func TestGetInstanceID(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
id := conn.GetInstanceID()
assert.NotEqual(t, id, 0)
conn.Shutdown()
}
func TestMakeDeletePool(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
// get current list of pool
pools, err := conn.ListPools()
assert.NoError(t, err)
// check that new pool name is unique
new_name := GetUUID()
for _, poolname := range pools {
if new_name == poolname {
t.Error("Random pool name exists!")
return
}
}
// create pool
err = conn.MakePool(new_name)
assert.NoError(t, err)
// get updated list of pools
pools, err = conn.ListPools()
assert.NoError(t, err)
// verify that the new pool name exists
found := false
for _, poolname := range pools {
if new_name == poolname {
found = true
}
}
if !found {
t.Error("Cannot find newly created pool")
}
// delete the pool
err = conn.DeletePool(new_name)
assert.NoError(t, err)
// verify that it is gone
// get updated list of pools
pools, err = conn.ListPools()
assert.NoError(t, err)
// verify that the new pool name exists
found = false
for _, poolname := range pools {
if new_name == poolname {
found = true
}
}
if found {
t.Error("Deleted pool still exists")
}
conn.Shutdown()
}
func TestPingMonitor(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
// mon id that should work with vstart.sh
reply, err := conn.PingMonitor("a")
if err == nil {
assert.NotEqual(t, reply, "")
return
}
// mon id that should work with micro-osd.sh
reply, err = conn.PingMonitor("0")
if err == nil {
assert.NotEqual(t, reply, "")
return
}
// try to use a hostname as the monitor id
mon_addr, _ := conn.GetConfigOption("mon_host")
hosts, _ := net.LookupAddr(mon_addr)
for _, host := range hosts {
reply, err := conn.PingMonitor(host)
if err == nil {
assert.NotEqual(t, reply, "")
return
}
}
t.Error("Could not find a valid monitor id")
conn.Shutdown()
}
func TestReadConfigFile(t *testing.T) {
conn, _ := rados.NewConn()
// check current log_file value
log_file_val, err := conn.GetConfigOption("log_file")
assert.NoError(t, err)
assert.NotEqual(t, log_file_val, "/dev/null")
// create a temporary ceph.conf file that changes the log_file conf
// option.
file, err := ioutil.TempFile("/tmp", "go-rados")
assert.NoError(t, err)
_, err = io.WriteString(file, "[global]\nlog_file = /dev/null\n")
assert.NoError(t, err)
// parse the config file
err = conn.ReadConfigFile(file.Name())
assert.NoError(t, err)
// check current log_file value
log_file_val, err = conn.GetConfigOption("log_file")
assert.NoError(t, err)
assert.Equal(t, log_file_val, "/dev/null")
// cleanup
file.Close()
os.Remove(file.Name())
}
func TestWaitForLatestOSDMap(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
err := conn.WaitForLatestOSDMap()
assert.NoError(t, err)
conn.Shutdown()
}
func TestReadWrite(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
// make pool
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
bytes_in := []byte("input data")
err = pool.Write("obj", bytes_in, 0)
assert.NoError(t, err)
bytes_out := make([]byte, len(bytes_in))
n_out, err := pool.Read("obj", bytes_out, 0)
assert.Equal(t, n_out, len(bytes_in))
assert.Equal(t, bytes_in, bytes_out)
pool.Destroy()
}
func TestObjectStat(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
bytes_in := []byte("input data")
err = pool.Write("obj", bytes_in, 0)
assert.NoError(t, err)
stat, err := pool.Stat("obj")
assert.Equal(t, uint64(len(bytes_in)), stat.Size)
assert.NotNil(t, stat.ModTime)
pool.Destroy()
conn.Shutdown()
}
func TestGetPoolStats(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
// grab current stats
prev_stat, err := pool.GetPoolStats()
fmt.Printf("prev_stat: %+v\n", prev_stat)
assert.NoError(t, err)
// make some changes to the cluster
buf := make([]byte, 1<<20)
for i := 0; i < 10; i++ {
objname := GetUUID()
pool.Write(objname, buf, 0)
}
// wait a while for the stats to change
for i := 0; i < 30; i++ {
stat, err := pool.GetPoolStats()
assert.NoError(t, err)
// wait for something to change
if stat == prev_stat {
fmt.Printf("curr_stat: %+v (trying again...)\n", stat)
time.Sleep(time.Second)
} else {
// success
fmt.Printf("curr_stat: %+v (change detected)\n", stat)
conn.Shutdown()
return
}
}
pool.Destroy()
conn.Shutdown()
t.Error("Pool stats aren't changing")
}
func TestGetPoolName(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
ioctx, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
poolname_ret, err := ioctx.GetPoolName()
assert.NoError(t, err)
assert.Equal(t, poolname, poolname_ret)
ioctx.Destroy()
conn.Shutdown()
}
func TestMonCommand(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
command, err := json.Marshal(map[string]string{"prefix": "df", "format": "json"})
assert.NoError(t, err)
buf, info, err := conn.MonCommand(command)
assert.NoError(t, err)
assert.Equal(t, info, "")
var message map[string]interface{}
err = json.Unmarshal(buf, &message)
assert.NoError(t, err)
conn.Shutdown()
}
func TestObjectIterator(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
poolname := GetUUID()
err := conn.MakePool(poolname)
assert.NoError(t, err)
ioctx, err := conn.OpenIOContext(poolname)
assert.NoError(t, err)
objectList := []string{}
err = ioctx.ListObjects(func(oid string) {
objectList = append(objectList, oid)
})
assert.NoError(t, err)
assert.True(t, len(objectList) == 0)
createdList := []string{}
for i := 0; i < 200; i++ {
oid := GetUUID()
bytes_in := []byte("input data")
err = ioctx.Write(oid, bytes_in, 0)
assert.NoError(t, err)
createdList = append(createdList, oid)
}
assert.True(t, len(createdList) == 200)
err = ioctx.ListObjects(func(oid string) {
objectList = append(objectList, oid)
})
assert.NoError(t, err)
assert.Equal(t, len(objectList), len(createdList))
sort.Strings(objectList)
sort.Strings(createdList)
assert.Equal(t, objectList, createdList)
}
func TestNewConnWithUser(t *testing.T) {
_, err := rados.NewConnWithUser("admin")
assert.Equal(t, err, nil)
}
func TestReadWriteXattr(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
// make pool
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
bytes_in := []byte("input data")
err = pool.Write("obj", bytes_in, 0)
assert.NoError(t, err)
my_xattr_in := []byte("my_value")
err = pool.SetXattr("obj", "my_key", my_xattr_in)
assert.NoError(t, err)
my_xattr_out := make([]byte, len(my_xattr_in))
n_out, err := pool.GetXattr("obj", "my_key", my_xattr_out)
assert.Equal(t, n_out, len(my_xattr_in))
assert.Equal(t, my_xattr_in, my_xattr_out)
pool.Destroy()
}
func TestListXattrs(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
// make pool
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
bytes_in := []byte("input data")
err = pool.Write("obj", bytes_in, 0)
assert.NoError(t, err)
input_xattrs := make(map[string][]byte)
for i := 0; i < 200; i++ {
name := fmt.Sprintf("key_%d", i)
data := []byte(GetUUID())
err = pool.SetXattr("obj", name, data)
assert.NoError(t, err)
input_xattrs[name] = data
}
output_xattrs := make(map[string][]byte)
output_xattrs, err = pool.ListXattrs("obj")
assert.NoError(t, err)
assert.Equal(t, len(input_xattrs), len(output_xattrs))
assert.Equal(t, input_xattrs, output_xattrs)
pool.Destroy()
}
func TestRmXattr(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
bytes_in := []byte("input data")
err = pool.Write("obj", bytes_in, 0)
assert.NoError(t, err)
key := "key1"
val := []byte("val1")
err = pool.SetXattr("obj", key, val)
assert.NoError(t, err)
key = "key2"
val = []byte("val2")
err = pool.SetXattr("obj", key, val)
assert.NoError(t, err)
xattr_list := make(map[string][]byte)
xattr_list, err = pool.ListXattrs("obj")
assert.NoError(t, err)
assert.Equal(t, len(xattr_list), 2)
pool.RmXattr("obj", "key2")
xattr_list, err = pool.ListXattrs("obj")
assert.NoError(t, err)
assert.Equal(t, len(xattr_list), 1)
found := false
for key, _ = range xattr_list {
if key == "key2" {
found = true
}
}
if found {
t.Error("Deleted pool still exists")
}
pool.Destroy()
}
func TestReadWriteOmap(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
// Set
orig := map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("value2"),
"prefixed-key3": []byte("value3"),
"empty": []byte(""),
}
err = pool.SetOmap("obj", orig)
assert.NoError(t, err)
// List
remaining := map[string][]byte{}
for k, v := range orig {
remaining[k] = v
}
err = pool.ListOmapValues("obj", "", "", 4, func(key string, value []byte) {
assert.Equal(t, remaining[key], value)
delete(remaining, key)
})
assert.NoError(t, err)
assert.Equal(t, 0, len(remaining))
// Get (with a fixed number of keys)
fetched, err := pool.GetOmapValues("obj", "", "", 4)
assert.NoError(t, err)
assert.Equal(t, orig, fetched)
// Get All (with an iterator size bigger than the map size)
fetched, err = pool.GetAllOmapValues("obj", "", "", 100)
assert.NoError(t, err)
assert.Equal(t, orig, fetched)
// Get All (with an iterator size smaller than the map size)
fetched, err = pool.GetAllOmapValues("obj", "", "", 1)
assert.NoError(t, err)
assert.Equal(t, orig, fetched)
// Remove
err = pool.RmOmapKeys("obj", []string{"key1", "prefixed-key3"})
assert.NoError(t, err)
fetched, err = pool.GetOmapValues("obj", "", "", 4)
assert.NoError(t, err)
assert.Equal(t, map[string][]byte{
"key2": []byte("value2"),
"empty": []byte(""),
}, fetched)
// Clear
err = pool.CleanOmap("obj")
assert.NoError(t, err)
fetched, err = pool.GetOmapValues("obj", "", "", 4)
assert.NoError(t, err)
assert.Equal(t, map[string][]byte{}, fetched)
pool.Destroy()
}
func TestReadFilterOmap(t *testing.T) {
conn, _ := rados.NewConn()
conn.ReadDefaultConfigFile()
conn.Connect()
pool_name := GetUUID()
err := conn.MakePool(pool_name)
assert.NoError(t, err)
pool, err := conn.OpenIOContext(pool_name)
assert.NoError(t, err)
orig := map[string][]byte{
"key1": []byte("value1"),
"prefixed-key3": []byte("value3"),
"key2": []byte("value2"),
}
err = pool.SetOmap("obj", orig)
assert.NoError(t, err)
// filter by prefix
fetched, err := pool.GetOmapValues("obj", "", "prefixed", 4)
assert.NoError(t, err)
assert.Equal(t, map[string][]byte{
"prefixed-key3": []byte("value3"),
}, fetched)
// "start_after" a key
fetched, err = pool.GetOmapValues("obj", "key1", "", 4)
assert.NoError(t, err)
assert.Equal(t, map[string][]byte{
"prefixed-key3": []byte("value3"),
"key2": []byte("value2"),
}, fetched)
// maxReturn
fetched, err = pool.GetOmapValues("obj", "", "key", 1)
assert.NoError(t, err)
assert.Equal(t, map[string][]byte{
"key1": []byte("value1"),
}, fetched)
pool.Destroy()
}