feat: add block file
This commit is contained in:
parent
c2862576c0
commit
6c1f7b4ef5
|
@ -37,6 +37,7 @@ server_key_path = "certs/server.key"
|
|||
router = "info"
|
||||
api = "info"
|
||||
coreapi = "info"
|
||||
storage = "info"
|
||||
|
||||
[cert]
|
||||
verify = true
|
||||
|
|
3
go.mod
3
go.mod
|
@ -10,7 +10,9 @@ require (
|
|||
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
|
||||
github.com/ethereum/go-ethereum v1.9.18
|
||||
github.com/fatih/color v1.7.0 // indirect
|
||||
github.com/go-playground/assert/v2 v2.0.1
|
||||
github.com/gobuffalo/envy v1.9.0 // indirect
|
||||
github.com/gobuffalo/logger v1.0.0
|
||||
github.com/gobuffalo/packd v1.0.0
|
||||
github.com/gobuffalo/packr v1.30.1
|
||||
github.com/gogo/protobuf v1.3.1
|
||||
|
@ -32,6 +34,7 @@ require (
|
|||
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_golang v1.5.0
|
||||
github.com/prometheus/tsdb v0.7.1
|
||||
github.com/rogpeppe/go-internal v1.5.2 // indirect
|
||||
github.com/rs/cors v1.7.0
|
||||
github.com/sirupsen/logrus v1.6.0
|
||||
|
|
20
go.sum
20
go.sum
|
@ -150,16 +150,22 @@ github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x
|
|||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
|
||||
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
|
||||
github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk=
|
||||
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
|
||||
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
|
||||
github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA=
|
||||
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
|
||||
github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
|
||||
github.com/go-playground/assert v1.2.1 h1:ad06XqC+TOv0nJWnbULSlh3ehp5uLuQEojZY5Tq8RgI=
|
||||
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
|
||||
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
|
||||
github.com/go-sourcemap/sourcemap v2.1.2+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/gobuffalo/envy v1.7.0 h1:GlXgaiBkmrYMHco6t4j7SacKO4XUjvh5pwXh0f4uxXU=
|
||||
github.com/gobuffalo/envy v1.7.0/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSCx4T5NI=
|
||||
github.com/gobuffalo/envy v1.9.0 h1:eZR0DuEgVLfeIb1zIKt3bT4YovIMf9O9LXQeCZLXpqE=
|
||||
github.com/gobuffalo/envy v1.9.0/go.mod h1:FurDp9+EDPE4aIUS3ZLyD+7/9fpx7YRt/ukY6jIHf0w=
|
||||
github.com/gobuffalo/logger v1.0.0 h1:xw9Ko9EcC5iAFprrjJ6oZco9UpzS5MQ4jAwghsLHdy4=
|
||||
github.com/gobuffalo/logger v1.0.0/go.mod h1:2zbswyIUa45I+c+FLXuWl9zSWEiVuthsk8ze5s8JvPs=
|
||||
github.com/gobuffalo/packd v0.3.0/go.mod h1:zC7QkmNkYVGKPw4tHpBQ+ml7W/3tIebgeo1b36chA3Q=
|
||||
github.com/gobuffalo/packd v1.0.0 h1:6ERZvJHfe24rfFmA9OaoKBdC7+c9sydrytMg8SdFGBM=
|
||||
|
@ -557,6 +563,12 @@ github.com/meshplus/bitxhub-core v0.1.0-rc1.0.20201023084554-3f7dc6cd1164 h1:Kqj
|
|||
github.com/meshplus/bitxhub-core v0.1.0-rc1.0.20201023084554-3f7dc6cd1164/go.mod h1:MHf0waxqnW4Qwfpq66jqvJP+FritN5OTs/8wlQcNlJY=
|
||||
github.com/meshplus/bitxhub-kit v1.0.0-rc1/go.mod h1:ra/AhOkPvpElI+wXrB9G6DjdcrdxFU3vMwA5MYKr9D0=
|
||||
github.com/meshplus/bitxhub-kit v1.0.0/go.mod h1:7cWyhXWZfrQ3+EaxkRoXfuiG3Y5R9DXYJomeZKkETW8=
|
||||
github.com/meshplus/bitxhub-kit v1.0.1-0.20200903112208-fb288271c55c/go.mod h1:8Pprmnq+2fFi5kJP0qcbwPl/fe22nro0OamjtwD0LJM=
|
||||
github.com/meshplus/bitxhub-kit v1.0.1-0.20200914065214-5161497a783c h1:H5RvXIJK9yjV8kqruGI9Ks5aal6PVvcpsMhn6hREtA8=
|
||||
github.com/meshplus/bitxhub-kit v1.0.1-0.20200914065214-5161497a783c/go.mod h1:Whtgcr25HOF6iJv0Ib5/BPnEXq9iNFO89j8JQkElISk=
|
||||
github.com/meshplus/bitxhub-kit v1.0.1-0.20201020090511-52fcd9cba5dc h1:rB9K68YrxFS4R2JbweF2zDh8Vwk30U00YLgZuiUSUpY=
|
||||
github.com/meshplus/bitxhub-kit v1.0.1-0.20201020090511-52fcd9cba5dc/go.mod h1:Whtgcr25HOF6iJv0Ib5/BPnEXq9iNFO89j8JQkElISk=
|
||||
github.com/meshplus/bitxhub-kit v1.1.1 h1:vkPO88oA3+Kpc0N8lIgfj/U52KBuI+633hPbMYt1xm8=
|
||||
github.com/meshplus/bitxhub-kit v1.1.2-0.20201021105954-468d0a9d7957 h1:1a3wYo2HQw9/yg5LfAPJ1En90pPbMwRlaVssxOLG97w=
|
||||
github.com/meshplus/bitxhub-kit v1.1.2-0.20201021105954-468d0a9d7957/go.mod h1:r4l4iqn0RPJreb/OmoYKfjCjQJrXpZX++6Qc31VG/1k=
|
||||
github.com/meshplus/bitxhub-kit v1.1.2-0.20201023030558-9f36554d5d5d h1:i3tXJcUBDn36YSVfokT0o7iwe/MxbUoaiUFw4EYtwUA=
|
||||
|
@ -564,10 +576,16 @@ github.com/meshplus/bitxhub-kit v1.1.2-0.20201023030558-9f36554d5d5d/go.mod h1:r
|
|||
github.com/meshplus/bitxhub-kit v1.1.2-0.20201023073721-052e6b89ea39 h1:zgkQfnwsJZgyj3aGECrQ3HFOAcekjVRxbWUEERPW5u0=
|
||||
github.com/meshplus/bitxhub-kit v1.1.2-0.20201023073721-052e6b89ea39/go.mod h1:r4l4iqn0RPJreb/OmoYKfjCjQJrXpZX++6Qc31VG/1k=
|
||||
github.com/meshplus/bitxhub-model v1.0.0-rc3/go.mod h1:ZCctQIYTlE3vJ8Lhkrgs9bWwNA+Dw4JzojOSIzLVU6E=
|
||||
github.com/meshplus/bitxhub-model v1.0.0-rc3/go.mod h1:ZCctQIYTlE3vJ8Lhkrgs9bWwNA+Dw4JzojOSIzLVU6E=
|
||||
github.com/meshplus/bitxhub-model v1.0.0-rc4.0.20200514093243-7e8ae60d1c19/go.mod h1:QK8aACbxtZEA3Hk1BOCirW0uxMWLsMrLDpWz9FweIKM=
|
||||
github.com/meshplus/bitxhub-model v1.0.0-rc4.0.20201009112846-79d2e6ddf10d h1:q4Vig+IVhBvARLyeOsFw8gAdor0ajiBl6lGch1N65sk=
|
||||
github.com/meshplus/bitxhub-model v1.0.0-rc4.0.20201009112846-79d2e6ddf10d/go.mod h1:QK8aACbxtZEA3Hk1BOCirW0uxMWLsMrLDpWz9FweIKM=
|
||||
github.com/meshplus/bitxhub-model v1.1.1 h1:/0Si29e14YW1GUbkJbCL8A70yXzxyiV/u36kxFC+gqI=
|
||||
github.com/meshplus/bitxhub-model v1.1.2-0.20201021152621-0b3c17c54b23 h1:ys+2VjPrt6nr5xEVgRsVxowipkF425IOcI5HV53M5bA=
|
||||
github.com/meshplus/bitxhub-model v1.1.2-0.20201021152621-0b3c17c54b23/go.mod h1:4qWBZx5wv7WZzUqiuBsbkQqQ2Ju8aOFpsoNpBBNy8Us=
|
||||
github.com/meshplus/bitxhub-model v1.1.2-0.20201023072831-329ed555ad0d h1:vm9SaxovbNoA/wpC8NrOPdnJuszARj3F1y3N81sNU8o=
|
||||
github.com/meshplus/bitxhub-model v1.1.2-0.20201023072831-329ed555ad0d/go.mod h1:4qWBZx5wv7WZzUqiuBsbkQqQ2Ju8aOFpsoNpBBNy8Us=
|
||||
github.com/meshplus/go-bitxhub-client v1.0.0-rc3/go.mod h1:FpiCyf6KhydcqthrHdvvPhbPIcD92b+Ju8T7WvQtSyM=
|
||||
github.com/meshplus/go-lightp2p v0.0.0-20200817105923-6b3aee40fa54 h1:5Ip5AB7SxxQHg5SRtf2cCOI2wy1p75MQB12soPtPyf8=
|
||||
github.com/meshplus/go-lightp2p v0.0.0-20200817105923-6b3aee40fa54/go.mod h1:G89UJaeqCQFxFdp8wzy1AdKfMtDEhpySau0pjDNeeaw=
|
||||
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
|
||||
|
@ -644,6 +662,7 @@ github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h
|
|||
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
|
||||
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
||||
github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
|
||||
github.com/olekukonko/tablewriter v0.0.2-0.20190409134802-7e037d187b0c/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
|
||||
|
@ -705,6 +724,7 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
|
|||
github.com/prometheus/procfs v0.0.10 h1:QJQN3jYQhkamO4mhfUWqdDH2asK7ONOI9MTWjyAxNKM=
|
||||
github.com/prometheus/procfs v0.0.10/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
|
||||
github.com/prometheus/tsdb v0.6.2-0.20190402121629-4f204dcbc150/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||
github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA=
|
||||
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/meshplus/bitxhub/internal/repo"
|
||||
"github.com/meshplus/bitxhub/internal/router"
|
||||
"github.com/meshplus/bitxhub/internal/storages"
|
||||
"github.com/meshplus/bitxhub/internal/storages/blockfile"
|
||||
"github.com/meshplus/bitxhub/pkg/order"
|
||||
"github.com/meshplus/bitxhub/pkg/order/etcdraft"
|
||||
"github.com/meshplus/bitxhub/pkg/peermgr"
|
||||
|
@ -108,8 +109,13 @@ func generateBitXHubWithoutOrder(rep *repo.Repo) (*BitXHub, error) {
|
|||
return nil, fmt.Errorf("create tm-leveldb: %w", err)
|
||||
}
|
||||
|
||||
bf, err := blockfile.NewBlockFile(repoRoot, loggers.Logger(loggers.Storage))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("blockfile initialize: %w", err)
|
||||
}
|
||||
|
||||
// 0. load ledger
|
||||
rwLdg, err := ledger.New(rep, bcStorage, ldb, nil, loggers.Logger(loggers.Executor))
|
||||
rwLdg, err := ledger.New(rep, bcStorage, ldb, bf, nil, loggers.Logger(loggers.Executor))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create RW ledger: %w", err)
|
||||
}
|
||||
|
@ -122,7 +128,7 @@ func generateBitXHubWithoutOrder(rep *repo.Repo) (*BitXHub, error) {
|
|||
}
|
||||
|
||||
// create read only ledger
|
||||
viewLdg, err := ledger.New(rep, bcStorage, ldb, rwLdg.AccountCache(), loggers.Logger(loggers.Executor))
|
||||
viewLdg, err := ledger.New(rep, bcStorage, ldb, bf, rwLdg.AccountCache(), loggers.Logger(loggers.Executor))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create readonly ledger: %w", err)
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/meshplus/bitxhub/internal/ledger/mock_ledger"
|
||||
"github.com/meshplus/bitxhub/internal/model/events"
|
||||
"github.com/meshplus/bitxhub/internal/repo"
|
||||
"github.com/meshplus/bitxhub/internal/storages/blockfile"
|
||||
"github.com/meshplus/bitxhub/pkg/cert"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -266,7 +267,10 @@ func TestBlockExecutor_ExecuteBlock_Transfer(t *testing.T) {
|
|||
repo.DefaultConfig()
|
||||
accountCache, err := ledger.NewAccountCache()
|
||||
assert.Nil(t, err)
|
||||
ldg, err := ledger.New(createMockRepo(t), blockchainStorage, ldb, accountCache, log.NewWithModule("ledger"))
|
||||
logger := log.NewWithModule("executor_test")
|
||||
blockFile, err := blockfile.NewBlockFile(repoRoot, logger)
|
||||
assert.Nil(t, err)
|
||||
ldg, err := ledger.New(createMockRepo(t), blockchainStorage, ldb, blockFile, accountCache, log.NewWithModule("ledger"))
|
||||
require.Nil(t, err)
|
||||
|
||||
_, from := loadAdminKey(t)
|
||||
|
@ -299,7 +303,7 @@ func TestBlockExecutor_ExecuteBlock_Transfer(t *testing.T) {
|
|||
require.EqualValues(t, uint64(99999997), ldg.GetBalance(from))
|
||||
|
||||
// test executor with readonly ledger
|
||||
viewLedger, err := ledger.New(createMockRepo(t), blockchainStorage, ldb, accountCache, log.NewWithModule("ledger"))
|
||||
viewLedger, err := ledger.New(createMockRepo(t), blockchainStorage, ldb, blockFile, accountCache, log.NewWithModule("ledger"))
|
||||
require.Nil(t, err)
|
||||
|
||||
exec, err := New(viewLedger, log.NewWithModule("executor"), executorType)
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/meshplus/bitxhub-kit/log"
|
||||
"github.com/meshplus/bitxhub-kit/storage/leveldb"
|
||||
"github.com/meshplus/bitxhub-kit/types"
|
||||
"github.com/meshplus/bitxhub/internal/storages/blockfile"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
|
@ -23,7 +24,10 @@ func TestAccount_GetState(t *testing.T) {
|
|||
|
||||
accountCache, err := NewAccountCache()
|
||||
assert.Nil(t, err)
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, accountCache, log.NewWithModule("ChainLedger"))
|
||||
logger := log.NewWithModule("account_test")
|
||||
blockFile, err := blockfile.NewBlockFile(repoRoot, logger)
|
||||
assert.Nil(t, err)
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, blockFile, accountCache, log.NewWithModule("ChainLedger"))
|
||||
assert.Nil(t, err)
|
||||
|
||||
h := hexutil.Encode(bytesutil.LeftPadBytes([]byte{11}, 20))
|
||||
|
|
|
@ -9,25 +9,21 @@ import (
|
|||
"github.com/meshplus/bitxhub-kit/storage"
|
||||
"github.com/meshplus/bitxhub-kit/types"
|
||||
"github.com/meshplus/bitxhub-model/pb"
|
||||
"github.com/meshplus/bitxhub/internal/storages/blockfile"
|
||||
)
|
||||
|
||||
// PutBlock put block into store
|
||||
func (l *ChainLedger) PutBlock(height uint64, block *pb.Block) error {
|
||||
data, err := block.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l.blockchainStore.Put(compositeKey(blockKey, height), data)
|
||||
// deprecated
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetBlock get block with height
|
||||
func (l *ChainLedger) GetBlock(height uint64) (*pb.Block, error) {
|
||||
data := l.blockchainStore.Get(compositeKey(blockKey, height))
|
||||
if data == nil {
|
||||
return nil, storage.ErrorNotFound
|
||||
data, err := l.bf.Get(blockfile.BlockFileBodiesTable, height)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
block := &pb.Block{}
|
||||
|
@ -44,16 +40,16 @@ func (l *ChainLedger) GetBlock(height uint64) (*pb.Block, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var txs []*pb.Transaction
|
||||
for _, hash := range txHashes {
|
||||
tx, err := l.GetTransaction(hash)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get tx with hash %s", hash.String())
|
||||
}
|
||||
txs = append(txs, tx)
|
||||
txs := &pb.Transactions{}
|
||||
txsBytes, err := l.bf.Get(blockfile.BlockFileTXsTable, height)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := txs.Unmarshal(txsBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
block.Transactions = txs
|
||||
block.Transactions = txs.Transactions
|
||||
|
||||
return block, nil
|
||||
}
|
||||
|
@ -85,16 +81,24 @@ func (l *ChainLedger) GetBlockByHash(hash *types.Hash) (*pb.Block, error) {
|
|||
|
||||
// GetTransaction get the transaction using transaction hash
|
||||
func (l *ChainLedger) GetTransaction(hash *types.Hash) (*pb.Transaction, error) {
|
||||
v := l.blockchainStore.Get(compositeKey(transactionKey, hash.String()))
|
||||
if v == nil {
|
||||
metaBytes := l.blockchainStore.Get(compositeKey(transactionMetaKey, hash.String()))
|
||||
if metaBytes == nil {
|
||||
return nil, storage.ErrorNotFound
|
||||
}
|
||||
tx := &pb.Transaction{}
|
||||
if err := tx.Unmarshal(v); err != nil {
|
||||
meta := &pb.TransactionMeta{}
|
||||
if err := meta.Unmarshal(metaBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
txsBytes, err := l.bf.Get(blockfile.BlockFileTXsTable, meta.BlockHeight)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
txs := &pb.Transactions{}
|
||||
if err := txs.Unmarshal(txsBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tx, nil
|
||||
return txs.Transactions[meta.Index], nil
|
||||
}
|
||||
|
||||
func (l *ChainLedger) GetTransactionCount(height uint64) (uint64, error) {
|
||||
|
@ -127,17 +131,24 @@ func (l *ChainLedger) GetTransactionMeta(hash *types.Hash) (*pb.TransactionMeta,
|
|||
|
||||
// GetReceipt get the transaction receipt
|
||||
func (l *ChainLedger) GetReceipt(hash *types.Hash) (*pb.Receipt, error) {
|
||||
data := l.blockchainStore.Get(compositeKey(receiptKey, hash.String()))
|
||||
if data == nil {
|
||||
metaBytes := l.blockchainStore.Get(compositeKey(transactionMetaKey, hash.String()))
|
||||
if metaBytes == nil {
|
||||
return nil, storage.ErrorNotFound
|
||||
}
|
||||
|
||||
r := &pb.Receipt{}
|
||||
if err := r.Unmarshal(data); err != nil {
|
||||
meta := &pb.TransactionMeta{}
|
||||
if err := meta.Unmarshal(metaBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rsBytes, err := l.bf.Get(blockfile.BlockFileReceiptTable, meta.BlockHeight)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rs := &pb.Receipts{}
|
||||
if err := rs.Unmarshal(rsBytes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r, nil
|
||||
return rs.Receipts[meta.Index], nil
|
||||
}
|
||||
|
||||
// PersistExecutionResult persist the execution result
|
||||
|
@ -150,19 +161,23 @@ func (l *ChainLedger) PersistExecutionResult(block *pb.Block, receipts []*pb.Rec
|
|||
|
||||
batcher := l.blockchainStore.NewBatch()
|
||||
|
||||
if err := l.persistReceipts(batcher, receipts); err != nil {
|
||||
rs, err := l.prepareReceipts(batcher, block, receipts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := l.persistTransactions(batcher, block); err != nil {
|
||||
ts, err := l.prepareTransactions(batcher, block)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := l.persistBlock(batcher, block); err != nil {
|
||||
b, err := l.prepareBlock(batcher, block)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := l.persistInterChainMeta(batcher, interchainMeta, block.BlockHeader.Number); err != nil {
|
||||
im, err := interchainMeta.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -178,6 +193,10 @@ func (l *ChainLedger) PersistExecutionResult(block *pb.Block, receipts []*pb.Rec
|
|||
InterchainTxCount: count + l.chainMeta.InterchainTxCount,
|
||||
}
|
||||
|
||||
if err := l.bf.AppendBlock(l.chainMeta.Height, block.BlockHash.Bytes(), b, rs, ts, im); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := l.persistChainMeta(batcher, meta); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -213,9 +232,9 @@ func (l *ChainLedger) GetChainMeta() *pb.ChainMeta {
|
|||
}
|
||||
|
||||
func (l *ChainLedger) GetInterchainMeta(height uint64) (*pb.InterchainMeta, error) {
|
||||
data := l.blockchainStore.Get(compositeKey(interchainMetaKey, height))
|
||||
if data == nil {
|
||||
return nil, storage.ErrorNotFound
|
||||
data, err := l.bf.Get(blockfile.BlockFileInterchainTable, height)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
meta := &pb.InterchainMeta{}
|
||||
|
@ -226,50 +245,41 @@ func (l *ChainLedger) GetInterchainMeta(height uint64) (*pb.InterchainMeta, erro
|
|||
return meta, nil
|
||||
}
|
||||
|
||||
func (l *ChainLedger) persistReceipts(batcher storage.Batch, receipts []*pb.Receipt) error {
|
||||
for _, receipt := range receipts {
|
||||
data, err := receipt.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
batcher.Put(compositeKey(receiptKey, receipt.TxHash.String()), data)
|
||||
func (l *ChainLedger) prepareReceipts(batcher storage.Batch, block *pb.Block, receipts []*pb.Receipt) ([]byte, error) {
|
||||
rs := &pb.Receipts{
|
||||
Receipts: receipts,
|
||||
}
|
||||
|
||||
return nil
|
||||
return rs.Marshal()
|
||||
}
|
||||
|
||||
func (l *ChainLedger) persistTransactions(batcher storage.Batch, block *pb.Block) error {
|
||||
func (l *ChainLedger) prepareTransactions(batcher storage.Batch, block *pb.Block) ([]byte, error) {
|
||||
for i, tx := range block.Transactions {
|
||||
body, err := tx.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
batcher.Put(compositeKey(transactionKey, tx.TransactionHash.String()), body)
|
||||
|
||||
meta := &pb.TransactionMeta{
|
||||
BlockHeight: block.BlockHeader.Number,
|
||||
BlockHash: block.BlockHash.Bytes(),
|
||||
Index: uint64(i),
|
||||
}
|
||||
|
||||
bs, err := meta.Marshal()
|
||||
metaBytes, err := meta.Marshal()
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal tx meta error: %s", err)
|
||||
return nil, fmt.Errorf("marshal tx meta error: %s", err)
|
||||
}
|
||||
|
||||
batcher.Put(compositeKey(transactionMetaKey, tx.TransactionHash.String()), bs)
|
||||
batcher.Put(compositeKey(transactionMetaKey, tx.TransactionHash.String()), metaBytes)
|
||||
}
|
||||
|
||||
return nil
|
||||
ts := &pb.Transactions{
|
||||
Transactions: block.Transactions,
|
||||
}
|
||||
return ts.Marshal()
|
||||
}
|
||||
|
||||
func (l *ChainLedger) persistBlock(batcher storage.Batch, block *pb.Block) error {
|
||||
func (l *ChainLedger) prepareBlock(batcher storage.Batch, block *pb.Block) ([]byte, error) {
|
||||
// Generate block header signature
|
||||
signed, err := l.repo.Key.PrivKey.Sign(block.BlockHash.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
block.Signature = signed
|
||||
|
@ -283,11 +293,10 @@ func (l *ChainLedger) persistBlock(batcher storage.Batch, block *pb.Block) error
|
|||
}
|
||||
bs, err := storedBlock.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
height := block.BlockHeader.Number
|
||||
batcher.Put(compositeKey(blockKey, height), bs)
|
||||
|
||||
var txHashes []types.Hash
|
||||
for _, tx := range block.Transactions {
|
||||
|
@ -296,7 +305,7 @@ func (l *ChainLedger) persistBlock(batcher storage.Batch, block *pb.Block) error
|
|||
|
||||
data, err := json.Marshal(txHashes)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
batcher.Put(compositeKey(blockTxSetKey, height), data)
|
||||
|
@ -304,18 +313,7 @@ func (l *ChainLedger) persistBlock(batcher storage.Batch, block *pb.Block) error
|
|||
hash := block.BlockHash.String()
|
||||
batcher.Put(compositeKey(blockHashKey, hash), []byte(fmt.Sprintf("%d", height)))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *ChainLedger) persistInterChainMeta(batcher storage.Batch, meta *pb.InterchainMeta, height uint64) error {
|
||||
data, err := meta.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
batcher.Put(compositeKey(interchainMetaKey, height), data)
|
||||
|
||||
return nil
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
func (l *ChainLedger) persistChainMeta(batcher storage.Batch, meta *pb.ChainMeta) error {
|
||||
|
@ -339,14 +337,16 @@ func (l *ChainLedger) removeChainDataOnBlock(batch storage.Batch, height uint64)
|
|||
return 0, err
|
||||
}
|
||||
|
||||
batch.Delete(compositeKey(blockKey, height))
|
||||
if err := l.bf.TruncateBlocks(height - 1); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
batch.Delete(compositeKey(blockTxSetKey, height))
|
||||
batch.Delete(compositeKey(blockHashKey, block.BlockHash.String()))
|
||||
batch.Delete(compositeKey(interchainMetaKey, height))
|
||||
|
||||
for _, tx := range block.Transactions {
|
||||
batch.Delete(compositeKey(transactionKey, tx.TransactionHash.String()))
|
||||
batch.Delete(compositeKey(transactionMetaKey, tx.TransactionHash.String()))
|
||||
batch.Delete(compositeKey(receiptKey, tx.TransactionHash.String()))
|
||||
}
|
||||
|
||||
return getInterchainTxCount(interchainMeta), nil
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/meshplus/bitxhub-kit/types"
|
||||
"github.com/meshplus/bitxhub-model/pb"
|
||||
"github.com/meshplus/bitxhub/internal/repo"
|
||||
"github.com/meshplus/bitxhub/internal/storages/blockfile"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
|
@ -25,6 +26,7 @@ type ChainLedger struct {
|
|||
logger logrus.FieldLogger
|
||||
blockchainStore storage.Storage
|
||||
ldb storage.Storage
|
||||
bf *blockfile.BlockFile
|
||||
minJnlHeight uint64
|
||||
maxJnlHeight uint64
|
||||
events sync.Map
|
||||
|
@ -49,7 +51,7 @@ type BlockData struct {
|
|||
}
|
||||
|
||||
// New create a new ledger instance
|
||||
func New(repo *repo.Repo, blockchainStore storage.Storage, ldb storage.Storage, accountCache *AccountCache, logger logrus.FieldLogger) (*ChainLedger, error) {
|
||||
func New(repo *repo.Repo, blockchainStore storage.Storage, ldb storage.Storage, bf *blockfile.BlockFile, accountCache *AccountCache, logger logrus.FieldLogger) (*ChainLedger, error) {
|
||||
chainMeta, err := loadChainMeta(blockchainStore)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("load chain meta: %w", err)
|
||||
|
@ -79,6 +81,7 @@ func New(repo *repo.Repo, blockchainStore storage.Storage, ldb storage.Storage,
|
|||
chainMeta: chainMeta,
|
||||
blockchainStore: blockchainStore,
|
||||
ldb: ldb,
|
||||
bf: bf,
|
||||
minJnlHeight: minJnlHeight,
|
||||
maxJnlHeight: maxJnlHeight,
|
||||
accounts: make(map[string]*Account),
|
||||
|
@ -185,4 +188,5 @@ func (l *ChainLedger) Events(txHash string) []*pb.Event {
|
|||
func (l *ChainLedger) Close() {
|
||||
l.ldb.Close()
|
||||
l.blockchainStore.Close()
|
||||
l.bf.Close()
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
@ -16,6 +17,7 @@ import (
|
|||
"github.com/meshplus/bitxhub-kit/types"
|
||||
"github.com/meshplus/bitxhub-model/pb"
|
||||
"github.com/meshplus/bitxhub/internal/repo"
|
||||
"github.com/meshplus/bitxhub/internal/storages/blockfile"
|
||||
"github.com/meshplus/bitxhub/pkg/cert"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -30,7 +32,10 @@ func TestNew001(t *testing.T) {
|
|||
ldb, err := leveldb.New(filepath.Join(repoRoot, "ledger"))
|
||||
assert.Nil(t, err)
|
||||
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, nil, log.NewWithModule("executor"))
|
||||
logger := log.NewWithModule("account_test")
|
||||
blockFile, err := blockfile.NewBlockFile(repoRoot, logger)
|
||||
assert.Nil(t, err)
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, blockFile, nil, log.NewWithModule("executor"))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, ledger)
|
||||
}
|
||||
|
@ -48,7 +53,10 @@ func TestNew002(t *testing.T) {
|
|||
|
||||
accountCache, err := NewAccountCache()
|
||||
assert.Nil(t, err)
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, accountCache, log.NewWithModule("executor"))
|
||||
logger := log.NewWithModule("account_test")
|
||||
blockFile, err := blockfile.NewBlockFile(repoRoot, logger)
|
||||
assert.Nil(t, err)
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, blockFile, accountCache, log.NewWithModule("executor"))
|
||||
require.NotNil(t, err)
|
||||
require.Nil(t, ledger)
|
||||
}
|
||||
|
@ -64,7 +72,10 @@ func TestNew003(t *testing.T) {
|
|||
|
||||
ldb.Put(compositeKey(journalKey, maxHeightStr), marshalHeight(1))
|
||||
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, nil, log.NewWithModule("executor"))
|
||||
logger := log.NewWithModule("account_test")
|
||||
blockFile, err := blockfile.NewBlockFile(repoRoot, logger)
|
||||
assert.Nil(t, err)
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, blockFile, nil, log.NewWithModule("executor"))
|
||||
require.NotNil(t, err)
|
||||
require.Nil(t, ledger)
|
||||
}
|
||||
|
@ -86,7 +97,10 @@ func TestNew004(t *testing.T) {
|
|||
|
||||
ldb.Put(compositeKey(journalKey, 1), data)
|
||||
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, nil, log.NewWithModule("executor"))
|
||||
logger := log.NewWithModule("account_test")
|
||||
blockFile, err := blockfile.NewBlockFile(repoRoot, logger)
|
||||
assert.Nil(t, err)
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, blockFile, nil, log.NewWithModule("executor"))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, ledger)
|
||||
}
|
||||
|
@ -109,7 +123,10 @@ func TestNew005(t *testing.T) {
|
|||
|
||||
ldb.Put(compositeKey(journalKey, 5), data)
|
||||
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, nil, log.NewWithModule("executor"))
|
||||
logger := log.NewWithModule("account_test")
|
||||
blockFile, err := blockfile.NewBlockFile(repoRoot, logger)
|
||||
assert.Nil(t, err)
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, blockFile, nil, log.NewWithModule("executor"))
|
||||
require.NotNil(t, err)
|
||||
require.Nil(t, ledger)
|
||||
}
|
||||
|
@ -285,7 +302,7 @@ func TestChainLedger_Rollback(t *testing.T) {
|
|||
err = ledger.Rollback(2)
|
||||
assert.Nil(t, err)
|
||||
block, err = ledger.GetBlock(3)
|
||||
assert.Equal(t, storage.ErrorNotFound, err)
|
||||
assert.Equal(t, fmt.Errorf("out of bounds"), err)
|
||||
assert.Nil(t, block)
|
||||
assert.Equal(t, uint64(2), ledger.chainMeta.Height)
|
||||
assert.Equal(t, journal2.ChangedHash.String(), ledger.prevJnlHash.String())
|
||||
|
@ -579,7 +596,7 @@ func TestChainLedger_GetInterchainMeta(t *testing.T) {
|
|||
accounts, journal := ledger.FlushDirtyDataAndComputeJournal()
|
||||
|
||||
meta, err := ledger.GetInterchainMeta(1)
|
||||
require.Equal(t, storage.ErrorNotFound, err)
|
||||
require.Equal(t, fmt.Errorf("out of bounds"), err)
|
||||
require.Nil(t, meta)
|
||||
|
||||
ledger.PersistBlockData(genBlockData(1, accounts, journal))
|
||||
|
@ -589,22 +606,23 @@ func TestChainLedger_GetInterchainMeta(t *testing.T) {
|
|||
require.Nil(t, err)
|
||||
require.Equal(t, 0, len(meta.Counter))
|
||||
|
||||
meta = &pb.InterchainMeta{
|
||||
Counter: make(map[string]*pb.Uint64Slice),
|
||||
L2Roots: make([]types.Hash, 0),
|
||||
}
|
||||
meta.Counter["a"] = &pb.Uint64Slice{}
|
||||
meta.L2Roots = append(meta.L2Roots, *types.NewHash([]byte{}))
|
||||
batch := ledger.blockchainStore.NewBatch()
|
||||
err = ledger.persistInterChainMeta(batch, meta, 2)
|
||||
require.Nil(t, err)
|
||||
batch.Commit()
|
||||
// deprecated for blockfile
|
||||
// meta = &pb.InterchainMeta{
|
||||
// Counter: make(map[string]*pb.Uint64Slice),
|
||||
// L2Roots: make([]types.Hash, 0),
|
||||
// }
|
||||
// meta.Counter["a"] = &pb.Uint64Slice{}
|
||||
// meta.L2Roots = append(meta.L2Roots, *types.NewHash([]byte{}))
|
||||
// batch := ledger.blockchainStore.NewBatch()
|
||||
// err = ledger.persistInterChainMeta(batch, meta, 2)
|
||||
// require.Nil(t, err)
|
||||
// batch.Commit()
|
||||
|
||||
meta2, err := ledger.GetInterchainMeta(2)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, len(meta.Counter), len(meta2.Counter))
|
||||
require.Equal(t, meta.Counter["a"], meta2.Counter["a"])
|
||||
require.Equal(t, len(meta.L2Roots), len(meta2.Counter))
|
||||
// meta2, err := ledger.GetInterchainMeta(2)
|
||||
// require.Nil(t, err)
|
||||
// require.Equal(t, len(meta.Counter), len(meta2.Counter))
|
||||
// require.Equal(t, meta.Counter["a"], meta2.Counter["a"])
|
||||
// require.Equal(t, len(meta.L2Roots), len(meta2.Counter))
|
||||
}
|
||||
|
||||
func TestChainLedger_AddState(t *testing.T) {
|
||||
|
@ -676,6 +694,142 @@ func TestChainLedger_AddEvent(t *testing.T) {
|
|||
assert.Equal(t, 0, len(events))
|
||||
}
|
||||
|
||||
func TestPutBlock(t *testing.T) {
|
||||
repoRoot, err := ioutil.TempDir("", "TestPutBlock")
|
||||
require.Nil(t, err)
|
||||
|
||||
blockStorage, err := leveldb.New(filepath.Join(repoRoot, "storage"))
|
||||
assert.Nil(t, err)
|
||||
ldb, err := leveldb.New(filepath.Join(repoRoot, "ledger"))
|
||||
assert.Nil(t, err)
|
||||
|
||||
logger := log.NewWithModule("account_test")
|
||||
blockFile, err := blockfile.NewBlockFile(repoRoot, logger)
|
||||
assert.Nil(t, err)
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, blockFile, nil, log.NewWithModule("executor"))
|
||||
block := &pb.Block{}
|
||||
ledger.PutBlock(uint64(0), block)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestGetBlockSign(t *testing.T) {
|
||||
ledger, _ := initLedger(t, "")
|
||||
_, err := ledger.GetBlockSign(uint64(0))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestGetBlockByHash(t *testing.T) {
|
||||
ledger, _ := initLedger(t, "")
|
||||
_, err := ledger.GetBlockByHash(types.NewHash([]byte("1")))
|
||||
assert.Equal(t, storage.ErrorNotFound, err)
|
||||
ledger.blockchainStore.Put(compositeKey(blockHashKey, types.NewHash([]byte("1")).String()), []byte("1"))
|
||||
_, err = ledger.GetBlockByHash(types.NewHash([]byte("1")))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestGetTransaction(t *testing.T) {
|
||||
ledger, _ := initLedger(t, "")
|
||||
_, err := ledger.GetTransaction(types.NewHash([]byte("1")))
|
||||
assert.Equal(t, storage.ErrorNotFound, err)
|
||||
ledger.blockchainStore.Put(compositeKey(transactionMetaKey, types.NewHash([]byte("1")).String()), []byte("1"))
|
||||
_, err = ledger.GetTransaction(types.NewHash([]byte("1")))
|
||||
assert.NotNil(t, err)
|
||||
err = ledger.bf.AppendBlock(0, []byte("1"), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
require.Nil(t, err)
|
||||
_, err = ledger.GetTransaction(types.NewHash([]byte("1")))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestGetTransaction1(t *testing.T) {
|
||||
ledger, _ := initLedger(t, "")
|
||||
_, err := ledger.GetTransaction(types.NewHash([]byte("1")))
|
||||
assert.Equal(t, storage.ErrorNotFound, err)
|
||||
meta := pb.TransactionMeta{
|
||||
BlockHeight: 0,
|
||||
}
|
||||
metaBytes, err := meta.Marshal()
|
||||
require.Nil(t, err)
|
||||
ledger.blockchainStore.Put(compositeKey(transactionMetaKey, types.NewHash([]byte("1")).String()), metaBytes)
|
||||
_, err = ledger.GetTransaction(types.NewHash([]byte("1")))
|
||||
assert.NotNil(t, err)
|
||||
err = ledger.bf.AppendBlock(0, []byte("1"), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
require.Nil(t, err)
|
||||
_, err = ledger.GetTransaction(types.NewHash([]byte("1")))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestGetTransactionMeta(t *testing.T) {
|
||||
ledger, _ := initLedger(t, "")
|
||||
_, err := ledger.GetTransactionMeta(types.NewHash([]byte("1")))
|
||||
assert.Equal(t, storage.ErrorNotFound, err)
|
||||
ledger.blockchainStore.Put(compositeKey(transactionMetaKey, types.NewHash([]byte("1")).String()), []byte("1"))
|
||||
_, err = ledger.GetTransactionMeta(types.NewHash([]byte("1")))
|
||||
assert.NotNil(t, err)
|
||||
err = ledger.bf.AppendBlock(0, []byte("1"), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
require.Nil(t, err)
|
||||
_, err = ledger.GetTransactionMeta(types.NewHash([]byte("1")))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestGetReceipt(t *testing.T) {
|
||||
ledger, _ := initLedger(t, "")
|
||||
_, err := ledger.GetReceipt(types.NewHash([]byte("1")))
|
||||
assert.Equal(t, storage.ErrorNotFound, err)
|
||||
ledger.blockchainStore.Put(compositeKey(transactionMetaKey, types.NewHash([]byte("1")).String()), []byte("0"))
|
||||
_, err = ledger.GetReceipt(types.NewHash([]byte("1")))
|
||||
assert.NotNil(t, err)
|
||||
err = ledger.bf.AppendBlock(0, []byte("1"), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
require.Nil(t, err)
|
||||
_, err = ledger.GetReceipt(types.NewHash([]byte("1")))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestGetReceipt1(t *testing.T) {
|
||||
ledger, _ := initLedger(t, "")
|
||||
_, err := ledger.GetTransaction(types.NewHash([]byte("1")))
|
||||
assert.Equal(t, storage.ErrorNotFound, err)
|
||||
meta := pb.TransactionMeta{
|
||||
BlockHeight: 0,
|
||||
}
|
||||
metaBytes, err := meta.Marshal()
|
||||
require.Nil(t, err)
|
||||
ledger.blockchainStore.Put(compositeKey(transactionMetaKey, types.NewHash([]byte("1")).String()), metaBytes)
|
||||
_, err = ledger.GetReceipt(types.NewHash([]byte("1")))
|
||||
assert.NotNil(t, err)
|
||||
err = ledger.bf.AppendBlock(0, []byte("1"), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
require.Nil(t, err)
|
||||
_, err = ledger.GetReceipt(types.NewHash([]byte("1")))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestPrepare(t *testing.T) {
|
||||
ledger, _ := initLedger(t, "")
|
||||
batch := ledger.blockchainStore.NewBatch()
|
||||
transactions := []*pb.Transaction{}
|
||||
transaction := &pb.Transaction{
|
||||
TransactionHash: types.NewHash([]byte("1")),
|
||||
}
|
||||
transactions = append(transactions, transaction)
|
||||
block := &pb.Block{
|
||||
BlockHeader: &pb.BlockHeader{
|
||||
Number: uint64(0),
|
||||
},
|
||||
BlockHash: types.NewHash([]byte{1}),
|
||||
Transactions: transactions,
|
||||
}
|
||||
_, err := ledger.prepareBlock(batch, block)
|
||||
require.Nil(t, err)
|
||||
receipts := []*pb.Receipt{}
|
||||
receipt := &pb.Receipt{
|
||||
TxHash: types.NewHash([]byte("1")),
|
||||
}
|
||||
receipts = append(receipts, receipt)
|
||||
_, err = ledger.prepareReceipts(batch, block, receipts)
|
||||
require.Nil(t, err)
|
||||
_, err = ledger.prepareTransactions(batch, block)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
func genBlockData(height uint64, accounts map[string]*Account, journal *BlockJournal) *BlockData {
|
||||
return &BlockData{
|
||||
Block: &pb.Block{
|
||||
|
@ -725,7 +879,10 @@ func initLedger(t *testing.T, repoRoot string) (*ChainLedger, string) {
|
|||
|
||||
accountCache, err := NewAccountCache()
|
||||
assert.Nil(t, err)
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, accountCache, log.NewWithModule("executor"))
|
||||
logger := log.NewWithModule("account_test")
|
||||
blockFile, err := blockfile.NewBlockFile(repoRoot, logger)
|
||||
assert.Nil(t, err)
|
||||
ledger, err := New(createMockRepo(t), blockStorage, ldb, blockFile, accountCache, log.NewWithModule("executor"))
|
||||
require.Nil(t, err)
|
||||
|
||||
return ledger, repoRoot
|
||||
|
|
|
@ -14,6 +14,7 @@ const (
|
|||
App = "app"
|
||||
API = "api"
|
||||
CoreAPI = "coreapi"
|
||||
Storage = "storage"
|
||||
)
|
||||
|
||||
var w *loggerWrapper
|
||||
|
@ -38,6 +39,8 @@ func Initialize(config *repo.Config) {
|
|||
m[API].Logger.SetLevel(log.ParseLevel(config.Log.Module.API))
|
||||
m[CoreAPI] = log.NewWithModule(CoreAPI)
|
||||
m[CoreAPI].Logger.SetLevel(log.ParseLevel(config.Log.Module.CoreAPI))
|
||||
m[Storage] = log.NewWithModule(Storage)
|
||||
m[Storage].Logger.SetLevel(log.ParseLevel(config.Log.Module.Storage))
|
||||
|
||||
w = &loggerWrapper{loggers: m}
|
||||
}
|
||||
|
|
|
@ -87,6 +87,7 @@ type LogModule struct {
|
|||
Router string `toml:"router" json:"router"`
|
||||
API string `toml:"api" json:"api"`
|
||||
CoreAPI string `mapstructure:"coreapi" toml:"coreapi" json:"coreapi"`
|
||||
Storage string `toml:"storage" json:"storage"`
|
||||
}
|
||||
|
||||
type Genesis struct {
|
||||
|
|
|
@ -0,0 +1,181 @@
|
|||
package blockfile
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/prometheus/tsdb/fileutil"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type BlockFile struct {
|
||||
blocks uint64 // Number of blocks
|
||||
|
||||
tables map[string]*BlockTable // Data tables for stroring blocks
|
||||
instanceLock fileutil.Releaser // File-system lock to prevent double opens
|
||||
|
||||
logger logrus.FieldLogger
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
func NewBlockFile(repoRoot string, logger logrus.FieldLogger) (*BlockFile, error) {
|
||||
if info, err := os.Lstat(repoRoot); !os.IsNotExist(err) {
|
||||
if info.Mode()&os.ModeSymlink != 0 {
|
||||
logger.WithField("path", repoRoot).Error("Symbolic link is not supported")
|
||||
return nil, fmt.Errorf("symbolic link datadir is not supported")
|
||||
}
|
||||
}
|
||||
lock, _, err := fileutil.Flock(filepath.Join(repoRoot, "FLOCK"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
blockfile := &BlockFile{
|
||||
tables: make(map[string]*BlockTable),
|
||||
instanceLock: lock,
|
||||
logger: logger,
|
||||
}
|
||||
for name := range BlockFileSchema {
|
||||
table, err := newTable(repoRoot, name, 2*1000*1000*1000, logger)
|
||||
if err != nil {
|
||||
for _, table := range blockfile.tables {
|
||||
table.Close()
|
||||
}
|
||||
_ = lock.Release()
|
||||
return nil, err
|
||||
}
|
||||
blockfile.tables[name] = table
|
||||
}
|
||||
if err := blockfile.repair(); err != nil {
|
||||
for _, table := range blockfile.tables {
|
||||
table.Close()
|
||||
}
|
||||
_ = lock.Release()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return blockfile, nil
|
||||
}
|
||||
|
||||
func (bf *BlockFile) Blocks() (uint64, error) {
|
||||
return atomic.LoadUint64(&bf.blocks), nil
|
||||
}
|
||||
|
||||
func (bf *BlockFile) Get(kind string, number uint64) ([]byte, error) {
|
||||
if table := bf.tables[kind]; table != nil {
|
||||
return table.Retrieve(number - 1)
|
||||
}
|
||||
return nil, fmt.Errorf("unknown table")
|
||||
}
|
||||
|
||||
func (bf *BlockFile) AppendBlock(number uint64, hash, body, receipts, transactions, interchainMetas []byte) (err error) {
|
||||
if atomic.LoadUint64(&bf.blocks) != number {
|
||||
return fmt.Errorf("the append operation is out-order")
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
rerr := bf.repair()
|
||||
if rerr != nil {
|
||||
bf.logger.WithField("err", err).Errorf("Failed to repair blockfile")
|
||||
}
|
||||
bf.logger.WithFields(logrus.Fields{
|
||||
"number": number,
|
||||
"err": err,
|
||||
}).Info("Append block failed")
|
||||
}
|
||||
}()
|
||||
if err := bf.tables[BlockFileHashTable].Append(bf.blocks, hash); err != nil {
|
||||
bf.logger.WithFields(logrus.Fields{
|
||||
"number": bf.blocks,
|
||||
"hash": hash,
|
||||
"err": err,
|
||||
}).Error("Failed to append block hash")
|
||||
return err
|
||||
}
|
||||
if err := bf.tables[BlockFileBodiesTable].Append(bf.blocks, body); err != nil {
|
||||
bf.logger.WithFields(logrus.Fields{
|
||||
"number": bf.blocks,
|
||||
"hash": hash,
|
||||
"err": err,
|
||||
}).Error("Failed to append block body")
|
||||
return err
|
||||
}
|
||||
if err := bf.tables[BlockFileTXsTable].Append(bf.blocks, transactions); err != nil {
|
||||
bf.logger.WithFields(logrus.Fields{
|
||||
"number": bf.blocks,
|
||||
"hash": hash,
|
||||
"err": err,
|
||||
}).Error("Failed to append block transactions")
|
||||
return err
|
||||
}
|
||||
if err := bf.tables[BlockFileReceiptTable].Append(bf.blocks, receipts); err != nil {
|
||||
bf.logger.WithFields(logrus.Fields{
|
||||
"number": bf.blocks,
|
||||
"hash": hash,
|
||||
"err": err,
|
||||
}).Error("Failed to append block receipt")
|
||||
return err
|
||||
}
|
||||
if err := bf.tables[BlockFileInterchainTable].Append(bf.blocks, interchainMetas); err != nil {
|
||||
bf.logger.WithFields(logrus.Fields{
|
||||
"number": bf.blocks,
|
||||
"hash": hash,
|
||||
"err": err,
|
||||
}).Error("Failed to append block interchain metas")
|
||||
return err
|
||||
}
|
||||
atomic.AddUint64(&bf.blocks, 1) // Only modify atomically
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bf *BlockFile) TruncateBlocks(items uint64) error {
|
||||
if atomic.LoadUint64(&bf.blocks) <= items {
|
||||
return nil
|
||||
}
|
||||
for _, table := range bf.tables {
|
||||
if err := table.truncate(items); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
atomic.StoreUint64(&bf.blocks, items)
|
||||
return nil
|
||||
}
|
||||
|
||||
// repair truncates all data tables to the same length.
|
||||
func (bf *BlockFile) repair() error {
|
||||
min := uint64(math.MaxUint64)
|
||||
for _, table := range bf.tables {
|
||||
items := atomic.LoadUint64(&table.items)
|
||||
if min > items {
|
||||
min = items
|
||||
}
|
||||
}
|
||||
for _, table := range bf.tables {
|
||||
if err := table.truncate(min); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
atomic.StoreUint64(&bf.blocks, min)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bf *BlockFile) Close() error {
|
||||
var errs []error
|
||||
bf.closeOnce.Do(func() {
|
||||
for _, table := range bf.tables {
|
||||
if err := table.Close(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
if err := bf.instanceLock.Release(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
})
|
||||
if errs != nil {
|
||||
return fmt.Errorf("%v", errs)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,475 @@
|
|||
package blockfile
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type BlockTable struct {
|
||||
items uint64
|
||||
|
||||
name string
|
||||
path string
|
||||
maxFileSize uint32 // Max file size for data-files
|
||||
|
||||
head *os.File // File descriptor for the data head of the table
|
||||
index *os.File // File description
|
||||
files map[uint32]*os.File // open files
|
||||
headId uint32 // number of the currently active head file
|
||||
tailId uint32 // number of the earliest file
|
||||
|
||||
headBytes uint32 // Number of bytes written to the head file
|
||||
itemOffset uint32 // Offset (number of discarded items)
|
||||
|
||||
logger logrus.FieldLogger
|
||||
lock sync.RWMutex // Mutex protecting the data file descriptors
|
||||
}
|
||||
|
||||
type indexEntry struct {
|
||||
filenum uint32 // stored as uint16 ( 2 bytes)
|
||||
offset uint32 // stored as uint32 ( 4 bytes)
|
||||
}
|
||||
|
||||
const indexEntrySize = 6
|
||||
|
||||
// unmarshallBinary deserializes binary b into the rawIndex entry.
|
||||
func (i *indexEntry) unmarshalBinary(b []byte) error {
|
||||
i.filenum = uint32(binary.BigEndian.Uint16(b[:2]))
|
||||
i.offset = binary.BigEndian.Uint32(b[2:6])
|
||||
return nil
|
||||
}
|
||||
|
||||
// marshallBinary serializes the rawIndex entry into binary.
|
||||
func (i *indexEntry) marshallBinary() []byte {
|
||||
b := make([]byte, indexEntrySize)
|
||||
binary.BigEndian.PutUint16(b[:2], uint16(i.filenum))
|
||||
binary.BigEndian.PutUint32(b[2:6], i.offset)
|
||||
return b
|
||||
}
|
||||
|
||||
func newTable(path string, name string, maxFilesize uint32, logger logrus.FieldLogger) (*BlockTable, error) {
|
||||
if err := os.MkdirAll(path, 0755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
idxName := fmt.Sprintf("%s.ridx", name)
|
||||
offsets, err := openBlockFileForAppend(filepath.Join(path, idxName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
table := &BlockTable{
|
||||
index: offsets,
|
||||
files: make(map[uint32]*os.File),
|
||||
name: name,
|
||||
path: path,
|
||||
maxFileSize: maxFilesize,
|
||||
logger: logger,
|
||||
}
|
||||
if err := table.repair(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return table, nil
|
||||
}
|
||||
|
||||
func (b *BlockTable) repair() error {
|
||||
buffer := make([]byte, indexEntrySize)
|
||||
|
||||
stat, err := b.index.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stat.Size() == 0 {
|
||||
if _, err := b.index.Write(buffer); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if remainder := stat.Size() % indexEntrySize; remainder != 0 {
|
||||
err := truncateBlockFile(b.index, stat.Size()-remainder)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if stat, err = b.index.Stat(); err != nil {
|
||||
return err
|
||||
}
|
||||
offsetsSize := stat.Size()
|
||||
|
||||
// Open the head file
|
||||
var (
|
||||
firstIndex indexEntry
|
||||
lastIndex indexEntry
|
||||
contentSize int64
|
||||
contentExp int64
|
||||
)
|
||||
// Read index zero, determine what file is the earliest
|
||||
// and what item offset to use
|
||||
_, err = b.index.ReadAt(buffer, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = firstIndex.unmarshalBinary(buffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.tailId = firstIndex.filenum
|
||||
b.itemOffset = firstIndex.offset
|
||||
|
||||
_, err = b.index.ReadAt(buffer, offsetsSize-indexEntrySize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = lastIndex.unmarshalBinary(buffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.head, err = b.openFile(lastIndex.filenum, openBlockFileForAppend)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stat, err = b.head.Stat(); err != nil {
|
||||
return err
|
||||
}
|
||||
contentSize = stat.Size()
|
||||
|
||||
// Keep truncating both files until they come in sync
|
||||
contentExp = int64(lastIndex.offset)
|
||||
|
||||
for contentExp != contentSize {
|
||||
b.logger.WithFields(logrus.Fields{
|
||||
"indexed": contentExp,
|
||||
"stored": contentSize,
|
||||
}).Warn("Truncating dangling head")
|
||||
if contentExp < contentSize {
|
||||
if err := truncateBlockFile(b.head, contentExp); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if contentExp > contentSize {
|
||||
b.logger.WithFields(logrus.Fields{
|
||||
"indexed": contentExp,
|
||||
"stored": contentSize,
|
||||
}).Warn("Truncating dangling indexes")
|
||||
offsetsSize -= indexEntrySize
|
||||
_, err = b.index.ReadAt(buffer, offsetsSize-indexEntrySize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var newLastIndex indexEntry
|
||||
err = newLastIndex.unmarshalBinary(buffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// We might have slipped back into an earlier head-file here
|
||||
if newLastIndex.filenum != lastIndex.filenum {
|
||||
// Release earlier opened file
|
||||
b.releaseFile(lastIndex.filenum)
|
||||
if b.head, err = b.openFile(newLastIndex.filenum, openBlockFileForAppend); err != nil {
|
||||
return err
|
||||
}
|
||||
if stat, err = b.head.Stat(); err != nil {
|
||||
// TODO, anything more we can do here?
|
||||
// A data file has gone missing...
|
||||
return err
|
||||
}
|
||||
contentSize = stat.Size()
|
||||
}
|
||||
lastIndex = newLastIndex
|
||||
contentExp = int64(lastIndex.offset)
|
||||
}
|
||||
}
|
||||
// Ensure all reparation changes have been written to disk
|
||||
if err := b.index.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.head.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Update the item and byte counters and return
|
||||
b.items = uint64(b.itemOffset) + uint64(offsetsSize/indexEntrySize-1) // last indexEntry points to the end of the data file
|
||||
b.headBytes = uint32(contentSize)
|
||||
b.headId = lastIndex.filenum
|
||||
|
||||
// Close opened files and preopen all files
|
||||
if err := b.preopen(); err != nil {
|
||||
return err
|
||||
}
|
||||
b.logger.WithFields(logrus.Fields{
|
||||
"items": b.items,
|
||||
"size": b.headBytes,
|
||||
}).Debug("Chain freezer table opened")
|
||||
return nil
|
||||
}
|
||||
|
||||
// truncate discards any recent data above the provided threshold number.
|
||||
func (b *BlockTable) truncate(items uint64) error {
|
||||
b.lock.Lock()
|
||||
defer b.lock.Unlock()
|
||||
|
||||
existing := atomic.LoadUint64(&b.items)
|
||||
if existing <= items {
|
||||
return nil
|
||||
}
|
||||
|
||||
b.logger.WithFields(logrus.Fields{
|
||||
"items": existing,
|
||||
"limit": items,
|
||||
}).Warn("Truncating block file")
|
||||
if err := truncateBlockFile(b.index, int64(items+1)*indexEntrySize); err != nil {
|
||||
return err
|
||||
}
|
||||
// Calculate the new expected size of the data file and truncate it
|
||||
buffer := make([]byte, indexEntrySize)
|
||||
if _, err := b.index.ReadAt(buffer, int64(items*indexEntrySize)); err != nil {
|
||||
return err
|
||||
}
|
||||
var expected indexEntry
|
||||
err := expected.unmarshalBinary(buffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We might need to truncate back to older files
|
||||
if expected.filenum != b.headId {
|
||||
// If already open for reading, force-reopen for writing
|
||||
b.releaseFile(expected.filenum)
|
||||
newHead, err := b.openFile(expected.filenum, openBlockFileForAppend)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Release any files _after the current head -- both the previous head
|
||||
// and any files which may have been opened for reading
|
||||
b.releaseFilesAfter(expected.filenum, true)
|
||||
// Set back the historic head
|
||||
b.head = newHead
|
||||
atomic.StoreUint32(&b.headId, expected.filenum)
|
||||
}
|
||||
if err := truncateBlockFile(b.head, int64(expected.offset)); err != nil {
|
||||
return err
|
||||
}
|
||||
// All data files truncated, set internal counters and return
|
||||
atomic.StoreUint64(&b.items, items)
|
||||
atomic.StoreUint32(&b.headBytes, expected.offset)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlockTable) Retrieve(item uint64) ([]byte, error) {
|
||||
b.lock.RLock()
|
||||
|
||||
if b.index == nil || b.head == nil {
|
||||
b.lock.RUnlock()
|
||||
return nil, fmt.Errorf("closed")
|
||||
}
|
||||
if atomic.LoadUint64(&b.items) <= item {
|
||||
b.lock.RUnlock()
|
||||
return nil, fmt.Errorf("out of bounds")
|
||||
}
|
||||
if uint64(b.itemOffset) > item {
|
||||
b.lock.RUnlock()
|
||||
return nil, fmt.Errorf("out of bounds")
|
||||
}
|
||||
startOffset, endOffset, filenum, err := b.getBounds(item - uint64(b.itemOffset))
|
||||
if err != nil {
|
||||
b.lock.RUnlock()
|
||||
return nil, err
|
||||
}
|
||||
dataFile, exist := b.files[filenum]
|
||||
if !exist {
|
||||
b.lock.RUnlock()
|
||||
return nil, fmt.Errorf("missing data file %d", filenum)
|
||||
}
|
||||
blob := make([]byte, endOffset-startOffset)
|
||||
if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil {
|
||||
b.lock.RUnlock()
|
||||
return nil, err
|
||||
}
|
||||
b.lock.RUnlock()
|
||||
|
||||
return blob, nil
|
||||
}
|
||||
|
||||
func (b *BlockTable) Append(item uint64, blob []byte) error {
|
||||
b.lock.RLock()
|
||||
if b.index == nil || b.head == nil {
|
||||
b.lock.RUnlock()
|
||||
return fmt.Errorf("closed")
|
||||
}
|
||||
if atomic.LoadUint64(&b.items) != item {
|
||||
b.lock.RUnlock()
|
||||
return fmt.Errorf("appending unexpected item: want %d, have %d", b.items, item)
|
||||
}
|
||||
bLen := uint32(len(blob))
|
||||
if b.headBytes+bLen < bLen ||
|
||||
b.headBytes+bLen > b.maxFileSize {
|
||||
b.lock.RUnlock()
|
||||
b.lock.Lock()
|
||||
nextID := atomic.LoadUint32(&b.headId) + 1
|
||||
// We open the next file in truncated mode -- if this file already
|
||||
// exists, we need to start over from scratch on it
|
||||
newHead, err := b.openFile(nextID, openBlockFileTruncated)
|
||||
if err != nil {
|
||||
b.lock.Unlock()
|
||||
return err
|
||||
}
|
||||
// Close old file, and reopen in RDONLY mode
|
||||
b.releaseFile(b.headId)
|
||||
_, err = b.openFile(b.headId, openBlockFileForReadOnly)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Swap out the current head
|
||||
b.head = newHead
|
||||
atomic.StoreUint32(&b.headBytes, 0)
|
||||
atomic.StoreUint32(&b.headId, nextID)
|
||||
b.lock.Unlock()
|
||||
b.lock.RLock()
|
||||
}
|
||||
|
||||
defer b.lock.RUnlock()
|
||||
if _, err := b.head.Write(blob); err != nil {
|
||||
return err
|
||||
}
|
||||
newOffset := atomic.AddUint32(&b.headBytes, bLen)
|
||||
idx := indexEntry{
|
||||
filenum: atomic.LoadUint32(&b.headId),
|
||||
offset: newOffset,
|
||||
}
|
||||
// Write indexEntry
|
||||
_, _ = b.index.Write(idx.marshallBinary())
|
||||
|
||||
atomic.AddUint64(&b.items, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlockTable) getBounds(item uint64) (uint32, uint32, uint32, error) {
|
||||
buffer := make([]byte, indexEntrySize)
|
||||
var startIdx, endIdx indexEntry
|
||||
if _, err := b.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
if err := endIdx.unmarshalBinary(buffer); err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
if item != 0 {
|
||||
if _, err := b.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
if err := startIdx.unmarshalBinary(buffer); err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
} else {
|
||||
// the first reading
|
||||
return 0, endIdx.offset, endIdx.filenum, nil
|
||||
}
|
||||
if startIdx.filenum != endIdx.filenum {
|
||||
return 0, endIdx.offset, endIdx.filenum, nil
|
||||
}
|
||||
return startIdx.offset, endIdx.offset, endIdx.filenum, nil
|
||||
}
|
||||
|
||||
func (b *BlockTable) preopen() (err error) {
|
||||
b.releaseFilesAfter(0, false)
|
||||
|
||||
for i := b.tailId; i < b.headId; i++ {
|
||||
if _, err = b.openFile(i, openBlockFileForReadOnly); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
b.head, err = b.openFile(b.headId, openBlockFileForAppend)
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *BlockTable) openFile(num uint32, opener func(string) (*os.File, error)) (f *os.File, err error) {
|
||||
var exist bool
|
||||
if f, exist = b.files[num]; !exist {
|
||||
name := fmt.Sprintf("%s.%04d.rdat", b.name, num)
|
||||
f, err = opener(filepath.Join(b.path, name))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b.files[num] = f
|
||||
}
|
||||
return f, err
|
||||
}
|
||||
|
||||
// Close closes all opened files.
|
||||
func (b *BlockTable) Close() error {
|
||||
b.lock.Lock()
|
||||
defer b.lock.Unlock()
|
||||
|
||||
var errs []error
|
||||
if err := b.index.Close(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
b.index = nil
|
||||
|
||||
for _, f := range b.files {
|
||||
if err := f.Close(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
b.head = nil
|
||||
|
||||
if errs != nil {
|
||||
return fmt.Errorf("%v", errs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlockTable) releaseFilesAfter(num uint32, remove bool) {
|
||||
for fnum, f := range b.files {
|
||||
if fnum > num {
|
||||
delete(b.files, fnum)
|
||||
f.Close()
|
||||
if remove {
|
||||
os.Remove(f.Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BlockTable) releaseFile(num uint32) {
|
||||
if f, exist := b.files[num]; exist {
|
||||
delete(b.files, num)
|
||||
f.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func truncateBlockFile(file *os.File, size int64) error {
|
||||
if err := file.Truncate(size); err != nil {
|
||||
return err
|
||||
}
|
||||
// Seek to end for append
|
||||
if _, err := file.Seek(0, io.SeekEnd); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func openBlockFileForAppend(filename string) (*os.File, error) {
|
||||
file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Seek to end for append
|
||||
if _, err = file.Seek(0, io.SeekEnd); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return file, nil
|
||||
}
|
||||
|
||||
func openBlockFileTruncated(filename string) (*os.File, error) {
|
||||
return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
}
|
||||
|
||||
func openBlockFileForReadOnly(filename string) (*os.File, error) {
|
||||
return os.OpenFile(filename, os.O_RDONLY, 0644)
|
||||
}
|
|
@ -0,0 +1,275 @@
|
|||
package blockfile
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/meshplus/bitxhub-kit/log"
|
||||
"github.com/meshplus/bitxhub-kit/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func getChunk(size int, b int) []byte {
|
||||
data := make([]byte, size)
|
||||
for i := range data {
|
||||
data[i] = byte(b)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
func TestBlockFileBasics(t *testing.T) {
|
||||
f, err := NewBlockFile(os.TempDir(), log.NewWithModule("blockfile_test"))
|
||||
assert.Nil(t, err)
|
||||
defer f.Close()
|
||||
err = f.TruncateBlocks(uint64(0))
|
||||
assert.Nil(t, err)
|
||||
err = f.AppendBlock(uint64(0), types.NewHash([]byte{1}).Bytes(), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
assert.Nil(t, err)
|
||||
num, err := f.Blocks()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, uint64(1), num)
|
||||
|
||||
_, err = f.Get(BlockFileHashTable, uint64(1))
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestBlockTableBasics(t *testing.T) {
|
||||
// set cutoff at 50 bytes
|
||||
f, err := newTable(os.TempDir(),
|
||||
fmt.Sprintf("unittest-%d", rand.Uint64()), 2*1000*1000*1000, log.NewWithModule("blockfile_test"))
|
||||
assert.Nil(t, err)
|
||||
defer f.Close()
|
||||
// Write 15 bytes 255 times, results in 85 files
|
||||
for x := 0; x < 255; x++ {
|
||||
data := getChunk(15, x)
|
||||
f.Append(uint64(x), data)
|
||||
}
|
||||
for y := 0; y < 255; y++ {
|
||||
exp := getChunk(15, y)
|
||||
got, err := f.Retrieve(uint64(y))
|
||||
assert.Nil(t, err)
|
||||
if !bytes.Equal(got, exp) {
|
||||
t.Fatalf("test %d, got \n%x != \n%x", y, got, exp)
|
||||
}
|
||||
}
|
||||
// Check that we cannot read too far
|
||||
_, err = f.Retrieve(uint64(255))
|
||||
assert.Equal(t, fmt.Errorf("out of bounds"), err)
|
||||
}
|
||||
|
||||
func TestAppendBlocKCase1(t *testing.T) {
|
||||
f, err := NewBlockFile(os.TempDir(), log.NewWithModule("blockfile_test"))
|
||||
assert.Nil(t, err)
|
||||
defer f.Close()
|
||||
err = f.TruncateBlocks(uint64(0))
|
||||
assert.Nil(t, err)
|
||||
err = f.AppendBlock(uint64(0), types.NewHash([]byte{1}).Bytes(), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
assert.Nil(t, err)
|
||||
f.tables[BlockFileHashTable].items = 3
|
||||
err = f.AppendBlock(uint64(1), types.NewHash([]byte{1}).Bytes(), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestAppendBlocKCase2(t *testing.T) {
|
||||
f, err := NewBlockFile(os.TempDir(), log.NewWithModule("blockfile_test"))
|
||||
assert.Nil(t, err)
|
||||
defer f.Close()
|
||||
err = f.TruncateBlocks(uint64(0))
|
||||
assert.Nil(t, err)
|
||||
err = f.AppendBlock(uint64(0), types.NewHash([]byte{1}).Bytes(), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
assert.Nil(t, err)
|
||||
f.tables[BlockFileBodiesTable].items = 3
|
||||
err = f.AppendBlock(uint64(1), types.NewHash([]byte{1}).Bytes(), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestAppendBlocKCase3(t *testing.T) {
|
||||
f, err := NewBlockFile(os.TempDir(), log.NewWithModule("blockfile_test"))
|
||||
assert.Nil(t, err)
|
||||
defer f.Close()
|
||||
err = f.TruncateBlocks(uint64(0))
|
||||
assert.Nil(t, err)
|
||||
err = f.AppendBlock(uint64(0), types.NewHash([]byte{1}).Bytes(), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
assert.Nil(t, err)
|
||||
f.tables[BlockFileInterchainTable].items = 3
|
||||
err = f.AppendBlock(uint64(1), types.NewHash([]byte{1}).Bytes(), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestAppendBlocKCase4(t *testing.T) {
|
||||
f, err := NewBlockFile(os.TempDir(), log.NewWithModule("blockfile_test"))
|
||||
assert.Nil(t, err)
|
||||
defer f.Close()
|
||||
err = f.TruncateBlocks(uint64(0))
|
||||
assert.Nil(t, err)
|
||||
err = f.AppendBlock(uint64(0), types.NewHash([]byte{1}).Bytes(), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
assert.Nil(t, err)
|
||||
f.tables[BlockFileReceiptTable].items = 3
|
||||
err = f.AppendBlock(uint64(1), types.NewHash([]byte{1}).Bytes(), []byte("1"), []byte("1"), []byte("1"), []byte("1"))
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestBlockTableBasicsClosing(t *testing.T) {
|
||||
var (
|
||||
fname = fmt.Sprintf("basics-close-%d", rand.Uint64())
|
||||
logger = log.NewWithModule("blockfile_test")
|
||||
f *BlockTable
|
||||
err error
|
||||
)
|
||||
f, err = newTable(os.TempDir(), fname, 2*1000*1000*1000, logger)
|
||||
assert.Nil(t, err)
|
||||
// Write 15 bytes 255 times, results in 85 files
|
||||
for x := 0; x < 255; x++ {
|
||||
data := getChunk(15, x)
|
||||
f.Append(uint64(x), data)
|
||||
f.Close()
|
||||
f, err = newTable(os.TempDir(), fname, 2*1000*1000*1000, logger)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
for y := 0; y < 255; y++ {
|
||||
exp := getChunk(15, y)
|
||||
got, err := f.Retrieve(uint64(y))
|
||||
assert.Nil(t, err)
|
||||
if !bytes.Equal(got, exp) {
|
||||
t.Fatalf("test %d, got \n%x != \n%x", y, got, exp)
|
||||
}
|
||||
f.Close()
|
||||
f, err = newTable(os.TempDir(), fname, 2*1000*1000*1000, logger)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFreezerTruncate(t *testing.T) {
|
||||
fname := fmt.Sprintf("truncation-%d", rand.Uint64())
|
||||
logger := log.NewWithModule("blockfile_test")
|
||||
|
||||
{ // Fill table
|
||||
f, err := newTable(os.TempDir(), fname, 50, logger)
|
||||
assert.Nil(t, err)
|
||||
// Write 15 bytes 30 times
|
||||
for x := 0; x < 30; x++ {
|
||||
data := getChunk(15, x)
|
||||
f.Append(uint64(x), data)
|
||||
}
|
||||
// The last item should be there
|
||||
_, err = f.Retrieve(f.items - 1)
|
||||
assert.Nil(t, err)
|
||||
f.Close()
|
||||
}
|
||||
// Reopen, truncate
|
||||
{
|
||||
f, err := newTable(os.TempDir(), fname, 50, logger)
|
||||
assert.Nil(t, err)
|
||||
defer f.Close()
|
||||
// for x := 0; x < 20; x++ {
|
||||
// f.truncate(uint64(30 - x - 1)) // 150 bytes
|
||||
// }
|
||||
f.truncate(10)
|
||||
if f.items != 10 {
|
||||
t.Fatalf("expected %d items, got %d", 10, f.items)
|
||||
}
|
||||
// 45, 45, 45, 15 -- bytes should be 15
|
||||
if f.headBytes != 15 {
|
||||
t.Fatalf("expected %d bytes, got %d", 15, f.headBytes)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func TestFreezerReadAndTruncate(t *testing.T) {
|
||||
fname := fmt.Sprintf("read_truncate-%d", rand.Uint64())
|
||||
logger := log.NewWithModule("blockfile_test")
|
||||
{ // Fill table
|
||||
f, err := newTable(os.TempDir(), fname, 50, logger)
|
||||
assert.Nil(t, err)
|
||||
// Write 15 bytes 30 times
|
||||
for x := 0; x < 30; x++ {
|
||||
data := getChunk(15, x)
|
||||
f.Append(uint64(x), data)
|
||||
}
|
||||
// The last item should be there
|
||||
_, err = f.Retrieve(f.items - 1)
|
||||
assert.Nil(t, err)
|
||||
f.Close()
|
||||
}
|
||||
// Reopen and read all files
|
||||
{
|
||||
f, err := newTable(os.TempDir(), fname, 50, logger)
|
||||
assert.Nil(t, err)
|
||||
if f.items != 30 {
|
||||
f.Close()
|
||||
t.Fatalf("expected %d items, got %d", 0, f.items)
|
||||
}
|
||||
for y := byte(0); y < 30; y++ {
|
||||
f.Retrieve(uint64(y))
|
||||
}
|
||||
// Now, truncate back to zero
|
||||
f.truncate(0)
|
||||
// Write the data again
|
||||
for x := 0; x < 30; x++ {
|
||||
data := getChunk(15, ^x)
|
||||
err := f.Append(uint64(x), data)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestFreezerRepairFirstFile(t *testing.T) {
|
||||
fname := fmt.Sprintf("truncationfirst-%d", rand.Uint64())
|
||||
logger := log.NewWithModule("blockfile_test")
|
||||
{ // Fill table
|
||||
f, err := newTable(os.TempDir(), fname, 50, logger)
|
||||
assert.Nil(t, err)
|
||||
// Write 80 bytes, splitting out into two files
|
||||
f.Append(0, getChunk(40, 0xFF))
|
||||
f.Append(1, getChunk(40, 0xEE))
|
||||
// The last item should be there
|
||||
_, err = f.Retrieve(f.items - 1)
|
||||
assert.Nil(t, err)
|
||||
f.Close()
|
||||
}
|
||||
// Truncate the file in half
|
||||
fileToCrop := filepath.Join(os.TempDir(), fmt.Sprintf("%s.0001.rdat", fname))
|
||||
{
|
||||
err := assertFileSize(fileToCrop, 40)
|
||||
assert.Nil(t, err)
|
||||
file, err := os.OpenFile(fileToCrop, os.O_RDWR, 0644)
|
||||
assert.Nil(t, err)
|
||||
file.Truncate(20)
|
||||
file.Close()
|
||||
}
|
||||
// Reopen
|
||||
{
|
||||
f, err := newTable(os.TempDir(), fname, 50, logger)
|
||||
assert.Nil(t, err)
|
||||
if f.items != 1 {
|
||||
f.Close()
|
||||
t.Fatalf("expected %d items, got %d", 0, f.items)
|
||||
}
|
||||
// Write 40 bytes
|
||||
f.Append(1, getChunk(40, 0xDD))
|
||||
f.Close()
|
||||
// Should have been truncated down to zero and then 40 written
|
||||
err = assertFileSize(fileToCrop, 40)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func assertFileSize(f string, size int64) error {
|
||||
stat, err := os.Stat(f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stat.Size() != size {
|
||||
return fmt.Errorf("error, expected size %d, got %d", size, stat.Size())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package blockfile
|
||||
|
||||
const (
|
||||
// freezerHashTable indicates the name of the freezer canonical hash table.
|
||||
BlockFileHashTable = "hashes"
|
||||
|
||||
// freezerBodiesTable indicates the name of the freezer block body table.
|
||||
BlockFileBodiesTable = "bodies"
|
||||
|
||||
// freezerHeaderTable indicates the name of the freezer header table.
|
||||
BlockFileTXsTable = "transactions"
|
||||
|
||||
// freezerReceiptTable indicates the name of the freezer receipts table.
|
||||
BlockFileReceiptTable = "receipts"
|
||||
|
||||
// freezerReceiptTable indicates the name of the freezer receipts table.
|
||||
BlockFileInterchainTable = "interchain"
|
||||
)
|
||||
|
||||
var BlockFileSchema = map[string]bool{
|
||||
BlockFileHashTable: true,
|
||||
BlockFileBodiesTable: true,
|
||||
BlockFileTXsTable: true,
|
||||
BlockFileReceiptTable: true,
|
||||
BlockFileInterchainTable: true,
|
||||
}
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/meshplus/bitxhub-model/pb"
|
||||
"github.com/meshplus/bitxhub/internal/ledger"
|
||||
"github.com/meshplus/bitxhub/internal/repo"
|
||||
"github.com/meshplus/bitxhub/internal/storages/blockfile"
|
||||
"github.com/meshplus/bitxhub/pkg/cert"
|
||||
"github.com/meshplus/bitxhub/pkg/vm"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -62,7 +63,10 @@ func initCreateContext(t *testing.T, name string) *vm.Context {
|
|||
|
||||
accountCache, err := ledger.NewAccountCache()
|
||||
require.Nil(t, err)
|
||||
ldg, err := ledger.New(createMockRepo(t), store, ldb, accountCache, log.NewWithModule("executor"))
|
||||
logger := log.NewWithModule("account_test")
|
||||
blockFile, err := blockfile.NewBlockFile(dir, logger)
|
||||
assert.Nil(t, err)
|
||||
ldg, err := ledger.New(createMockRepo(t), store, ldb, blockFile, accountCache, log.NewWithModule("executor"))
|
||||
assert.Nil(t, err)
|
||||
|
||||
return &vm.Context{
|
||||
|
@ -95,7 +99,10 @@ func initValidationContext(t *testing.T, name string) *vm.Context {
|
|||
|
||||
accountCache, err := ledger.NewAccountCache()
|
||||
require.Nil(t, err)
|
||||
ldg, err := ledger.New(createMockRepo(t), store, ldb, accountCache, log.NewWithModule("executor"))
|
||||
logger := log.NewWithModule("account_test")
|
||||
blockFile, err := blockfile.NewBlockFile(dir, logger)
|
||||
assert.Nil(t, err)
|
||||
ldg, err := ledger.New(createMockRepo(t), store, ldb, blockFile, accountCache, log.NewWithModule("executor"))
|
||||
require.Nil(t, err)
|
||||
|
||||
return &vm.Context{
|
||||
|
@ -127,7 +134,10 @@ func initFabricContext(t *testing.T, name string) *vm.Context {
|
|||
|
||||
accountCache, err := ledger.NewAccountCache()
|
||||
require.Nil(t, err)
|
||||
ldg, err := ledger.New(createMockRepo(t), store, ldb, accountCache, log.NewWithModule("executor"))
|
||||
logger := log.NewWithModule("account_test")
|
||||
blockFile, err := blockfile.NewBlockFile(dir, logger)
|
||||
assert.Nil(t, err)
|
||||
ldg, err := ledger.New(createMockRepo(t), store, ldb, blockFile, accountCache, log.NewWithModule("executor"))
|
||||
require.Nil(t, err)
|
||||
|
||||
return &vm.Context{
|
||||
|
@ -258,7 +268,10 @@ func BenchmarkRunFabValidation(b *testing.B) {
|
|||
|
||||
accountCache, err := ledger.NewAccountCache()
|
||||
assert.Nil(b, err)
|
||||
ldg, err := ledger.New(&repo.Repo{Key: &repo.Key{PrivKey: privKey}}, store, ldb, accountCache, log.NewWithModule("executor"))
|
||||
logger := log.NewWithModule("account_test")
|
||||
blockFile, err := blockfile.NewBlockFile(dir, logger)
|
||||
assert.Nil(b, err)
|
||||
ldg, err := ledger.New(&repo.Repo{Key: &repo.Key{PrivKey: privKey}}, store, ldb, blockFile, accountCache, log.NewWithModule("executor"))
|
||||
require.Nil(b, err)
|
||||
ctx := &vm.Context{
|
||||
Caller: caller,
|
||||
|
|
Loading…
Reference in New Issue