-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-12177][Streaming][Kafka] Update KafkaDStreams to new Kafka 0.10 Consumer API #11863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… beta consumer, modify getPreferredLocations to choose a consistent executor per topicpartition
… use new consumer
… for new consumers is finished
…g, but dont handle recalculating the same RDD efficiently
…mer for dynamic topics, listener, etc
…consuming messages on driver
Test build #53680 has finished for PR 11863 at commit
|
Test build #53682 has finished for PR 11863 at commit
|
Test build #53686 has started for PR 11863 at commit |
…on attempts to serialize ConsumerRecord
Test build #55252 has finished for PR 11863 at commit
|
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-streaming-kafka-beta-assembly_2.11</artifactId> | ||
<packaging>jar</packaging> | ||
<name>Spark Project External Kafka Assembly</name> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it may be a good idea to update this, so the two kafka assemblies can be differentiated in the build.
@Experimental | ||
object KafkaUtils extends Logging { | ||
/** | ||
* Scala constructor for a batch-oriented interface for consuming from Kafka. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add :: Experimental ::
at the beginning of comments if you add the @Experimental
tag.
Finished my round of reviewing. Some some nits and one question about |
@zsxwing Thanks for the fixes |
* configuration parameters</a>. | ||
* Requires "bootstrap.servers" to be set with Kafka broker(s), | ||
* NOT zookeeper servers, specified in host1:port1,host2:port2 form. | ||
* @param driverConsumer zero-argument function for you to construct a Kafka Consumer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix docs
Overall, this is looking good. Two high level points.
|
You do need CanCommitOffsets because DirectKafkaInputDstream is now
|
Test build #61495 has finished for PR 11863 at commit
|
Aah, right. My bad. In that case, there arent major issues as far as i can see, let me merge this, and test how the docs look like. I am pretty sure its going to cause trouble with two KafkaUtils. And in that case I will handle the package renaming. |
Well.. after the tests pass. |
I'll do the scaladoc fix and the package rename. I think the package rename is fine even if it did work with docs, just to disambiguate things. Will start a separate ticket for documentation updates. |
sounds good. thanks! |
…10 version number, to disambiguate from the older connector
Test build #61506 has finished for PR 11863 at commit
|
Test build #3151 has finished for PR 11863 at commit
|
Test build #3150 has finished for PR 11863 at commit
|
Test build #61513 has finished for PR 11863 at commit
|
LGTM. Merging this to master and 2.0. Thank you very much @koeninger for this awesome effort. :) |
…0 Consumer API ## What changes were proposed in this pull request? New Kafka consumer api for the released 0.10 version of Kafka ## How was this patch tested? Unit tests, manual tests Author: cody koeninger <cody@koeninger.org> Closes #11863 from koeninger/kafka-0.9. (cherry picked from commit dedbcee) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
|
||
// make sure constructors can be called from java | ||
final ConsumerStrategy<String, String> sub0 = | ||
Subscribe.<String, String>apply(topics, kafkaParams, offsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is seems to break in scala 2.10 and not scala 2.11. This is very weird.
Merging this PR broke 2.10 builds - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-sbt-scala-2.10/1947/console
[error] /home/jenkins/workspace/spark-master-compile-sbt-scala-2.10/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java:54: error: incompatible types: Collection<String> cannot be converted to Iterable<String>
[error] Subscribe.<String, String>apply(topics, kafkaParams, offsets);
[error] ^
[error] /home/jenkins/workspace/spark-master-compile-sbt-scala-2.10/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java:69: error: incompatible types: Collection<TopicPartition> cannot be converted to Iterable<TopicPartition>
[error] Assign.<String, String>apply(parts, kafkaParams, offsets);
[error] ^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should figure out a way to fix scala 2.10. I don't think we need to revert this though since 2.10 is no longer the default build and it does not fail PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay found the issue. In scala 2.10, if companion object of a case class has explicitly defined apply(), then the implicit apply method is not generated. In scala 2.11 it is generated.
I remember now, this type of stuff is why we avoid using case classes in the public API. Do you mind if I convert these to simple classes??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored the API to avoid case classes and minimize publicly visible classes - #13996
I played around with the API and I found a few issues
I have opened a new PR to address them - please take a look - #13996 |
hey,I have an question about the setting "enable.auto.commit",could it be changed ????Because I wanna save the offsets information to zookeeper cluster. |
You won't get any reasonable semantics out of auto commit, because it will
commit on the driver without regard to what the executors have done.
…On Aug 2, 2017 21:46, "Wallace Huang" ***@***.***> wrote:
hey,I have an question about the setting "auto.commit.enable", It could be
changed ????Because I wanna save the offsets information to zookeeper
cluster.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#11863 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAGAByVZ2QEf_o627Z7BKdRBuCtmza-Nks5sUTSTgaJpZM4H1Pg1>
.
|
I'm wondering that why the setting "enable.auto.commit" existed, but it was set to false by default and I could't modify it . Anyway, how do I use it ? |
What changes were proposed in this pull request?
New Kafka consumer api for the released 0.10 version of Kafka
How was this patch tested?
Unit tests, manual tests