forked from redpanda-data/redpanda
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request redpanda-data#13 from weeco/add-requesting-examples
Add requesting examples
- Loading branch information
Showing
4 changed files
with
102 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
package connect | ||
package connecting | ||
|
||
import ( | ||
"fmt" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,6 @@ | ||
package connect | ||
package connecting | ||
|
||
import ( | ||
"context" | ||
"crypto/tls" | ||
"fmt" | ||
"net" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
# Message requests | ||
|
||
With kafka-go you can directly construct message requests and send them to your Kafka cluster. There are three options | ||
how you can issue requests: | ||
|
||
- Using the broker's `Request` method | ||
- Using the client's `Request` method | ||
- Using the message's `RequestWith` method | ||
|
||
## Broker Requests | ||
|
||
**Interface:** | ||
|
||
```go | ||
func (b *Broker) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) | ||
``` | ||
|
||
Reference: https://pkg.go.dev/github.com/twmb/kafka-go/pkg/kgo#Broker.Request | ||
|
||
Use this method only if you need to send requests to a specific broker. The actual response type has to be asserted. | ||
Requests sent using this method are not retried. | ||
|
||
|
||
## Client Requests | ||
|
||
```go | ||
func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error) | ||
``` | ||
|
||
Reference: https://pkg.go.dev/github.com/twmb/kafka-go/pkg/kgo#Client.Request | ||
|
||
The client provides a lot functionality making sure that your message request will be sent in the most efficient manner | ||
to the right set of brokers. Additionally it will retry your requests if needed. The actual response type has to be | ||
asserted. | ||
|
||
## Message Requests | ||
|
||
```go | ||
// Example for ListOffsetsRequest | ||
func (v *ListOffsetsRequest) RequestWith(ctx context.Context, r Requestor) (*ListOffsetsResponse, error) | ||
``` | ||
|
||
Reference: https://pkg.go.dev/github.com/twmb/kafka-go/pkg/kmsg | ||
|
||
Each request message in the `kmsg` package has it's own `RequestWith` method which accepts a context and an interface | ||
which `Client` already fulfills. This method uses the client's `Request` method with the advantage that you don't | ||
need to assert the actual response type. | ||
|
||
Most commonly you want to use this method to send requests to Kafka. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package requesting | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"github.com/twmb/kafka-go/pkg/kerr" | ||
"github.com/twmb/kafka-go/pkg/kgo" | ||
"github.com/twmb/kafka-go/pkg/kmsg" | ||
"github.com/twmb/kafka-go/pkg/kversion" | ||
"time" | ||
) | ||
|
||
func requestMetadata() { | ||
seeds := []string{"localhost:9092"} | ||
client, err := kgo.NewClient( | ||
kgo.SeedBrokers(seeds...), | ||
|
||
// Do not try to send requests newer than 2.4.0 to avoid breaking changes in the request struct. | ||
// Sometimes there are breaking changes for newer versions where more properties are required to set. | ||
kgo.MaxVersions(kversion.V2_4_0()), | ||
) | ||
if err != nil { | ||
panic(err) | ||
} | ||
defer client.Close() | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) | ||
defer cancel() | ||
|
||
// Construct message request and send it to Kafka | ||
req := kmsg.MetadataRequest{ | ||
Topics: []kmsg.MetadataRequestTopic{}, | ||
} | ||
|
||
res, err := req.RequestWith(ctx, client) | ||
if err != nil { | ||
// Error during request has happened (e. g. context cancelled) | ||
panic(err) | ||
} | ||
|
||
// Check response for Kafka error codes and print them. | ||
// Other requests might have top level error codes, which indicate completed but failed requests. | ||
for _, topic := range res.Topics { | ||
err := kerr.ErrorForCode(topic.ErrorCode) | ||
if err != nil { | ||
fmt.Printf("topic %v response has errored: %v\n", topic.Topic, err.Error()) | ||
} | ||
} | ||
|
||
fmt.Printf("received '%v' topics and '%v' brokers", len(res.Topics), len(res.Brokers)) | ||
} |