Skip to content

Commit

Permalink
Implement Chain Exchange protocol over pubsub
Browse files Browse the repository at this point in the history
Implement chain exchange protocol over pubsub as a mechanism to
propagate `ECChain` across the network with reasonable spam protection.

To protect against spam the mechanism employs two separate caches for
chains that are generally discovered across the network and the ones
explicitly looked up or broadcasted by the local node. Both caches are
capped LRU, where the LRU recent-ness is used as a way to prioritise
chains we cache while keeping the total memory footprint fixed. This
approach is not the most memory efficient but is simpler to implement
as the LRU encapsulates a lot of the complexity.

The code has a lot of TODOs as places to improve or question to the
reviewer. To action most of the TODOs further refactoring across the
code is needed which is intended to be actioned in separate commits.

The code path introduced here is not integrated into F3 host; future PRs
will iteratively integrate the mechanism across F3 host and other
places.

Part of #792
  • Loading branch information
masih committed Dec 19, 2024
1 parent d4f3a0c commit 7f3d3ae
Show file tree
Hide file tree
Showing 7 changed files with 687 additions and 0 deletions.
135 changes: 135 additions & 0 deletions chainexchange/cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions chainexchange/chainexchange.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package chainexchange

import (
"context"

"github.com/filecoin-project/go-f3/gpbft"
)

type Key []byte

type Keyer interface {
Key(gpbft.ECChain) Key
}

type Message struct {
Instance uint64
Chain gpbft.ECChain
}

type ChainExchange interface {
Keyer
Broadcast(context.Context, Message) error
GetChainByInstance(context.Context, uint64, Key) (gpbft.ECChain, bool)
RemoveChainsByInstance(context.Context, uint64) error
}

func (k Key) IsZero() bool { return len(k) == 0 }
134 changes: 134 additions & 0 deletions chainexchange/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package chainexchange

import (
"errors"

"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/manifest"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

type Option func(*options) error

type options struct {
topicName string
topicScoreParams *pubsub.TopicScoreParams
subscriptionBufferSize int
pubsub *pubsub.PubSub
progress gpbft.Progress
maxChainLength int
maxInstanceLookahead uint64
maxDiscoveredChainsPerInstance int
maxWantedChainsPerInstance int
}

func newOptions(o ...Option) (*options, error) {
opts := &options{
topicScoreParams: psutil.PubsubTopicScoreParams,
subscriptionBufferSize: 32,
maxChainLength: gpbft.ChainMaxLen,
maxInstanceLookahead: manifest.DefaultCommitteeLookback,
maxDiscoveredChainsPerInstance: 1000,
maxWantedChainsPerInstance: 1000,
}
for _, apply := range o {
if err := apply(opts); err != nil {
return nil, err
}

Check warning on line 38 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L37-L38

Added lines #L37 - L38 were not covered by tests
}
if opts.progress == nil {
return nil, errors.New("gpbft progress must be set")
}

Check warning on line 42 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L41-L42

Added lines #L41 - L42 were not covered by tests
if opts.pubsub == nil {
return nil, errors.New("pubsub must be set")
}

Check warning on line 45 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L44-L45

Added lines #L44 - L45 were not covered by tests
if opts.topicName == "" {
return nil, errors.New("topic name must be set")
}

Check warning on line 48 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L47-L48

Added lines #L47 - L48 were not covered by tests
return opts, nil
}

func WithTopicName(name string) Option {
return func(o *options) error {
if name == "" {
return errors.New("topic name cannot be empty")
}

Check warning on line 56 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L55-L56

Added lines #L55 - L56 were not covered by tests
o.topicName = name
return nil
}
}

func WithTopicScoreParams(params *pubsub.TopicScoreParams) Option {
return func(o *options) error {
o.topicScoreParams = params
return nil
}
}

func WithSubscriptionBufferSize(size int) Option {
return func(o *options) error {
if size < 1 {
return errors.New("subscription buffer size must be at least 1")
}
o.subscriptionBufferSize = size
return nil

Check warning on line 75 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L69-L75

Added lines #L69 - L75 were not covered by tests
}
}

func WithPubSub(pubsub *pubsub.PubSub) Option {
return func(o *options) error {
if pubsub == nil {
return errors.New("pubsub cannot be nil")
}

Check warning on line 83 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L82-L83

Added lines #L82 - L83 were not covered by tests
o.pubsub = pubsub
return nil
}
}

func WithProgress(progress gpbft.Progress) Option {
return func(o *options) error {
if progress == nil {
return errors.New("progress cannot be nil")
}

Check warning on line 93 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L92-L93

Added lines #L92 - L93 were not covered by tests
o.progress = progress
return nil
}
}

func WithMaxChainLength(length int) Option {
return func(o *options) error {
if length < 1 {
return errors.New("max chain length must be at least 1")
}
o.maxChainLength = length
return nil

Check warning on line 105 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L99-L105

Added lines #L99 - L105 were not covered by tests
}
}

func WithMaxInstanceLookahead(lookahead uint64) Option {
return func(o *options) error {
o.maxInstanceLookahead = lookahead
return nil
}

Check warning on line 113 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L109-L113

Added lines #L109 - L113 were not covered by tests
}

func WithMaxDiscoveredChainsPerInstance(max int) Option {
return func(o *options) error {
if max < 1 {
return errors.New("max discovered chains per instance must be at least 1")
}
o.maxDiscoveredChainsPerInstance = max
return nil

Check warning on line 122 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L116-L122

Added lines #L116 - L122 were not covered by tests
}
}

func WithMaxWantedChainsPerInstance(max int) Option {
return func(o *options) error {
if max < 1 {
return errors.New("max wanted chains per instance must be at least 1")
}
o.maxWantedChainsPerInstance = max
return nil

Check warning on line 132 in chainexchange/options.go

View check run for this annotation

Codecov / codecov/patch

chainexchange/options.go#L126-L132

Added lines #L126 - L132 were not covered by tests
}
}
Loading

0 comments on commit 7f3d3ae

Please sign in to comment.