Skip to content

[SPARK-12177][Streaming][Kafka] limit api surface area #13998

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from

Conversation

koeninger
Copy link
Contributor

@koeninger koeninger commented Jun 30, 2016

What changes were proposed in this pull request?

This is an alternative to the refactoring proposed by #13996

How was this patch tested?

unit tests
also tested under scala 2.10 via
mvn -Dscala-2.10

@@ -68,8 +67,7 @@ trait ConsumerStrategy[K, V] {
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: keep this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just told me in the last PR to remove Experimental annotations from private classes. This is private. What's the actual rule?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I commented at a wrong place. I meant ConsumerStrategies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, makes sense, believe I fixed it there.

@tdas
Copy link
Contributor

tdas commented Jun 30, 2016

Quick feedback

  1. As I mentioned in my method, traits have issues with java compatibility when methods are added even if those methods are defined. So I strongly suggest converting ConsumerStrategy and LocationStrategy to abstract class. You dont loose anything but you gain better future-proof compatibility.
  2. Another difference from PR is you are using a separate class ConsumerStrategies for the static builder methods, instead of ConsumerStrategy. Its another additional class, but i am okay with that since it can be called easily from Java.
  3. I suggest keeping style as of the strategies similar to my PR to make it look more enum-ish.
  4. Please make the JavaConsumerStrategySuite pass offsets as HashMap[TopicPartition, Long] instead of HashMap[TopicPartition, Object] just to make sure it works - https://github.com/apache/spark/blob/master/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java#L47

…m look like enums, distinguish java from scala long
@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61551 has finished for PR 13998 at commit 2cf8fad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61560 has finished for PR 13998 at commit ef040cf.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -51,11 +51,10 @@ trait ConsumerStrategy[K, V] {
* has successfully read. Will be empty on initial start, possibly non-empty on restart from
* checkpoint.
*/
def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]
Copy link
Contributor

@tdas tdas Jun 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 ... i forgot that this is one of the subtle things that I fixed.

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61554 has finished for PR 13998 at commit 27d6337.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -44,37 +44,39 @@ public void testConsumerStrategyConstructors() {
kafkaParams.put("bootstrap.servers", "not used");
final scala.collection.Map<String, Object> sKafkaParams =
JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
final Map<TopicPartition, Object> offsets = new HashMap<>();
final Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(tp1, 23L);
final scala.collection.Map<TopicPartition, Object> sOffsets =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldnt this be scala.collection.Map<TopicPartition, scala.Long> to really test this out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala is going to compile foo(map[TopicPartition, scala.Long]) to bytecode for foo(map[TopicPartiiton, Object]). It's boxed and erased either way, just looks a little weird to call it from java.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. I guess it better to test these explicitly nonetheless, rather than writing tests that assume certain JVM behavior. No strong feelings for this though.

@tdas
Copy link
Contributor

tdas commented Jun 30, 2016

Looking good. The only main concern is the trait vs abstract class? Do you have any concerns with converting the strategies to abstract class?

@koeninger
Copy link
Contributor Author

Copying from the other PR about the trait vs abstract class and adding methods after people have already implented or subclassed:

You said "When abstract class, you can later add defined methods which the users can override if needed, but does not break compatibility of existing implementations."

This is what I'm taking issue with. It's confusing binary compatibility with actual api compatibility. Changing the class and silently slipping in a new default method into someone's code may preserve binary compatibility, at the cost of actually breaking their code at runtime and losing them their job.

@tdas
Copy link
Contributor

tdas commented Jun 30, 2016

[Sorry! I missed the comment in the other PR]

Why would the user code break at runtime if the method with default implementation ensures the same behavior as before?

We have been relying on this for many of our APIs. Methods have been added to Receiver APIs. Hadoop has added a new methods overtime in their FileSystem APIs. This is used fundamentally everywhere!

There is no cost for doing this now, and leaves room to take the decision in the future of whether to add new functionality in the same API or make new API.

@koeninger
Copy link
Contributor Author

Basically, if you're adding a new method to an interface, it's because you need new behavior.

This is the fragile base class problem inherent in, well, inheriting behavior. You can guess at what the intended default behavior should be, and put in a default method... but that behavior is going to be wrong for some subclass of users.

The alternative is to actually change the interface (not add a default method) which makes it obvious at compile time that things have changed. You can certainly add an abstract method without an implementation to a scala trait, since it's just an interface. It means people that made a subclass are forced to think about the new behavior and do the right thing for themselves.

I understand your bias is towards maintaining binary compatibility, I'm just trying to point out that binary compatibility doesn't actually mean things are going to work right.

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61561 has finished for PR 13998 at commit b442c98.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Jun 30, 2016

If the default method implementation preserves earlier behavior, can you give a concrete scenario where this would break? If not, I see no cost in converting to abstract class, and potential benefit in future.

Here is an example from my side. Let's say for some reason in the future, we want the system to automatically commit synchronously the read offsets. That cannot be implemented using custom onStart() only. We can extend the ConsumerStrategy abstract class to have a overridable flag.

abstract class ConsumerStrategy {
 def shouldCommitOffsets(): Boolean = false // default implementation preserves behavior.
...
}

Current implementations, Subscribe, Assign, or any custom ones, will not break, neither at compile time, nor at runtime. Anyone can take advantage of the new ConsumerStrategy by override the method to return true. People can also wrap their existing custom ConsumerStrategy inside a wrapper ConsumerStrategy that sets that method to return true. All of this without having to think about a new interface, and new methods. Its annoying if such small but useful additions require adding whole another interface, when it can be very easily done this way.

Just to be clear, I am not proposing that we WILL do it this way in the future. All I am proposing is that having it as abstract class instead of trait keeps the possibilities open for the future.

@zsxwing
Copy link
Member

zsxwing commented Jun 30, 2016

@koeninger
Copy link
Contributor Author

Sure... the most trivial example of how that would break at runtime is that I already created an implementation that defines that method, with the opposite semantics. Is that particularly likely for that exact case you bring up? No. But the point is that binary compatibility is not a magic bullet.

It's also not cost free. At least one other cost of an abstract class as opposed to an interface is that you have to have a constructor, so you cannot implement multiple abstract classes in one instance.

As you say, whether to use the ability to add methods is a decision to be made in the future, so I'll go ahead and make the change and defer the discussion until it actually becomes necessary.

@koeninger
Copy link
Contributor Author

@zsxwing looking

@koeninger
Copy link
Contributor Author

@zsxwing made a tweak in this PR that may affect that first failure. LMK if we should do a separate pull just for that.

Looking at the second one.

@tdas
Copy link
Contributor

tdas commented Jun 30, 2016

@koeninger Thank you :)

@koeninger
Copy link
Contributor Author

Are those failures repeatable for those particular hadoop versions, or is this an intermittent thing?

May need to look at redoing how kafkatestutils is working, since it's interacting with ZK directly...

@@ -36,7 +36,7 @@ import org.apache.spark.annotation.Experimental
* @tparam V type of Kafka message value
*/
@Experimental
trait ConsumerStrategy[K, V] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a pointer to the ConsumerStrategies class in the docs for this class?

And similarly for LocationStrategies?

@tdas
Copy link
Contributor

tdas commented Jun 30, 2016

LGTM. Just minor points. Good to merge anytime, but would be good to have the test fixes.

@koeninger
Copy link
Contributor Author

Added the doc links and my best guess on that first test failure. I think the second is going to take more work to figure out. Your call on whether you want to wait and see if I find something, or go ahead and merge this now.

@tdas
Copy link
Contributor

tdas commented Jul 1, 2016

Let's merge this when test pass. @zsxwing I might not get a chance tonight,
could you merge when tests pass?
On Jun 30, 2016 5:38 PM, "Apache Spark QA" notifications@github.com wrote:

Test build #61585 has started
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/61585/consoleFull

for PR 13998 at commit d1480e7
d1480e7
.


You are receiving this because you commented.
Reply to this email directly, view it on GitHub
#13998 (comment), or mute
the thread
https://github.com/notifications/unsubscribe/AAoerJuUZXEa27cSed0js4EFlCzRzv-Hks5qRGGHgaJpZM4JCYMb
.

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61585 has finished for PR 13998 at commit d1480e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61576 has finished for PR 13998 at commit 2652170.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class ConsumerStrategy[K, V]

@zsxwing
Copy link
Member

zsxwing commented Jul 1, 2016

Seems this one was blocked forever:

[info] Test org.apache.spark.streaming.kafka010.JavaKafkaRDDSuite.testKafkaRDD started
Attempting to post to Github...

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61593 has finished for PR 13998 at commit 22db76f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61596 has finished for PR 13998 at commit 2f65fc1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Jul 1, 2016

Merging this to master and 2.0. Will keep monitoring the test flakiness. Thank you very much @koeninger

@asfgit asfgit closed this in fbfd0ab Jul 1, 2016
asfgit pushed a commit that referenced this pull request Jul 1, 2016
## What changes were proposed in this pull request?
This is an alternative to the refactoring proposed by #13996

## How was this patch tested?

unit tests
also tested under scala 2.10 via
mvn -Dscala-2.10

Author: cody koeninger <cody@koeninger.org>

Closes #13998 from koeninger/kafka-0-10-refactor.

(cherry picked from commit fbfd0ab)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@tdas
Copy link
Contributor

tdas commented Sep 26, 2016

@koeninger
Copy link
Contributor Author

Sure I'll give it another look

On Sep 26, 2016 3:46 PM, "Tathagata Das" notifications@github.com wrote:

@koeninger https://github.com/koeninger Could you take a look at this
test flakiness in
https://amplab.cs.berkeley.edu/jenkins/job/spark-master-
test-sbt-hadoop-2.4/1792/


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#13998 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAGABzzmkLROVhoVXuW0GwPv7twwIOdeks5quC8IgaJpZM4JCYMb
.

@koeninger
Copy link
Contributor Author

I ran that test 100 times locally w/out error... you have any suggestions
on repro?

On Mon, Sep 26, 2016 at 6:40 PM, Cody Koeninger cody@koeninger.org wrote:

Sure I'll give it another look

On Sep 26, 2016 3:46 PM, "Tathagata Das" notifications@github.com wrote:

@koeninger https://github.com/koeninger Could you take a look at this
test flakiness in
https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test
-sbt-hadoop-2.4/1792/


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#13998 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAGABzzmkLROVhoVXuW0GwPv7twwIOdeks5quC8IgaJpZM4JCYMb
.

LuciferYang pushed a commit that referenced this pull request May 28, 2024
### What changes were proposed in this pull request?
The pr aims to upgrade `netty` from `4.1.109.Final` to `4.1.110.Final`.

### Why are the changes needed?
- https://netty.io/news/2024/05/22/4-1-110-Final.html
  This version has brought some bug fixes and improvements, such as:
  Fix Zstd throws Exception on read-only volumes (netty/netty#13982)
  Add unix domain socket transport in netty 4.x via JDK16+ ([#13965](netty/netty#13965))
  Backport #13075: Add the AdaptivePoolingAllocator ([#13976](netty/netty#13976))
  Add no-value key handling only for form body ([#13998](netty/netty#13998))
  Add support for specifying SecureRandom in SSLContext initialization ([#14058](netty/netty#14058))

- https://github.com/netty/netty/issues?q=milestone%3A4.1.110.Final+is%3Aclosed

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #46744 from panbingkun/SPARK-48420.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
riyaverm-db pushed a commit to riyaverm-db/spark that referenced this pull request Jun 7, 2024
### What changes were proposed in this pull request?
The pr aims to upgrade `netty` from `4.1.109.Final` to `4.1.110.Final`.

### Why are the changes needed?
- https://netty.io/news/2024/05/22/4-1-110-Final.html
  This version has brought some bug fixes and improvements, such as:
  Fix Zstd throws Exception on read-only volumes (netty/netty#13982)
  Add unix domain socket transport in netty 4.x via JDK16+ ([apache#13965](netty/netty#13965))
  Backport apache#13075: Add the AdaptivePoolingAllocator ([apache#13976](netty/netty#13976))
  Add no-value key handling only for form body ([apache#13998](netty/netty#13998))
  Add support for specifying SecureRandom in SSLContext initialization ([apache#14058](netty/netty#14058))

- https://github.com/netty/netty/issues?q=milestone%3A4.1.110.Final+is%3Aclosed

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#46744 from panbingkun/SPARK-48420.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Aug 7, 2024
The pr aims to upgrade `netty` from `4.1.109.Final` to `4.1.110.Final`.

- https://netty.io/news/2024/05/22/4-1-110-Final.html
  This version has brought some bug fixes and improvements, such as:
  Fix Zstd throws Exception on read-only volumes (netty/netty#13982)
  Add unix domain socket transport in netty 4.x via JDK16+ ([apache#13965](netty/netty#13965))
  Backport apache#13075: Add the AdaptivePoolingAllocator ([apache#13976](netty/netty#13976))
  Add no-value key handling only for form body ([apache#13998](netty/netty#13998))
  Add support for specifying SecureRandom in SSLContext initialization ([apache#14058](netty/netty#14058))

- https://github.com/netty/netty/issues?q=milestone%3A4.1.110.Final+is%3Aclosed

No.

Pass GA.

No.

Closes apache#46744 from panbingkun/SPARK-48420.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants