vfs: Implement Read support (#14)

Signed-off-by: Xuanwo <github@xuanwo.io>
This commit is contained in:
Xuanwo 2021-07-16 17:36:18 +08:00 committed by GitHub
parent a1d9d0c129
commit 82b0bd746c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 98 additions and 3 deletions

View File

@ -161,8 +161,12 @@ func (fs *FS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name string,
}
node, err := fs.fs.GetEntry(ino.ID, name)
if err != nil && errors.Is(err, services.ErrObjectNotExist) {
return fuse.ENOENT
}
if err != nil {
return
fs.logger.Error("get entry", zap.Error(err))
return fuse.EAGAIN
}
if node == nil {
return fuse.ENOENT
@ -342,7 +346,19 @@ func (fs *FS) Open(cancel <-chan struct{}, input *fuse.OpenIn, out *fuse.OpenOut
}
func (fs *FS) Read(cancel <-chan struct{}, input *fuse.ReadIn, buf []byte) (fuse.ReadResult, fuse.Status) {
panic("implement me")
fh, err := fs.fs.GetFileHandle(input.Fh)
if err != nil {
fs.logger.Error("get file handle", zap.Error(err))
return nil, fuse.EAGAIN
}
n, err := fh.Read(input.Offset, buf)
if err != nil {
fs.logger.Error("read", zap.Error(err))
return nil, fuse.EAGAIN
}
return fuse.ReadResultData(buf[:n]), fuse.OK
}
func (fs *FS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekOut) fuse.Status {

View File

@ -1,11 +1,19 @@
package vfs
import (
"go.uber.org/zap"
"sync"
"github.com/Xuanwo/go-bufferpool"
"github.com/beyondstorage/go-storage/v4/pairs"
"github.com/beyondstorage/beyond-fs/meta"
)
var (
fileBufPool = bufferpool.New(4 * 1024 * 1024)
)
type fileHandleMap struct {
lock sync.Mutex
m map[uint64]*FileHandle
@ -44,4 +52,41 @@ type FileHandle struct {
ino *Inode
fs *FS
meta meta.Service
mu sync.Mutex
buf *bufferpool.Buffer
size uint64
offset uint64
}
func (fh *FileHandle) GetInode() *Inode {
return fh.ino
}
func (fh *FileHandle) Read(offset uint64, buf []byte) (n int, err error) {
size := fh.size
if size > uint64(len(buf)) {
size = uint64(len(buf))
}
fh.mu.Lock()
defer fh.mu.Unlock()
fh.buf.Reset()
fh.fs.logger.Info("read data",
zap.String("path", fh.ino.Path),
zap.Uint64("offset", offset),
zap.Uint64("size", size))
byteRead, err := fh.fs.s.Read(fh.ino.Path, fh.buf,
pairs.WithOffset(int64(offset)),
pairs.WithSize(int64(size)))
if err != nil {
fh.fs.logger.Error("read underlying", zap.Error(err))
return
}
copy(buf, fh.buf.Bytes()[:byteRead])
fh.offset += uint64(byteRead)
return int(byteRead), nil
}

View File

@ -2,6 +2,7 @@ package vfs
import (
"bytes"
"errors"
"fmt"
"time"
@ -128,6 +129,34 @@ func (fs *FS) Delete(parent uint64, name string) (err error) {
return
}
func (fs *FS) Stat(parent uint64, name string) (ino *Inode, err error) {
p, err := fs.GetInode(parent)
if err != nil {
return
}
path := p.GetEntryPath(name)
o, err := fs.s.Stat(path)
if err != nil && errors.Is(err, services.ErrObjectNotExist) {
// FIXME: we need to use stat with ModeDir instead.
o, err = fs.s.Stat(path + "/")
if err != nil {
return nil, err
}
o.Path = path
o.Mode = types.ModeDir
}
if err != nil {
return nil, err
}
ino = newInode(p.ID, o)
err = fs.SetInode(ino)
if err != nil {
return
}
return
}
func (fs *FS) DeleteDir(path string) (err error) {
panic("implement me")
}
@ -138,6 +167,10 @@ func (fs *FS) CreateFileHandle(ino *Inode) (fh *FileHandle, err error) {
ino: ino,
fs: fs,
meta: fs.meta,
buf: fileBufPool.Get(),
size: ino.Size,
offset: 0,
}
fs.fhm.Set(fh.ID, fh)
return fh, nil
@ -239,7 +272,8 @@ func (fs *FS) GetEntry(parent uint64, name string) (ino *Inode, err error) {
return nil, fmt.Errorf("get entry: %w", err)
}
if bs == nil {
return nil, nil
// Try get from underlying storage
return fs.Stat(parent, name)
}
ino = &Inode{}