Integration With Kafka Not Working
-
Hi,
I'm trying to set up an integration with Kafka, and I finally got it to say that it was able to send a test message successfully, but I never see the message in my broker, nor does my consumer receive it.
First, I added my bootstrap.servers property, and then I was getting timeouts:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
I was finally able to get it to say that it was successful by adding the sasl.jass.config property, which contains credentials, like:
sasl.jass.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="XXXXXXXX" password="XXXXXXXX"
However, oddly enough, it doesn't seem to care what the credentials are set to. Also, if I comment out this line (prefix with
#
), it still seems to work. It's only when I remove that line that it gives me the Timeout exception.Also, it doesn't seem to care what the Topic name is.
BTW, I have a Java Producer and Consumer example working fine that uses these same properties.
Also, I'm using a free account on cloudkarafka.com if that helps.
But without any logging available, I can't tell what's really going on.
Can you help or give any advice?
Thanks.
-
Hi @chris-bridges,
What version of FusionAuth are you running?
It's been a while, but I think I had fusionauth running locally against Kafka, but definitely haven't tried against any cloud providers.
Thanks,
Dan -
-
What version of Java are you using in your Java Producer/Consumer, and did you have to add anything to the keystore to get it to work?
Thanks!
-
I'm using Java 11. I didn't have to add anything to the keystore.
The thing is, I can go into Integrations and enable Kafka and paste the following into the Producer configuration (leaving the topic as "fusion"), and it will tell me "Successfully sent a test message":
bootstrap.servers=junk sasl.jass.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="xxxxx" password="xxxxx" max.block.ms=5000 request.timeout.ms=2000
Those settings aren't even valid, but it still gives me a success message.
Can you reproduce that?
Thanks.
-
@chris-bridges said in Integration With Kafka Not Working:
sasl.jass.config=org.apache.kafka.common.security.scram.ScramLoginModule
Looks like we have a bug in our configuration parser, the value that contains the
=
is breaking our config.And there is a bug in the Test option that is causing the test to show successful when it should have displayed an error indicating the issue with the
=
sign.I signed up for a test account with cloudkarafka.com and replicated your configuration and was able to post a test message and send events with my fix.
Here is my config, I think it is essentially the same as yours
bootstrap.servers=omnibus-01.srvs.cloudkafka.com:9094,omnibus-02.srvs.cloudkafka.com:9094,omnibus-03.srvs.cloudkafka.com:9094 max.block.ms=5000 request.timeout.ms=2000 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<redacted>" password="<redacted>"; sasl.mechanism=SCRAM-SHA-256 security.protocol=SASL_SSL
The one possible issue with your config, apart from our bug, is the value for
sasl.jass.config
needs to end with a semicolon;
.We'll fix this under this GH issue. https://github.com/FusionAuth/fusionauth-issues/issues/1107
Thanks for letting us know and for helping with the config examples.
-
That's great! Thanks for tracking that one down.
I'll be on the lookout for the fix.
BTW, I started using that sasl.jaas.config param since my local client is Java and I was playing around with things (adding properties) trying to get it to work in FusionAuth.
But is there any other way that other clients would typically specify credentials for Kafka (specifically in your UI)? Just curious how others are doing it.
Thanks.
-
Not sure.. I am not a Kafka expert by any means.
When we wrote the initial support for Kafka we likely made some assumptions that we would be connecting to something in the local network - and so we don't have a great way to accept credentials.
I'd have to do some research into how we can authenticate the Kafka producer outside of the this producer configuration property list.
I suppose one option would be to support replacement variables in the configuration file such as
${user}
and${password}
and then add discrete fields for these values that we could then optionally not return on the API or display in the UI - similar to how we handle SMTP credentials. Then when we build the producer we would parse and reify the configuration.Feel free to open a GH feature request if you can identify a preferred way to accept credentials outside of the config file.
-
@chris-bridges confirmed all is working in version 1.24.0 with CloudKarafka. Thanks for helping us debug this one! (release notes should be out shortly)
-
@robotdan Thanks. BTW, I notice that FusionAuth Cloud is at 1.22.2. When does 1.24.0 hit production? We're using the cloud version.
-
If you are on cloud, you can self service upgrade whenever you like. We do not automatically upgrade FusionAuth cloud instances.
Find the deployment section in your account and select Upgrade from the actions. https://account.fusionauth.io/
-
This issue was resolved and released as part of 1.24.0.
More here: https://fusionauth.io/docs/v1/tech/release-notes/#version-1-24-0