-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-12177][Streaming][Kafka] limit api surface area #13998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -68,8 +67,7 @@ trait ConsumerStrategy[K, V] { | |||
* TopicPartition, the committed offset (if applicable) or kafka param | |||
* auto.offset.reset will be used. | |||
*/ | |||
@Experimental |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: keep this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You just told me in the last PR to remove Experimental annotations from private classes. This is private. What's the actual rule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. I commented at a wrong place. I meant ConsumerStrategies
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, makes sense, believe I fixed it there.
Quick feedback
|
…m look like enums, distinguish java from scala long
Test build #61551 has finished for PR 13998 at commit
|
Test build #61560 has finished for PR 13998 at commit
|
@@ -51,11 +51,10 @@ trait ConsumerStrategy[K, V] { | |||
* has successfully read. Will be empty on initial start, possibly non-empty on restart from | |||
* checkpoint. | |||
*/ | |||
def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] | |||
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 ... i forgot that this is one of the subtle things that I fixed.
Test build #61554 has finished for PR 13998 at commit
|
@@ -44,37 +44,39 @@ public void testConsumerStrategyConstructors() { | |||
kafkaParams.put("bootstrap.servers", "not used"); | |||
final scala.collection.Map<String, Object> sKafkaParams = | |||
JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); | |||
final Map<TopicPartition, Object> offsets = new HashMap<>(); | |||
final Map<TopicPartition, Long> offsets = new HashMap<>(); | |||
offsets.put(tp1, 23L); | |||
final scala.collection.Map<TopicPartition, Object> sOffsets = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldnt this be scala.collection.Map<TopicPartition, scala.Long>
to really test this out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala is going to compile foo(map[TopicPartition, scala.Long]) to bytecode for foo(map[TopicPartiiton, Object]). It's boxed and erased either way, just looks a little weird to call it from java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. I guess it better to test these explicitly nonetheless, rather than writing tests that assume certain JVM behavior. No strong feelings for this though.
Looking good. The only main concern is the |
Copying from the other PR about the trait vs abstract class and adding methods after people have already implented or subclassed: You said "When abstract class, you can later add defined methods which the users can override if needed, but does not break compatibility of existing implementations." This is what I'm taking issue with. It's confusing binary compatibility with actual api compatibility. Changing the class and silently slipping in a new default method into someone's code may preserve binary compatibility, at the cost of actually breaking their code at runtime and losing them their job. |
[Sorry! I missed the comment in the other PR] Why would the user code break at runtime if the method with default implementation ensures the same behavior as before? We have been relying on this for many of our APIs. Methods have been added to Receiver APIs. Hadoop has added a new methods overtime in their FileSystem APIs. This is used fundamentally everywhere! There is no cost for doing this now, and leaves room to take the decision in the future of whether to add new functionality in the same API or make new API. |
Basically, if you're adding a new method to an interface, it's because you need new behavior. This is the fragile base class problem inherent in, well, inheriting behavior. You can guess at what the intended default behavior should be, and put in a default method... but that behavior is going to be wrong for some subclass of users. The alternative is to actually change the interface (not add a default method) which makes it obvious at compile time that things have changed. You can certainly add an abstract method without an implementation to a scala trait, since it's just an interface. It means people that made a subclass are forced to think about the new behavior and do the right thing for themselves. I understand your bias is towards maintaining binary compatibility, I'm just trying to point out that binary compatibility doesn't actually mean things are going to work right. |
Test build #61561 has finished for PR 13998 at commit
|
If the default method implementation preserves earlier behavior, can you give a concrete scenario where this would break? If not, I see no cost in converting to abstract class, and potential benefit in future. Here is an example from my side. Let's say for some reason in the future, we want the system to automatically commit synchronously the read offsets. That cannot be implemented using custom
Current implementations, Subscribe, Assign, or any custom ones, will not break, neither at compile time, nor at runtime. Anyone can take advantage of the new ConsumerStrategy by override the method to return true. People can also wrap their existing custom ConsumerStrategy inside a wrapper ConsumerStrategy that sets that method to return true. All of this without having to think about a new interface, and new methods. Its annoying if such small but useful additions require adding whole another interface, when it can be very easily done this way. Just to be clear, I am not proposing that we WILL do it this way in the future. All I am proposing is that having it as |
Sure... the most trivial example of how that would break at runtime is that I already created an implementation that defines that method, with the opposite semantics. Is that particularly likely for that exact case you bring up? No. But the point is that binary compatibility is not a magic bullet. It's also not cost free. At least one other cost of an abstract class as opposed to an interface is that you have to have a constructor, so you cannot implement multiple abstract classes in one instance. As you say, whether to use the ability to add methods is a decision to be made in the future, so I'll go ahead and make the change and defer the discussion until it actually becomes necessary. |
@zsxwing looking |
…creases test stability, add explicit group id
@zsxwing made a tweak in this PR that may affect that first failure. LMK if we should do a separate pull just for that. Looking at the second one. |
@koeninger Thank you :) |
Are those failures repeatable for those particular hadoop versions, or is this an intermittent thing? May need to look at redoing how kafkatestutils is working, since it's interacting with ZK directly... |
@@ -36,7 +36,7 @@ import org.apache.spark.annotation.Experimental | |||
* @tparam V type of Kafka message value | |||
*/ | |||
@Experimental | |||
trait ConsumerStrategy[K, V] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a pointer to the ConsumerStrategies class in the docs for this class?
And similarly for LocationStrategies?
LGTM. Just minor points. Good to merge anytime, but would be good to have the test fixes. |
Added the doc links and my best guess on that first test failure. I think the second is going to take more work to figure out. Your call on whether you want to wait and see if I find something, or go ahead and merge this now. |
Let's merge this when test pass. @zsxwing I might not get a chance tonight,
|
Test build #61585 has finished for PR 13998 at commit
|
Jenkins, retest this please. |
Test build #61576 has finished for PR 13998 at commit
|
Seems this one was blocked forever:
|
Test build #61593 has finished for PR 13998 at commit
|
Test build #61596 has finished for PR 13998 at commit
|
Merging this to master and 2.0. Will keep monitoring the test flakiness. Thank you very much @koeninger |
## What changes were proposed in this pull request? This is an alternative to the refactoring proposed by #13996 ## How was this patch tested? unit tests also tested under scala 2.10 via mvn -Dscala-2.10 Author: cody koeninger <cody@koeninger.org> Closes #13998 from koeninger/kafka-0-10-refactor. (cherry picked from commit fbfd0ab) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@koeninger Could you take a look at this test flakiness in |
Sure I'll give it another look On Sep 26, 2016 3:46 PM, "Tathagata Das" notifications@github.com wrote:
|
I ran that test 100 times locally w/out error... you have any suggestions On Mon, Sep 26, 2016 at 6:40 PM, Cody Koeninger cody@koeninger.org wrote:
|
### What changes were proposed in this pull request? The pr aims to upgrade `netty` from `4.1.109.Final` to `4.1.110.Final`. ### Why are the changes needed? - https://netty.io/news/2024/05/22/4-1-110-Final.html This version has brought some bug fixes and improvements, such as: Fix Zstd throws Exception on read-only volumes (netty/netty#13982) Add unix domain socket transport in netty 4.x via JDK16+ ([#13965](netty/netty#13965)) Backport #13075: Add the AdaptivePoolingAllocator ([#13976](netty/netty#13976)) Add no-value key handling only for form body ([#13998](netty/netty#13998)) Add support for specifying SecureRandom in SSLContext initialization ([#14058](netty/netty#14058)) - https://github.com/netty/netty/issues?q=milestone%3A4.1.110.Final+is%3Aclosed ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46744 from panbingkun/SPARK-48420. Authored-by: panbingkun <panbingkun@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
### What changes were proposed in this pull request? The pr aims to upgrade `netty` from `4.1.109.Final` to `4.1.110.Final`. ### Why are the changes needed? - https://netty.io/news/2024/05/22/4-1-110-Final.html This version has brought some bug fixes and improvements, such as: Fix Zstd throws Exception on read-only volumes (netty/netty#13982) Add unix domain socket transport in netty 4.x via JDK16+ ([apache#13965](netty/netty#13965)) Backport apache#13075: Add the AdaptivePoolingAllocator ([apache#13976](netty/netty#13976)) Add no-value key handling only for form body ([apache#13998](netty/netty#13998)) Add support for specifying SecureRandom in SSLContext initialization ([apache#14058](netty/netty#14058)) - https://github.com/netty/netty/issues?q=milestone%3A4.1.110.Final+is%3Aclosed ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#46744 from panbingkun/SPARK-48420. Authored-by: panbingkun <panbingkun@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
The pr aims to upgrade `netty` from `4.1.109.Final` to `4.1.110.Final`. - https://netty.io/news/2024/05/22/4-1-110-Final.html This version has brought some bug fixes and improvements, such as: Fix Zstd throws Exception on read-only volumes (netty/netty#13982) Add unix domain socket transport in netty 4.x via JDK16+ ([apache#13965](netty/netty#13965)) Backport apache#13075: Add the AdaptivePoolingAllocator ([apache#13976](netty/netty#13976)) Add no-value key handling only for form body ([apache#13998](netty/netty#13998)) Add support for specifying SecureRandom in SSLContext initialization ([apache#14058](netty/netty#14058)) - https://github.com/netty/netty/issues?q=milestone%3A4.1.110.Final+is%3Aclosed No. Pass GA. No. Closes apache#46744 from panbingkun/SPARK-48420. Authored-by: panbingkun <panbingkun@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
What changes were proposed in this pull request?
This is an alternative to the refactoring proposed by #13996
How was this patch tested?
unit tests
also tested under scala 2.10 via
mvn -Dscala-2.10