|
30 | 30 | )
|
31 | 31 | from airflow.providers.google.cloud.transfers.bigquery_to_mssql import BigQueryToMsSqlOperator
|
32 | 32 |
|
| 33 | +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") |
33 | 34 | PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
|
34 |
| -DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset_transfer") |
| 35 | +DAG_ID = "example_bigquery_to_mssql" |
| 36 | + |
| 37 | +DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}" |
35 | 38 | DATA_EXPORT_BUCKET_NAME = os.environ.get("GCP_BIGQUERY_EXPORT_BUCKET_NAME", "INVALID BUCKET NAME")
|
36 | 39 | TABLE = "table_42"
|
37 | 40 | destination_table = "mssql_table_test"
|
38 | 41 |
|
39 | 42 | with models.DAG(
|
40 |
| - "example_bigquery_to_mssql", |
41 |
| - schedule_interval=None, # Override to match your needs |
| 43 | + DAG_ID, |
| 44 | + schedule_interval="@once", # Override to match your needs |
42 | 45 | start_date=datetime(2021, 1, 1),
|
43 | 46 | catchup=False,
|
44 |
| - tags=["example"], |
| 47 | + tags=["example", "bigquery"], |
45 | 48 | ) as dag:
|
46 | 49 | bigquery_to_mssql = BigQueryToMsSqlOperator(
|
47 | 50 | task_id="bigquery_to_mssql",
|
|
61 | 64 | {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
|
62 | 65 | ],
|
63 | 66 | )
|
64 |
| - create_dataset >> create_table >> bigquery_to_mssql |
65 | 67 |
|
66 | 68 | delete_dataset = BigQueryDeleteDatasetOperator(
|
67 | 69 | task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
|
68 | 70 | )
|
69 | 71 |
|
70 |
| - bigquery_to_mssql >> delete_dataset |
| 72 | + ( |
| 73 | + # TEST SETUP |
| 74 | + create_dataset |
| 75 | + >> create_table |
| 76 | + # TEST BODY |
| 77 | + >> bigquery_to_mssql |
| 78 | + # TEST TEARDOWN |
| 79 | + >> delete_dataset |
| 80 | + ) |
| 81 | + |
| 82 | + from tests.system.utils.watcher import watcher |
| 83 | + |
| 84 | + # This test needs watcher in order to properly mark success/failure |
| 85 | + # when "tearDown" task with trigger rule is part of the DAG |
| 86 | + list(dag.tasks) >> watcher() |
| 87 | + |
| 88 | +from tests.system.utils import get_test_run # noqa: E402 |
| 89 | + |
| 90 | +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) |
| 91 | +test_run = get_test_run(dag) |
0 commit comments