Skip to content

Commit a39960b

Browse files
committed
This closes #3053
2 parents 03a7f92 + 8cd98bd commit a39960b

File tree

68 files changed

+706
-482
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+706
-482
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricFiltering.java renamed to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricFiltering.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.beam.sdk.metrics;
19+
package org.apache.beam.runners.core.metrics;
2020

2121
import com.google.common.base.Objects;
2222
import java.util.Set;
23+
import org.apache.beam.sdk.metrics.MetricName;
24+
import org.apache.beam.sdk.metrics.MetricNameFilter;
25+
import org.apache.beam.sdk.metrics.MetricsFilter;
2326

2427
/**
2528
* Implements matching for metrics filters. Specifically, matching for metric name,

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java renamed to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/metrics/MetricKey.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.beam.sdk.metrics;
18+
19+
package org.apache.beam.runners.core.metrics;
1920

2021
import com.google.auto.value.AutoValue;
2122
import java.io.Serializable;
2223
import org.apache.beam.sdk.annotations.Experimental;
2324
import org.apache.beam.sdk.annotations.Experimental.Kind;
25+
import org.apache.beam.sdk.metrics.MetricName;
2426

2527
/**
2628
* Metrics are keyed by the step name they are associated with and the name of the metric.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
/**
20+
* Utilities for runners to implement metrics.
21+
*/
22+
package org.apache.beam.runners.core.metrics;

sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricFilteringTest.java renamed to runners/core-construction-java/src/test/java/org/apache/beam/runners/core/metrics/MetricFilteringTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.beam.sdk.metrics;
19+
package org.apache.beam.runners.core.metrics;
2020

2121
import static org.junit.Assert.assertFalse;
2222
import static org.junit.Assert.assertTrue;
2323

2424
import java.util.HashSet;
2525
import java.util.Set;
26+
import org.apache.beam.sdk.metrics.MetricName;
27+
import org.apache.beam.sdk.metrics.MetricNameFilter;
28+
import org.apache.beam.sdk.metrics.MetricsFilter;
2629
import org.junit.Test;
2730
import org.junit.runner.RunWith;
2831
import org.junit.runners.JUnit4;

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import com.google.common.collect.FluentIterable;
2323
import com.google.common.collect.Iterables;
2424
import javax.annotation.Nullable;
25-
import org.apache.beam.sdk.metrics.CounterCell;
25+
import org.apache.beam.runners.core.metrics.CounterCell;
2626
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2727
import org.apache.beam.sdk.util.WindowTracing;
2828
import org.apache.beam.sdk.util.WindowedValue;
@@ -71,7 +71,7 @@ public boolean apply(@Nullable WindowedValue<V> input) {
7171
.isBefore(timerInternals.currentInputWatermarkTime());
7272
if (expired) {
7373
// The element is too late for this window.
74-
droppedDueToLateness.update(1L);
74+
droppedDueToLateness.inc();
7575
WindowTracing.debug(
7676
"GroupAlsoByWindow: Dropping element at {} for key: {}; "
7777
+ "window: {} since it is too far behind inputWatermark: {}",

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java renamed to runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/CounterCell.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.beam.sdk.metrics;
18+
19+
package org.apache.beam.runners.core.metrics;
1920

2021
import java.util.concurrent.atomic.AtomicLong;
2122
import org.apache.beam.sdk.annotations.Experimental;
2223
import org.apache.beam.sdk.annotations.Experimental.Kind;
24+
import org.apache.beam.sdk.metrics.Counter;
25+
import org.apache.beam.sdk.metrics.MetricName;
2326

2427
/**
2528
* Tracks the current value (and delta) for a Counter metric for a specific context and bundle.
@@ -30,34 +33,40 @@
3033
* indirection.
3134
*/
3235
@Experimental(Kind.METRICS)
33-
public class CounterCell implements MetricCell<Counter, Long> {
36+
public class CounterCell implements Counter, MetricCell<Long> {
3437

3538
private final DirtyState dirty = new DirtyState();
3639
private final AtomicLong value = new AtomicLong();
40+
private final MetricName name;
3741

3842
/**
3943
* Package-visibility because all {@link CounterCell CounterCells} should be created by
40-
* {@link MetricsContainer#getCounter(MetricName)}.
44+
* {@link MetricsContainerImpl#getCounter(MetricName)}.
4145
*/
42-
CounterCell() {}
46+
CounterCell(MetricName name) {
47+
this.name = name;
48+
}
4349

4450
/**
4551
* Increment the counter by the given amount.
4652
* @param n value to increment by. Can be negative to decrement.
4753
*/
48-
public void update(long n) {
54+
@Override
55+
public void inc(long n) {
4956
value.addAndGet(n);
5057
dirty.afterModification();
5158
}
5259

53-
@Override
54-
public void update(Long n) {
55-
throw new UnsupportedOperationException("CounterCell.update(Long n) should not be used"
56-
+ " as it performs unnecessary boxing/unboxing. Use CounterCell.update(long n) instead.");
60+
public void inc() {
61+
inc(1);
5762
}
5863

59-
@Override public void update(MetricCell<Counter, Long> other) {
60-
update((long) other.getCumulative());
64+
public void dec() {
65+
inc(-1);
66+
}
67+
68+
public void dec(long n) {
69+
inc(-1 * n);
6170
}
6271

6372
@Override
@@ -69,4 +78,9 @@ public DirtyState getDirty() {
6978
public Long getCumulative() {
7079
return value.get();
7180
}
81+
82+
@Override
83+
public MetricName getName() {
84+
return name;
85+
}
7286
}

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java renamed to runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DirtyState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.beam.sdk.metrics;
19+
package org.apache.beam.runners.core.metrics;
2020

2121
import java.io.Serializable;
2222
import java.util.concurrent.atomic.AtomicReference;

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java renamed to runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.beam.sdk.metrics;
18+
19+
package org.apache.beam.runners.core.metrics;
1920

2021
import java.util.concurrent.atomic.AtomicReference;
2122
import org.apache.beam.sdk.annotations.Experimental;
2223
import org.apache.beam.sdk.annotations.Experimental.Kind;
24+
import org.apache.beam.sdk.metrics.Distribution;
25+
import org.apache.beam.sdk.metrics.MetricName;
2326

2427
/**
2528
* Tracks the current value (and delta) for a Distribution metric.
@@ -30,37 +33,35 @@
3033
* of indirection.
3134
*/
3235
@Experimental(Kind.METRICS)
33-
public class DistributionCell implements MetricCell<Distribution, DistributionData> {
36+
public class DistributionCell implements Distribution, MetricCell<DistributionData> {
3437

3538
private final DirtyState dirty = new DirtyState();
3639
private final AtomicReference<DistributionData> value =
3740
new AtomicReference<>(DistributionData.EMPTY);
41+
private final MetricName name;
3842

3943
/**
4044
* Package-visibility because all {@link DistributionCell DistributionCells} should be created by
41-
* {@link MetricsContainer#getDistribution(MetricName)}.
45+
* {@link MetricsContainerImpl#getDistribution(MetricName)}.
4246
*/
43-
DistributionCell() {}
47+
DistributionCell(MetricName name) {
48+
this.name = name;
49+
}
4450

4551
/** Increment the distribution by the given amount. */
52+
@Override
4653
public void update(long n) {
4754
update(DistributionData.singleton(n));
4855
}
4956

50-
@Override
51-
public void update(DistributionData data) {
57+
void update(DistributionData data) {
5258
DistributionData original;
5359
do {
5460
original = value.get();
5561
} while (!value.compareAndSet(original, original.combine(data)));
5662
dirty.afterModification();
5763
}
5864

59-
@Override
60-
public void update(MetricCell<Distribution, DistributionData> other) {
61-
update(other.getCumulative());
62-
}
63-
6465
@Override
6566
public DirtyState getDirty() {
6667
return dirty;
@@ -70,5 +71,10 @@ public DirtyState getDirty() {
7071
public DistributionData getCumulative() {
7172
return value.get();
7273
}
74+
75+
@Override
76+
public MetricName getName() {
77+
return name;
78+
}
7379
}
7480

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java renamed to runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionData.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.beam.sdk.metrics;
18+
19+
package org.apache.beam.runners.core.metrics;
1920

2021
import com.google.auto.value.AutoValue;
2122
import java.io.Serializable;
23+
import org.apache.beam.sdk.metrics.DistributionResult;
2224

2325
/**
2426
* Data describing the the distribution. This should retain enough detail that it can be combined

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java renamed to runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GaugeCell.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.beam.sdk.metrics;
18+
19+
package org.apache.beam.runners.core.metrics;
1920

2021
import java.util.concurrent.atomic.AtomicReference;
2122
import org.apache.beam.sdk.annotations.Experimental;
23+
import org.apache.beam.sdk.metrics.Gauge;
24+
import org.apache.beam.sdk.metrics.MetricName;
2225

2326
/**
2427
* Tracks the current value (and delta) for a {@link Gauge} metric.
@@ -29,35 +32,34 @@
2932
* of indirection.
3033
*/
3134
@Experimental(Experimental.Kind.METRICS)
32-
public class GaugeCell implements MetricCell<Gauge, GaugeData> {
35+
public class GaugeCell implements Gauge, MetricCell<GaugeData> {
3336

3437
private final DirtyState dirty = new DirtyState();
3538
private final AtomicReference<GaugeData> gaugeValue = new AtomicReference<>(GaugeData.empty());
39+
private final MetricName name;
40+
41+
/**
42+
* Package-visibility because all {@link GaugeCell GaugeCells} should be created by
43+
* {@link MetricsContainerImpl#getGauge(MetricName)}.
44+
*/
45+
GaugeCell(MetricName name) {
46+
this.name = name;
47+
}
3648

3749
/** Set the gauge to the given value. */
50+
@Override
3851
public void set(long value) {
3952
update(GaugeData.create(value));
4053
}
4154

42-
@Override
43-
public void update(GaugeData data) {
55+
void update(GaugeData data) {
4456
GaugeData original;
4557
do {
4658
original = gaugeValue.get();
4759
} while (!gaugeValue.compareAndSet(original, original.combine(data)));
4860
dirty.afterModification();
4961
}
5062

51-
@Override
52-
public void update(MetricCell<Gauge, GaugeData> other) {
53-
GaugeData original;
54-
do {
55-
original = gaugeValue.get();
56-
} while (!gaugeValue.compareAndSet(original, original.combine(other.getCumulative())));
57-
dirty.afterModification();
58-
update(other.getCumulative());
59-
}
60-
6163
@Override
6264
public DirtyState getDirty() {
6365
return dirty;
@@ -67,4 +69,10 @@ public DirtyState getDirty() {
6769
public GaugeData getCumulative() {
6870
return gaugeValue.get();
6971
}
72+
73+
74+
@Override
75+
public MetricName getName() {
76+
return name;
77+
}
7078
}

0 commit comments

Comments
 (0)