ValueError: A BigQuery table or a query must be specified with beam.io.gcp.bigquery.ReadFromBigQuery(ValueError:必须使用beam.io.gcp.bigquery.ReadFromBigQuery指定BigQuery表或查询)
问题描述
我正在尝试传递一个BigQuery表名作为一个ApacheBEAM管道模板的值提供者。根据their documentation和StackOverflow answer,可以将值提供程序传递给apache_beam.io.gcp.bigquery.ReadFromBigQuery。
这就是我的管道代码
class UserOptions(PipelineOptions):
"""Define runtime argument"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--input', type=str)
parser.add_value_provider_argument('--output', type=str)
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
user_options = pipeline_options.view_as(UserOptions)
(p | 'Read from BQ Table' >> beam.io.gcp.bigquery.ReadFromBigQuery(
user_options.input
)
当我本地运行代码时,命令行传递user_options.inputis--input projectid.dataset_id.table
但是,我遇到错误:
ValueError: A BigQuery table or a query must be specified
我已尝试:
传递
projectid:dataset_id.table使用
bigquery.TableReference-&>不可能使用
f'{user_options.input}'传递查询-&>在本地运行时有效,但在GCP上调用模板时无效。错误语句:
缺少数据集,但请求中未设置默认数据集。错误:[{";Message";:";Table name";RounmeValueProvider(Option:Input,type:Str,Default_Value:None)],";请求中未设置默认数据集。";,";域";:";全局";,";原因";:";无效";},";状态";:&QOOT;INVALID_ARGUMENT&QOOT;}}>;
我错过了什么?
推荐答案
table参数必须按名称传递给ReadFromBigQuery。
BigQuerySource(已弃用)接受table作为第一个参数,因此您可以按位置(docs)传递一个。但ReadFromBigQuery预期gcs_location作为第一个参数(docs)。因此,如果您正在将代码从使用BigQuerySource移植到使用ReadFromBigQuery,并且您没有显式地按名称传入表,那么它将失败,并显示您收到的错误。
这里有两个工作示例和一个不工作的示例:
import apache_beam as beam
project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'
if __name__ == "__main__":
args = [
'--temp_location=gs://my_temp_bucket',
]
# This works:
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(table=f"{project_id}:{dataset_id}.{table_id}")
)
# So does this:
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(table=f"{dataset_id}.{table_id}", project=project_id)
)
# But this doesn't work becuase the table argument is not passed in by name.
# The f"{project_id}:{dataset_id}.{table_id}" string is interpreted as the gcs_location.
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(f"{project_id}:{dataset_id}.{table_id}")
)
这篇关于ValueError:必须使用beam.io.gcp.bigquery.ReadFromBigQuery指定BigQuery表或查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:ValueError:必须使用beam.io.gcp.bigquery.ReadFromBigQuery指定BigQuery表或查询
基础教程推荐
- 在 Python 中将货币解析为数字 2022-01-01
- 在 Django Admin 中使用内联 OneToOneField 2022-01-01
- Python,确定字符串是否应转换为 Int 或 Float 2022-01-01
- matplotlib 设置 yaxis 标签大小 2022-01-01
- 对多索引数据帧的列进行排序 2022-01-01
- Python 中是否有任何支持将长字符串转储为块文字或折叠块的 yaml 库? 2022-01-01
- 究竟什么是“容器"?在蟒蛇?(以及所有的 python 容器类型是什么?) 2022-01-01
- 比较两个文本文件以找出差异并将它们输出到新的文本文件 2022-01-01
- kivy 应用程序中的一个简单网页作为小部件 2022-01-01
- Kivy 使用 opencv.调整图像大小 2022-01-01
