diff -Nru golang-github-shopify-sarama-1.20.0/async_producer.go golang-github-shopify-sarama-1.20.1/async_producer.go --- golang-github-shopify-sarama-1.20.0/async_producer.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/async_producer.go 2019-01-10 17:48:46.000000000 +0000 @@ -837,9 +837,11 @@ }) if len(retryTopics) > 0 { - err := bp.parent.client.RefreshMetadata(retryTopics...) - if err != nil { - Logger.Printf("Failed refreshing metadata because of %v\n", err) + if bp.parent.conf.Producer.Idempotent { + err := bp.parent.client.RefreshMetadata(retryTopics...) + if err != nil { + Logger.Printf("Failed refreshing metadata because of %v\n", err) + } } sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { @@ -858,9 +860,13 @@ bp.currentRetries[topic] = make(map[int32]error) } bp.currentRetries[topic][partition] = block.Err + if bp.parent.conf.Producer.Idempotent { + go bp.parent.retryBatch(topic, partition, pSet, block.Err) + } else { + bp.parent.retryMessages(pSet.msgs, block.Err) + } // dropping the following messages has the side effect of incrementing their retry count bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err) - bp.parent.retryBatch(topic, partition, pSet, block.Err) } }) } diff -Nru golang-github-shopify-sarama-1.20.0/async_producer_test.go golang-github-shopify-sarama-1.20.1/async_producer_test.go --- golang-github-shopify-sarama-1.20.0/async_producer_test.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/async_producer_test.go 2019-01-10 17:48:46.000000000 +0000 @@ -300,7 +300,6 @@ for i := 0; i < 10; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} } - leader2.Returns(metadataLeader2) leader2.Returns(prodSuccess) expectResults(t, producer, 10, 0) @@ -468,7 +467,6 @@ seedBroker.Returns(metadataLeader1) leader1.Returns(prodNotLeader) seedBroker.Returns(metadataLeader2) - seedBroker.Returns(metadataLeader2) prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) @@ -654,7 +652,6 @@ // succeed this time expectResults(t, producer, 5, 0) - seedBroker.Returns(metadataResponse) // put five more through for i := 0; i < 5; i++ { diff -Nru golang-github-shopify-sarama-1.20.0/CHANGELOG.md golang-github-shopify-sarama-1.20.1/CHANGELOG.md --- golang-github-shopify-sarama-1.20.0/CHANGELOG.md 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/CHANGELOG.md 2019-01-10 17:48:46.000000000 +0000 @@ -1,5 +1,29 @@ # Changelog +#### Version 1.20.1 (2019-01-10) + +New Features: +- Add optional replica id in offset request + ([1100](https://github.com/Shopify/sarama/pull/1100)). + +Improvements: +- Implement DescribeConfigs Request + Response v1 & v2 + ([1230](https://github.com/Shopify/sarama/pull/1230)). +- Reuse compression objects + ([1185](https://github.com/Shopify/sarama/pull/1185)). +- Switch from png to svg for GoDoc link in README + ([1243](https://github.com/Shopify/sarama/pull/1243)). +- Fix typo in deprecation notice for FetchResponseBlock.Records + ([1242](https://github.com/Shopify/sarama/pull/1242)). +- Fix typos in consumer metadata response file + ([1244](https://github.com/Shopify/sarama/pull/1244)). + +Bug Fixes: +- Revert to individual msg retries for non-idempotent + ([1203](https://github.com/Shopify/sarama/pull/1203)). +- Respect MaxMessageBytes limit for uncompressed messages + ([1141](https://github.com/Shopify/sarama/pull/1141)). + #### Version 1.20.0 (2018-12-10) New Features: diff -Nru golang-github-shopify-sarama-1.20.0/compress.go golang-github-shopify-sarama-1.20.1/compress.go --- golang-github-shopify-sarama-1.20.0/compress.go 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/compress.go 2019-01-10 17:48:46.000000000 +0000 @@ -0,0 +1,75 @@ +package sarama + +import ( + "bytes" + "compress/gzip" + "fmt" + "sync" + + "github.com/eapache/go-xerial-snappy" + "github.com/pierrec/lz4" +) + +var ( + lz4WriterPool = sync.Pool{ + New: func() interface{} { + return lz4.NewWriter(nil) + }, + } + + gzipWriterPool = sync.Pool{ + New: func() interface{} { + return gzip.NewWriter(nil) + }, + } +) + +func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) { + switch cc { + case CompressionNone: + return data, nil + case CompressionGZIP: + var ( + err error + buf bytes.Buffer + writer *gzip.Writer + ) + if level != CompressionLevelDefault { + writer, err = gzip.NewWriterLevel(&buf, level) + if err != nil { + return nil, err + } + } else { + writer = gzipWriterPool.Get().(*gzip.Writer) + defer gzipWriterPool.Put(writer) + writer.Reset(&buf) + } + if _, err := writer.Write(data); err != nil { + return nil, err + } + if err := writer.Close(); err != nil { + return nil, err + } + return buf.Bytes(), nil + case CompressionSnappy: + return snappy.Encode(data), nil + case CompressionLZ4: + writer := lz4WriterPool.Get().(*lz4.Writer) + defer lz4WriterPool.Put(writer) + + var buf bytes.Buffer + writer.Reset(&buf) + + if _, err := writer.Write(data); err != nil { + return nil, err + } + if err := writer.Close(); err != nil { + return nil, err + } + return buf.Bytes(), nil + case CompressionZSTD: + return zstdCompressLevel(nil, data, level) + default: + return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)} + } +} diff -Nru golang-github-shopify-sarama-1.20.0/consumer_metadata_response.go golang-github-shopify-sarama-1.20.1/consumer_metadata_response.go --- golang-github-shopify-sarama-1.20.0/consumer_metadata_response.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/consumer_metadata_response.go 2019-01-10 17:48:46.000000000 +0000 @@ -5,7 +5,7 @@ "strconv" ) -//ConsumerMetadataResponse holds the reponse for a consumer gorup meta data request +//ConsumerMetadataResponse holds the response for a consumer group meta data requests type ConsumerMetadataResponse struct { Err KError Coordinator *Broker diff -Nru golang-github-shopify-sarama-1.20.0/debian/changelog golang-github-shopify-sarama-1.20.1/debian/changelog --- golang-github-shopify-sarama-1.20.0/debian/changelog 2019-01-09 13:08:00.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/debian/changelog 2019-01-25 13:17:30.000000000 +0000 @@ -1,3 +1,9 @@ +golang-github-shopify-sarama (1.20.1-1) unstable; urgency=medium + + * New upstream release + + -- Christos Trochalakis Fri, 25 Jan 2019 15:17:30 +0200 + golang-github-shopify-sarama (1.20.0-2) unstable; urgency=medium * Add missing Depends on golang-github-datadog-zstd-dev (Closes: #918511) diff -Nru golang-github-shopify-sarama-1.20.0/decompress.go golang-github-shopify-sarama-1.20.1/decompress.go --- golang-github-shopify-sarama-1.20.0/decompress.go 1970-01-01 00:00:00.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/decompress.go 2019-01-10 17:48:46.000000000 +0000 @@ -0,0 +1,63 @@ +package sarama + +import ( + "bytes" + "compress/gzip" + "fmt" + "io/ioutil" + "sync" + + "github.com/eapache/go-xerial-snappy" + "github.com/pierrec/lz4" +) + +var ( + lz4ReaderPool = sync.Pool{ + New: func() interface{} { + return lz4.NewReader(nil) + }, + } + + gzipReaderPool sync.Pool +) + +func decompress(cc CompressionCodec, data []byte) ([]byte, error) { + switch cc { + case CompressionNone: + return data, nil + case CompressionGZIP: + var ( + err error + reader *gzip.Reader + readerIntf = gzipReaderPool.Get() + ) + if readerIntf != nil { + reader = readerIntf.(*gzip.Reader) + } else { + reader, err = gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + } + + defer gzipReaderPool.Put(reader) + + if err := reader.Reset(bytes.NewReader(data)); err != nil { + return nil, err + } + + return ioutil.ReadAll(reader) + case CompressionSnappy: + return snappy.Decode(data) + case CompressionLZ4: + reader := lz4ReaderPool.Get().(*lz4.Reader) + defer lz4ReaderPool.Put(reader) + + reader.Reset(bytes.NewReader(data)) + return ioutil.ReadAll(reader) + case CompressionZSTD: + return zstdDecompress(nil, data) + default: + return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)} + } +} diff -Nru golang-github-shopify-sarama-1.20.0/describe_configs_request.go golang-github-shopify-sarama-1.20.1/describe_configs_request.go --- golang-github-shopify-sarama-1.20.0/describe_configs_request.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/describe_configs_request.go 2019-01-10 17:48:46.000000000 +0000 @@ -1,15 +1,17 @@ package sarama +type DescribeConfigsRequest struct { + Version int16 + Resources []*ConfigResource + IncludeSynonyms bool +} + type ConfigResource struct { Type ConfigResourceType Name string ConfigNames []string } -type DescribeConfigsRequest struct { - Resources []*ConfigResource -} - func (r *DescribeConfigsRequest) encode(pe packetEncoder) error { if err := pe.putArrayLength(len(r.Resources)); err != nil { return err @@ -30,6 +32,10 @@ } } + if r.Version >= 1 { + pe.putBool(r.IncludeSynonyms) + } + return nil } @@ -74,6 +80,14 @@ } r.Resources[i].ConfigNames = cfnames } + r.Version = version + if r.Version >= 1 { + b, err := pd.getBool() + if err != nil { + return err + } + r.IncludeSynonyms = b + } return nil } @@ -83,9 +97,16 @@ } func (r *DescribeConfigsRequest) version() int16 { - return 0 + return r.Version } func (r *DescribeConfigsRequest) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch r.Version { + case 1: + return V1_0_0_0 + case 2: + return V2_0_0_0 + default: + return V0_11_0_0 + } } diff -Nru golang-github-shopify-sarama-1.20.0/describe_configs_request_test.go golang-github-shopify-sarama-1.20.1/describe_configs_request_test.go --- golang-github-shopify-sarama-1.20.0/describe_configs_request_test.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/describe_configs_request_test.go 2019-01-10 17:48:46.000000000 +0000 @@ -36,20 +36,30 @@ 0, 0, 0, 1, // 1 config 2, // a topic 0, 3, 'f', 'o', 'o', // topic name: foo + 255, 255, 255, 255, // all configs + } + + singleDescribeConfigsRequestAllConfigsv1 = []byte{ + 0, 0, 0, 1, // 1 config + 2, // a topic + 0, 3, 'f', 'o', 'o', // topic name: foo 255, 255, 255, 255, // no configs + 1, //synoms } ) -func TestDescribeConfigsRequest(t *testing.T) { +func TestDescribeConfigsRequestv0(t *testing.T) { var request *DescribeConfigsRequest request = &DescribeConfigsRequest{ + Version: 0, Resources: []*ConfigResource{}, } testRequest(t, "no requests", request, emptyDescribeConfigsRequest) configs := []string{"segment.ms"} request = &DescribeConfigsRequest{ + Version: 0, Resources: []*ConfigResource{ &ConfigResource{ Type: TopicResource, @@ -62,6 +72,7 @@ testRequest(t, "one config", request, singleDescribeConfigsRequest) request = &DescribeConfigsRequest{ + Version: 0, Resources: []*ConfigResource{ &ConfigResource{ Type: TopicResource, @@ -78,6 +89,7 @@ testRequest(t, "two configs", request, doubleDescribeConfigsRequest) request = &DescribeConfigsRequest{ + Version: 0, Resources: []*ConfigResource{ &ConfigResource{ Type: TopicResource, @@ -88,3 +100,20 @@ testRequest(t, "one topic, all configs", request, singleDescribeConfigsRequestAllConfigs) } + +func TestDescribeConfigsRequestv1(t *testing.T) { + var request *DescribeConfigsRequest + + request = &DescribeConfigsRequest{ + Version: 1, + Resources: []*ConfigResource{ + { + Type: TopicResource, + Name: "foo", + }, + }, + IncludeSynonyms: true, + } + + testRequest(t, "one topic, all configs", request, singleDescribeConfigsRequestAllConfigsv1) +} diff -Nru golang-github-shopify-sarama-1.20.0/describe_configs_response.go golang-github-shopify-sarama-1.20.1/describe_configs_response.go --- golang-github-shopify-sarama-1.20.0/describe_configs_response.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/describe_configs_response.go 2019-01-10 17:48:46.000000000 +0000 @@ -1,8 +1,41 @@ package sarama -import "time" +import ( + "fmt" + "time" +) + +type ConfigSource int8 + +func (s ConfigSource) String() string { + switch s { + case SourceUnknown: + return "Unknown" + case SourceTopic: + return "Topic" + case SourceDynamicBroker: + return "DynamicBroker" + case SourceDynamicDefaultBroker: + return "DynamicDefaultBroker" + case SourceStaticBroker: + return "StaticBroker" + case SourceDefault: + return "Default" + } + return fmt.Sprintf("Source Invalid: %d", int(s)) +} + +const ( + SourceUnknown ConfigSource = 0 + SourceTopic ConfigSource = 1 + SourceDynamicBroker ConfigSource = 2 + SourceDynamicDefaultBroker ConfigSource = 3 + SourceStaticBroker ConfigSource = 4 + SourceDefault ConfigSource = 5 +) type DescribeConfigsResponse struct { + Version int16 ThrottleTime time.Duration Resources []*ResourceResponse } @@ -20,7 +53,15 @@ Value string ReadOnly bool Default bool + Source ConfigSource Sensitive bool + Synonyms []*ConfigSynonym +} + +type ConfigSynonym struct { + ConfigName string + ConfigValue string + Source ConfigSource } func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) { @@ -30,14 +71,16 @@ } for _, c := range r.Resources { - if err = c.encode(pe); err != nil { + if err = c.encode(pe, r.Version); err != nil { return err } } + return nil } func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version throttleTime, err := pd.getInt32() if err != nil { return err @@ -66,14 +109,21 @@ } func (r *DescribeConfigsResponse) version() int16 { - return 0 + return r.Version } func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion { - return V0_11_0_0 + switch r.Version { + case 1: + return V1_0_0_0 + case 2: + return V2_0_0_0 + default: + return V0_11_0_0 + } } -func (r *ResourceResponse) encode(pe packetEncoder) (err error) { +func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) { pe.putInt16(r.ErrorCode) if err = pe.putString(r.ErrorMsg); err != nil { @@ -91,7 +141,7 @@ } for _, c := range r.Configs { - if err = c.encode(pe); err != nil { + if err = c.encode(pe, version); err != nil { return err } } @@ -139,7 +189,7 @@ return nil } -func (r *ConfigEntry) encode(pe packetEncoder) (err error) { +func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) { if err = pe.putString(r.Name); err != nil { return err } @@ -149,12 +199,32 @@ } pe.putBool(r.ReadOnly) - pe.putBool(r.Default) - pe.putBool(r.Sensitive) + + if version <= 0 { + pe.putBool(r.Default) + pe.putBool(r.Sensitive) + } else { + pe.putInt8(int8(r.Source)) + pe.putBool(r.Sensitive) + + if err := pe.putArrayLength(len(r.Synonyms)); err != nil { + return err + } + for _, c := range r.Synonyms { + if err = c.encode(pe, version); err != nil { + return err + } + } + } + return nil } +//https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) { + if version == 0 { + r.Source = SourceUnknown + } name, err := pd.getString() if err != nil { return err @@ -173,16 +243,78 @@ } r.ReadOnly = read - de, err := pd.getBool() - if err != nil { - return err + if version == 0 { + defaultB, err := pd.getBool() + if err != nil { + return err + } + r.Default = defaultB + } else { + source, err := pd.getInt8() + if err != nil { + return err + } + r.Source = ConfigSource(source) } - r.Default = de sensitive, err := pd.getBool() if err != nil { return err } r.Sensitive = sensitive + + if version > 0 { + n, err := pd.getArrayLength() + if err != nil { + return err + } + r.Synonyms = make([]*ConfigSynonym, n) + + for i := 0; i < n; i++ { + s := &ConfigSynonym{} + if err := s.decode(pd, version); err != nil { + return err + } + r.Synonyms[i] = s + } + + } + return nil +} + +func (c *ConfigSynonym) encode(pe packetEncoder, version int16) (err error) { + err = pe.putString(c.ConfigName) + if err != nil { + return err + } + + err = pe.putString(c.ConfigValue) + if err != nil { + return err + } + + pe.putInt8(int8(c.Source)) + + return nil +} + +func (c *ConfigSynonym) decode(pd packetDecoder, version int16) error { + name, err := pd.getString() + if err != nil { + return nil + } + c.ConfigName = name + + value, err := pd.getString() + if err != nil { + return nil + } + c.ConfigValue = value + + source, err := pd.getInt8() + if err != nil { + return nil + } + c.Source = ConfigSource(source) return nil } diff -Nru golang-github-shopify-sarama-1.20.0/describe_configs_response_test.go golang-github-shopify-sarama-1.20.1/describe_configs_response_test.go --- golang-github-shopify-sarama-1.20.0/describe_configs_response_test.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/describe_configs_response_test.go 2019-01-10 17:48:46.000000000 +0000 @@ -10,7 +10,7 @@ 0, 0, 0, 0, // no configs } - describeConfigsResponsePopulated = []byte{ + describeConfigsResponsePopulatedv0 = []byte{ 0, 0, 0, 0, //throttle 0, 0, 0, 1, // response 0, 0, //errorcode @@ -24,9 +24,44 @@ 0, // Default 0, // Sensitive } + + describeConfigsResponsePopulatedv1 = []byte{ + 0, 0, 0, 0, //throttle + 0, 0, 0, 1, // response + 0, 0, //errorcode + 0, 0, //string + 2, // topic + 0, 3, 'f', 'o', 'o', + 0, 0, 0, 1, //configs + 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + 0, 4, '1', '0', '0', '0', + 0, // ReadOnly + 4, // Source + 0, // Sensitive + 0, 0, 0, 0, // No Synonym + } + + describeConfigsResponseWithSynonymv1 = []byte{ + 0, 0, 0, 0, //throttle + 0, 0, 0, 1, // response + 0, 0, //errorcode + 0, 0, //string + 2, // topic + 0, 3, 'f', 'o', 'o', + 0, 0, 0, 1, //configs + 0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + 0, 4, '1', '0', '0', '0', + 0, // ReadOnly + 4, // Source + 0, // Sensitive + 0, 0, 0, 1, // 1 Synonym + 0, 14, 'l', 'o', 'g', '.', 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's', + 0, 4, '1', '0', '0', '0', + 4, // Source + } ) -func TestDescribeConfigsResponse(t *testing.T) { +func TestDescribeConfigsResponsev0(t *testing.T) { var response *DescribeConfigsResponse response = &DescribeConfigsResponse{ @@ -38,7 +73,7 @@ } response = &DescribeConfigsResponse{ - Resources: []*ResourceResponse{ + Version: 0, Resources: []*ResourceResponse{ &ResourceResponse{ ErrorCode: 0, ErrorMsg: "", @@ -56,5 +91,81 @@ }, }, } - testResponse(t, "response with error", response, describeConfigsResponsePopulated) + testResponse(t, "response with error", response, describeConfigsResponsePopulatedv0) +} + +func TestDescribeConfigsResponsev1(t *testing.T) { + var response *DescribeConfigsResponse + + response = &DescribeConfigsResponse{ + Resources: []*ResourceResponse{}, + } + testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0) + if len(response.Resources) != 0 { + t.Error("Expected no groups") + } + + response = &DescribeConfigsResponse{ + Version: 1, + Resources: []*ResourceResponse{ + &ResourceResponse{ + ErrorCode: 0, + ErrorMsg: "", + Type: TopicResource, + Name: "foo", + Configs: []*ConfigEntry{ + &ConfigEntry{ + Name: "segment.ms", + Value: "1000", + ReadOnly: false, + Source: SourceStaticBroker, + Sensitive: false, + Synonyms: []*ConfigSynonym{}, + }, + }, + }, + }, + } + testResponse(t, "response with error", response, describeConfigsResponsePopulatedv1) +} + +func TestDescribeConfigsResponseWithSynonym(t *testing.T) { + var response *DescribeConfigsResponse + + response = &DescribeConfigsResponse{ + Resources: []*ResourceResponse{}, + } + testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0) + if len(response.Resources) != 0 { + t.Error("Expected no groups") + } + + response = &DescribeConfigsResponse{ + Version: 1, + Resources: []*ResourceResponse{ + &ResourceResponse{ + ErrorCode: 0, + ErrorMsg: "", + Type: TopicResource, + Name: "foo", + Configs: []*ConfigEntry{ + &ConfigEntry{ + Name: "segment.ms", + Value: "1000", + ReadOnly: false, + Source: SourceStaticBroker, + Sensitive: false, + Synonyms: []*ConfigSynonym{ + { + ConfigName: "log.segment.ms", + ConfigValue: "1000", + Source: SourceStaticBroker, + }, + }, + }, + }, + }, + }, + } + testResponse(t, "response with error", response, describeConfigsResponseWithSynonymv1) } diff -Nru golang-github-shopify-sarama-1.20.0/fetch_response.go golang-github-shopify-sarama-1.20.1/fetch_response.go --- golang-github-shopify-sarama-1.20.0/fetch_response.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/fetch_response.go 2019-01-10 17:48:46.000000000 +0000 @@ -33,7 +33,7 @@ HighWaterMarkOffset int64 LastStableOffset int64 AbortedTransactions []*AbortedTransaction - Records *Records // deprecated: use FetchResponseBlock.Records + Records *Records // deprecated: use FetchResponseBlock.RecordsSet RecordsSet []*Records Partial bool } diff -Nru golang-github-shopify-sarama-1.20.0/message.go golang-github-shopify-sarama-1.20.1/message.go --- golang-github-shopify-sarama-1.20.0/message.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/message.go 2019-01-10 17:48:46.000000000 +0000 @@ -1,14 +1,8 @@ package sarama import ( - "bytes" - "compress/gzip" "fmt" - "io/ioutil" "time" - - "github.com/eapache/go-xerial-snappy" - "github.com/pierrec/lz4" ) // CompressionCodec represents the various compression codecs recognized by Kafka in messages. @@ -77,53 +71,12 @@ payload = m.compressedCache m.compressedCache = nil } else if m.Value != nil { - switch m.Codec { - case CompressionNone: - payload = m.Value - case CompressionGZIP: - var buf bytes.Buffer - var writer *gzip.Writer - if m.CompressionLevel != CompressionLevelDefault { - writer, err = gzip.NewWriterLevel(&buf, m.CompressionLevel) - if err != nil { - return err - } - } else { - writer = gzip.NewWriter(&buf) - } - if _, err = writer.Write(m.Value); err != nil { - return err - } - if err = writer.Close(); err != nil { - return err - } - m.compressedCache = buf.Bytes() - payload = m.compressedCache - case CompressionSnappy: - tmp := snappy.Encode(m.Value) - m.compressedCache = tmp - payload = m.compressedCache - case CompressionLZ4: - var buf bytes.Buffer - writer := lz4.NewWriter(&buf) - if _, err = writer.Write(m.Value); err != nil { - return err - } - if err = writer.Close(); err != nil { - return err - } - m.compressedCache = buf.Bytes() - payload = m.compressedCache - case CompressionZSTD: - c, err := zstdCompressLevel(nil, m.Value, m.CompressionLevel) - if err != nil { - return err - } - m.compressedCache = c - payload = m.compressedCache - default: - return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)} + + payload, err = compress(m.Codec, m.CompressionLevel, m.Value) + if err != nil { + return err } + m.compressedCache = payload // Keep in mind the compressed payload size for metric gathering m.compressedSize = len(payload) } @@ -179,53 +132,18 @@ switch m.Codec { case CompressionNone: // nothing to do - case CompressionGZIP: + default: if m.Value == nil { break } - reader, err := gzip.NewReader(bytes.NewReader(m.Value)) + + m.Value, err = decompress(m.Codec, m.Value) if err != nil { return err } - if m.Value, err = ioutil.ReadAll(reader); err != nil { - return err - } if err := m.decodeSet(); err != nil { return err } - case CompressionSnappy: - if m.Value == nil { - break - } - if m.Value, err = snappy.Decode(m.Value); err != nil { - return err - } - if err := m.decodeSet(); err != nil { - return err - } - case CompressionLZ4: - if m.Value == nil { - break - } - reader := lz4.NewReader(bytes.NewReader(m.Value)) - if m.Value, err = ioutil.ReadAll(reader); err != nil { - return err - } - if err := m.decodeSet(); err != nil { - return err - } - case CompressionZSTD: - if m.Value == nil { - break - } - if m.Value, err = zstdDecompress(nil, m.Value); err != nil { - return err - } - if err := m.decodeSet(); err != nil { - return err - } - default: - return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)} } return pd.pop() diff -Nru golang-github-shopify-sarama-1.20.0/offset_request.go golang-github-shopify-sarama-1.20.1/offset_request.go --- golang-github-shopify-sarama-1.20.0/offset_request.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/offset_request.go 2019-01-10 17:48:46.000000000 +0000 @@ -27,12 +27,20 @@ } type OffsetRequest struct { - Version int16 - blocks map[string]map[int32]*offsetRequestBlock + Version int16 + replicaID int32 + isReplicaIDSet bool + blocks map[string]map[int32]*offsetRequestBlock } func (r *OffsetRequest) encode(pe packetEncoder) error { - pe.putInt32(-1) // replica ID is always -1 for clients + if r.isReplicaIDSet { + pe.putInt32(r.replicaID) + } else { + // default replica ID is always -1 for clients + pe.putInt32(-1) + } + err := pe.putArrayLength(len(r.blocks)) if err != nil { return err @@ -59,10 +67,14 @@ func (r *OffsetRequest) decode(pd packetDecoder, version int16) error { r.Version = version - // Ignore replica ID - if _, err := pd.getInt32(); err != nil { + replicaID, err := pd.getInt32() + if err != nil { return err } + if replicaID >= 0 { + r.SetReplicaID(replicaID) + } + blockCount, err := pd.getArrayLength() if err != nil { return err @@ -113,6 +125,18 @@ } } +func (r *OffsetRequest) SetReplicaID(id int32) { + r.replicaID = id + r.isReplicaIDSet = true +} + +func (r *OffsetRequest) ReplicaID() int32 { + if r.isReplicaIDSet { + return r.replicaID + } + return -1 +} + func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) { if r.blocks == nil { r.blocks = make(map[string]map[int32]*offsetRequestBlock) diff -Nru golang-github-shopify-sarama-1.20.0/offset_request_test.go golang-github-shopify-sarama-1.20.1/offset_request_test.go --- golang-github-shopify-sarama-1.20.0/offset_request_test.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/offset_request_test.go 2019-01-10 17:48:46.000000000 +0000 @@ -23,6 +23,10 @@ 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01} + + offsetRequestReplicaID = []byte{ + 0x00, 0x00, 0x00, 0x2a, + 0x00, 0x00, 0x00, 0x00} ) func TestOffsetRequest(t *testing.T) { @@ -41,3 +45,15 @@ request.AddBlock("bar", 4, 1, 2) // Last argument is ignored for V1 testRequest(t, "one block", request, offsetRequestOneBlockV1) } + +func TestOffsetRequestReplicaID(t *testing.T) { + request := new(OffsetRequest) + replicaID := int32(42) + request.SetReplicaID(replicaID) + + if found := request.ReplicaID(); found != replicaID { + t.Errorf("replicaID: expected %v, found %v", replicaID, found) + } + + testRequest(t, "with replica ID", request, offsetRequestReplicaID) +} diff -Nru golang-github-shopify-sarama-1.20.0/produce_set.go golang-github-shopify-sarama-1.20.1/produce_set.go --- golang-github-shopify-sarama-1.20.0/produce_set.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/produce_set.go 2019-01-10 17:48:46.000000000 +0000 @@ -222,9 +222,8 @@ // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)): return true - // Would we overflow the size-limit of a compressed message-batch for this partition? - case ps.parent.conf.Producer.Compression != CompressionNone && - ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil && + // Would we overflow the size-limit of a message-batch for this partition? + case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil && ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes: return true // Would we overflow simply in number of messages? diff -Nru golang-github-shopify-sarama-1.20.0/produce_set_test.go golang-github-shopify-sarama-1.20.1/produce_set_test.go --- golang-github-shopify-sarama-1.20.0/produce_set_test.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/produce_set_test.go 2019-01-10 17:48:46.000000000 +0000 @@ -35,10 +35,9 @@ } func TestProduceSetAddingMessages(t *testing.T) { - parent, ps := makeProduceSet() - parent.conf.Producer.Flush.MaxMessages = 1000 - + _, ps := makeProduceSet() msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)} + safeAddMessage(t, ps, msg) if ps.empty() { @@ -48,8 +47,15 @@ if !ps.readyToFlush() { t.Error("by default set should be ready to flush when any message is in place") } +} + +func TestProduceSetAddingMessagesOverflowMessagesLimit(t *testing.T) { + parent, ps := makeProduceSet() + parent.conf.Producer.Flush.MaxMessages = 1000 + + msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)} - for i := 0; i < 999; i++ { + for i := 0; i < 1000; i++ { if ps.wouldOverflow(msg) { t.Error("set shouldn't fill up after only", i+1, "messages") } @@ -61,6 +67,24 @@ } } +func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) { + parent, ps := makeProduceSet() + parent.conf.Producer.MaxMessageBytes = 1000 + + msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)} + + for ps.bufferBytes+msg.byteSize(2) < parent.conf.Producer.MaxMessageBytes { + if ps.wouldOverflow(msg) { + t.Error("set shouldn't fill up before 1000 bytes") + } + safeAddMessage(t, ps, msg) + } + + if !ps.wouldOverflow(msg) { + t.Error("set should be full after 1000 bytes") + } +} + func TestProduceSetPartitionTracking(t *testing.T) { _, ps := makeProduceSet() diff -Nru golang-github-shopify-sarama-1.20.0/README.md golang-github-shopify-sarama-1.20.1/README.md --- golang-github-shopify-sarama-1.20.0/README.md 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/README.md 2019-01-10 17:48:46.000000000 +0000 @@ -1,7 +1,7 @@ sarama ====== -[![GoDoc](https://godoc.org/github.com/Shopify/sarama?status.png)](https://godoc.org/github.com/Shopify/sarama) +[![GoDoc](https://godoc.org/github.com/Shopify/sarama?status.svg)](https://godoc.org/github.com/Shopify/sarama) [![Build Status](https://travis-ci.org/Shopify/sarama.svg?branch=master)](https://travis-ci.org/Shopify/sarama) [![Coverage](https://codecov.io/gh/Shopify/sarama/branch/master/graph/badge.svg)](https://codecov.io/gh/Shopify/sarama) diff -Nru golang-github-shopify-sarama-1.20.0/record_batch.go golang-github-shopify-sarama-1.20.1/record_batch.go --- golang-github-shopify-sarama-1.20.0/record_batch.go 2018-12-10 19:46:42.000000000 +0000 +++ golang-github-shopify-sarama-1.20.1/record_batch.go 2019-01-10 17:48:46.000000000 +0000 @@ -1,14 +1,8 @@ package sarama import ( - "bytes" - "compress/gzip" "fmt" - "io/ioutil" "time" - - "github.com/eapache/go-xerial-snappy" - "github.com/pierrec/lz4" ) const recordBatchOverhead = 49 @@ -174,31 +168,9 @@ return err } - switch b.Codec { - case CompressionNone: - case CompressionGZIP: - reader, err := gzip.NewReader(bytes.NewReader(recBuffer)) - if err != nil { - return err - } - if recBuffer, err = ioutil.ReadAll(reader); err != nil { - return err - } - case CompressionSnappy: - if recBuffer, err = snappy.Decode(recBuffer); err != nil { - return err - } - case CompressionLZ4: - reader := lz4.NewReader(bytes.NewReader(recBuffer)) - if recBuffer, err = ioutil.ReadAll(reader); err != nil { - return err - } - case CompressionZSTD: - if recBuffer, err = zstdDecompress(nil, recBuffer); err != nil { - return err - } - default: - return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)} + recBuffer, err = decompress(b.Codec, recBuffer) + if err != nil { + return err } b.recordsLen = len(recBuffer) @@ -219,50 +191,8 @@ } b.recordsLen = len(raw) - switch b.Codec { - case CompressionNone: - b.compressedRecords = raw - case CompressionGZIP: - var buf bytes.Buffer - var writer *gzip.Writer - if b.CompressionLevel != CompressionLevelDefault { - writer, err = gzip.NewWriterLevel(&buf, b.CompressionLevel) - if err != nil { - return err - } - } else { - writer = gzip.NewWriter(&buf) - } - if _, err := writer.Write(raw); err != nil { - return err - } - if err := writer.Close(); err != nil { - return err - } - b.compressedRecords = buf.Bytes() - case CompressionSnappy: - b.compressedRecords = snappy.Encode(raw) - case CompressionLZ4: - var buf bytes.Buffer - writer := lz4.NewWriter(&buf) - if _, err := writer.Write(raw); err != nil { - return err - } - if err := writer.Close(); err != nil { - return err - } - b.compressedRecords = buf.Bytes() - case CompressionZSTD: - c, err := zstdCompressLevel(nil, raw, b.CompressionLevel) - if err != nil { - return err - } - b.compressedRecords = c - default: - return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)} - } - - return nil + b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw) + return err } func (b *RecordBatch) computeAttributes() int16 {