beyond-fs/vfs/fs.go

316 lines
6.0 KiB
Go

package vfs
import (
"bytes"
"errors"
"fmt"
"time"
_ "github.com/beyondstorage/go-service-fs/v3"
_ "github.com/beyondstorage/go-service-memory"
_ "github.com/beyondstorage/go-service-s3/v2"
"github.com/beyondstorage/go-storage/v4/pairs"
"github.com/beyondstorage/go-storage/v4/services"
"github.com/beyondstorage/go-storage/v4/types"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/beyondstorage/beyond-fs/meta"
)
var (
nextInode = atomic.NewUint64(0)
nextHandle = atomic.NewUint64(0)
)
func NextInodeID() uint64 {
return nextInode.Inc()
}
func NextHandle() uint64 {
return nextHandle.Inc()
}
type FS struct {
s types.Storager
cache *Cache
meta meta.Service
dhm *dirHandleMap
fhm *fileHandleMap
logger *zap.Logger
}
type Config struct {
StoragePath string
Logger *zap.Logger
}
func NewFS(cfg *Config) (fs *FS, err error) {
store, err := services.NewStoragerFromString(cfg.StoragePath)
if err != nil {
return nil, err
}
cacheStore, err := services.NewStoragerFromString("memory://")
if err != nil {
return nil, err
}
metaSrv, err := meta.NewBadger()
if err != nil {
return nil, err
}
fs = &FS{
s: store,
// TODO: we will support other service as cache later.
cache: NewCache(store, cacheStore, cfg.Logger),
meta: metaSrv,
dhm: newDirHandleMap(),
fhm: newFileHandleMap(),
logger: cfg.Logger,
}
// Start cache service.
go fs.cache.Start()
o := types.NewObject(nil, true)
o.ID = store.Metadata().WorkDir
o.Path = ""
o.Mode = types.ModeDir
err = fs.SetInode(newInode(1, o))
if err != nil {
return nil, err
}
return fs, err
}
func (fs *FS) Create(parent uint64, name string) (ino *Inode, fh *FileHandle, err error) {
// FIXME: we need to handle file exists.
p, err := fs.GetInode(parent)
if err != nil {
return nil, nil, err
}
path := p.GetEntryPath(name)
_, err = fs.s.Write(path, bytes.NewReader([]byte{}), 0)
if err != nil {
fs.logger.Error("write", zap.String("path", path), zap.Error(err))
return nil, nil, err
}
o := fs.s.Create(path)
o.Path = path
o.Mode = types.ModeRead
o.SetContentLength(0)
o.SetLastModified(time.Now())
ino = newInode(parent, o)
err = fs.SetInode(ino)
if err != nil {
return
}
fh, err = fs.CreateFileHandle(ino)
if err != nil {
return
}
return
}
func (fs *FS) Delete(parent uint64, name string) (err error) {
ino, err := fs.GetEntry(parent, name)
if err != nil {
return
}
err = fs.s.Delete(ino.Path)
if err != nil {
return
}
err = fs.DeleteInode(ino)
if err != nil {
return
}
err = fs.DeleteEntry(parent, name)
if err != nil {
return
}
return
}
func (fs *FS) Stat(parent uint64, name string) (ino *Inode, err error) {
p, err := fs.GetInode(parent)
if err != nil {
return
}
path := p.GetEntryPath(name)
o, err := fs.s.Stat(path)
if err != nil && errors.Is(err, services.ErrObjectNotExist) {
// FIXME: we need to use stat with ModeDir instead.
o, err = fs.s.Stat(path + "/")
if err != nil {
return nil, err
}
o.Path = path
o.Mode = types.ModeDir
}
if err != nil {
return nil, err
}
ino = newInode(p.ID, o)
err = fs.SetInode(ino)
if err != nil {
return
}
return
}
func (fs *FS) DeleteDir(path string) (err error) {
panic("implement me")
}
func (fs *FS) CreateFileHandle(ino *Inode) (fh *FileHandle, err error) {
fh = &FileHandle{
ID: NextHandle(),
ino: ino,
fs: fs,
meta: fs.meta,
cache: fs.cache,
buf: fileBufPool.Get(),
size: ino.Size,
offset: 0,
}
fs.fhm.Set(fh.ID, fh)
return fh, nil
}
func (fs *FS) GetFileHandle(fhid uint64) (fh *FileHandle, err error) {
return fs.fhm.Get(fhid), nil
}
func (fs *FS) DeleteFileHandle(fhid uint64) (err error) {
fh := fs.fhm.Get(fhid)
if fh == nil {
return nil
}
err = fh.CloseForWrite()
if err != nil {
return
}
fs.fhm.Delete(fhid)
return nil
}
func (fs *FS) CreateDirHandle(ino *Inode) (dh *DirHandle, err error) {
it, err := fs.s.List(ino.Path, pairs.WithListMode(types.ListModeDir))
if err != nil {
return
}
dh = &DirHandle{
ID: NextHandle(),
ino: ino,
fs: fs,
it: it,
meta: fs.meta,
}
fs.dhm.Set(dh.ID, dh)
return dh, err
}
func (fs *FS) GetDirHandle(dhid uint64) (dh *DirHandle, err error) {
return fs.dhm.Get(dhid), nil
}
func (fs *FS) DeleteDirHandle(dhid uint64) (err error) {
fs.dhm.Delete(dhid)
return nil
}
func (fs *FS) SetInode(ino *Inode) (err error) {
bs, err := ino.MarshalMsg(nil)
if err != nil {
return fmt.Errorf("marshal inode: %w", err)
}
err = fs.meta.Set(meta.InodeKey(ino.ID), bs)
if err != nil {
return fmt.Errorf("set inode: %w", err)
}
if ino.ID == ino.ParentID {
// Don't set entry key for root directory.
return nil
}
err = fs.meta.Set(meta.EntryKey(ino.ParentID, ino.Name), bs)
if err != nil {
return fmt.Errorf("set entry: %w", err)
}
return nil
}
func (fs *FS) GetInode(id uint64) (ino *Inode, err error) {
bs, err := fs.meta.Get(meta.InodeKey(id))
if err != nil {
return nil, fmt.Errorf("get inode: %w", err)
}
if bs == nil {
return nil, nil
}
ino = &Inode{}
_, err = ino.UnmarshalMsg(bs)
if err != nil {
return nil, fmt.Errorf("unmarshal inode: %w", err)
}
return
}
func (fs *FS) DeleteInode(ino *Inode) (err error) {
err = fs.meta.Delete(meta.InodeKey(ino.ID))
if err != nil {
return fmt.Errorf("del inode: %w", err)
}
return
}
func (fs *FS) DeleteInodeByID(id uint64) (err error) {
err = fs.meta.Delete(meta.InodeKey(id))
if err != nil {
return fmt.Errorf("del inode: %w", err)
}
return
}
func (fs *FS) GetEntry(parent uint64, name string) (ino *Inode, err error) {
bs, err := fs.meta.Get(meta.EntryKey(parent, name))
if err != nil {
return nil, fmt.Errorf("get entry: %w", err)
}
if bs == nil {
// Try get from underlying storage
return fs.Stat(parent, name)
}
ino = &Inode{}
_, err = ino.UnmarshalMsg(bs)
if err != nil {
return nil, fmt.Errorf("unmarshal inode: %w", err)
}
return
}
func (fs *FS) DeleteEntry(parent uint64, name string) (err error) {
err = fs.meta.Delete(meta.EntryKey(parent, name))
if err != nil {
return fmt.Errorf("del inode: %w", err)
}
return
}