Skip to content

[SPARK-12177][Streaming][Kafka] Update KafkaDStreams to new Kafka 0.10 Consumer API #11863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 42 commits into from

Conversation

koeninger
Copy link
Contributor

@koeninger koeninger commented Mar 21, 2016

What changes were proposed in this pull request?

New Kafka consumer api for the released 0.10 version of Kafka

How was this patch tested?

Unit tests, manual tests

@SparkQA
Copy link

SparkQA commented Mar 21, 2016

Test build #53680 has finished for PR 11863 at commit 546246e.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 21, 2016

Test build #53682 has finished for PR 11863 at commit 477055c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 21, 2016

Test build #53686 has started for PR 11863 at commit ba41956.

@SparkQA
Copy link

SparkQA commented Apr 7, 2016

Test build #55252 has finished for PR 11863 at commit e559183.

  • This patch fails MiMa tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-beta-assembly_2.11</artifactId>
<packaging>jar</packaging>
<name>Spark Project External Kafka Assembly</name>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be a good idea to update this, so the two kafka assemblies can be differentiated in the build.

@Experimental
object KafkaUtils extends Logging {
/**
* Scala constructor for a batch-oriented interface for consuming from Kafka.
Copy link
Member

@zsxwing zsxwing Jun 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add :: Experimental :: at the beginning of comments if you add the @Experimental tag.

@zsxwing
Copy link
Member

zsxwing commented Jun 29, 2016

Finished my round of reviewing. Some some nits and one question about commitAsync left.

@koeninger
Copy link
Contributor Author

@zsxwing Thanks for the fixes

* configuration parameters</a>.
* Requires "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
* @param driverConsumer zero-argument function for you to construct a Kafka Consumer,
Copy link
Contributor

@tdas tdas Jun 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix docs

@tdas
Copy link
Contributor

tdas commented Jun 30, 2016

Overall, this is looking good. Two high level points.

  1. Now we have two subprojects both creating org.apache.spark.streaming.kafka.KafkaUtils. I think this is going to cause problems downstream in the docs and stuff. Also it will be hard for users to disambiguate what's being used in code. So I propose changing the package name for the new one to o.a.s.streaming.kafka010. All the new classes will be in that package. What do you think?
  2. Don't need CanCommitOffsets.
  3. We need a whole lot of new docs, especially updates in the streaming kafka integration guide. That will be a different PR. Could you start working on that?

@koeninger
Copy link
Contributor Author

You do need CanCommitOffsets because DirectKafkaInputDstream is now
private, so otherwise you have nothing to cast to to access that method.
On Jun 29, 2016 7:11 PM, "Tathagata Das" notifications@github.com wrote:

Overall, this is looking good. Two high level points.

Now we have two subprojects both creating
org.apache.spark.streaming.kafka.KafkaUtils. I think this is going to cause
problems downstream in the docs and stuff. Also it will be hard for user to
disambiguate whats being used. So I propose changing the package name for
the new one to o.a.s.streaming.kafka010. All the new classes will be
in that package.
2.

Don't need CanCommitOffsets.
3.

We need a whole lot of new docs, especially updates in the streaming
kafka integration guide. That will be a different PR. Could you start
working on that?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#11863 (comment), or mute
the thread
https://github.com/notifications/unsubscribe/AAGABz0T__MSSwsM6znRLSwfh8UwC1Kcks5qQwmqgaJpZM4H1Pg1
.

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61495 has finished for PR 11863 at commit 31502d9.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Jun 30, 2016

Aah, right. My bad. In that case, there arent major issues as far as i can see, let me merge this, and test how the docs look like. I am pretty sure its going to cause trouble with two KafkaUtils. And in that case I will handle the package renaming.

@tdas
Copy link
Contributor

tdas commented Jun 30, 2016

Well.. after the tests pass.

@koeninger
Copy link
Contributor Author

I'll do the scaladoc fix and the package rename. I think the package rename is fine even if it did work with docs, just to disambiguate things.

Will start a separate ticket for documentation updates.

@tdas
Copy link
Contributor

tdas commented Jun 30, 2016

sounds good. thanks!

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61506 has finished for PR 11863 at commit f863369.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #3151 has finished for PR 11863 at commit f863369.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #3150 has finished for PR 11863 at commit f863369.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61513 has finished for PR 11863 at commit cffb0e0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Jun 30, 2016

LGTM. Merging this to master and 2.0. Thank you very much @koeninger for this awesome effort. :)

@asfgit asfgit closed this in dedbcee Jun 30, 2016
asfgit pushed a commit that referenced this pull request Jun 30, 2016
…0 Consumer API

## What changes were proposed in this pull request?

New Kafka consumer api for the released 0.10 version of Kafka

## How was this patch tested?

Unit tests, manual tests

Author: cody koeninger <cody@koeninger.org>

Closes #11863 from koeninger/kafka-0.9.

(cherry picked from commit dedbcee)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>

// make sure constructors can be called from java
final ConsumerStrategy<String, String> sub0 =
Subscribe.<String, String>apply(topics, kafkaParams, offsets);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is seems to break in scala 2.10 and not scala 2.11. This is very weird.

Merging this PR broke 2.10 builds - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-sbt-scala-2.10/1947/console

[error] /home/jenkins/workspace/spark-master-compile-sbt-scala-2.10/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java:54:  error: incompatible types: Collection<String> cannot be converted to Iterable<String>
[error]       Subscribe.<String, String>apply(topics, kafkaParams, offsets);
[error]                                       ^
[error] /home/jenkins/workspace/spark-master-compile-sbt-scala-2.10/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java:69:  error: incompatible types: Collection<TopicPartition> cannot be converted to Iterable<TopicPartition>
[error]       Assign.<String, String>apply(parts, kafkaParams, offsets);
[error]                                    ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should figure out a way to fix scala 2.10. I don't think we need to revert this though since 2.10 is no longer the default build and it does not fail PRs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay found the issue. In scala 2.10, if companion object of a case class has explicitly defined apply(), then the implicit apply method is not generated. In scala 2.11 it is generated.

I remember now, this type of stuff is why we avoid using case classes in the public API. Do you mind if I convert these to simple classes??

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored the API to avoid case classes and minimize publicly visible classes - #13996

@tdas
Copy link
Contributor

tdas commented Jun 30, 2016

I played around with the API and I found a few issues

  1. I mentioned above, case classes lead to problems in the public API. The API could be simpler, and same for both Java and Scala users (dont like the apply vs create).
  2. The wrapping between java and scala maps were sometimes leading to the map not being serializable, and thus causing serialization issues in Subscribe and Assign strategies when checkpointing.
  3. ConsumerStrategy class is an interface making it hard to add methods later without breaking compatibility.

I have opened a new PR to address them - please take a look - #13996

@BiyuHuang
Copy link

BiyuHuang commented Aug 3, 2017

hey,I have an question about the setting "enable.auto.commit",could it be changed ????Because I wanna save the offsets information to zookeeper cluster.

@koeninger
Copy link
Contributor Author

koeninger commented Aug 3, 2017 via email

@BiyuHuang
Copy link

I'm wondering that why the setting "enable.auto.commit" existed, but it was set to false by default and I could't modify it . Anyway, how do I use it ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants