383 lines
10 KiB
Go
383 lines
10 KiB
Go
|
package sarama
|
||
|
|
||
|
import "errors"
|
||
|
|
||
|
// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
|
||
|
// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
|
||
|
// Methods with stricter requirements will specify the minimum broker version required.
|
||
|
// You MUST call Close() on a client to avoid leaks
|
||
|
type ClusterAdmin interface {
|
||
|
// Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
|
||
|
// It may take several seconds after CreateTopic returns success for all the brokers
|
||
|
// to become aware that the topic has been created. During this time, listTopics
|
||
|
// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
|
||
|
CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
|
||
|
|
||
|
// Delete a topic. It may take several seconds after the DeleteTopic to returns success
|
||
|
// and for all the brokers to become aware that the topics are gone.
|
||
|
// During this time, listTopics may continue to return information about the deleted topic.
|
||
|
// If delete.topic.enable is false on the brokers, deleteTopic will mark
|
||
|
// the topic for deletion, but not actually delete them.
|
||
|
// This operation is supported by brokers with version 0.10.1.0 or higher.
|
||
|
DeleteTopic(topic string) error
|
||
|
|
||
|
// Increase the number of partitions of the topics according to the corresponding values.
|
||
|
// If partitions are increased for a topic that has a key, the partition logic or ordering of
|
||
|
// the messages will be affected. It may take several seconds after this method returns
|
||
|
// success for all the brokers to become aware that the partitions have been created.
|
||
|
// During this time, ClusterAdmin#describeTopics may not return information about the
|
||
|
// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
|
||
|
CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
|
||
|
|
||
|
// Delete records whose offset is smaller than the given offset of the corresponding partition.
|
||
|
// This operation is supported by brokers with version 0.11.0.0 or higher.
|
||
|
DeleteRecords(topic string, partitionOffsets map[int32]int64) error
|
||
|
|
||
|
// Get the configuration for the specified resources.
|
||
|
// The returned configuration includes default values and the Default is true
|
||
|
// can be used to distinguish them from user supplied values.
|
||
|
// Config entries where ReadOnly is true cannot be updated.
|
||
|
// The value of config entries where Sensitive is true is always nil so
|
||
|
// sensitive information is not disclosed.
|
||
|
// This operation is supported by brokers with version 0.11.0.0 or higher.
|
||
|
DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
|
||
|
|
||
|
// Update the configuration for the specified resources with the default options.
|
||
|
// This operation is supported by brokers with version 0.11.0.0 or higher.
|
||
|
// The resources with their configs (topic is the only resource type with configs
|
||
|
// that can be updated currently Updates are not transactional so they may succeed
|
||
|
// for some resources while fail for others. The configs for a particular resource are updated automatically.
|
||
|
AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
|
||
|
|
||
|
// Creates access control lists (ACLs) which are bound to specific resources.
|
||
|
// This operation is not transactional so it may succeed for some ACLs while fail for others.
|
||
|
// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
|
||
|
// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
|
||
|
CreateACL(resource Resource, acl Acl) error
|
||
|
|
||
|
// Lists access control lists (ACLs) according to the supplied filter.
|
||
|
// it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
|
||
|
// This operation is supported by brokers with version 0.11.0.0 or higher.
|
||
|
ListAcls(filter AclFilter) ([]ResourceAcls, error)
|
||
|
|
||
|
// Deletes access control lists (ACLs) according to the supplied filters.
|
||
|
// This operation is not transactional so it may succeed for some ACLs while fail for others.
|
||
|
// This operation is supported by brokers with version 0.11.0.0 or higher.
|
||
|
DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
|
||
|
|
||
|
// Close shuts down the admin and closes underlying client.
|
||
|
Close() error
|
||
|
}
|
||
|
|
||
|
type clusterAdmin struct {
|
||
|
client Client
|
||
|
conf *Config
|
||
|
}
|
||
|
|
||
|
// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
|
||
|
func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
|
||
|
client, err := NewClient(addrs, conf)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
//make sure we can retrieve the controller
|
||
|
_, err = client.Controller()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
ca := &clusterAdmin{
|
||
|
client: client,
|
||
|
conf: client.Config(),
|
||
|
}
|
||
|
return ca, nil
|
||
|
}
|
||
|
|
||
|
func (ca *clusterAdmin) Close() error {
|
||
|
return ca.client.Close()
|
||
|
}
|
||
|
|
||
|
func (ca *clusterAdmin) Controller() (*Broker, error) {
|
||
|
return ca.client.Controller()
|
||
|
}
|
||
|
|
||
|
func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
|
||
|
|
||
|
if topic == "" {
|
||
|
return ErrInvalidTopic
|
||
|
}
|
||
|
|
||
|
if detail == nil {
|
||
|
return errors.New("You must specify topic details")
|
||
|
}
|
||
|
|
||
|
topicDetails := make(map[string]*TopicDetail)
|
||
|
topicDetails[topic] = detail
|
||
|
|
||
|
request := &CreateTopicsRequest{
|
||
|
TopicDetails: topicDetails,
|
||
|
ValidateOnly: validateOnly,
|
||
|
Timeout: ca.conf.Admin.Timeout,
|
||
|
}
|
||
|
|
||
|
if ca.conf.Version.IsAtLeast(V0_11_0_0) {
|
||
|
request.Version = 1
|
||
|
}
|
||
|
if ca.conf.Version.IsAtLeast(V1_0_0_0) {
|
||
|
request.Version = 2
|
||
|
}
|
||
|
|
||
|
b, err := ca.Controller()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
rsp, err := b.CreateTopics(request)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
topicErr, ok := rsp.TopicErrors[topic]
|
||
|
if !ok {
|
||
|
return ErrIncompleteResponse
|
||
|
}
|
||
|
|
||
|
if topicErr.Err != ErrNoError {
|
||
|
return topicErr.Err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ca *clusterAdmin) DeleteTopic(topic string) error {
|
||
|
|
||
|
if topic == "" {
|
||
|
return ErrInvalidTopic
|
||
|
}
|
||
|
|
||
|
request := &DeleteTopicsRequest{
|
||
|
Topics: []string{topic},
|
||
|
Timeout: ca.conf.Admin.Timeout,
|
||
|
}
|
||
|
|
||
|
if ca.conf.Version.IsAtLeast(V0_11_0_0) {
|
||
|
request.Version = 1
|
||
|
}
|
||
|
|
||
|
b, err := ca.Controller()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
rsp, err := b.DeleteTopics(request)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
topicErr, ok := rsp.TopicErrorCodes[topic]
|
||
|
if !ok {
|
||
|
return ErrIncompleteResponse
|
||
|
}
|
||
|
|
||
|
if topicErr != ErrNoError {
|
||
|
return topicErr
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
|
||
|
if topic == "" {
|
||
|
return ErrInvalidTopic
|
||
|
}
|
||
|
|
||
|
topicPartitions := make(map[string]*TopicPartition)
|
||
|
topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
|
||
|
|
||
|
request := &CreatePartitionsRequest{
|
||
|
TopicPartitions: topicPartitions,
|
||
|
Timeout: ca.conf.Admin.Timeout,
|
||
|
}
|
||
|
|
||
|
b, err := ca.Controller()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
rsp, err := b.CreatePartitions(request)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
topicErr, ok := rsp.TopicPartitionErrors[topic]
|
||
|
if !ok {
|
||
|
return ErrIncompleteResponse
|
||
|
}
|
||
|
|
||
|
if topicErr.Err != ErrNoError {
|
||
|
return topicErr.Err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
|
||
|
|
||
|
if topic == "" {
|
||
|
return ErrInvalidTopic
|
||
|
}
|
||
|
|
||
|
topics := make(map[string]*DeleteRecordsRequestTopic)
|
||
|
topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
|
||
|
request := &DeleteRecordsRequest{
|
||
|
Topics: topics,
|
||
|
Timeout: ca.conf.Admin.Timeout,
|
||
|
}
|
||
|
|
||
|
b, err := ca.Controller()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
rsp, err := b.DeleteRecords(request)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
_, ok := rsp.Topics[topic]
|
||
|
if !ok {
|
||
|
return ErrIncompleteResponse
|
||
|
}
|
||
|
|
||
|
//todo since we are dealing with couple of partitions it would be good if we return slice of errors
|
||
|
//for each partition instead of one error
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
|
||
|
|
||
|
var entries []ConfigEntry
|
||
|
var resources []*ConfigResource
|
||
|
resources = append(resources, &resource)
|
||
|
|
||
|
request := &DescribeConfigsRequest{
|
||
|
Resources: resources,
|
||
|
}
|
||
|
|
||
|
b, err := ca.Controller()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
rsp, err := b.DescribeConfigs(request)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
for _, rspResource := range rsp.Resources {
|
||
|
if rspResource.Name == resource.Name {
|
||
|
if rspResource.ErrorMsg != "" {
|
||
|
return nil, errors.New(rspResource.ErrorMsg)
|
||
|
}
|
||
|
for _, cfgEntry := range rspResource.Configs {
|
||
|
entries = append(entries, *cfgEntry)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return entries, nil
|
||
|
}
|
||
|
|
||
|
func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
|
||
|
|
||
|
var resources []*AlterConfigsResource
|
||
|
resources = append(resources, &AlterConfigsResource{
|
||
|
Type: resourceType,
|
||
|
Name: name,
|
||
|
ConfigEntries: entries,
|
||
|
})
|
||
|
|
||
|
request := &AlterConfigsRequest{
|
||
|
Resources: resources,
|
||
|
ValidateOnly: validateOnly,
|
||
|
}
|
||
|
|
||
|
b, err := ca.Controller()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
rsp, err := b.AlterConfigs(request)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
for _, rspResource := range rsp.Resources {
|
||
|
if rspResource.Name == name {
|
||
|
if rspResource.ErrorMsg != "" {
|
||
|
return errors.New(rspResource.ErrorMsg)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
|
||
|
var acls []*AclCreation
|
||
|
acls = append(acls, &AclCreation{resource, acl})
|
||
|
request := &CreateAclsRequest{AclCreations: acls}
|
||
|
|
||
|
b, err := ca.Controller()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
_, err = b.CreateAcls(request)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
|
||
|
|
||
|
request := &DescribeAclsRequest{AclFilter: filter}
|
||
|
|
||
|
b, err := ca.Controller()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
rsp, err := b.DescribeAcls(request)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
var lAcls []ResourceAcls
|
||
|
for _, rAcl := range rsp.ResourceAcls {
|
||
|
lAcls = append(lAcls, *rAcl)
|
||
|
}
|
||
|
return lAcls, nil
|
||
|
}
|
||
|
|
||
|
func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
|
||
|
var filters []*AclFilter
|
||
|
filters = append(filters, &filter)
|
||
|
request := &DeleteAclsRequest{Filters: filters}
|
||
|
|
||
|
b, err := ca.Controller()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
rsp, err := b.DeleteAcls(request)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
var mAcls []MatchingAcl
|
||
|
for _, fr := range rsp.FilterResponses {
|
||
|
for _, mACL := range fr.MatchingAcls {
|
||
|
mAcls = append(mAcls, *mACL)
|
||
|
}
|
||
|
|
||
|
}
|
||
|
return mAcls, nil
|
||
|
}
|