Skip to content

Commit a25855a

Browse files
committed
This closes #1900
2 parents bdcd26c + b83744f commit a25855a

File tree

12 files changed

+823
-1
lines changed

12 files changed

+823
-1
lines changed

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
<beamSurefireArgline />
102102

103103
<!-- If updating dependencies, please update any relevant javadoc offlineLinks -->
104+
<apache.commons.lang.version>3.5</apache.commons.lang.version>
104105
<apex.kryo.version>2.24.0</apex.kryo.version>
105106
<avro.version>1.8.1</avro.version>
106107
<bigquery.version>v2-rev295-1.22.0</bigquery.version>
@@ -458,6 +459,12 @@
458459
<version>${project.version}</version>
459460
</dependency>
460461

462+
<dependency>
463+
<groupId>org.apache.commons</groupId>
464+
<artifactId>commons-lang3</artifactId>
465+
<version>${apache.commons.lang.version}</version>
466+
</dependency>
467+
461468
<dependency>
462469
<groupId>io.grpc</groupId>
463470
<artifactId>grpc-all</artifactId>

runners/apex/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@
9696
</exclusions>
9797
</dependency>
9898

99+
<dependency>
100+
<groupId>org.apache.commons</groupId>
101+
<artifactId>commons-lang3</artifactId>
102+
</dependency>
103+
99104
<dependency>
100105
<!-- javax.annotation.Nullable -->
101106
<groupId>com.google.code.findbugs</groupId>

sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,24 @@
166166
<!-- Intentionally overriding parent name because inheritors should replace the parent. -->
167167
</Match>
168168

169+
<Match>
170+
<Class name="org.apache.beam.sdk.io.LocalResourceId"/>
171+
<Method name="getCurrentDirectory" />
172+
<Bug pattern="NP_NULL_PARAM_DEREF"/>
173+
<!--
174+
Path.getParent() could return null. However, we check the returned Path is not null.
175+
-->
176+
</Match>
177+
178+
<Match>
179+
<Class name="org.apache.beam.sdk.io.gcp.storage.GcsResourceId"/>
180+
<Method name="getCurrentDirectory" />
181+
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
182+
<!--
183+
GcsPath.getParent() could return null. However, we check the returned Path is not null.
184+
-->
185+
</Match>
186+
169187
<Match>
170188
<Class name="org.apache.beam.sdk.util.ZipFiles"/>
171189
<Method name="zipDirectory" />

sdks/java/core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,11 @@
371371
<version>1.9</version>
372372
</dependency>
373373

