博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka client使用kerberos
阅读量:6948 次
发布时间:2019-06-27

本文共 3835 字,大约阅读时间需要 12 分钟。

hot3.png

在kafka server端创建client机princ: addprinc test/192.168.1.124@YLH.COM

 

在kafka server端根据client机的ip生成keytab: xst -k ylh.keytab test/192.168.1.124

 

拷贝kafka server的/etc/krb5.conf到/Users/yaolihua/Documents

 

生成/Users/yaolihua/Documents/kafka-client-sasl.conf

KafkaClient{

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=true

storeKey=true

keyTab="/Users/yaolihua/Documents/ylh124.keytab"

principal="test/192.168.1.124@YLH.COM";

};

 

在kafka server授权producer权限给test

./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --producer --topic ylh-acl-test

 

在kafka server授权consumer权限给test

./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --consumer --group=ylh-group --topic ylh-acl-test

 

代码:

public static void main(String args[]) {

System.setProperty("java.security.krb5.conf", "/Users/yaolihua/Documents/krb5.conf");

System.setProperty("java.security.auth.login.config", "/Users/yaolihua/Documents/kafka-client-sasl.conf");

System.setProperty("sun.security.krb5.debug", "true");

testProducer();

testConsumer();

}

 

private static void testProducer() {

Properties props = new Properties();

props.put("security.protocol", "SASL_PLAINTEXT");

// props.put("sasl.mechanism", "PLAIN");

props.put("sasl.mechanism", "GSSAPI");

props.put("sasl.kerberos.service.name", "test");

// props.put("bootstrap.servers", "134.129.98.33:9012");

props.put("bootstrap.servers", "192.168.1.170:9093");

props.put("acks", "all");

props.put("retries", 1);

props.put("batch.size", 1684);

props.put("linger.ms", 0);

props.put("buffer.memory", 33554432); // buffer空间32M

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 

Producer<String, String> producer = new KafkaProducer<String, String>(props);

for (int i = 0; i < 10; i++) {

String dvalue = "hello " + i;

ProducerRecord record = new ProducerRecord<String, String>("ylh-acl-test", null, dvalue);

producer.send(record, new Callback() {

public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {

if (paramRecordMetadata == null) {

System.out.println("paramRecordMetadata is null ");

paramException.printStackTrace();

return;

}

System.out.println("发送的消息信息 topic:" + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());

}

});

}

producer.close();

}

 

private static void testConsumer() {

Properties properties = new Properties();

properties.put("bootstrap.servers", "192.168.1.170:9093");

properties.put("group.id", "ylh-group");

properties.put("enable.auto.commit", "true");

properties.put("auto.offset.reset", "earliest");

properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// properties.put("auto.offset.reset", "none");

properties.put("security.protocol", "SASL_PLAINTEXT");

// properties.put("sasl.mechanism", "PLAIN");

properties.put("sasl.mechanism", "GSSAPI");

properties.put("sasl.kerberos.service.name", "test");

KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);

consumer.subscribe(Arrays.asList("ylh-acl-test"));

/* 读取数据,读取超时时间为100ms */

while (true) {

ConsumerRecords<Object, Object> records = consumer.poll(100);

records.forEach(record->{

String format = String.format("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());

System.out.println(format);

});

}

}

 

Congratulations!!!

 

安装kerberos: https://my.oschina.net/u/185335/blog/2963061

kafka sever配置kerberos: https://my.oschina.net/u/185335/blog/2963062

kafka client使用kerberos: https://my.oschina.net/u/185335/blog/2963063

转载于:https://my.oschina.net/u/185335/blog/2963063

你可能感兴趣的文章
Beanstalkd协议 二(任务的生命周期)
查看>>
jvisualvm远程监控 visualgc插件 不受此jvm支持问题
查看>>
(1)Powershell简介
查看>>
zabbix客户端配置
查看>>
Logtail提升采集性能
查看>>
史上最失败的一次营销活动
查看>>
asp.net5发神经一例 ------无法加载依赖
查看>>
数据库水平切分的实现原理解析
查看>>
nova boot from volume在多主机zone下的坑
查看>>
uip中关于web服务器的简单例子
查看>>
Windows 10 ADK 1809 的变更 附下载地址
查看>>
修改win7和win8 preview双系统的开机默认启动项
查看>>
参考案例Shop-React-Native,后端Yii2
查看>>
关于PHP_CMS的一点感想
查看>>
知乎[披萨不就是个大饼铺点肉]问题延伸出的认知风格相关知识
查看>>
新浪微博基于混合云的PHP服务化与弹性扩容
查看>>
centOS7封装
查看>>
通过php 执行git pull 自动部署
查看>>
google乱码 IE浏览器 英文
查看>>
Linux查看系统配置常用命令
查看>>