Skip to content

Commit 82e6226

Browse files
Fix Datafusion system tests (#32749)
1 parent 56c41d4 commit 82e6226

File tree

3 files changed

+87
-64
lines changed

3 files changed

+87
-64
lines changed

‎airflow/providers/google/cloud/hooks/datafusion.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ def create_pipeline(
319319
namespace: str = "default",
320320
) -> None:
321321
"""
322-
Creates a Cloud Data Fusion pipeline.
322+
Creates a batch Cloud Data Fusion pipeline.
323323
324324
:param pipeline_name: Your pipeline name.
325325
:param pipeline: The pipeline definition. For more information check:
@@ -343,12 +343,12 @@ def delete_pipeline(
343343
namespace: str = "default",
344344
) -> None:
345345
"""
346-
Deletes a Cloud Data Fusion pipeline.
346+
Deletes a batch Cloud Data Fusion pipeline.
347347
348348
:param pipeline_name: Your pipeline name.
349349
:param version_id: Version of pipeline to delete
350350
:param instance_url: Endpoint on which the REST APIs is accessible for the instance.
351-
:param namespace: f your pipeline belongs to a Basic edition instance, the namespace ID
351+
:param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID
352352
is always default. If your pipeline belongs to an Enterprise edition instance, you
353353
can create a namespace.
354354
"""
@@ -357,9 +357,20 @@ def delete_pipeline(
357357
url = os.path.join(url, "versions", version_id)
358358

359359
response = self._cdap_request(url=url, method="DELETE", body=None)
360-
self._check_response_status_and_data(
361-
response, f"Deleting a pipeline failed with code {response.status}"
362-
)
360+
# Check for 409 error: the previous step for starting/stopping pipeline could still be in progress.
361+
# Waiting some time before retry.
362+
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
363+
try:
364+
self._check_response_status_and_data(
365+
response, f"Deleting a pipeline failed with code {response.status}: {response.data}"
366+
)
367+
break
368+
except AirflowException as exc:
369+
if "409" in str(exc):
370+
sleep(time_to_wait)
371+
response = self._cdap_request(url=url, method="DELETE", body=None)
372+
else:
373+
raise
363374

364375
def list_pipelines(
365376
self,

‎tests/system/providers/google/cloud/datafusion/example_datafusion.py

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from datetime import datetime
2424

2525
from airflow import models
26-
from airflow.operators.bash import BashOperator
2726
from airflow.providers.google.cloud.operators.datafusion import (
2827
CloudDataFusionCreateInstanceOperator,
2928
CloudDataFusionCreatePipelineOperator,
@@ -36,34 +35,36 @@
3635
CloudDataFusionStopPipelineOperator,
3736
CloudDataFusionUpdateInstanceOperator,
3837
)
39-
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator
38+
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
4039
from airflow.providers.google.cloud.sensors.datafusion import CloudDataFusionPipelineStateSensor
40+
from airflow.utils.trigger_rule import TriggerRule
4141

4242
# [START howto_data_fusion_env_variables]
4343
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
4444
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
4545
LOCATION = "europe-north1"
46-
INSTANCE_NAME = "airflow-test-instance"
46+
DAG_ID = "example_data_fusion"
47+
INSTANCE_NAME = "test-instance"
4748
INSTANCE = {
4849
"type": "BASIC",
4950
"displayName": INSTANCE_NAME,
5051
"dataprocServiceAccount": SERVICE_ACCOUNT,
5152
}
5253

53-
BUCKET_1 = "test-datafusion-1"
54-
BUCKET_2 = "test-datafusion-2"
55-
BUCKET_1_URI = f"gs://{BUCKET_1}"
56-
BUCKET_2_URI = f"gs://{BUCKET_2}"
54+
BUCKET_NAME_1 = "test-datafusion-1"
55+
BUCKET_NAME_2 = "test-datafusion-2"
56+
BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
57+
BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"
5758

5859
PIPELINE_NAME = "test-pipe"
5960
PIPELINE = {
6061
"artifact": {
6162
"name": "cdap-data-pipeline",
62-
"version": "6.7.2",
63+
"version": "6.8.3",
6364
"scope": "SYSTEM",
6465
},
6566
"description": "Data Pipeline Application",
66-
"name": "test-pipe",
67+
"name": PIPELINE_NAME,
6768
"config": {
6869
"resources": {"memoryMB": 2048, "virtualCores": 1},
6970
"driverResources": {"memoryMB": 2048, "virtualCores": 1},
@@ -80,7 +81,7 @@
8081
"name": "GCSFile",
8182
"type": "batchsource",
8283
"label": "GCS",
83-
"artifact": {"name": "google-cloud", "version": "0.20.3", "scope": "SYSTEM"},
84+
"artifact": {"name": "google-cloud", "version": "0.21.2", "scope": "SYSTEM"},
8485
"properties": {
8586
"project": "auto-detect",
8687
"format": "text",
@@ -91,7 +92,7 @@
9192
"encrypted": "false",
9293
"schema": '{"type":"record","name":"textfile","fields":[{"name"\
9394
:"offset","type":"long"},{"name":"body","type":"string"}]}',
94-
"path": BUCKET_1_URI,
95+
"path": BUCKET_NAME_1_URI,
9596
"referenceName": "foo_bucket",
9697
"useConnection": "false",
9798
"serviceAccountType": "filePath",
@@ -109,7 +110,7 @@
109110
"name": "GCS",
110111
"type": "batchsink",
111112
"label": "GCS2",
112-
"artifact": {"name": "google-cloud", "version": "0.20.3", "scope": "SYSTEM"},
113+
"artifact": {"name": "google-cloud", "version": "0.21.2", "scope": "SYSTEM"},
113114
"properties": {
114115
"project": "auto-detect",
115116
"suffix": "yyyy-MM-dd-HH-mm",
@@ -119,7 +120,7 @@
119120
"schema": '{"type":"record","name":"textfile","fields":[{"name"\
120121
:"offset","type":"long"},{"name":"body","type":"string"}]}',
121122
"referenceName": "bar",
122-
"path": BUCKET_2_URI,
123+
"path": BUCKET_NAME_2_URI,
123124
"serviceAccountType": "filePath",
124125
"contentType": "application/octet-stream",
125126
},
@@ -146,19 +147,20 @@
146147
# [END howto_data_fusion_env_variables]
147148

148149
with models.DAG(
149-
"example_data_fusion",
150+
DAG_ID,
150151
start_date=datetime(2021, 1, 1),
151152
catchup=False,
153+
tags=["example", "datafusion"],
152154
) as dag:
153155
create_bucket1 = GCSCreateBucketOperator(
154156
task_id="create_bucket1",
155-
bucket_name=BUCKET_1,
157+
bucket_name=BUCKET_NAME_1,
156158
project_id=PROJECT_ID,
157159
)
158160

159161
create_bucket2 = GCSCreateBucketOperator(
160162
task_id="create_bucket2",
161-
bucket_name=BUCKET_2,
163+
bucket_name=BUCKET_NAME_2,
162164
project_id=PROJECT_ID,
163165
)
164166

@@ -255,38 +257,44 @@
255257
pipeline_name=PIPELINE_NAME,
256258
instance_name=INSTANCE_NAME,
257259
task_id="delete_pipeline",
260+
trigger_rule=TriggerRule.ALL_DONE,
258261
)
259262
# [END howto_cloud_data_fusion_delete_pipeline]
260263

261264
# [START howto_cloud_data_fusion_delete_instance_operator]
262265
delete_instance = CloudDataFusionDeleteInstanceOperator(
263-
location=LOCATION, instance_name=INSTANCE_NAME, task_id="delete_instance"
266+
location=LOCATION,
267+
instance_name=INSTANCE_NAME,
268+
task_id="delete_instance",
269+
trigger_rule=TriggerRule.ALL_DONE,
264270
)
265271
# [END howto_cloud_data_fusion_delete_instance_operator]
266272

267-
# Add sleep before creating pipeline
268-
sleep = BashOperator(task_id="sleep", bash_command="sleep 60")
269-
270-
# Add sleep before creating pipeline
271-
sleep_30 = BashOperator(task_id="sleep_30", bash_command="sleep 30")
273+
delete_bucket1 = GCSDeleteBucketOperator(
274+
task_id="delete_bucket1", bucket_name=BUCKET_NAME_1, trigger_rule=TriggerRule.ALL_DONE
275+
)
276+
delete_bucket2 = GCSDeleteBucketOperator(
277+
task_id="delete_bucket2", bucket_name=BUCKET_NAME_1, trigger_rule=TriggerRule.ALL_DONE
278+
)
272279

273280
(
274-
create_bucket1
275-
>> create_bucket2
281+
# TEST SETUP
282+
[create_bucket1, create_bucket2]
283+
# TEST BODY
276284
>> create_instance
277285
>> get_instance
278286
>> restart_instance
279287
>> update_instance
280-
>> sleep
281288
>> create_pipeline
282289
>> list_pipelines
283290
>> start_pipeline_async
284291
>> start_pipeline_sensor
285292
>> start_pipeline
286293
>> stop_pipeline
287-
>> sleep_30
288294
>> delete_pipeline
289295
>> delete_instance
296+
# TEST TEARDOWN
297+
>> [delete_bucket1, delete_bucket2]
290298
)
291299

292300
from tests.system.utils.watcher import watcher

‎tests/system/providers/google/cloud/datafusion/example_datafusion_async.py

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from datetime import datetime
2424

2525
from airflow import models
26-
from airflow.operators.bash import BashOperator
2726
from airflow.providers.google.cloud.operators.datafusion import (
2827
CloudDataFusionCreateInstanceOperator,
2928
CloudDataFusionCreatePipelineOperator,
@@ -36,33 +35,35 @@
3635
CloudDataFusionStopPipelineOperator,
3736
CloudDataFusionUpdateInstanceOperator,
3837
)
39-
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator
38+
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
39+
from airflow.utils.trigger_rule import TriggerRule
4040

4141
# [START howto_data_fusion_env_variables]
4242
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
4343
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
44+
DAG_ID = "example_data_fusion_async"
4445
LOCATION = "europe-north1"
45-
INSTANCE_NAME = "airflow-test-instance"
46+
INSTANCE_NAME = "test-instance-async"
4647
INSTANCE = {
4748
"type": "BASIC",
4849
"displayName": INSTANCE_NAME,
4950
"dataprocServiceAccount": SERVICE_ACCOUNT,
5051
}
5152

52-
BUCKET_1 = "test-datafusion-1"
53-
BUCKET_2 = "test-datafusion-2"
54-
BUCKET_1_URI = f"gs://{BUCKET_1}"
55-
BUCKET_2_URI = f"gs://{BUCKET_2}"
53+
BUCKET_NAME_1 = "test-datafusion-async-1"
54+
BUCKET_NAME_2 = "test-datafusion-async-2"
55+
BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
56+
BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"
5657

5758
PIPELINE_NAME = "test-pipe"
5859
PIPELINE = {
5960
"artifact": {
6061
"name": "cdap-data-pipeline",
61-
"version": "6.7.2",
62+
"version": "6.8.3",
6263
"scope": "SYSTEM",
6364
},
6465
"description": "Data Pipeline Application",
65-
"name": "test-pipe",
66+
"name": PIPELINE_NAME,
6667
"config": {
6768
"resources": {"memoryMB": 2048, "virtualCores": 1},
6869
"driverResources": {"memoryMB": 2048, "virtualCores": 1},
@@ -79,7 +80,7 @@
7980
"name": "GCSFile",
8081
"type": "batchsource",
8182
"label": "GCS",
82-
"artifact": {"name": "google-cloud", "version": "0.20.3", "scope": "SYSTEM"},
83+
"artifact": {"name": "google-cloud", "version": "0.21.2", "scope": "SYSTEM"},
8384
"properties": {
8485
"project": "auto-detect",
8586
"format": "text",
@@ -90,7 +91,7 @@
9091
"encrypted": "false",
9192
"schema": '{"type":"record","name":"textfile","fields":[{"name"\
9293
:"offset","type":"long"},{"name":"body","type":"string"}]}',
93-
"path": BUCKET_1_URI,
94+
"path": BUCKET_NAME_1_URI,
9495
"referenceName": "foo_bucket",
9596
"useConnection": "false",
9697
"serviceAccountType": "filePath",
@@ -108,7 +109,7 @@
108109
"name": "GCS",
109110
"type": "batchsink",
110111
"label": "GCS2",
111-
"artifact": {"name": "google-cloud", "version": "0.20.3", "scope": "SYSTEM"},
112+
"artifact": {"name": "google-cloud", "version": "0.21.2", "scope": "SYSTEM"},
112113
"properties": {
113114
"project": "auto-detect",
114115
"suffix": "yyyy-MM-dd-HH-mm",
@@ -118,7 +119,7 @@
118119
"schema": '{"type":"record","name":"textfile","fields":[{"name"\
119120
:"offset","type":"long"},{"name":"body","type":"string"}]}',
120121
"referenceName": "bar",
121-
"path": BUCKET_2_URI,
122+
"path": BUCKET_NAME_2_URI,
122123
"serviceAccountType": "filePath",
123124
"contentType": "application/octet-stream",
124125
},
@@ -145,19 +146,20 @@
145146
# [END howto_data_fusion_env_variables]
146147

147148
with models.DAG(
148-
"example_data_fusion_async",
149+
DAG_ID,
149150
start_date=datetime(2021, 1, 1),
150151
catchup=False,
152+
tags=["example", "datafusion", "deferrable"],
151153
) as dag:
152154
create_bucket1 = GCSCreateBucketOperator(
153155
task_id="create_bucket1",
154-
bucket_name=BUCKET_1,
156+
bucket_name=BUCKET_NAME_1,
155157
project_id=PROJECT_ID,
156158
)
157159

158160
create_bucket2 = GCSCreateBucketOperator(
159161
task_id="create_bucket2",
160-
bucket_name=BUCKET_2,
162+
bucket_name=BUCKET_NAME_2,
161163
project_id=PROJECT_ID,
162164
)
163165

@@ -209,7 +211,7 @@
209211
# [END howto_cloud_data_fusion_list_pipelines]
210212

211213
# [START howto_cloud_data_fusion_start_pipeline_def]
212-
start_pipeline_async = CloudDataFusionStartPipelineOperator(
214+
start_pipeline_def = CloudDataFusionStartPipelineOperator(
213215
location=LOCATION,
214216
pipeline_name=PIPELINE_NAME,
215217
instance_name=INSTANCE_NAME,
@@ -233,40 +235,42 @@
233235
pipeline_name=PIPELINE_NAME,
234236
instance_name=INSTANCE_NAME,
235237
task_id="delete_pipeline",
238+
trigger_rule=TriggerRule.ALL_DONE,
236239
)
237240
# [END howto_cloud_data_fusion_delete_pipeline]
238241

239242
# [START howto_cloud_data_fusion_delete_instance_operator]
240243
delete_instance = CloudDataFusionDeleteInstanceOperator(
241-
location=LOCATION, instance_name=INSTANCE_NAME, task_id="delete_instance"
244+
location=LOCATION,
245+
instance_name=INSTANCE_NAME,
246+
task_id="delete_instance",
247+
trigger_rule=TriggerRule.ALL_DONE,
242248
)
243249
# [END howto_cloud_data_fusion_delete_instance_operator]
244-
#
245-
# Add sleep before creating pipeline
246-
sleep_30_1 = BashOperator(task_id="sleep_30_1", bash_command="sleep 30")
247-
248-
# Add sleep before deleting pipeline
249-
sleep_30 = BashOperator(task_id="sleep_30", bash_command="sleep 30")
250250

251-
# Add sleep before starting pipeline
252-
sleep_20 = BashOperator(task_id="sleep_20", bash_command="sleep 40")
251+
delete_bucket1 = GCSDeleteBucketOperator(
252+
task_id="delete_bucket1", bucket_name=BUCKET_NAME_1, trigger_rule=TriggerRule.ALL_DONE
253+
)
254+
delete_bucket2 = GCSDeleteBucketOperator(
255+
task_id="delete_bucket2", bucket_name=BUCKET_NAME_1, trigger_rule=TriggerRule.ALL_DONE
256+
)
253257

254258
(
255-
create_bucket1
256-
>> create_bucket2
259+
# TEST SETUP
260+
[create_bucket1, create_bucket2]
261+
# TEST BODY
257262
>> create_instance
258263
>> get_instance
259264
>> restart_instance
260265
>> update_instance
261-
>> sleep_30_1
262266
>> create_pipeline
263267
>> list_pipelines
264-
>> sleep_20
265-
>> start_pipeline_async
268+
>> start_pipeline_def
266269
>> stop_pipeline
267-
>> sleep_30
268270
>> delete_pipeline
269271
>> delete_instance
272+
# TEST TEARDOWN
273+
>> [delete_bucket1, delete_bucket2]
270274
)
271275

272276
from tests.system.utils.watcher import watcher

0 commit comments

Comments
 (0)