Skip to content

Commit 15bd3a3

Browse files
committed
This closes #2866
2 parents 62ee275 + 316ff6b commit 15bd3a3

File tree

5 files changed

+63
-58
lines changed

5 files changed

+63
-58
lines changed

sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ protected List<MatchResult> match(List<String> specs) throws IOException {
7373
List<Boolean> isGlobBooleans = Lists.newArrayList();
7474

7575
for (GcsPath path : gcsPaths) {
76-
if (GcsUtil.isGlob(path)) {
76+
if (GcsUtil.isWildcard(path)) {
7777
globs.add(path);
7878
isGlobBooleans.add(true);
7979
} else {
@@ -178,8 +178,8 @@ public MatchResult apply(GcsPath gcsPath) {
178178
*/
179179
@VisibleForTesting
180180
MatchResult expand(GcsPath gcsPattern) throws IOException {
181-
String prefix = GcsUtil.getGlobPrefix(gcsPattern.getObject());
182-
Pattern p = Pattern.compile(GcsUtil.globToRegexp(gcsPattern.getObject()));
181+
String prefix = GcsUtil.getNonWildcardPrefix(gcsPattern.getObject());
182+
Pattern p = Pattern.compile(GcsUtil.wildcardToRegexp(gcsPattern.getObject()));
183183

184184
LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(),
185185
prefix, p.toString());

sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
2424
import org.apache.beam.sdk.io.fs.ResourceId;
2525
import org.apache.beam.sdk.options.PipelineOptions;
26-
import org.apache.beam.sdk.util.GcsUtil;
2726
import org.apache.beam.sdk.util.gcsfs.GcsPath;
2827

2928
/**
@@ -47,8 +46,7 @@ public static GcsPathValidator fromOptions(PipelineOptions options) {
4746
*/
4847
@Override
4948
public void validateInputFilePatternSupported(String filepattern) {
50-
GcsPath gcsPath = getGcsPath(filepattern);
51-
checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject()));
49+
getGcsPath(filepattern);
5250
verifyPath(filepattern);
5351
verifyPathIsAccessible(filepattern, "Could not find file %s");
5452
}

sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,6 @@ public static GcsUtil create(
125125
/** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */
126126
private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*");
127127

128-
private static final String RECURSIVE_WILDCARD = "[*]{2}";
129-
130-
/**
131-
* A {@link Pattern} for globs with a recursive wildcard.
132-
*/
133-
private static final Pattern RECURSIVE_GCS_PATTERN =
134-
Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*");
135-
136128
/**
137129
* Maximum number of requests permitted in a GCS batch request.
138130
*/
@@ -159,23 +151,10 @@ public static GcsUtil create(
159151
// Exposed for testing.
160152
final ExecutorService executorService;
161153

162-
/**
163-
* Returns true if the given GCS pattern is supported otherwise fails with an
164-
* exception.
165-
*/
166-
public static boolean isGcsPatternSupported(String gcsPattern) {
167-
if (RECURSIVE_GCS_PATTERN.matcher(gcsPattern).matches()) {
168-
throw new IllegalArgumentException("Unsupported wildcard usage in \"" + gcsPattern + "\": "
169-
+ " recursive wildcards are not supported.");
170-
}
171-
return true;
172-
}
173-
174154
/**
175155
* Returns the prefix portion of the glob that doesn't contain wildcards.
176156
*/
177-
public static String getGlobPrefix(String globExp) {
178-
checkArgument(isGcsPatternSupported(globExp));
157+
public static String getNonWildcardPrefix(String globExp) {
179158
Matcher m = GLOB_PREFIX.matcher(globExp);
180159
checkArgument(
181160
m.matches(),
@@ -189,15 +168,15 @@ public static String getGlobPrefix(String globExp) {
189168
* @param globExp the glob expression to expand
190169
* @return a string with the regular expression this glob expands to
191170
*/
192-
public static String globToRegexp(String globExp) {
171+
public static String wildcardToRegexp(String globExp) {
193172
StringBuilder dst = new StringBuilder();
194-
char[] src = globExp.toCharArray();
173+
char[] src = globExp.replace("**/*", "**").toCharArray();
195174
int i = 0;
196175
while (i < src.length) {
197176
char c = src[i++];
198177
switch (c) {
199178
case '*':
200-
dst.append("[^/]*");
179+
dst.append(".*");
201180
break;
202181
case '?':
203182
dst.append("[^/]");
@@ -226,9 +205,9 @@ public static String globToRegexp(String globExp) {
226205
}
227206

228207
/**
229-
* Returns true if the given {@code spec} contains glob.
208+
* Returns true if the given {@code spec} contains wildcard.
230209
*/
231-
public static boolean isGlob(GcsPath spec) {
210+
public static boolean isWildcard(GcsPath spec) {
232211
return GLOB_PREFIX.matcher(spec.getObject()).matches();
233212
}
234213

@@ -254,11 +233,14 @@ protected void setStorageClient(Storage storageClient) {
254233
* exists.
255234
*/
256235
public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
257-
checkArgument(isGcsPatternSupported(gcsPattern.getObject()));
258236
Pattern p = null;
259237
String prefix = null;
260-
if (!isGlob(gcsPattern)) {
261-
// Not a glob.
238+
if (isWildcard(gcsPattern)) {
239+
// Part before the first wildcard character.
240+
prefix = getNonWildcardPrefix(gcsPattern.getObject());
241+
p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));
242+
} else {
243+
// Not a wildcard.
262244
try {
263245
// Use a get request to fetch the metadata of the object, and ignore the return value.
264246
// The request has strong global consistency.
@@ -268,10 +250,6 @@ public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
268250
// If the path was not found, return an empty list.
269251
return ImmutableList.of();
270252
}
271-
} else {
272-
// Part before the first wildcard character.
273-
prefix = getGlobPrefix(gcsPattern.getObject());
274-
p = Pattern.compile(globToRegexp(gcsPattern.getObject()));
275253
}
276254

277255
LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(),

sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -214,14 +214,6 @@ public void testExpandNonGlob() throws Exception {
214214
gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
215215
}
216216

217-
// Patterns that contain recursive wildcards ('**') are not supported.
218-
@Test
219-
public void testRecursiveGlobExpansionFails() throws IOException {
220-
thrown.expect(IllegalArgumentException.class);
221-
thrown.expectMessage("Unsupported wildcard usage");
222-
gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/test**"));
223-
}
224-
225217
@Test
226218
public void testMatchNonGlobs() throws Exception {
227219
List<StorageObjectOrIOException> items = new ArrayList<>();

sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,12 @@ public class GcsUtilTest {
9292

9393
@Test
9494
public void testGlobTranslation() {
95-
assertEquals("foo", GcsUtil.globToRegexp("foo"));
96-
assertEquals("fo[^/]*o", GcsUtil.globToRegexp("fo*o"));
97-
assertEquals("f[^/]*o\\.[^/]", GcsUtil.globToRegexp("f*o.?"));
98-
assertEquals("foo-[0-9][^/]*", GcsUtil.globToRegexp("foo-[0-9]*"));
95+
assertEquals("foo", GcsUtil.wildcardToRegexp("foo"));
96+
assertEquals("fo.*o", GcsUtil.wildcardToRegexp("fo*o"));
97+
assertEquals("f.*o\\.[^/]", GcsUtil.wildcardToRegexp("f*o.?"));
98+
assertEquals("foo-[0-9].*", GcsUtil.wildcardToRegexp("foo-[0-9]*"));
99+
assertEquals(".*.*foo", GcsUtil.wildcardToRegexp("**/*foo"));
100+
assertEquals(".*.*foo", GcsUtil.wildcardToRegexp("**foo"));
99101
}
100102

101103
private static GcsOptions gcsOptionsWithTestCredential() {
@@ -260,16 +262,51 @@ public void testGlobExpansion() throws IOException {
260262
}
261263
}
262264

263-
// Patterns that contain recursive wildcards ('**') are not supported.
264265
@Test
265-
public void testRecursiveGlobExpansionFails() throws IOException {
266+
public void testRecursiveGlobExpansion() throws IOException {
266267
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
267268
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
268-
GcsPath pattern = GcsPath.fromUri("gs://testbucket/test**");
269269

270-
thrown.expect(IllegalArgumentException.class);
271-
thrown.expectMessage("Unsupported wildcard usage");
272-
gcsUtil.expand(pattern);
270+
Storage mockStorage = Mockito.mock(Storage.class);
271+
gcsUtil.setStorageClient(mockStorage);
272+
273+
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
274+
Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
275+
Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class);
276+
277+
Objects modelObjects = new Objects();
278+
List<StorageObject> items = new ArrayList<>();
279+
// A directory
280+
items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
281+
282+
// Files within the directory
283+
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file1.txt"));
284+
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file2.txt"));
285+
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file3.txt"));
286+
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/otherfile"));
287+
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/anotherfile"));
288+
items.add(new StorageObject().setBucket("testbucket").setName("test/file4.txt"));
289+
290+
modelObjects.setItems(items);
291+
292+
when(mockStorage.objects()).thenReturn(mockStorageObjects);
293+
when(mockStorageObjects.get("testbucket", "test/directory/otherfile")).thenReturn(
294+
mockStorageGet);
295+
when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList);
296+
when(mockStorageGet.execute()).thenReturn(
297+
new StorageObject().setBucket("testbucket").setName("test/directory/otherfile"));
298+
when(mockStorageList.execute()).thenReturn(modelObjects);
299+
300+
{
301+
GcsPath pattern = GcsPath.fromUri("gs://testbucket/test/**/*.txt");
302+
List<GcsPath> expectedFiles = ImmutableList.of(
303+
GcsPath.fromUri("gs://testbucket/test/directory/file1.txt"),
304+
GcsPath.fromUri("gs://testbucket/test/directory/file2.txt"),
305+
GcsPath.fromUri("gs://testbucket/test/directory/file3.txt"),
306+
GcsPath.fromUri("gs://testbucket/test/file4.txt"));
307+
308+
assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
309+
}
273310
}
274311

275312
// GCSUtil.expand() should fail when matching a single object when that object does not exist.

0 commit comments

Comments
 (0)