Dataflow 템플릿을 사용하여 스트리밍 파이프라인 만들기
이 빠른 시작에서는 Google에서 제공하는 Dataflow 템플릿을 사용하여 스트리밍 파이프라인을 만드는 방법을 설명합니다. 특히 이 빠른 시작에서는 Pub/Sub Topic to BigQuery 템플릿을 예시로 사용합니다.
Pub/Sub to BigQuery 템플릿은 Pub/Sub 주제에서 JSON 형식의 메시지를 읽고 BigQuery 테이블에 쓸 수 있는 스트리밍 파이프라인입니다.
Google Cloud 콘솔에서 이 태스크에 대한 단계별 안내를 직접 수행하려면 둘러보기를 클릭합니다.
시작하기 전에
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
- Cloud Storage 버킷을 만듭니다.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
-
In the Choose where to store your data section, do the following:
- Select a Location type.
- Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
- If you select the dual-region location type, you can also choose to enable turbo replication by using the relevant checkbox.
- To set up cross-bucket replication, select
Add cross-bucket replication via Storage Transfer Service and
follow these steps:
Set up cross-bucket replication
- In the Bucket menu, select a bucket.
In the Replication settings section, click Configure to configure settings for the replication job.
The Configure cross-bucket replication pane appears.
- To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
- To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
- Click Done.
-
In the Choose how to store your data section, do the following:
- In the Set a default class section, select the following: Standard.
- To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
- In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
-
In the Choose how to protect object data section, do the
following:
- Select any of the options under Data protection that you
want to set for your bucket.
- To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
- To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
- To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
- To enable Object Retention Lock, click the Enable object retention checkbox.
- To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
- To choose how your object data will be encrypted, expand the Data encryption section (Data encryption method. ), and select a
- Select any of the options under Data protection that you
want to set for your bucket.
- Click Create.
- 뒤에 나오는 섹션에서 필요하므로 다음을 복사합니다.
- Cloud Storage 버킷 이름
- Google Cloud 프로젝트 ID입니다.
이 ID를 찾으려면 프로젝트 식별을 참조하세요.
빠른 시작 단계를 완료하려면 사용자 계정에 Dataflow 관리자 역할 및 서비스 계정 사용자 역할이 있어야 합니다. Compute Engine 기본 서비스 계정에는 Dataflow 작업자 역할, 스토리지 객체 관리자 역할, Pub/Sub 편집자 역할, BigQuery 데이터 편집자 역할, 뷰어 역할이 있어야 합니다. Google Cloud 콘솔에서 필요한 역할을 추가하려면 다음 안내를 따르세요.
- IAM 페이지로 이동하여 프로젝트를 선택합니다.
IAM으로 이 - 사용자 계정이 포함된 행에서 주 구성원 수정을 클릭합니다. 다른 역할 추가를 클릭하고 Dataflow 관리자 및 서비스 계정 사용자 역할을 추가합니다.
- 저장을 클릭합니다.
- Compute Engine 기본 서비스 계정(PROJECT_NUMBER-compute@developer.gserviceaccount.com)이 포함된 행에서 주 구성원 수정을 클릭합니다.
- 다른 역할 추가를 클릭하고 Dataflow 작업자, 스토리지 객체 관리자, Pub/Sub 편집자, BigQuery 데이터 편집자, 뷰어 역할을 추가합니다.
저장을 클릭합니다.
역할 부여에 대한 상세 설명은 콘솔을 사용하여 IAM 역할 부여를 참조하세요.
- IAM 페이지로 이동하여 프로젝트를 선택합니다.
- 기본적으로 각각의 새 프로젝트는 기본 네트워크로 시작합니다.
프로젝트에 대한 기본 네트워크가 사용 중지되었거나 삭제된 경우 프로젝트에 사용자 계정에 Compute Network 사용자 역할(
roles/compute.networkUser
)이 있는 네트워크가 있어야 합니다.
BigQuery 데이터 세트와 테이블 만들기
Google Cloud 콘솔을 사용하여 Pub/Sub 주제에 적절한 스키마로 BigQuery 데이터 세트와 테이블을 만듭니다.
이 예시에서 데이터 세트 이름은 taxirides
이고 테이블 이름은 realtime
입니다. 이 데이터 세트와 테이블을 만들려면 다음 단계를 따르세요.
- BigQuery 페이지로 이동합니다.
BigQuery로 이동 - 데이터 세트를 만들 프로젝트 옆에 있는 탐색기 패널에서 작업 보기를 클릭한 후 데이터 세트 만들기를 클릭합니다.
- 데이터 세트 만들기 패널에서 다음 단계를 수행합니다.
- 데이터 세트 ID에
taxirides
를 입력합니다. 데이터 세트 ID는 Google Cloud 프로젝트마다 고유합니다. - 위치 유형에 대해 멀티 리전을 선택한 다음 US(미국 내 여러 리전)를 선택합니다. 공개 데이터 세트는
US
멀티 리전 위치에 저장됩니다. 편의상 같은 위치에 데이터 세트를 배치합니다. - 다른 기본 설정을 그대로 두고 데이터 세트 만들기를 클릭합니다.
탐색기 패널에서 프로젝트를 펼칩니다.taxirides
데이터 세트 옆의 작업 보기를 클릭한 후 테이블 만들기를 클릭합니다.- 테이블 만들기 패널에서 다음 단계를 수행합니다.
- 소스 섹션에서 다음 항목으로 테이블 만들기에 빈 테이블을 선택합니다.
- 대상 섹션의 테이블에
realtime
을 입력합니다. - 스키마 섹션에서 텍스트로 수정 전환 버튼을 클릭하고 다음 스키마 정의를 상자에 붙여넣습니다.
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- 파티션 나누기 및 클러스터 설정 섹션의 파티셔닝에서 타임스탬프 필드를 선택합니다.
- 다른 기본 설정은 그대로 두고 테이블 만들기를 클릭합니다.
파이프라인 실행
Google에서 제공하는 Pub/Sub Topic to BigQuery 템플릿을 사용하여 스트리밍 파이프라인을 실행합니다. 파이프라인은 입력 주제에서 수신되는 데이터를 가져옵니다.
- Dataflow 작업 페이지로 이동합니다.
작업으로 이동 템플릿에서 작업 만들기 를 클릭합니다.taxi-data
를 Dataflow 작업의 작업 이름으로 입력합니다.- Dataflow 템플릿에 BigQuery에 대한 Pub/Sub 템플릿을 선택합니다.
- BigQuery 출력 테이블에 다음을 입력합니다.
PROJECT_ID:taxirides.realtime
PROJECT_ID
를 BigQuery 데이터 세트를 만든 프로젝트의 프로젝트 ID로 바꿉니다. - 선택적 소스 파라미터 섹션의 Pub/Sub 주제 입력에서 직접 주제 입력을 클릭합니다.
- 대화상자의 주제 이름에 다음을 입력한 후 저장을 클릭합니다.
projects/pubsub-public-data/topics/taxirides-realtime
공개적으로 사용 가능한 Pub/Sub 주제는 뉴욕시 택시 및 리무진 조합의 공개 데이터 세트를 기반으로 합니다. 다음은 JSON 형식으로 된 이 주제의 샘플 메시지입니다.
{ "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e", "point_idx": 217, "latitude": 40.75399, "longitude": -73.96302, "timestamp": "2021-03-08T02:29:09.66644-05:00", "meter_reading": 6.293821, "meter_increment": 0.029003782, "ride_status": "enroute", "passenger_count": 1 }
- 임시 위치에 다음을 입력합니다.
gs://BUCKET_NAME/temp/
BUCKET_NAME
을 Cloud Storage 버킷 이름으로 바꿉니다.temp
폴더는 스테이징된 파이프라인 작업과 같은 임시 파일을 저장합니다. - 프로젝트에 기본 네트워크가 없으면 네트워크 및 서브네트워크를 입력합니다. 자세한 내용은 네트워크 및 서브네트워크 지정을 참조하세요.
- 작업 실행을 클릭합니다.
결과 보기
realtime
테이블에 기록된 데이터를 보려면 다음 단계를 따르세요.
BigQuery 페이지로 이동합니다.
새 쿼리 작성을 클릭합니다. 새 편집기 탭이 열립니다.
SELECT * FROM `PROJECT_ID.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000
PROJECT_ID
를 BigQuery 데이터 세트를 만든 프로젝트의 프로젝트 ID로 바꿉니다. 데이터가 테이블에 표시되는 데 최대 5분이 걸릴 수 있습니다.실행을 클릭합니다.
이 쿼리에서는 지난 24시간 동안 테이블에 추가된 행을 반환합니다. 표준 SQL을 사용하여 쿼리를 실행할 수도 있습니다.
삭제
이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 다음 단계를 수행합니다.
프로젝트 삭제
비용 청구를 방지하는 가장 쉬운 방법은 빠른 시작용으로 만든 Google Cloud 프로젝트를 삭제하는 것입니다.- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
개별 리소스 삭제
이 빠른 시작에서 사용한 Google Cloud 프로젝트를 유지하려면 개별 리소스를 삭제합니다.
- Dataflow 작업 페이지로 이동합니다.
작업으로 이동 - 작업 목록에서 스트리밍 작업을 선택합니다.
- 탐색에서 중지를 클릭합니다.
- 작업 중지 대화상자에서 파이프라인을 취소하거나 드레이닝한 다음 작업 중지를 클릭합니다.
- BigQuery 페이지로 이동합니다.
BigQuery로 이동 - 탐색기 패널에서 프로젝트를 펼칩니다.
- 삭제하려는 데이터 세트 옆에 있는 작업 보기를 클릭한 다음 열기를 클릭합니다.
- 세부정보 패널에서 데이터 세트 삭제를 클릭하고 안내를 따릅니다.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.