Skip to content

Commit 0cabdf6

Browse files
committed
Closes #3373
2 parents e827642 + 4519681 commit 0cabdf6

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

sdks/python/apache_beam/pipeline.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,18 @@ def from_runner_api(proto, runner, options):
515515
p.applied_labels = set([
516516
t.unique_name for t in proto.components.transforms.values()])
517517
for id in proto.components.pcollections:
518-
context.pcollections.get_by_id(id).pipeline = p
518+
pcollection = context.pcollections.get_by_id(id)
519+
pcollection.pipeline = p
520+
521+
# Inject PBegin input where necessary.
522+
from apache_beam.io.iobase import Read
523+
from apache_beam.transforms.core import Create
524+
has_pbegin = [Read, Create]
525+
for id in proto.components.transforms:
526+
transform = context.transforms.get_by_id(id)
527+
if not transform.inputs and transform.transform.__class__ in has_pbegin:
528+
transform.inputs = (pvalue.PBegin(p),)
529+
519530
return p
520531

521532

0 commit comments

Comments
 (0)