Use o spark-bigquery-connector com o Apache Spark para ler e gravar dados do e para o BigQuery.
Este tutorial demonstra um aplicativo PySpark que usa o
spark-bigquery-connector
.
Usar o conector do BigQuery com sua carga de trabalho
Consulte Versões de ambiente de execução sem servidor para Apache Spark para determinar a versão do conector do BigQuery instalada na versão de ambiente de execução da sua carga de trabalho em lote. Se o conector não estiver listado, consulte a próxima seção para instruções sobre como disponibilizar o conector para aplicativos.
Como usar o conector com a versão 2.0 do ambiente de execução do Spark
O conector do BigQuery não está instalado na versão 2.0 do ambiente de execução do Spark. Ao usar a versão 2.0 do ambiente de execução do Spark, é possível disponibilizar o conector para seu aplicativo de uma das seguintes maneiras:
- Use o parâmetro
jars
para apontar para um arquivo JAR do conector ao enviar sua carga de trabalho em lote do Google Cloud Serverless para Apache Spark. O exemplo a seguir especifica um arquivo JAR do conector. Consulte o repositório GoogleCloudDataproc/spark-bigquery-connector no GitHub para conferir uma lista de arquivos JAR de conectores disponíveis.- Exemplo da Google Cloud CLI:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Exemplo da Google Cloud CLI:
- Inclua o arquivo jar do conector no aplicativo Spark como uma dependência. Consulte Como compilar com o conector.
Calcular custos
Neste tutorial, usamos componentes faturáveis do Google Cloud, incluindo:
- Serverless para Apache Spark
- BigQuery
- Cloud Storage
Use a Calculadora de preços para gerar uma estimativa de custo com base no uso previsto.
E/S do BigQuery
Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.
O conector grava a saída do wordcount no BigQuery da seguinte maneira:
Armazenamento em buffer dos dados em arquivos temporários no bucket do Cloud Storage
Copiar os dados em uma operação do bucket do Cloud Storage para o BigQuery
Excluir os arquivos temporários no Cloud Storage depois que a operação de carregamento do BigQuery for concluída. Os arquivos temporários também são excluídos após o encerramento do aplicativo Spark. Se a exclusão falhar, será necessário excluir todos os arquivos temporários indesejados do Cloud Storage, que geralmente são colocados em
gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID
.
Configurar o faturamento
Por padrão, o projeto associado às credenciais ou à conta de serviço é cobrado pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Você também pode adicionar a uma operação de leitura ou gravação, da seguinte maneira:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Enviar uma carga de trabalho em lote de contagem de palavras do PySpark
Execute uma carga de trabalho em lote do Spark que conta o número de palavras em um conjunto de dados público.
- Abra um terminal local ou o Cloud Shell
- Crie o
wordcount_dataset
com a ferramenta de linha de comando bq em um terminal local ou no Cloud Shell.bq mk wordcount_dataset
- Crie um bucket do Cloud Storage com a Google Cloud CLI.
Substituagcloud storage buckets create gs://YOUR_BUCKET
YOUR_BUCKET
pelo nome do bucket do Cloud Storage criado. - Crie o arquivo
wordcount.py
localmente em um editor de texto copiando o seguinte código PySpark.#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "YOUR_BUCKET" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Envie a carga de trabalho em lote do PySpark:
Exemplo de saída do terminal:gcloud dataproc batches submit pyspark wordcount.py \ --region=REGION \ --deps-bucket=YOUR_BUCKET
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para visualizar a tabela de saída no console do Google Cloud , abra a página do BigQuery do projeto, selecione a tabelawordcount_output
e clique em Visualizar.
Para mais informações
- Armazenamento do BigQuery e Spark SQL, Python
- Como criar um arquivo de definição de tabela para uma fonte de dados externa
- Usar dados particionados externamente