374+
<dependency>
375+
<groupId>org.apache.commons</groupId>
376+
<artifactId>commons-lang3</artifactId>
377+
</dependency>
378+
374379
<dependency>
375380
<groupId>joda-time</groupId>
376381
<artifactId>joda-time</artifactId>
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io;
19+
20+
import static com.google.common.base.Preconditions.checkArgument;
21+
import static com.google.common.base.Preconditions.checkNotNull;
22+
import static com.google.common.base.Preconditions.checkState;
23+
24+
import java.nio.file.Path;
25+
import java.nio.file.Paths;
26+
import java.util.Objects;
27+
import java.util.UUID;
28+
import org.apache.beam.sdk.io.fs.ResolveOptions;
29+
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
30+
import org.apache.beam.sdk.io.fs.ResourceId;
31+
import org.apache.commons.lang3.SystemUtils;
32+
33+
/**
34+
* {@link ResourceId} implementation for local files.
35+
*/
36+
class LocalResourceId implements ResourceId {
37+
38+
private final Path path;
39+
private final boolean isDirectory;
40+
41+
static LocalResourceId fromPath(Path path, boolean isDirectory) {
42+
checkNotNull(path, "path");
43+
return new LocalResourceId(path, isDirectory);
44+
}
45+
46+
private LocalResourceId(Path path, boolean isDirectory) {
47+
this.path = path.normalize();
48+
this.isDirectory = isDirectory;
49+
}
50+
51+
@Override
52+
public ResourceId resolve(String other, ResolveOptions resolveOptions) {
53+
checkState(
54+
isDirectory,
55+
String.format("Expected the path is a directory, but had [%s].", path));
56+
checkArgument(
57+
resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)
58+
|| resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY),
59+
String.format("ResolveOptions: [%s] is not supported.", resolveOptions));
60+
checkArgument(
61+
!(resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)
62+
&& other.endsWith("/")),
63+
"The resolved file: [%s] should not end with '/'.", other);
64+
if (SystemUtils.IS_OS_WINDOWS) {
65+
return resolveLocalPathWindowsOS(other, resolveOptions);
66+
} else {
67+
return resolveLocalPath(other, resolveOptions);
68+
}
69+
}
70+
71+
@Override
72+
public ResourceId getCurrentDirectory() {
73+
if (isDirectory) {
74+
return this;
75+
} else {
76+
Path parent = path.getParent();
77+
if (parent == null && path.getNameCount() == 1) {
78+
parent = Paths.get(".");
79+
}
80+
checkState(
81+
parent != null,
82+
String.format("Failed to get the current directory for path: [%s].", path));
83+
return fromPath(
84+
parent,
85+
true /* isDirectory */);
86+
}
87+
}
88+
89+
private LocalResourceId resolveLocalPath(String other, ResolveOptions resolveOptions) {
90+
return new LocalResourceId(
91+
path.resolve(other),
92+
resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY));
93+
}
94+
95+
private LocalResourceId resolveLocalPathWindowsOS(String other, ResolveOptions resolveOptions) {
96+
String uuid = UUID.randomUUID().toString();
97+
Path pathAsterisksReplaced = Paths.get(path.toString().replaceAll("\\*", uuid));
98+
String otherAsterisksReplaced = other.replaceAll("\\*", uuid);
99+
100+
return new LocalResourceId(
101+
Paths.get(
102+
pathAsterisksReplaced.resolve(otherAsterisksReplaced)
103+
.toString()
104+
.replaceAll(uuid, "\\*")),
105+
resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY));
106+
}
107+
108+
@Override
109+
public String getScheme() {
110+
return LocalFileSystemRegistrar.LOCAL_FILE_SCHEME;
111+
}
112+
113+
@Override
114+
public String toString() {
115+
return String.format("LocalResourceId: [%s]", path);
116+
}
117+
118+
@Override
119+
public boolean equals(Object obj) {
120+
if (!(obj instanceof LocalResourceId)) {
121+
return false;
122+
}
123+
LocalResourceId other = (LocalResourceId) obj;
124+
return this.path.equals(other.path)
125+
&& this.isDirectory == other.isDirectory;
126+
}
127+
128+
@Override
129+
public int hashCode() {
130+
return Objects.hash(path, isDirectory);
131+
}
132+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.fs;
19+
20+
/**
21+
* An object that configures {@link ResourceId#resolve}.
22+
*/
23+
public interface ResolveOptions {
24+
25+
/**
26+
* Defines the standard resolve options.
27+
*/
28+
enum StandardResolveOptions implements ResolveOptions {
29+
/**
30+
* Resolve a file.
31+
*/
32+
RESOLVE_FILE,
33+
34+
/**
35+
* Resolve a directory.
36+
*
37+
* <p>This requires {@link ResourceId} implementation to append a delimiter.
38+
*/
39+
RESOLVE_DIRECTORY,
40+
}
41+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.fs;
19+
20+
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
21+
22+
/**
23+
* An identifier which represents a file-like resource.
24+
*
25+
* <p>{@link ResourceId} is hierarchical and composed of a sequence of directory
26+
* and file name elements separated by a special separator or delimiter.
27+
*
28+
* <p>TODO: add examples for how ResourceId is constructed and used.
29+
*/
30+
public interface ResourceId {
31+
32+
/**
33+
* Returns a child {@code ResourceId} under {@code this}.
34+
*
35+
* <p>In order to write file system agnostic code, callers should not include delimiters
36+
* in {@code other}, and should use {@link StandardResolveOptions} to specify
37+
* whether to resolve a file or a directory.
38+
*
39+
* <p>For example:
40+
*
41+
* <pre>{@code
42+
* ResourceId homeDir = ...;
43+
* ResourceId tempOutput = homeDir
44+
* .resolve("tempDir", StandardResolveOptions.RESOLVE_DIRECTORY)
45+
* .resolve("output", StandardResolveOptions.RESOLVE_FILE);
46+
* }</pre>
47+
*
48+
* <p>This {@link ResourceId} should represents a directory.
49+
*
50+
* <p>It is up to each file system to resolve in their own way.
51+
*
52+
* <p>Resolving special characters:
53+
* <ul>
54+
* <li>{@code resourceId.resolve("..", StandardResolveOptions.RESOLVE_DIRECTORY)} returns
55+
* the parent directory of this {@code ResourceId}.
56+
* <li>{@code resourceId.resolve("{@literal *}", StandardResolveOptions.RESOLVE_FILE)} returns
57+
* a {@code ResourceId} which matches all files in this {@code ResourceId}.
58+
* <li>{@code resourceId.resolve("{@literal *}", StandardResolveOptions.RESOLVE_DIRECTORY)}
59+
* returns a {@code ResourceId} which matches all directories in this {@code ResourceId}.
60+
* </ul>
61+
*
62+
* @throws IllegalStateException if this {@link ResourceId} is not a directory.
63+
*
64+
* @throws IllegalArgumentException if {@code other} contains illegal characters
65+
* or is an illegal name. It is recommended that callers use common characters,
66+
* such as {@code [_a-zA-Z0-9.-]}, in {@code other}.
67+
*/
68+
ResourceId resolve(String other, ResolveOptions resolveOptions);
69+
70+
/**
71+
* Returns the {@code ResourceId} that represents the current directory of
72+
* this {@code ResourceId}.
73+
*
74+
* <p>If it is already a directory, trivially returns this.
75+
*/
76+
ResourceId getCurrentDirectory();
77+
78+
/**
79+
* Get the scheme which defines the namespace of the {@link ResourceId}.
80+
*
81+
* <p>The scheme is required to follow URI scheme syntax. See
82+
* <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
83+
*/
84+
String getScheme();
85+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
/**
20+
* Apache Beam FileSystem interfaces and their default implementations.
21+
*/
22+
package org.apache.beam.sdk.io.fs;

0 commit comments

Comments
 (0)