From fe7973fa7f02dabae9d620b3176181f4b2af3f31 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 2 Aug 2021 17:11:12 +0800 Subject: [PATCH] feat: Add cache layer for write operations (#16) * feat: Add cache layer for write operations Signed-off-by: Xuanwo * fix: Cache not started Signed-off-by: Xuanwo * fix write not correctly Signed-off-by: Xuanwo * format imports Signed-off-by: Xuanwo --- fuse/hanwen/fs.go | 21 +++- go.mod | 8 +- go.sum | 57 +++++---- vfs/cache.go | 296 ++++++++++++++++++++++++++++++++++++++++++++++ vfs/file.go | 58 ++++++++- vfs/fs.go | 37 ++++-- 6 files changed, 439 insertions(+), 38 deletions(-) create mode 100644 vfs/cache.go diff --git a/fuse/hanwen/fs.go b/fuse/hanwen/fs.go index 01d3a08..efa922e 100644 --- a/fuse/hanwen/fs.go +++ b/fuse/hanwen/fs.go @@ -2,6 +2,7 @@ package hanwen import ( "errors" + "fmt" "os" "time" @@ -342,6 +343,12 @@ func (fs *FS) Open(cancel <-chan struct{}, input *fuse.OpenIn, out *fuse.OpenOut fs.logger.Error("create file handle", zap.Error(err)) return } + if input.Flags&uint32(os.O_WRONLY) != 0 { + err = fh.PrepareForWrite() + if err != nil { + panic(fmt.Errorf("prepare for write: %v", err)) + } + } return fillOpenOut(fh, out) } @@ -358,7 +365,6 @@ func (fs *FS) Read(cancel <-chan struct{}, input *fuse.ReadIn, buf []byte) (fuse return nil, fuse.EAGAIN } return fuse.ReadResultData(buf[:n]), fuse.OK - } func (fs *FS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekOut) fuse.Status { @@ -387,7 +393,18 @@ func (fs *FS) Release(cancel <-chan struct{}, input *fuse.ReleaseIn) { } func (fs *FS) Write(cancel <-chan struct{}, input *fuse.WriteIn, data []byte) (written uint32, code fuse.Status) { - panic("implement me") + fh, err := fs.fs.GetFileHandle(input.Fh) + if err != nil { + fs.logger.Error("get file handle", zap.Error(err)) + return 0, fuse.EAGAIN + } + + n, err := fh.Write(input.Offset, data) + if err != nil { + fs.logger.Error("read", zap.Error(err)) + return uint32(n), fuse.EAGAIN + } + return uint32(n), fuse.OK } func (fs *FS) CopyFileRange(cancel <-chan struct{}, input *fuse.CopyFileRangeIn) (written uint32, code fuse.Status) { diff --git a/go.mod b/go.mod index 6245637..1c4e7b6 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,14 @@ go 1.15 require ( github.com/Xuanwo/go-bufferpool v0.1.0 - github.com/beyondstorage/go-service-fs/v3 v3.2.0 - github.com/beyondstorage/go-service-s3/v2 v2.2.0 - github.com/beyondstorage/go-storage/v4 v4.2.0 + github.com/beyondstorage/go-service-fs/v3 v3.3.0 + github.com/beyondstorage/go-service-memory v0.1.0 + github.com/beyondstorage/go-service-s3/v2 v2.3.0 + github.com/beyondstorage/go-storage/v4 v4.4.0 github.com/dgraph-io/badger/v3 v3.2103.1 github.com/golang/protobuf v1.4.2 // indirect github.com/hanwen/go-fuse/v2 v2.1.0 + github.com/panjf2000/ants/v2 v2.4.6 github.com/tinylib/msgp v1.1.6 go.uber.org/atomic v1.7.0 go.uber.org/zap v1.18.1 diff --git a/go.sum b/go.sum index addbdcf..0c1fb70 100644 --- a/go.sum +++ b/go.sum @@ -7,21 +7,23 @@ github.com/Xuanwo/go-bufferpool v0.1.0/go.mod h1:Mle++9GGouhOwGj52i9PJLNAPmW2nb8 github.com/Xuanwo/templateutils v0.1.0 h1:WpkWOqQtIQ2vAIpJLa727DdN8WtxhUkkbDGa6UhntJY= github.com/Xuanwo/templateutils v0.1.0/go.mod h1:OdE0DJ+CJxDBq6psX5DPV+gOZi8bhuHuVUpPCG++Wb8= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/aws/aws-sdk-go v1.38.68 h1:aOG8geU4SohNp659eKBHRBgbqSrZ6jNZlfimIuJAwL8= -github.com/aws/aws-sdk-go v1.38.68/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= +github.com/aws/aws-sdk-go v1.40.1 h1:MlWasxqPLnFKZLPdEFzlEtzIdV7044o44YILe6A9zys= +github.com/aws/aws-sdk-go v1.40.1/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beyondstorage/go-endpoint v1.0.1 h1:F8x2dGLMu9je6g7zPbKoxCXDlug97K26SeCx7KEHgyg= github.com/beyondstorage/go-endpoint v1.0.1/go.mod h1:P2hknaGrziOJJKySv/XnAiVw/d3v12/LZu2gSxEx4nM= -github.com/beyondstorage/go-integration-test/v4 v4.1.1/go.mod h1:ihtCaOJvaHGE0v+IhY6ZUF5NU1IND6xmdrJI9Lq/jhc= -github.com/beyondstorage/go-service-fs/v3 v3.2.0 h1:POC1Z9b7Z6dTQycPnEsjRVtq7PndFInHzsP/A1Y9T+0= -github.com/beyondstorage/go-service-fs/v3 v3.2.0/go.mod h1:dHLOhJtn8Uh1WGuL5m6DN9/ZtWbBFD+CKRhga535wkY= -github.com/beyondstorage/go-service-s3/v2 v2.2.0 h1:aAQa4yXMb1AoeZWaQI5bhDbKARSbg/avRb0yu09Wlrc= -github.com/beyondstorage/go-service-s3/v2 v2.2.0/go.mod h1:R5W0FHPt6RmFAEO/ngoJ04q3G/i+mb38GCJNPZPwtxc= -github.com/beyondstorage/go-storage/v4 v4.2.0 h1:J0xqqy4qEQRtIS2zUWMA5wRXVHx/cxX5fHsU2ezA3+I= -github.com/beyondstorage/go-storage/v4 v4.2.0/go.mod h1:rUNzOXcikYk5w0ewvNsKbztg7ndQDyDvjDuP0bznSLU= -github.com/beyondstorage/specs/go v0.0.0-20210623065218-d1c2d7d81259 h1:mW9XpHLc6pdXBRnsha1VlqF0rNsB/Oc+8l+5UYngmRA= -github.com/beyondstorage/specs/go v0.0.0-20210623065218-d1c2d7d81259/go.mod h1:vF/Q0P1tCvhVAUrxg7i6NvrARRMQVTAuQdDNqpSzR1w= +github.com/beyondstorage/go-integration-test/v4 v4.2.0/go.mod h1:jLyYWSGUjQRH7U1HdaLbXE5sxBgqrtK73q+Q7PGIuSs= +github.com/beyondstorage/go-service-fs/v3 v3.3.0 h1:VOFdcshI4M1ovPbfwBpSdTp2CkzvdDKT6F70WQRdp4A= +github.com/beyondstorage/go-service-fs/v3 v3.3.0/go.mod h1:/keWarsnGKz219wMyGN57OM7s5BUIZ/YlK9LhsXMwtQ= +github.com/beyondstorage/go-service-memory v0.1.0 h1:12beKpRiY15wfTRyLsoQWXKELtWri4C2NprUQmX1AcU= +github.com/beyondstorage/go-service-memory v0.1.0/go.mod h1:GWAXoWnfevG/Weu+UhLzHMJRoZ2ylL3pdYb2kA/w++E= +github.com/beyondstorage/go-service-s3/v2 v2.3.0 h1:mCdCuGmQ/26A1Xhj9tnFcRy3bXf1h5MT/TyFrJW8CAI= +github.com/beyondstorage/go-service-s3/v2 v2.3.0/go.mod h1:B4gLy8g/uhjkLkVymkRES/tIKH2UhB0zmkzqzGaN5LU= +github.com/beyondstorage/go-storage/v4 v4.3.0/go.mod h1:0fdcRCzLKMQe7Ve4zPlyTGgoPYwuINiV79Gx9tCt9tQ= +github.com/beyondstorage/go-storage/v4 v4.3.2/go.mod h1:8FHjTUFuwLl/mmIGpOL9g3RTZPyye7vneFN/JkRj5Tg= +github.com/beyondstorage/go-storage/v4 v4.4.0 h1:sWURraKFjNR4qpwthr45cAGOIx6EOLrrJcz6su4Je30= +github.com/beyondstorage/go-storage/v4 v4.4.0/go.mod h1:mc9VzBImjXDg1/1sLfta2MJH79elfM6m47ZZvZ+q/Uw= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= @@ -56,8 +58,9 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= -github.com/golang/mock v1.5.0 h1:jlYHihg//f7RRwuPfptm04yp4s7O6Kw8EZiVYIGcH0g= github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -77,8 +80,9 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/pprof v0.0.0-20181127221834-b4f47329b966/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= -github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/hanwen/go-fuse v1.0.0 h1:GxS9Zrn6c35/BnfiVsZVWmsG803xwE7eVRDvcf/BEVc= github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok= @@ -107,6 +111,8 @@ github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LE github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/panjf2000/ants/v2 v2.4.6 h1:drmj9mcygn2gawZ155dRbo+NfXEfAssjZNU1qoIb4gQ= +github.com/panjf2000/ants/v2 v2.4.6/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.9.3 h1:zeC5b1GviRUyKYd6OJPvBU/mcVDVoL1OhT17FCt5dSQ= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= @@ -147,6 +153,7 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= @@ -169,8 +176,9 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -179,8 +187,9 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME= -golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -189,6 +198,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180903190138-2b024373dcd9/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -197,12 +207,17 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw= -golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -213,8 +228,9 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200509030707-2212a7e161a5/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.1 h1:wGiQel/hW0NnEkJUk8lbzkX2gFJU6PFxf1v5OlCfuOs= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -240,6 +256,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/src-d/go-billy.v4 v4.3.0 h1:KtlZ4c1OWbIs4jCv5ZXrTqG8EQocr0g/d4DjNg70aek= gopkg.in/src-d/go-billy.v4 v4.3.0/go.mod h1:tm33zBoOwxjYHZIE+OV8bxTWFMJLrconzFMd38aARFk= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/vfs/cache.go b/vfs/cache.go new file mode 100644 index 0000000..d510282 --- /dev/null +++ b/vfs/cache.go @@ -0,0 +1,296 @@ +package vfs + +import ( + "bytes" + "fmt" + "io" + "sync" + + "github.com/beyondstorage/go-storage/v4/types" + "github.com/panjf2000/ants/v2" + "go.uber.org/zap" +) + +type op struct { + fd uint64 + size int64 +} + +type chunk struct { + lock sync.Mutex + wg *sync.WaitGroup + + // The path for the chunk. + fd uint64 + path string + persistedIdx uint64 + persistedSize int64 + nextIdx uint64 + currentSize int64 + + // If we have CreateMultipart or CreateAppend, we will store the object here. + // So we can check if object == nil to decide use CompleteMultipart or call Write. + object *types.Object + + // Only valid if we have already called CreateMultipart. + parts map[int]*types.Part + nextPartNumber int +} + +func newChunk(fd uint64, path string) *chunk { + return &chunk{ + wg: &sync.WaitGroup{}, + fd: fd, + path: path, + } +} + +type Cache struct { + s types.Storager // Real data store + c types.Storager // Cache data store + logger *zap.Logger + + p *ants.Pool + ch chan op + + chunks map[uint64]*chunk + chunkLock sync.Mutex +} + +func NewCache(s, c types.Storager, logger *zap.Logger) *Cache { + cache := &Cache{ + s: s, + c: c, + logger: logger, + } + + p, err := ants.NewPool(10) + if err != nil { + panic(fmt.Errorf("new pool: %v", err)) + } + + cache.p = p + cache.ch = make(chan op) + cache.chunks = make(map[uint64]*chunk) + return cache +} + +func (c *Cache) Start() { + for v := range c.ch { + c.chunkLock.Lock() + chk := c.chunks[v.fd] + c.chunkLock.Unlock() + + chk.lock.Lock() + chk.nextIdx += 1 + chk.currentSize += v.size + chk.lock.Unlock() + + // Skip persistViaWriteMultipart write operation if we don't have enough data. + if chk.currentSize-chk.persistedSize < 64*1024*1024 { + continue + } + + chk.lock.Lock() + + if chk.object == nil { + o, err := c.s.(types.Multiparter).CreateMultipart(chk.path) + if err != nil { + c.logger.Fatal("create multipart", zap.Error(err)) + } + + chk.object = o + chk.parts = make(map[int]*types.Part) + } + + start := chk.persistedIdx + end := chk.nextIdx + size := chk.currentSize - chk.persistedSize + partNumber := chk.nextPartNumber + chk.persistedSize = chk.currentSize + chk.persistedIdx = chk.nextIdx + chk.nextPartNumber += 1 + chk.lock.Unlock() + + chk.wg.Add(1) + err := c.p.Submit(func() { + defer chk.wg.Done() + + err := c.persistViaWriteMultipart(chk, start, end, size, partNumber) + if err != nil { + c.logger.Error("persistViaWriteMultipart", zap.Error(err)) + } + }) + if err != nil { + c.logger.Fatal("submit task", zap.Error(err)) + } + } +} + +func (c *Cache) complete(chk *chunk) error { + // object == nil means data is small enough to complete in single write operation. + // We can persist it via write. + if chk.object == nil { + start := uint64(0) + end := chk.nextIdx + size := chk.currentSize + + return c.persistViaWrite(chk, start, end, size) + } + + // Check for dirty data. + // + // persistedIdx < nextIdx means we still have data to write. + if chk.persistedIdx < chk.nextIdx { + start := chk.persistedIdx + end := chk.nextIdx + size := chk.currentSize - chk.persistedSize + partNumber := chk.nextPartNumber + + chk.wg.Add(1) + err := c.p.Submit(func() { + defer chk.wg.Done() + + err := c.persistViaWriteMultipart(chk, start, end, size, partNumber) + if err != nil { + c.logger.Error("persistViaWriteMultipart", zap.Error(err)) + } + }) + if err != nil { + c.logger.Fatal("submit task", zap.Error(err)) + } + } + + // It's safe to complete the multipart after wait. + chk.wg.Wait() + + parts := make([]*types.Part, 0, len(chk.parts)) + for i := 0; i < len(chk.parts); i++ { + parts = append(parts, chk.parts[i]) + } + + err := c.s.(types.Multiparter).CompleteMultipart(chk.object, parts) + if err != nil { + return err + } + return nil +} + +func (c *Cache) persistViaWrite(chk *chunk, start, end uint64, size int64) error { + r, err := c.read(chk.fd, start, end) + if err != nil { + return err + } + defer func() { + err = r.Close() + if err != nil { + c.logger.Error("close reader", zap.Error(err)) + return + } + }() + + _, err = c.s.Write(chk.path, r, size) + if err != nil { + c.logger.Error("write", zap.Error(err)) + return err + } + + err = r.Close() + if err != nil { + c.logger.Error("close", zap.Error(err)) + return err + } + return nil +} + +func (c *Cache) persistViaWriteMultipart(chk *chunk, start, end uint64, size int64, partNumber int) error { + r, err := c.read(chk.fd, start, end) + if err != nil { + return err + } + defer func() { + err = r.Close() + if err != nil { + c.logger.Error("close reader", zap.Error(err)) + return + } + }() + + _, part, err := c.s.(types.Multiparter).WriteMultipart(chk.object, r, size, partNumber) + if err != nil { + c.logger.Error("write", zap.Error(err)) + return err + } + + chk.lock.Lock() + chk.parts[partNumber] = part + chk.lock.Unlock() + return nil +} + +func (c *Cache) read(fd, start, end uint64) (r io.ReadCloser, err error) { + r, w := io.Pipe() + + go func() { + for i := start; i < end; i++ { + p := fmt.Sprintf("%d-%d", fd, i) + _, err := c.c.Read(p, w) + if err != nil { + c.logger.Error("read", zap.Error(err)) + return + } + } + err := w.Close() + if err != nil { + c.logger.Error("close writer", zap.Error(err)) + return + } + }() + + return r, nil +} + +func (c *Cache) startWrite(fd uint64, path string) (err error) { + c.chunkLock.Lock() + // FIXME: maybe we need to check the fd before set. + c.chunks[fd] = newChunk(fd, path) + c.chunkLock.Unlock() + return nil +} + +func (c *Cache) write(fd, idx uint64, data []byte) (n int64, err error) { + p := fmt.Sprintf("%d-%d", fd, idx) + + size := int64(len(data)) + n, err = c.c.Write(p, bytes.NewReader(data), size) + if err != nil { + return + } + + c.ch <- op{ + fd: fd, + size: size, + } + return n, nil +} + +func (c *Cache) endWrite(fd uint64) (err error) { + c.chunkLock.Lock() + chk := c.chunks[fd] + c.chunkLock.Unlock() + + err = c.complete(chk) + if err != nil { + c.logger.Error("complete", zap.Error(err)) + return + } + + c.chunkLock.Lock() + delete(c.chunks, fd) + c.chunkLock.Unlock() + return nil +} + +func (c *Cache) Stop() { + close(c.ch) +} diff --git a/vfs/file.go b/vfs/file.go index d561ab6..45d01cc 100644 --- a/vfs/file.go +++ b/vfs/file.go @@ -1,11 +1,12 @@ package vfs import ( - "go.uber.org/zap" + "fmt" "sync" "github.com/Xuanwo/go-bufferpool" "github.com/beyondstorage/go-storage/v4/pairs" + "go.uber.org/zap" "github.com/beyondstorage/beyond-fs/meta" ) @@ -49,14 +50,20 @@ func (fhm *fileHandleMap) Delete(id uint64) { type FileHandle struct { ID uint64 - ino *Inode - fs *FS - meta meta.Service + ino *Inode + fs *FS + meta meta.Service + cache *Cache mu sync.Mutex - buf *bufferpool.Buffer size uint64 offset uint64 + + // Read operations + buf *bufferpool.Buffer + + // Write operations + idx uint64 } func (fh *FileHandle) GetInode() *Inode { @@ -90,3 +97,44 @@ func (fh *FileHandle) Read(offset uint64, buf []byte) (n int, err error) { fh.offset += uint64(byteRead) return int(byteRead), nil } + +func (fh *FileHandle) PrepareForWrite() (err error) { + err = fh.cache.startWrite(fh.ID, fh.ino.Path) + if err != nil { + return + } + return +} + +func (fh *FileHandle) Write(offset uint64, buf []byte) (n int, err error) { + fh.mu.Lock() + defer fh.mu.Unlock() + + if offset != fh.offset { + return 0, fmt.Errorf("random write is not allowd") + } + + fh.fs.logger.Info("write data", + zap.String("path", fh.ino.Path), + zap.Uint64("offset", offset), + zap.Int("size", len(buf))) + byteWritten, err := fh.cache.write(fh.ID, fh.idx, buf) + if err != nil { + fh.fs.logger.Error("write buffer", zap.Error(err)) + return + } + + fh.idx += 1 + fh.size += uint64(byteWritten) + fh.offset += uint64(byteWritten) + + return int(byteWritten), nil +} + +func (fh *FileHandle) CloseForWrite() (err error) { + err = fh.cache.endWrite(fh.ID) + if err != nil { + return + } + return +} diff --git a/vfs/fs.go b/vfs/fs.go index fe09bab..20776a4 100644 --- a/vfs/fs.go +++ b/vfs/fs.go @@ -7,6 +7,7 @@ import ( "time" _ "github.com/beyondstorage/go-service-fs/v3" + _ "github.com/beyondstorage/go-service-memory" _ "github.com/beyondstorage/go-service-s3/v2" "github.com/beyondstorage/go-storage/v4/pairs" "github.com/beyondstorage/go-storage/v4/services" @@ -31,8 +32,9 @@ func NextHandle() uint64 { } type FS struct { - s types.Storager - meta meta.Service + s types.Storager + cache *Cache + meta meta.Service dhm *dirHandleMap fhm *fileHandleMap @@ -51,20 +53,30 @@ func NewFS(cfg *Config) (fs *FS, err error) { return nil, err } + cacheStore, err := services.NewStoragerFromString("memory://") + if err != nil { + return nil, err + } + metaSrv, err := meta.NewBadger() if err != nil { return nil, err } fs = &FS{ - s: store, - meta: metaSrv, + s: store, + // TODO: we will support other service as cache later. + cache: NewCache(store, cacheStore, cfg.Logger), + meta: metaSrv, dhm: newDirHandleMap(), fhm: newFileHandleMap(), logger: cfg.Logger, } + // Start cache service. + go fs.cache.Start() + o := types.NewObject(nil, true) o.ID = store.Metadata().WorkDir o.Path = "" @@ -163,10 +175,11 @@ func (fs *FS) DeleteDir(path string) (err error) { func (fs *FS) CreateFileHandle(ino *Inode) (fh *FileHandle, err error) { fh = &FileHandle{ - ID: NextHandle(), - ino: ino, - fs: fs, - meta: fs.meta, + ID: NextHandle(), + ino: ino, + fs: fs, + meta: fs.meta, + cache: fs.cache, buf: fileBufPool.Get(), size: ino.Size, @@ -181,6 +194,14 @@ func (fs *FS) GetFileHandle(fhid uint64) (fh *FileHandle, err error) { } func (fs *FS) DeleteFileHandle(fhid uint64) (err error) { + fh := fs.fhm.Get(fhid) + if fh == nil { + return nil + } + err = fh.CloseForWrite() + if err != nil { + return + } fs.fhm.Delete(fhid) return nil }