Skip to content

Commit fbfd0ab

Browse files
koeningertdas
authored andcommitted
[SPARK-12177][STREAMING][KAFKA] limit api surface area
## What changes were proposed in this pull request? This is an alternative to the refactoring proposed by #13996 ## How was this patch tested? unit tests also tested under scala 2.10 via mvn -Dscala-2.10 Author: cody koeninger <cody@koeninger.org> Closes #13998 from koeninger/kafka-0-10-refactor.
1 parent 14cf61e commit fbfd0ab

File tree

13 files changed

+222
-193
lines changed

13 files changed

+222
-193
lines changed

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala

Lines changed: 90 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.streaming.kafka010
1919

20-
import java.{ util => ju }
20+
import java.{ lang => jl, util => ju }
2121

2222
import scala.collection.JavaConverters._
2323

@@ -30,15 +30,16 @@ import org.apache.spark.annotation.Experimental
3030
/**
3131
* :: Experimental ::
3232
* Choice of how to create and configure underlying Kafka Consumers on driver and executors.
33+
* See [[ConsumerStrategies]] to obtain instances.
3334
* Kafka 0.10 consumers can require additional, sometimes complex, setup after object
3435
* instantiation. This interface encapsulates that process, and allows it to be checkpointed.
3536
* @tparam K type of Kafka message key
3637
* @tparam V type of Kafka message value
3738
*/
3839
@Experimental
39-
trait ConsumerStrategy[K, V] {
40+
abstract class ConsumerStrategy[K, V] {
4041
/**
41-
* Kafka <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
42+
* Kafka <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
4243
* configuration parameters</a> to be used on executors. Requires "bootstrap.servers" to be set
4344
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
4445
*/
@@ -51,15 +52,14 @@ trait ConsumerStrategy[K, V] {
5152
* has successfully read. Will be empty on initial start, possibly non-empty on restart from
5253
* checkpoint.
5354
*/
54-
def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
55+
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]
5556
}
5657

5758
/**
58-
* :: Experimental ::
5959
* Subscribe to a collection of topics.
6060
* @param topics collection of topics to subscribe
6161
* @param kafkaParams Kafka
62-
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
62+
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
6363
* configuration parameters</a> to be used on driver. The same params will be used on executors,
6464
* with minor automatic modifications applied.
6565
* Requires "bootstrap.servers" to be set
@@ -68,16 +68,15 @@ trait ConsumerStrategy[K, V] {
6868
* TopicPartition, the committed offset (if applicable) or kafka param
6969
* auto.offset.reset will be used.
7070
*/
71-
@Experimental
72-
case class Subscribe[K, V] private(
73-
topics: ju.Collection[java.lang.String],
71+
private case class Subscribe[K, V](
72+
topics: ju.Collection[jl.String],
7473
kafkaParams: ju.Map[String, Object],
75-
offsets: ju.Map[TopicPartition, Long]
74+
offsets: ju.Map[TopicPartition, jl.Long]
7675
) extends ConsumerStrategy[K, V] {
7776

7877
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
7978

80-
def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
79+
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
8180
val consumer = new KafkaConsumer[K, V](kafkaParams)
8281
consumer.subscribe(topics)
8382
if (currentOffsets.isEmpty) {
@@ -90,18 +89,52 @@ case class Subscribe[K, V] private(
9089
}
9190
}
9291

92+
/**
93+
* Assign a fixed collection of TopicPartitions
94+
* @param topicPartitions collection of TopicPartitions to assign
95+
* @param kafkaParams Kafka
96+
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
97+
* configuration parameters</a> to be used on driver. The same params will be used on executors,
98+
* with minor automatic modifications applied.
99+
* Requires "bootstrap.servers" to be set
100+
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
101+
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
102+
* TopicPartition, the committed offset (if applicable) or kafka param
103+
* auto.offset.reset will be used.
104+
*/
105+
private case class Assign[K, V](
106+
topicPartitions: ju.Collection[TopicPartition],
107+
kafkaParams: ju.Map[String, Object],
108+
offsets: ju.Map[TopicPartition, jl.Long]
109+
) extends ConsumerStrategy[K, V] {
110+
111+
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
112+
113+
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
114+
val consumer = new KafkaConsumer[K, V](kafkaParams)
115+
consumer.assign(topicPartitions)
116+
if (currentOffsets.isEmpty) {
117+
offsets.asScala.foreach { case (topicPartition, offset) =>
118+
consumer.seek(topicPartition, offset)
119+
}
120+
}
121+
122+
consumer
123+
}
124+
}
125+
93126
/**
94127
* :: Experimental ::
95-
* Companion object for creating [[Subscribe]] strategy
128+
* object for obtaining instances of [[ConsumerStrategy]]
96129
*/
97130
@Experimental
98-
object Subscribe {
131+
object ConsumerStrategies {
99132
/**
100133
* :: Experimental ::
101134
* Subscribe to a collection of topics.
102135
* @param topics collection of topics to subscribe
103136
* @param kafkaParams Kafka
104-
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
137+
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
105138
* configuration parameters</a> to be used on driver. The same params will be used on executors,
106139
* with minor automatic modifications applied.
107140
* Requires "bootstrap.servers" to be set
@@ -111,43 +144,43 @@ object Subscribe {
111144
* auto.offset.reset will be used.
112145
*/
113146
@Experimental
114-
def apply[K, V](
115-
topics: Iterable[java.lang.String],
147+
def Subscribe[K, V](
148+
topics: Iterable[jl.String],
116149
kafkaParams: collection.Map[String, Object],
117-
offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = {
118-
Subscribe[K, V](
150+
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
151+
new Subscribe[K, V](
119152
new ju.ArrayList(topics.asJavaCollection),
120153
new ju.HashMap[String, Object](kafkaParams.asJava),
121-
new ju.HashMap[TopicPartition, Long](offsets.asJava))
154+
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
122155
}
123156

124157
/**
125158
* :: Experimental ::
126159
* Subscribe to a collection of topics.
127160
* @param topics collection of topics to subscribe
128161
* @param kafkaParams Kafka
129-
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
162+
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
130163
* configuration parameters</a> to be used on driver. The same params will be used on executors,
131164
* with minor automatic modifications applied.
132165
* Requires "bootstrap.servers" to be set
133166
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
134167
*/
135168
@Experimental
136-
def apply[K, V](
137-
topics: Iterable[java.lang.String],
138-
kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = {
139-
Subscribe[K, V](
169+
def Subscribe[K, V](
170+
topics: Iterable[jl.String],
171+
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
172+
new Subscribe[K, V](
140173
new ju.ArrayList(topics.asJavaCollection),
141174
new ju.HashMap[String, Object](kafkaParams.asJava),
142-
ju.Collections.emptyMap[TopicPartition, Long]())
175+
ju.Collections.emptyMap[TopicPartition, jl.Long]())
143176
}
144177

145178
/**
146179
* :: Experimental ::
147180
* Subscribe to a collection of topics.
148181
* @param topics collection of topics to subscribe
149182
* @param kafkaParams Kafka
150-
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
183+
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
151184
* configuration parameters</a> to be used on driver. The same params will be used on executors,
152185
* with minor automatic modifications applied.
153186
* Requires "bootstrap.servers" to be set
@@ -157,81 +190,37 @@ object Subscribe {
157190
* auto.offset.reset will be used.
158191
*/
159192
@Experimental
160-
def create[K, V](
161-
topics: ju.Collection[java.lang.String],
193+
def Subscribe[K, V](
194+
topics: ju.Collection[jl.String],
162195
kafkaParams: ju.Map[String, Object],
163-
offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = {
164-
Subscribe[K, V](topics, kafkaParams, offsets)
196+
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
197+
new Subscribe[K, V](topics, kafkaParams, offsets)
165198
}
166199

167200
/**
168201
* :: Experimental ::
169202
* Subscribe to a collection of topics.
170203
* @param topics collection of topics to subscribe
171204
* @param kafkaParams Kafka
172-
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
205+
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
173206
* configuration parameters</a> to be used on driver. The same params will be used on executors,
174207
* with minor automatic modifications applied.
175208
* Requires "bootstrap.servers" to be set
176209
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
177210
*/
178211
@Experimental
179-
def create[K, V](
180-
topics: ju.Collection[java.lang.String],
181-
kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = {
182-
Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
183-
}
184-
185-
}
186-
187-
/**
188-
* :: Experimental ::
189-
* Assign a fixed collection of TopicPartitions
190-
* @param topicPartitions collection of TopicPartitions to assign
191-
* @param kafkaParams Kafka
192-
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
193-
* configuration parameters</a> to be used on driver. The same params will be used on executors,
194-
* with minor automatic modifications applied.
195-
* Requires "bootstrap.servers" to be set
196-
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
197-
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
198-
* TopicPartition, the committed offset (if applicable) or kafka param
199-
* auto.offset.reset will be used.
200-
*/
201-
@Experimental
202-
case class Assign[K, V] private(
203-
topicPartitions: ju.Collection[TopicPartition],
204-
kafkaParams: ju.Map[String, Object],
205-
offsets: ju.Map[TopicPartition, Long]
206-
) extends ConsumerStrategy[K, V] {
207-
208-
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
209-
210-
def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
211-
val consumer = new KafkaConsumer[K, V](kafkaParams)
212-
consumer.assign(topicPartitions)
213-
if (currentOffsets.isEmpty) {
214-
offsets.asScala.foreach { case (topicPartition, offset) =>
215-
consumer.seek(topicPartition, offset)
216-
}
217-
}
218-
219-
consumer
212+
def Subscribe[K, V](
213+
topics: ju.Collection[jl.String],
214+
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
215+
new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())
220216
}
221-
}
222217

223-
/**
224-
* :: Experimental ::
225-
* Companion object for creating [[Assign]] strategy
226-
*/
227-
@Experimental
228-
object Assign {
229218
/**
230219
* :: Experimental ::
231220
* Assign a fixed collection of TopicPartitions
232221
* @param topicPartitions collection of TopicPartitions to assign
233222
* @param kafkaParams Kafka
234-
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
223+
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
235224
* configuration parameters</a> to be used on driver. The same params will be used on executors,
236225
* with minor automatic modifications applied.
237226
* Requires "bootstrap.servers" to be set
@@ -241,43 +230,43 @@ object Assign {
241230
* auto.offset.reset will be used.
242231
*/
243232
@Experimental
244-
def apply[K, V](
233+
def Assign[K, V](
245234
topicPartitions: Iterable[TopicPartition],
246235
kafkaParams: collection.Map[String, Object],
247-
offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = {
248-
Assign[K, V](
236+
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
237+
new Assign[K, V](
249238
new ju.ArrayList(topicPartitions.asJavaCollection),
250239
new ju.HashMap[String, Object](kafkaParams.asJava),
251-
new ju.HashMap[TopicPartition, Long](offsets.asJava))
240+
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
252241
}
253242

254243
/**
255244
* :: Experimental ::
256245
* Assign a fixed collection of TopicPartitions
257246
* @param topicPartitions collection of TopicPartitions to assign
258247
* @param kafkaParams Kafka
259-
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
248+
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
260249
* configuration parameters</a> to be used on driver. The same params will be used on executors,
261250
* with minor automatic modifications applied.
262251
* Requires "bootstrap.servers" to be set
263252
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
264253
*/
265254
@Experimental
266-
def apply[K, V](
255+
def Assign[K, V](
267256
topicPartitions: Iterable[TopicPartition],
268-
kafkaParams: collection.Map[String, Object]): Assign[K, V] = {
269-
Assign[K, V](
257+
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
258+
new Assign[K, V](
270259
new ju.ArrayList(topicPartitions.asJavaCollection),
271260
new ju.HashMap[String, Object](kafkaParams.asJava),
272-
ju.Collections.emptyMap[TopicPartition, Long]())
261+
ju.Collections.emptyMap[TopicPartition, jl.Long]())
273262
}
274263

275264
/**
276265
* :: Experimental ::
277266
* Assign a fixed collection of TopicPartitions
278267
* @param topicPartitions collection of TopicPartitions to assign
279268
* @param kafkaParams Kafka
280-
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
269+
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
281270
* configuration parameters</a> to be used on driver. The same params will be used on executors,
282271
* with minor automatic modifications applied.
283272
* Requires "bootstrap.servers" to be set
@@ -287,28 +276,32 @@ object Assign {
287276
* auto.offset.reset will be used.
288277
*/
289278
@Experimental
290-
def create[K, V](
279+
def Assign[K, V](
291280
topicPartitions: ju.Collection[TopicPartition],
292281
kafkaParams: ju.Map[String, Object],
293-
offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = {
294-
Assign[K, V](topicPartitions, kafkaParams, offsets)
282+
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
283+
new Assign[K, V](topicPartitions, kafkaParams, offsets)
295284
}
296285

297286
/**
298287
* :: Experimental ::
299288
* Assign a fixed collection of TopicPartitions
300289
* @param topicPartitions collection of TopicPartitions to assign
301290
* @param kafkaParams Kafka
302-
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
291+
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
303292
* configuration parameters</a> to be used on driver. The same params will be used on executors,
304293
* with minor automatic modifications applied.
305294
* Requires "bootstrap.servers" to be set
306295
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
307296
*/
308297
@Experimental
309-
def create[K, V](
298+
def Assign[K, V](
310299
topicPartitions: ju.Collection[TopicPartition],
311-
kafkaParams: ju.Map[String, Object]): Assign[K, V] = {
312-
Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
300+
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
301+
new Assign[K, V](
302+
topicPartitions,
303+
kafkaParams,
304+
ju.Collections.emptyMap[TopicPartition, jl.Long]())
313305
}
306+
314307
}

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
7171
@transient private var kc: Consumer[K, V] = null
7272
def consumer(): Consumer[K, V] = this.synchronized {
7373
if (null == kc) {
74-
kc = consumerStrategy.onStart(currentOffsets)
74+
kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava)
7575
}
7676
kc
7777
}

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.storage.StorageLevel
3636
* Starting and ending offsets are specified in advance,
3737
* so that you can control exactly-once semantics.
3838
* @param kafkaParams Kafka
39-
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
39+
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
4040
* configuration parameters</a>. Requires "bootstrap.servers" to be set
4141
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
4242
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
@@ -66,7 +66,7 @@ private[spark] class KafkaRDD[K, V](
6666
" must be set to false for executor kafka params, else offsets may commit before processing")
6767

6868
// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
69-
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
69+
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
7070
private val cacheInitialCapacity =
7171
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
7272
private val cacheMaxCapacity =

0 commit comments

Comments
 (0)