visit
To briefly explain what we are trying to do here: We want to have permission to read and write Kafka topics.
Our Kafka is protected by Kerberos. It means, before we start accessing Kafka, we need to obtain a ticket from Kerberos. To get the ticket we have to provide a keytab — authentication file for each user. All these steps have to be done automatically because when we use commands to access Kafka there won’t be an opportunity to show keytab manually. To get things done we need to specify the right parameters and configurations in the right place.Here is my environment (your tools and versions may vary but the approach still should work):
For the beginning, let’s access the protected Kafka topic with terminal. Access to the topic should only be granted if we obtain a ticket from Kerberos for the right user. For this operation we need to prepare (it will be smoother if all the files will be on the same path):
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab=”${PATH_TO_YOUR_KEYTAB}“
principal=”${USER_NAME}@${REALM}”;
};
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI
Then we need to export the variable with
jaas.conf
and krb5.conf
:export KAFKA_OPTS=” Djava.security.auth.login.config=jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf”
/bin/kafka-console-producer --broker-list ${KAFKA_BROKERS_WITH_PORTS} --topic ${TOPIC_NAME} --producer.config kafka_security.properties
/bin/kafka-console-consumer --bootstrap-server ${KAFKA_BROKERS_WITH_PORTS} --topic ${TOPIC_NAME} --from-beginning --consumer.config kafka_security.properties
Let’s do the same thing using Spark.
The challenge here is that we want Spark to access Kafka not only with the application driver but also with every executor. It means each executor needs to obtain a ticket from Kerberos with our keytab. To make Spark do this, we need to specify the right configurations.Firstly, we need the same
jaas.conf
:KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab=”${YOUR_KEYTAB_FILE} “
principal=”${USER_NAME}@${REALM}”;
};
export SPARK_KAFKA_VERSION=0.10
kafka.bootstrap.servers=${KAFKA_BROKERS_WITH_PORTS}
kafka.security.protocol=SASL_PLAINTEXT
kafka.sasl.kerberos.service.name=kafka
kafka.sasl.mechanism=GSSAPI
subscribe=${TOPIC_NAME}
startingOffsets=latest
maxOffsetsPerTrigger=1000
spark.readStream.
format("kafka").
options(myOptionsMap).
load()
JAVA_OPTIONS="-Djava.security.auth.login.config=jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf"
cp $USER_NAME.keytab ${USER_NAME}_2.keytab
spark2-submit \
--master yarn \
--conf "spark.yarn.keytab=${USER_NAME}_2.keytab" \
--conf "spark.yarn.principal=$USER_NAME@$REALM" \
--conf "spark.driver.extraJavaOptions=$JAVA_OPTIONS" \
--conf "spark.executor.extraJavaOptions=$JAVA_OPTIONS" \
--class "org.example.MyClass" \
--jars spark-sql-kafka-0-10_2.11-2.4.0.jar \
--files "jaas.conf","${USER_NAME}.keytab" \
my_spark.jar
Or you can use the same configurations with spark-shell or pyspark.
Note: to allow Spark access HDFS we specify
spark.yarn.keytab
and spark.yarn.principal
. To allow Spark access Kafka we specify spark.driver.extraJavaOptions
and spark.executor.extraJavaOptions
and provide files jaas.conf
, ${USER_NAME}.keytab
, mentioned in JavaOptions so every executor could receive a copy of these files for authentication. And for spark kafka dependency we provide spark-sql-kafka
jar suitable for our spark version. We can also use option --package instead of --jars.Hope everything worked!Let’s do the same trick in PySpark using Jupyter Notebook.
To access the shell environment from python we will use
os.environ
.import os
import sysos.environ[‘SPARK_KAFKA_VERSION’] = ‘0.10’
spark = SparkSession.builder. \
config(‘spark.yarn.keytab’, ‘${USER_NAME}_2.keytab’).\
config(‘spark.yarn.principal’, ‘$USER_NAME@$REALM’).\
config(‘spark.jars’, ‘spark-sql-kafka-0–10_2.11–2.4.0.jar’).\
config(‘spark.driver.extraJavaOptions’, ‘-Djava.security.auth.login.config=jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf’).\
config(‘spark.executor.extraJavaOptions’,
‘-Djava.security.auth.login.config=jaas.conf
-Djava.security.krb5.conf=/etc/krb5.conf’).\
config(‘spark.files’, ‘jaas.conf,${KEYTAB}’).\
.appName(“KafkaSpark”).getOrCreate()
kafka_raw = spark.readStream. \
format(‘kafka’).\
option(‘kafka.bootstrap.servers’, ${KAFKA_BROKERS_WITH_PORTS}). \
option(‘kafka.security.protocol’,’SASL_PLAINTEXT’). \
option(‘kafka.sasl.kerberos.service.name’,’kafka’). \
option(‘kafka.sasl.mechanism’,’GSSAPI’). \
option(‘startingOffsets’,’earliest’). \
option(‘maxOffestPerTrigger’,10). \
option(‘subscribe’,${TOPIC_NAME}). \
load()
query = kafka_raw. \
writeStream. \
format("console"). \
start()