Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: capture MetadataRequest and ProduceRequest #39

Open
wants to merge 1 commit into
base: feature/kafka
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions core/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@ package core

import (
"fmt"
"github.com/google/gopacket/pcap"
"log"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/tcpassembly"
"github.com/google/gopacket/tcpassembly/tcpreader"
"time"
)

type Dispatch struct {
device string
device string
payload []byte
Plug *Plug
Plug *Plug
}

func NewDispatch(plug *Plug, cmd *Cmd) *Dispatch {
return &Dispatch {
Plug: plug,
device:cmd.Device,
return &Dispatch{
Plug: plug,
device: cmd.Device,
}
}

Expand All @@ -40,16 +41,16 @@ func (d *Dispatch) Capture() {
}

// Capture
src := gopacket.NewPacketSource(handle, handle.LinkType())
packets := src.Packets()
src := gopacket.NewPacketSource(handle, handle.LinkType())
packets := src.Packets() // get packet chan

// Set up assembly
streamFactory := &ProtocolStreamFactory{
dispatch:d,
dispatch: d,
}
streamPool := NewStreamPool(streamFactory)
assembler := NewAssembler(streamPool)
ticker := time.Tick(time.Minute)
assembler := NewAssembler(streamPool)
ticker := time.Tick(time.Minute)

// Loop until ctrl+z
for {
Expand Down Expand Up @@ -84,7 +85,7 @@ type ProtocolStream struct {
func (m *ProtocolStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {

//init stream struct
stm := &ProtocolStream {
stm := &ProtocolStream{
net: net,
transport: transport,
r: tcpreader.NewReaderStream(),
Expand All @@ -97,4 +98,4 @@ func (m *ProtocolStreamFactory) New(net, transport gopacket.Flow) tcpassembly.St
go m.dispatch.Plug.ResolveStream(net, transport, &(stm.r))

return &(stm.r)
}
}
Binary file added go-sniffer
Binary file not shown.
Binary file added kafka.pcap
Binary file not shown.
31 changes: 26 additions & 5 deletions plugSrc/kafka/build/const.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package build

const (
ProduceRequest = 0
FetchRequest = 1
OffsetRequest = 2
MetadataRequest = 3
ProduceRequest = 0
FetchRequest = 1
OffsetRequest = 2
MetadataRequest = 3
//Non-user facing control APIs = 4-7
OffsetCommitRequest = 8
OffsetFetchRequest = 9
GroupCoordinatorRequest = 10
GroupCoordinatorRequest = 10
JoinGroupRequest = 11
HeartbeatRequest = 12
LeaveGroupRequest = 13
Expand All @@ -19,6 +19,27 @@ const (
CreateTopicsReqKind = 19
)

const ()

var RquestNameMap = map[int16]string{
0: "ProduceRequest",
1: "FetchRequest",
2: "OffsetRequest",
3: "MetadataRequest",
//Non-user facing control APIs = 4-7
8: "OffsetCommitRequest",
9: "OffsetFetchRequest",
10: "GroupCoordinatorRequest",
11: "JoinGroupRequest",
12: "HeartbeatRequest",
13: "LeaveGroupRequest",
14: "SyncGroupRequest",
15: "DescribeGroupsRequest",
16: "ListGroupsRequest",
18: "APIVersionsReqKind",
19: "CreateTopicsReqKind",
}

const (
ApiV0 = 0
ApiV1 = 1
Expand Down
141 changes: 100 additions & 41 deletions plugSrc/kafka/build/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package build

import (
"bytes"
"encoding/json"
"fmt"
"github.com/google/gopacket"
"io"
"strconv"
"sync"

"github.com/google/gopacket"
)

const (
Port = 9092
Port = 9092
Version = "0.1"
CmdPort = "-p"
)
Expand All @@ -22,18 +24,18 @@ type Kafka struct {
}

type stream struct {
packets chan *packet
packets chan *packet
correlationMap map[int32]requestHeader
}

type packet struct {

isClientFlow bool //客户端->服务器端流
messageSize int32
isClientFlow bool //客户端->服务器端流
messageSize int32

requestHeader
responseHeader

payload io.Reader
payload io.Reader
}

type requestHeader struct {
Expand All @@ -51,50 +53,52 @@ type messageSet struct {
offset int64
messageSize int32
}

func newMessageSet(r io.Reader) messageSet {
messageSet := messageSet{}
messageSet.offset = ReadInt64(r)
_, messageSet.offset = ReadInt64(r)
messageSet.messageSize = ReadInt32(r)

return messageSet
}

type message struct {
crc int32
magicByte int8
attributes int8
key []byte
value []byte
crc int32
magicByte int8
attributes int8
key []byte
value []byte
}

var kafkaInstance *Kafka
var once sync.Once

func NewInstance() *Kafka {
once.Do(func() {
kafkaInstance = &Kafka{
port :Port,
version:Version,
source: make(map[string]*stream),
port: Port,
version: Version,
source: make(map[string]*stream),
}
})
return kafkaInstance
}

func (m *Kafka) SetFlag(flg []string) {
func (m *Kafka) SetFlag(flg []string) {
c := len(flg)
if c == 0 {
return
}
if c >> 1 != 1 {
if c>>1 != 1 {
panic("Mongodb参数数量不正确!")
}
for i:=0;i<c;i=i+2 {
for i := 0; i < c; i = i + 2 {
key := flg[i]
val := flg[i+1]

switch key {
case CmdPort:
p, err := strconv.Atoi(val);
p, err := strconv.Atoi(val)
if err != nil {
panic("端口数不正确")
}
Expand All @@ -110,7 +114,7 @@ func (m *Kafka) SetFlag(flg []string) {
}

func (m *Kafka) BPFFilter() string {
return "tcp and port "+strconv.Itoa(m.port);
return "tcp and port " + strconv.Itoa(m.port)
}

func (m *Kafka) Version() string {
Expand All @@ -125,8 +129,9 @@ func (m *Kafka) ResolveStream(net, transport gopacket.Flow, buf io.Reader) {
//resolve packet
if _, ok := m.source[uuid]; !ok {

var newStream = stream {
packets:make(chan *packet, 100),
var newStream = stream{
packets: make(chan *packet, 100),
correlationMap: make(map[int32]requestHeader),
}

m.source[uuid] = &newStream
Expand All @@ -139,7 +144,7 @@ func (m *Kafka) ResolveStream(net, transport gopacket.Flow, buf io.Reader) {

newPacket := m.newPacket(net, transport, buf)
if newPacket == nil {
return
continue
}

m.source[uuid].packets <- newPacket
Expand All @@ -151,15 +156,30 @@ func (m *Kafka) newPacket(net, transport gopacket.Flow, r io.Reader) *packet {
//read packet
pk := packet{}

/*
bs := make([]byte, 0)
count, err := r.Read(bs)
if err != nil {
panic(err)
}
fmt.Printf("read: %d, buffer: %b", count, bs)
return nil
*/

//read messageSize
pk.messageSize = ReadInt32(r)
if pk.messageSize == 0 {
return nil
}
fmt.Printf("pk.messageSize: %d\n", pk.messageSize)

//set flow direction
if transport.Src().String() == strconv.Itoa(m.port) {
pk.isClientFlow = false

respHeader := responseHeader{}
respHeader.correlationId = ReadInt32(r)
// TODO extract request
pk.responseHeader = respHeader

var buf bytes.Buffer
Expand All @@ -174,18 +194,18 @@ func (m *Kafka) newPacket(net, transport gopacket.Flow, r io.Reader) *packet {

pk.payload = &buf

}else{
} else {
pk.isClientFlow = true

var clientIdLen = 0
reqHeader := requestHeader{}
reqHeader.apiKey = ReadInt16(r)
reqHeader.apiVersion = ReadInt16(r)
reqHeader.apiKey = ReadInt16(r)
reqHeader.apiVersion = ReadInt16(r)
reqHeader.correlationId = ReadInt32(r)
reqHeader.clientId, clientIdLen = ReadString(r)
reqHeader.clientId, clientIdLen = ReadString(r)
pk.requestHeader = reqHeader
var buf bytes.Buffer
if _, err := io.CopyN(&buf, r, int64(pk.messageSize-10) - int64(clientIdLen)); err != nil {
if _, err := io.CopyN(&buf, r, int64(pk.messageSize-10)-int64(clientIdLen)); err != nil {
if err == io.EOF {
fmt.Println(net, transport, " 关闭")
return nil
Expand All @@ -202,31 +222,70 @@ func (m *Kafka) newPacket(net, transport gopacket.Flow, r io.Reader) *packet {
func (stm *stream) resolve() {
for {
select {
case packet := <- stm.packets:
case packet := <-stm.packets:
if packet.isClientFlow {
stm.correlationMap[packet.requestHeader.correlationId] = packet.requestHeader
stm.resolveClientPacket(packet)
} else {
stm.resolveServerPacket(packet)
if _, ok := stm.correlationMap[packet.responseHeader.correlationId]; ok {
stm.resolveServerPacket(packet, stm.correlationMap[packet.responseHeader.correlationId])
}
}
}
}
}

func (stm *stream) resolveServerPacket(pk *packet) {
return
func (stm *stream) resolveServerPacket(pk *packet, rh requestHeader) {
var msg interface{}
payload := pk.payload

action := Action{
Request: GetRquestName(pk.apiKey),
Direction: "isServer",
ApiVersion: pk.apiVersion,
}
switch int(rh.apiKey) {
case ProduceRequest:
msg = ReadProduceResponse(payload, rh.apiVersion)
case MetadataRequest:
msg = ReadMetadataResponse(payload, rh.apiVersion)
default:
fmt.Printf("response ApiKey:%d TODO", rh.apiKey)
}

if msg != nil {
action.Message = msg
}
bs, err := json.Marshal(action)
if err != nil {
fmt.Printf("json marshal action failed, err: %+v\n", err)
}
fmt.Printf("%s\n", string(bs))
}

func (stm *stream) resolveClientPacket(pk *packet) {
var msg string
var msg interface{}
action := Action{
Request: GetRquestName(pk.apiKey),
Direction: "isClient",
ApiVersion: pk.apiVersion,
}
payload := pk.payload

fmt.Println("apiKey:")
fmt.Println(pk.apiKey)

switch int(pk.apiKey) {
case ProduceRequest:
msg = ReadProduceRequest(payload, pk.apiVersion)
case MetadataRequest:
msg = ReadMetadataRequest(payload, pk.apiVersion)
default:
fmt.Printf("ApiKey:%d TODO", pk.apiKey)
}

if msg != nil {
action.Message = msg
}
bs, err := json.Marshal(action)
if err != nil {
fmt.Printf("json marshal action failed, err: %+v\n", err)
}
_=msg
//fmt.Println(msg)
}
fmt.Printf("%s\n", string(bs))
}
Loading