feat: Add cache layer for write operations (#16)
* feat: Add cache layer for write operations Signed-off-by: Xuanwo <github@xuanwo.io> * fix: Cache not started Signed-off-by: Xuanwo <github@xuanwo.io> * fix write not correctly Signed-off-by: Xuanwo <github@xuanwo.io> * format imports Signed-off-by: Xuanwo <github@xuanwo.io>
This commit is contained in:
parent
82b0bd746c
commit
fe7973fa7f
|
@ -2,6 +2,7 @@ package hanwen
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
|
@ -342,6 +343,12 @@ func (fs *FS) Open(cancel <-chan struct{}, input *fuse.OpenIn, out *fuse.OpenOut
|
|||
fs.logger.Error("create file handle", zap.Error(err))
|
||||
return
|
||||
}
|
||||
if input.Flags&uint32(os.O_WRONLY) != 0 {
|
||||
err = fh.PrepareForWrite()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("prepare for write: %v", err))
|
||||
}
|
||||
}
|
||||
return fillOpenOut(fh, out)
|
||||
}
|
||||
|
||||
|
@ -358,7 +365,6 @@ func (fs *FS) Read(cancel <-chan struct{}, input *fuse.ReadIn, buf []byte) (fuse
|
|||
return nil, fuse.EAGAIN
|
||||
}
|
||||
return fuse.ReadResultData(buf[:n]), fuse.OK
|
||||
|
||||
}
|
||||
|
||||
func (fs *FS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekOut) fuse.Status {
|
||||
|
@ -387,7 +393,18 @@ func (fs *FS) Release(cancel <-chan struct{}, input *fuse.ReleaseIn) {
|
|||
}
|
||||
|
||||
func (fs *FS) Write(cancel <-chan struct{}, input *fuse.WriteIn, data []byte) (written uint32, code fuse.Status) {
|
||||
panic("implement me")
|
||||
fh, err := fs.fs.GetFileHandle(input.Fh)
|
||||
if err != nil {
|
||||
fs.logger.Error("get file handle", zap.Error(err))
|
||||
return 0, fuse.EAGAIN
|
||||
}
|
||||
|
||||
n, err := fh.Write(input.Offset, data)
|
||||
if err != nil {
|
||||
fs.logger.Error("read", zap.Error(err))
|
||||
return uint32(n), fuse.EAGAIN
|
||||
}
|
||||
return uint32(n), fuse.OK
|
||||
}
|
||||
|
||||
func (fs *FS) CopyFileRange(cancel <-chan struct{}, input *fuse.CopyFileRangeIn) (written uint32, code fuse.Status) {
|
||||
|
|
8
go.mod
8
go.mod
|
@ -4,12 +4,14 @@ go 1.15
|
|||
|
||||
require (
|
||||
github.com/Xuanwo/go-bufferpool v0.1.0
|
||||
github.com/beyondstorage/go-service-fs/v3 v3.2.0
|
||||
github.com/beyondstorage/go-service-s3/v2 v2.2.0
|
||||
github.com/beyondstorage/go-storage/v4 v4.2.0
|
||||
github.com/beyondstorage/go-service-fs/v3 v3.3.0
|
||||
github.com/beyondstorage/go-service-memory v0.1.0
|
||||
github.com/beyondstorage/go-service-s3/v2 v2.3.0
|
||||
github.com/beyondstorage/go-storage/v4 v4.4.0
|
||||
github.com/dgraph-io/badger/v3 v3.2103.1
|
||||
github.com/golang/protobuf v1.4.2 // indirect
|
||||
github.com/hanwen/go-fuse/v2 v2.1.0
|
||||
github.com/panjf2000/ants/v2 v2.4.6
|
||||
github.com/tinylib/msgp v1.1.6
|
||||
go.uber.org/atomic v1.7.0
|
||||
go.uber.org/zap v1.18.1
|
||||
|
|
57
go.sum
57
go.sum
|
@ -7,21 +7,23 @@ github.com/Xuanwo/go-bufferpool v0.1.0/go.mod h1:Mle++9GGouhOwGj52i9PJLNAPmW2nb8
|
|||
github.com/Xuanwo/templateutils v0.1.0 h1:WpkWOqQtIQ2vAIpJLa727DdN8WtxhUkkbDGa6UhntJY=
|
||||
github.com/Xuanwo/templateutils v0.1.0/go.mod h1:OdE0DJ+CJxDBq6psX5DPV+gOZi8bhuHuVUpPCG++Wb8=
|
||||
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
||||
github.com/aws/aws-sdk-go v1.38.68 h1:aOG8geU4SohNp659eKBHRBgbqSrZ6jNZlfimIuJAwL8=
|
||||
github.com/aws/aws-sdk-go v1.38.68/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
|
||||
github.com/aws/aws-sdk-go v1.40.1 h1:MlWasxqPLnFKZLPdEFzlEtzIdV7044o44YILe6A9zys=
|
||||
github.com/aws/aws-sdk-go v1.40.1/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
|
||||
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
|
||||
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||
github.com/beyondstorage/go-endpoint v1.0.1 h1:F8x2dGLMu9je6g7zPbKoxCXDlug97K26SeCx7KEHgyg=
|
||||
github.com/beyondstorage/go-endpoint v1.0.1/go.mod h1:P2hknaGrziOJJKySv/XnAiVw/d3v12/LZu2gSxEx4nM=
|
||||
github.com/beyondstorage/go-integration-test/v4 v4.1.1/go.mod h1:ihtCaOJvaHGE0v+IhY6ZUF5NU1IND6xmdrJI9Lq/jhc=
|
||||
github.com/beyondstorage/go-service-fs/v3 v3.2.0 h1:POC1Z9b7Z6dTQycPnEsjRVtq7PndFInHzsP/A1Y9T+0=
|
||||
github.com/beyondstorage/go-service-fs/v3 v3.2.0/go.mod h1:dHLOhJtn8Uh1WGuL5m6DN9/ZtWbBFD+CKRhga535wkY=
|
||||
github.com/beyondstorage/go-service-s3/v2 v2.2.0 h1:aAQa4yXMb1AoeZWaQI5bhDbKARSbg/avRb0yu09Wlrc=
|
||||
github.com/beyondstorage/go-service-s3/v2 v2.2.0/go.mod h1:R5W0FHPt6RmFAEO/ngoJ04q3G/i+mb38GCJNPZPwtxc=
|
||||
github.com/beyondstorage/go-storage/v4 v4.2.0 h1:J0xqqy4qEQRtIS2zUWMA5wRXVHx/cxX5fHsU2ezA3+I=
|
||||
github.com/beyondstorage/go-storage/v4 v4.2.0/go.mod h1:rUNzOXcikYk5w0ewvNsKbztg7ndQDyDvjDuP0bznSLU=
|
||||
github.com/beyondstorage/specs/go v0.0.0-20210623065218-d1c2d7d81259 h1:mW9XpHLc6pdXBRnsha1VlqF0rNsB/Oc+8l+5UYngmRA=
|
||||
github.com/beyondstorage/specs/go v0.0.0-20210623065218-d1c2d7d81259/go.mod h1:vF/Q0P1tCvhVAUrxg7i6NvrARRMQVTAuQdDNqpSzR1w=
|
||||
github.com/beyondstorage/go-integration-test/v4 v4.2.0/go.mod h1:jLyYWSGUjQRH7U1HdaLbXE5sxBgqrtK73q+Q7PGIuSs=
|
||||
github.com/beyondstorage/go-service-fs/v3 v3.3.0 h1:VOFdcshI4M1ovPbfwBpSdTp2CkzvdDKT6F70WQRdp4A=
|
||||
github.com/beyondstorage/go-service-fs/v3 v3.3.0/go.mod h1:/keWarsnGKz219wMyGN57OM7s5BUIZ/YlK9LhsXMwtQ=
|
||||
github.com/beyondstorage/go-service-memory v0.1.0 h1:12beKpRiY15wfTRyLsoQWXKELtWri4C2NprUQmX1AcU=
|
||||
github.com/beyondstorage/go-service-memory v0.1.0/go.mod h1:GWAXoWnfevG/Weu+UhLzHMJRoZ2ylL3pdYb2kA/w++E=
|
||||
github.com/beyondstorage/go-service-s3/v2 v2.3.0 h1:mCdCuGmQ/26A1Xhj9tnFcRy3bXf1h5MT/TyFrJW8CAI=
|
||||
github.com/beyondstorage/go-service-s3/v2 v2.3.0/go.mod h1:B4gLy8g/uhjkLkVymkRES/tIKH2UhB0zmkzqzGaN5LU=
|
||||
github.com/beyondstorage/go-storage/v4 v4.3.0/go.mod h1:0fdcRCzLKMQe7Ve4zPlyTGgoPYwuINiV79Gx9tCt9tQ=
|
||||
github.com/beyondstorage/go-storage/v4 v4.3.2/go.mod h1:8FHjTUFuwLl/mmIGpOL9g3RTZPyye7vneFN/JkRj5Tg=
|
||||
github.com/beyondstorage/go-storage/v4 v4.4.0 h1:sWURraKFjNR4qpwthr45cAGOIx6EOLrrJcz6su4Je30=
|
||||
github.com/beyondstorage/go-storage/v4 v4.4.0/go.mod h1:mc9VzBImjXDg1/1sLfta2MJH79elfM6m47ZZvZ+q/Uw=
|
||||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
|
||||
|
@ -56,8 +58,9 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
|
|||
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I=
|
||||
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
|
||||
github.com/golang/mock v1.5.0 h1:jlYHihg//f7RRwuPfptm04yp4s7O6Kw8EZiVYIGcH0g=
|
||||
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
|
||||
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
|
||||
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
|
@ -77,8 +80,9 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
|
||||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/pprof v0.0.0-20181127221834-b4f47329b966/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
|
||||
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
|
||||
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/hanwen/go-fuse v1.0.0 h1:GxS9Zrn6c35/BnfiVsZVWmsG803xwE7eVRDvcf/BEVc=
|
||||
github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok=
|
||||
|
@ -107,6 +111,8 @@ github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LE
|
|||
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/panjf2000/ants/v2 v2.4.6 h1:drmj9mcygn2gawZ155dRbo+NfXEfAssjZNU1qoIb4gQ=
|
||||
github.com/panjf2000/ants/v2 v2.4.6/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ=
|
||||
github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
|
||||
|
@ -147,6 +153,7 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT
|
|||
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0=
|
||||
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
|
||||
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
|
||||
|
@ -169,8 +176,9 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl
|
|||
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
|
||||
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
|
||||
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
|
@ -179,8 +187,9 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
|
|||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME=
|
||||
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q=
|
||||
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
|
@ -189,6 +198,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
|
|||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180903190138-2b024373dcd9/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
@ -197,12 +207,17 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw=
|
||||
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
|
@ -213,8 +228,9 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
|
|||
golang.org/x/tools v0.0.0-20200509030707-2212a7e161a5/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.1 h1:wGiQel/hW0NnEkJUk8lbzkX2gFJU6PFxf1v5OlCfuOs=
|
||||
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
@ -240,6 +256,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8
|
|||
gopkg.in/src-d/go-billy.v4 v4.3.0 h1:KtlZ4c1OWbIs4jCv5ZXrTqG8EQocr0g/d4DjNg70aek=
|
||||
gopkg.in/src-d/go-billy.v4 v4.3.0/go.mod h1:tm33zBoOwxjYHZIE+OV8bxTWFMJLrconzFMd38aARFk=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
|
||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
|
|
@ -0,0 +1,296 @@
|
|||
package vfs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/beyondstorage/go-storage/v4/types"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type op struct {
|
||||
fd uint64
|
||||
size int64
|
||||
}
|
||||
|
||||
type chunk struct {
|
||||
lock sync.Mutex
|
||||
wg *sync.WaitGroup
|
||||
|
||||
// The path for the chunk.
|
||||
fd uint64
|
||||
path string
|
||||
persistedIdx uint64
|
||||
persistedSize int64
|
||||
nextIdx uint64
|
||||
currentSize int64
|
||||
|
||||
// If we have CreateMultipart or CreateAppend, we will store the object here.
|
||||
// So we can check if object == nil to decide use CompleteMultipart or call Write.
|
||||
object *types.Object
|
||||
|
||||
// Only valid if we have already called CreateMultipart.
|
||||
parts map[int]*types.Part
|
||||
nextPartNumber int
|
||||
}
|
||||
|
||||
func newChunk(fd uint64, path string) *chunk {
|
||||
return &chunk{
|
||||
wg: &sync.WaitGroup{},
|
||||
fd: fd,
|
||||
path: path,
|
||||
}
|
||||
}
|
||||
|
||||
type Cache struct {
|
||||
s types.Storager // Real data store
|
||||
c types.Storager // Cache data store
|
||||
logger *zap.Logger
|
||||
|
||||
p *ants.Pool
|
||||
ch chan op
|
||||
|
||||
chunks map[uint64]*chunk
|
||||
chunkLock sync.Mutex
|
||||
}
|
||||
|
||||
func NewCache(s, c types.Storager, logger *zap.Logger) *Cache {
|
||||
cache := &Cache{
|
||||
s: s,
|
||||
c: c,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
p, err := ants.NewPool(10)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("new pool: %v", err))
|
||||
}
|
||||
|
||||
cache.p = p
|
||||
cache.ch = make(chan op)
|
||||
cache.chunks = make(map[uint64]*chunk)
|
||||
return cache
|
||||
}
|
||||
|
||||
func (c *Cache) Start() {
|
||||
for v := range c.ch {
|
||||
c.chunkLock.Lock()
|
||||
chk := c.chunks[v.fd]
|
||||
c.chunkLock.Unlock()
|
||||
|
||||
chk.lock.Lock()
|
||||
chk.nextIdx += 1
|
||||
chk.currentSize += v.size
|
||||
chk.lock.Unlock()
|
||||
|
||||
// Skip persistViaWriteMultipart write operation if we don't have enough data.
|
||||
if chk.currentSize-chk.persistedSize < 64*1024*1024 {
|
||||
continue
|
||||
}
|
||||
|
||||
chk.lock.Lock()
|
||||
|
||||
if chk.object == nil {
|
||||
o, err := c.s.(types.Multiparter).CreateMultipart(chk.path)
|
||||
if err != nil {
|
||||
c.logger.Fatal("create multipart", zap.Error(err))
|
||||
}
|
||||
|
||||
chk.object = o
|
||||
chk.parts = make(map[int]*types.Part)
|
||||
}
|
||||
|
||||
start := chk.persistedIdx
|
||||
end := chk.nextIdx
|
||||
size := chk.currentSize - chk.persistedSize
|
||||
partNumber := chk.nextPartNumber
|
||||
chk.persistedSize = chk.currentSize
|
||||
chk.persistedIdx = chk.nextIdx
|
||||
chk.nextPartNumber += 1
|
||||
chk.lock.Unlock()
|
||||
|
||||
chk.wg.Add(1)
|
||||
err := c.p.Submit(func() {
|
||||
defer chk.wg.Done()
|
||||
|
||||
err := c.persistViaWriteMultipart(chk, start, end, size, partNumber)
|
||||
if err != nil {
|
||||
c.logger.Error("persistViaWriteMultipart", zap.Error(err))
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
c.logger.Fatal("submit task", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) complete(chk *chunk) error {
|
||||
// object == nil means data is small enough to complete in single write operation.
|
||||
// We can persist it via write.
|
||||
if chk.object == nil {
|
||||
start := uint64(0)
|
||||
end := chk.nextIdx
|
||||
size := chk.currentSize
|
||||
|
||||
return c.persistViaWrite(chk, start, end, size)
|
||||
}
|
||||
|
||||
// Check for dirty data.
|
||||
//
|
||||
// persistedIdx < nextIdx means we still have data to write.
|
||||
if chk.persistedIdx < chk.nextIdx {
|
||||
start := chk.persistedIdx
|
||||
end := chk.nextIdx
|
||||
size := chk.currentSize - chk.persistedSize
|
||||
partNumber := chk.nextPartNumber
|
||||
|
||||
chk.wg.Add(1)
|
||||
err := c.p.Submit(func() {
|
||||
defer chk.wg.Done()
|
||||
|
||||
err := c.persistViaWriteMultipart(chk, start, end, size, partNumber)
|
||||
if err != nil {
|
||||
c.logger.Error("persistViaWriteMultipart", zap.Error(err))
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
c.logger.Fatal("submit task", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// It's safe to complete the multipart after wait.
|
||||
chk.wg.Wait()
|
||||
|
||||
parts := make([]*types.Part, 0, len(chk.parts))
|
||||
for i := 0; i < len(chk.parts); i++ {
|
||||
parts = append(parts, chk.parts[i])
|
||||
}
|
||||
|
||||
err := c.s.(types.Multiparter).CompleteMultipart(chk.object, parts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) persistViaWrite(chk *chunk, start, end uint64, size int64) error {
|
||||
r, err := c.read(chk.fd, start, end)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err = r.Close()
|
||||
if err != nil {
|
||||
c.logger.Error("close reader", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
_, err = c.s.Write(chk.path, r, size)
|
||||
if err != nil {
|
||||
c.logger.Error("write", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = r.Close()
|
||||
if err != nil {
|
||||
c.logger.Error("close", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) persistViaWriteMultipart(chk *chunk, start, end uint64, size int64, partNumber int) error {
|
||||
r, err := c.read(chk.fd, start, end)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err = r.Close()
|
||||
if err != nil {
|
||||
c.logger.Error("close reader", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
_, part, err := c.s.(types.Multiparter).WriteMultipart(chk.object, r, size, partNumber)
|
||||
if err != nil {
|
||||
c.logger.Error("write", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
chk.lock.Lock()
|
||||
chk.parts[partNumber] = part
|
||||
chk.lock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) read(fd, start, end uint64) (r io.ReadCloser, err error) {
|
||||
r, w := io.Pipe()
|
||||
|
||||
go func() {
|
||||
for i := start; i < end; i++ {
|
||||
p := fmt.Sprintf("%d-%d", fd, i)
|
||||
_, err := c.c.Read(p, w)
|
||||
if err != nil {
|
||||
c.logger.Error("read", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
err := w.Close()
|
||||
if err != nil {
|
||||
c.logger.Error("close writer", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (c *Cache) startWrite(fd uint64, path string) (err error) {
|
||||
c.chunkLock.Lock()
|
||||
// FIXME: maybe we need to check the fd before set.
|
||||
c.chunks[fd] = newChunk(fd, path)
|
||||
c.chunkLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) write(fd, idx uint64, data []byte) (n int64, err error) {
|
||||
p := fmt.Sprintf("%d-%d", fd, idx)
|
||||
|
||||
size := int64(len(data))
|
||||
n, err = c.c.Write(p, bytes.NewReader(data), size)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.ch <- op{
|
||||
fd: fd,
|
||||
size: size,
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (c *Cache) endWrite(fd uint64) (err error) {
|
||||
c.chunkLock.Lock()
|
||||
chk := c.chunks[fd]
|
||||
c.chunkLock.Unlock()
|
||||
|
||||
err = c.complete(chk)
|
||||
if err != nil {
|
||||
c.logger.Error("complete", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
c.chunkLock.Lock()
|
||||
delete(c.chunks, fd)
|
||||
c.chunkLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) Stop() {
|
||||
close(c.ch)
|
||||
}
|
58
vfs/file.go
58
vfs/file.go
|
@ -1,11 +1,12 @@
|
|||
package vfs
|
||||
|
||||
import (
|
||||
"go.uber.org/zap"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/Xuanwo/go-bufferpool"
|
||||
"github.com/beyondstorage/go-storage/v4/pairs"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/beyondstorage/beyond-fs/meta"
|
||||
)
|
||||
|
@ -49,14 +50,20 @@ func (fhm *fileHandleMap) Delete(id uint64) {
|
|||
type FileHandle struct {
|
||||
ID uint64
|
||||
|
||||
ino *Inode
|
||||
fs *FS
|
||||
meta meta.Service
|
||||
ino *Inode
|
||||
fs *FS
|
||||
meta meta.Service
|
||||
cache *Cache
|
||||
|
||||
mu sync.Mutex
|
||||
buf *bufferpool.Buffer
|
||||
size uint64
|
||||
offset uint64
|
||||
|
||||
// Read operations
|
||||
buf *bufferpool.Buffer
|
||||
|
||||
// Write operations
|
||||
idx uint64
|
||||
}
|
||||
|
||||
func (fh *FileHandle) GetInode() *Inode {
|
||||
|
@ -90,3 +97,44 @@ func (fh *FileHandle) Read(offset uint64, buf []byte) (n int, err error) {
|
|||
fh.offset += uint64(byteRead)
|
||||
return int(byteRead), nil
|
||||
}
|
||||
|
||||
func (fh *FileHandle) PrepareForWrite() (err error) {
|
||||
err = fh.cache.startWrite(fh.ID, fh.ino.Path)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (fh *FileHandle) Write(offset uint64, buf []byte) (n int, err error) {
|
||||
fh.mu.Lock()
|
||||
defer fh.mu.Unlock()
|
||||
|
||||
if offset != fh.offset {
|
||||
return 0, fmt.Errorf("random write is not allowd")
|
||||
}
|
||||
|
||||
fh.fs.logger.Info("write data",
|
||||
zap.String("path", fh.ino.Path),
|
||||
zap.Uint64("offset", offset),
|
||||
zap.Int("size", len(buf)))
|
||||
byteWritten, err := fh.cache.write(fh.ID, fh.idx, buf)
|
||||
if err != nil {
|
||||
fh.fs.logger.Error("write buffer", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
fh.idx += 1
|
||||
fh.size += uint64(byteWritten)
|
||||
fh.offset += uint64(byteWritten)
|
||||
|
||||
return int(byteWritten), nil
|
||||
}
|
||||
|
||||
func (fh *FileHandle) CloseForWrite() (err error) {
|
||||
err = fh.cache.endWrite(fh.ID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
37
vfs/fs.go
37
vfs/fs.go
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
_ "github.com/beyondstorage/go-service-fs/v3"
|
||||
_ "github.com/beyondstorage/go-service-memory"
|
||||
_ "github.com/beyondstorage/go-service-s3/v2"
|
||||
"github.com/beyondstorage/go-storage/v4/pairs"
|
||||
"github.com/beyondstorage/go-storage/v4/services"
|
||||
|
@ -31,8 +32,9 @@ func NextHandle() uint64 {
|
|||
}
|
||||
|
||||
type FS struct {
|
||||
s types.Storager
|
||||
meta meta.Service
|
||||
s types.Storager
|
||||
cache *Cache
|
||||
meta meta.Service
|
||||
|
||||
dhm *dirHandleMap
|
||||
fhm *fileHandleMap
|
||||
|
@ -51,20 +53,30 @@ func NewFS(cfg *Config) (fs *FS, err error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
cacheStore, err := services.NewStoragerFromString("memory://")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metaSrv, err := meta.NewBadger()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fs = &FS{
|
||||
s: store,
|
||||
meta: metaSrv,
|
||||
s: store,
|
||||
// TODO: we will support other service as cache later.
|
||||
cache: NewCache(store, cacheStore, cfg.Logger),
|
||||
meta: metaSrv,
|
||||
|
||||
dhm: newDirHandleMap(),
|
||||
fhm: newFileHandleMap(),
|
||||
logger: cfg.Logger,
|
||||
}
|
||||
|
||||
// Start cache service.
|
||||
go fs.cache.Start()
|
||||
|
||||
o := types.NewObject(nil, true)
|
||||
o.ID = store.Metadata().WorkDir
|
||||
o.Path = ""
|
||||
|
@ -163,10 +175,11 @@ func (fs *FS) DeleteDir(path string) (err error) {
|
|||
|
||||
func (fs *FS) CreateFileHandle(ino *Inode) (fh *FileHandle, err error) {
|
||||
fh = &FileHandle{
|
||||
ID: NextHandle(),
|
||||
ino: ino,
|
||||
fs: fs,
|
||||
meta: fs.meta,
|
||||
ID: NextHandle(),
|
||||
ino: ino,
|
||||
fs: fs,
|
||||
meta: fs.meta,
|
||||
cache: fs.cache,
|
||||
|
||||
buf: fileBufPool.Get(),
|
||||
size: ino.Size,
|
||||
|
@ -181,6 +194,14 @@ func (fs *FS) GetFileHandle(fhid uint64) (fh *FileHandle, err error) {
|
|||
}
|
||||
|
||||
func (fs *FS) DeleteFileHandle(fhid uint64) (err error) {
|
||||
fh := fs.fhm.Get(fhid)
|
||||
if fh == nil {
|
||||
return nil
|
||||
}
|
||||
err = fh.CloseForWrite()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
fs.fhm.Delete(fhid)
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue