Skip to content

Commit abb304c

Browse files
authored
Migrate Google example DAG bigquery_transfer to new design AIP-47 (#24543)
related: #22447, #22430
1 parent 47f54b6 commit abb304c

File tree

2 files changed

+125
-77
lines changed

2 files changed

+125
-77
lines changed

‎airflow/providers/google/cloud/example_dags/example_bigquery_transfer.py

Lines changed: 0 additions & 77 deletions
This file was deleted.
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Example Airflow DAG for Google BigQuery service.
21+
"""
22+
import os
23+
from datetime import datetime
24+
25+
from airflow import models
26+
from airflow.providers.google.cloud.operators.bigquery import (
27+
BigQueryCreateEmptyDatasetOperator,
28+
BigQueryCreateEmptyTableOperator,
29+
BigQueryDeleteDatasetOperator,
30+
)
31+
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
32+
from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator
33+
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
34+
from airflow.utils.trigger_rule import TriggerRule
35+
36+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
37+
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
38+
DAG_ID = "example_bigquery_transfer"
39+
40+
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
41+
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
42+
43+
ORIGIN = "origin"
44+
TARGET = "target"
45+
46+
with models.DAG(
47+
DAG_ID,
48+
schedule_interval='@once', # Override to match your needs
49+
start_date=datetime(2021, 1, 1),
50+
catchup=False,
51+
tags=["example", "bigquery"],
52+
) as dag:
53+
create_bucket = GCSCreateBucketOperator(
54+
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
55+
)
56+
57+
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)
58+
59+
create_origin_table = BigQueryCreateEmptyTableOperator(
60+
task_id=f"create_{ORIGIN}_table",
61+
dataset_id=DATASET_NAME,
62+
table_id=ORIGIN,
63+
schema_fields=[
64+
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
65+
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
66+
],
67+
)
68+
69+
create_target_table = BigQueryCreateEmptyTableOperator(
70+
task_id=f"create_{TARGET}_table",
71+
dataset_id=DATASET_NAME,
72+
table_id=TARGET,
73+
schema_fields=[
74+
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
75+
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
76+
],
77+
)
78+
79+
copy_selected_data = BigQueryToBigQueryOperator(
80+
task_id="copy_selected_data",
81+
source_project_dataset_tables=f"{DATASET_NAME}.{ORIGIN}",
82+
destination_project_dataset_table=f"{DATASET_NAME}.{TARGET}",
83+
)
84+
85+
bigquery_to_gcs = BigQueryToGCSOperator(
86+
task_id="bigquery_to_gcs",
87+
source_project_dataset_table=f"{DATASET_NAME}.{ORIGIN}",
88+
destination_cloud_storage_uris=[f"gs://{BUCKET_NAME}/export-bigquery.csv"],
89+
)
90+
91+
delete_dataset = BigQueryDeleteDatasetOperator(
92+
task_id="delete_dataset",
93+
dataset_id=DATASET_NAME,
94+
delete_contents=True,
95+
trigger_rule=TriggerRule.ALL_DONE,
96+
)
97+
98+
delete_bucket = GCSDeleteBucketOperator(
99+
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
100+
)
101+
102+
(
103+
# TEST SETUP
104+
create_bucket
105+
>> create_dataset
106+
>> create_origin_table
107+
>> create_target_table
108+
# TEST BODY
109+
>> copy_selected_data
110+
>> bigquery_to_gcs
111+
# TEST TEARDOWN
112+
>> delete_dataset
113+
>> delete_bucket
114+
)
115+
116+
from tests.system.utils.watcher import watcher
117+
118+
# This test needs watcher in order to properly mark success/failure
119+
# when "tearDown" task with trigger rule is part of the DAG
120+
list(dag.tasks) >> watcher()
121+
122+
from tests.system.utils import get_test_run # noqa: E402
123+
124+
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
125+
test_run = get_test_run(dag)

0 commit comments

Comments
 (0)