apache kafka使用

posted in: 技术 | 0 | (177)次浏览

1 环境搭建:

建议:关闭防火墙,配置hosts文件。

1.1 zookeeper

解压zookeeper-3.3.2.tar.gz并重命名为zookeeper。

1.1.1 修改配置文件zoo.cfg

进入~/zookeeper/conf目录:

$ cp zoo_sample.cfg zoo.cfg

拷贝zoo_sample.cfg文件为zoo.cfg,并编辑如下:

dataDir=/your_zookeeper_home/data

server.1=192.168.1.100:2888:3888
server.2=192.168.1.101:2888:3888
server.3=192.168.1.102:2888:3888

 

1.1.2 新建并编辑myid文件

在dataDir目录下新建myid文件,输入一个数字(master为1,slave1为2,slave2为3):

$ mkdir /your_zookeeper_home/data
$ echo “1” > /your_zookeeper_home/data/myid

同样使用scp命令进行远程复制,只不过要修改每个节点上myid文件中的数字。

1.1.3 启动ZooKeeper集群

在ZooKeeper集群的每个结点上,执行启动ZooKeeper服务的脚本:

$ ~/zookeeper/bin/zkServer.sh start

查看日志,如果没有异常代表安装成功。

1.2 kafka

解压kafka_2.12-0.10.2.0.tar.gz并重命名为kafka。

1.2.1 修改server.properties(以master主机为例)

  • broker.id=0
  • listeners=PLAINTEXT://master:9092

1.2.2 启动kafka集群

$ ~/kafka/bin/kafka-server-start.sh

 

2 使用spring kafka运行kafka demo

package com.raylew.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

/**
 * Created by Raymond on 2017/3/17.
 */
public class KafkaTest {
    @Test
    public void testAutoCommit() throws Exception {
        ContainerProperties containerProps = new ContainerProperties("topic_test");
        final CountDownLatch latch = new CountDownLatch(1);
        containerProps.setMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> message) {
                System.out.println("received:"+message);
                latch.countDown();
            }

        });
        KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
        container.setBeanName("testAuto");
        container.start();
        Thread.sleep(1000); // wait a bit for the container to start
        KafkaTemplate<Integer, String> template = createTemplate();
        template.setDefaultTopic("topic_test");
        template.sendDefault(0, "data_test4");
        //template.sendDefault(2, "bar");
        //template.sendDefault(0, "baz");
        //template.sendDefault(2, "qux");
        template.flush();
        container.stop();

    }

    private KafkaMessageListenerContainer<Integer, String> createContainer(
            ContainerProperties containerProps) {
        Map<String, Object> props = consumerProps();
        DefaultKafkaConsumerFactory<Integer, String> cf =
                new DefaultKafkaConsumerFactory<Integer, String>(props);
        KafkaMessageListenerContainer<Integer, String> container =
                new KafkaMessageListenerContainer<>(cf, containerProps);
        return container;
    }

    private KafkaTemplate<Integer, String> createTemplate() {
        Map<String, Object> senderProps = senderProps();
        ProducerFactory<Integer, String> pf =
                new DefaultKafkaProducerFactory<Integer, String>(senderProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        return template;
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG,"client1");
        return props;
    }

    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}

观察控制台日志,输出“received:ConsumerRecord(topic = topic_test, partition = 0, offset = 2, CreateTime = 1489887362729, checksum = 4075805805, serialized key size = 4, serialized value size = 10, key = 0, value = data_test3)”代表成功。

项目地址: