# Golang Kafka Client

By [Robin](https://paragraph.com/@robin-8) · 2023-02-23

---

    package main
    
    import (
        "flag"
        "fmt"
        "log"
        "os"
        "os/signal"
        "strconv"
        "strings"
        "sync"
        "syscall"
    
        "github.com/Shopify/sarama"
        "github.com/Shopify/sarama/tools/tls"
    )
    
    // 从命令行终端获取参数
    var (
        brokerList    = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
        topic         = flag.String("topic", "", "REQUIRED: the topic to consume")
        partitions    = flag.String("partitions", "all", "The partitions to consume, can be 'all' or comma-separated numbers")
        offset        = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`")
        verbose       = flag.Bool("verbose", false, "Whether to turn on sarama logging")
        tlsEnabled    = flag.Bool("tls-enabled", false, "Whether or not to use TLS when connecting to the broker  (defaults to false)")
        tlsSkipVerify = flag.Bool("tls-skip-verify", false, "Whether skip TLS server cert verification")
        tlsClientCert = flag.String("tls-client-cert", "", "Client cert for client authentication (use with -tls-enabled and -tls-client-key)")
        tlsClientKey  = flag.String("tls-client-key", "", "Client key for client authentication (use with tls-enabled and -tls-client-cert)")
        saslEnabled   = flag.Bool("sasl-enabled", false, "Whether or not to use SASL authentication when connecting to the broker (defaults to false)")
        saslUser      = flag.String("sasl-user", "", "The username used for SASL (use with -sasl-enable)")
        saslPassword  = flag.String("sasl-password", "", "The password used for SASL (use with -sasl-enable)")
    
        bufferSize = flag.Int("buffer-size", 256, "The buffer size of the message channel.")
    
        logger = log.New(os.Stderr, "", log.LstdFlags)
    )
    
    func main() {
        flag.Parse()
    
        if *brokerList == "" {
            printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
        }
    
        if *topic == "" {
            printUsageErrorAndExit("-topic is required")
        }
    
        if *verbose {
            sarama.Logger = logger
        }
    
        // 初始化读取位置；
        var initialOffset int64
        switch *offset {
        case "oldest":
            initialOffset = sarama.OffsetOldest
        case "newest":
            initialOffset = sarama.OffsetNewest
        default:
            printUsageErrorAndExit("-offset should be `oldest` or `newest`")
        }
    
        // 初始化kafka配置参数
        config := sarama.NewConfig()
        // 配置kafka版本
        config.Version = sarama.V0_10_1_0
        // 配置TLS相关
        if *tlsEnabled {
            tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
            if err != nil {
                printErrorAndExit(69, "Failed to create TLS config: %s", err)
            }
    
            config.Net.TLS.Enable = true
            config.Net.TLS.Config = tlsConfig
            config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
        }
    
        // 配置用户名密码认证
        if *saslEnabled {
            config.Net.SASL.Enable = true
            config.Net.SASL.User = *saslUser
            config.Net.SASL.Password = *saslPassword
        }
    
        // 实例化consumer
        c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
        if err != nil {
            printErrorAndExit(69, "Failed to start consumer: %s", err)
        }
    
        // 获取partition
        partitionList, err := getPartitions(c)
        if err != nil {
            printErrorAndExit(69, "Failed to get the list of partitions: %s", err)
        }
    
        // 定义channel
        var (
            messages = make(chan *sarama.ConsumerMessage, *bufferSize)
            closing  = make(chan struct{})
            wg       sync.WaitGroup
        )
    
        // goroutin 等待系统中断信号
        go func() {
            signals := make(chan os.Signal, 1)
            signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
            <-signals
            logger.Println("Initiating shutdown of consumer...")
            close(closing)
        }()
    
        // 根据partitionlist启动多个goroutin去消费
        for _, partition := range partitionList {
            pc, err := c.ConsumePartition(*topic, partition, initialOffset)
            if err != nil {
                printErrorAndExit(69, "Failed to start consumer for partition %d: %s", partition, err)
            }
    
            go func(pc sarama.PartitionConsumer) {
                <-closing
                pc.AsyncClose()
            }(pc)
    
            wg.Add(1)
            go func(pc sarama.PartitionConsumer) {
                defer wg.Done()
                for message := range pc.Messages() {
                    messages <- message
                }
            }(pc)
        }
    
        go func() {
            for msg := range messages {
                fmt.Printf("Partition:\t%d\n", msg.Partition)
                fmt.Printf("Offset:\t%d\n", msg.Offset)
                fmt.Printf("Key:\t%s\n", string(msg.Key))
                fmt.Printf("Value:\t%s\n", string(msg.Value))
                fmt.Println()
            }
        }()
    
        wg.Wait()
        logger.Println("Done consuming topic", *topic)
        close(messages)
    
        if err := c.Close(); err != nil {
            logger.Println("Failed to close consumer: ", err)
        }
    }
    
    func getPartitions(c sarama.Consumer) ([]int32, error) {
        if *partitions == "all" {
            return c.Partitions(*topic)
        }
    
        tmp := strings.Split(*partitions, ",")
        var pList []int32
        for i := range tmp {
            val, err := strconv.ParseInt(tmp[i], 10, 32)
            if err != nil {
                return nil, err
            }
            pList = append(pList, int32(val))
        }
    
        return pList, nil
    }
    
    func printErrorAndExit(code int, format string, values ...interface{}) {
        fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
        fmt.Fprintln(os.Stderr)
        os.Exit(code)
    }
    
    func printUsageErrorAndExit(format string, values ...interface{}) {
        fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
        fmt.Fprintln(os.Stderr)
        fmt.Fprintln(os.Stderr, "Available command line options:")
        flag.PrintDefaults()
        os.Exit(64)
    }

---

*Originally published on [Robin](https://paragraph.com/@robin-8/golang-kafka-client)*
