Skip to content

Commit 7a7c5f8

Browse files
Add system tests for Vertex AI operators in new approach (#27053)
1 parent 9c737f6 commit 7a7c5f8

33 files changed

+3053
-123
lines changed

‎airflow/providers/google/cloud/hooks/vertex_ai/auto_ml.py

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,11 @@ def extract_model_id(obj: dict) -> str:
259259
"""Returns unique id of the Model."""
260260
return obj["name"].rpartition("/")[-1]
261261

262+
@staticmethod
263+
def extract_training_id(resource_name: str) -> str:
264+
"""Returns unique id of the Training pipeline."""
265+
return resource_name.rpartition("/")[-1]
266+
262267
def wait_for_operation(self, operation: Operation, timeout: float | None = None):
263268
"""Waits for long-lasting operation to complete."""
264269
try:
@@ -303,7 +308,7 @@ def create_auto_ml_tabular_training_job(
303308
export_evaluated_data_items_bigquery_destination_uri: str | None = None,
304309
export_evaluated_data_items_override_destination: bool = False,
305310
sync: bool = True,
306-
) -> models.Model:
311+
) -> tuple[models.Model | None, str]:
307312
"""
308313
Create an AutoML Tabular Training Job.
309314
@@ -488,9 +493,15 @@ def create_auto_ml_tabular_training_job(
488493
export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination,
489494
sync=sync,
490495
)
491-
model.wait()
492-
493-
return model
496+
training_id = self.extract_training_id(self._job.resource_name)
497+
if model:
498+
model.wait()
499+
else:
500+
self.log.warning(
501+
"Training did not produce a Managed Model returning None. Training Pipeline is not "
502+
"configured to upload a Model."
503+
)
504+
return model, training_id
494505

495506
@GoogleBaseHook.fallback_to_default_project_id
496507
def create_auto_ml_forecasting_training_job(
@@ -529,7 +540,7 @@ def create_auto_ml_forecasting_training_job(
529540
model_display_name: str | None = None,
530541
model_labels: dict[str, str] | None = None,
531542
sync: bool = True,
532-
) -> models.Model:
543+
) -> tuple[models.Model | None, str]:
533544
"""
534545
Create an AutoML Forecasting Training Job.
535546
@@ -715,9 +726,15 @@ def create_auto_ml_forecasting_training_job(
715726
model_labels=model_labels,
716727
sync=sync,
717728
)
718-
model.wait()
719-
720-
return model
729+
training_id = self.extract_training_id(self._job.resource_name)
730+
if model:
731+
model.wait()
732+
else:
733+
self.log.warning(
734+
"Training did not produce a Managed Model returning None. Training Pipeline is not "
735+
"configured to upload a Model."
736+
)
737+
return model, training_id
721738

722739
@GoogleBaseHook.fallback_to_default_project_id
723740
def create_auto_ml_image_training_job(
@@ -744,7 +761,7 @@ def create_auto_ml_image_training_job(
744761
model_labels: dict[str, str] | None = None,
745762
disable_early_stopping: bool = False,
746763
sync: bool = True,
747-
) -> models.Model:
764+
) -> tuple[models.Model | None, str]:
748765
"""
749766
Create an AutoML Image Training Job.
750767
@@ -885,9 +902,15 @@ def create_auto_ml_image_training_job(
885902
disable_early_stopping=disable_early_stopping,
886903
sync=sync,
887904
)
888-
model.wait()
889-
890-
return model
905+
training_id = self.extract_training_id(self._job.resource_name)
906+
if model:
907+
model.wait()
908+
else:
909+
self.log.warning(
910+
"Training did not produce a Managed Model returning None. AutoML Image Training "
911+
"Pipeline is not configured to upload a Model."
912+
)
913+
return model, training_id
891914

892915
@GoogleBaseHook.fallback_to_default_project_id
893916
def create_auto_ml_text_training_job(
@@ -911,7 +934,7 @@ def create_auto_ml_text_training_job(
911934
model_display_name: str | None = None,
912935
model_labels: dict[str, str] | None = None,
913936
sync: bool = True,
914-
) -> models.Model:
937+
) -> tuple[models.Model | None, str]:
915938
"""
916939
Create an AutoML Text Training Job.
917940
@@ -1016,9 +1039,15 @@ def create_auto_ml_text_training_job(
10161039
model_labels=model_labels,
10171040
sync=sync,
10181041
)
1019-
model.wait()
1020-
1021-
return model
1042+
training_id = self.extract_training_id(self._job.resource_name)
1043+
if model:
1044+
model.wait()
1045+
else:
1046+
self.log.warning(
1047+
"Training did not produce a Managed Model returning None. AutoML Text Training "
1048+
"Pipeline is not configured to upload a Model."
1049+
)
1050+
return model, training_id
10221051

10231052
@GoogleBaseHook.fallback_to_default_project_id
10241053
def create_auto_ml_video_training_job(
@@ -1039,7 +1068,7 @@ def create_auto_ml_video_training_job(
10391068
model_display_name: str | None = None,
10401069
model_labels: dict[str, str] | None = None,
10411070
sync: bool = True,
1042-
) -> models.Model:
1071+
) -> tuple[models.Model | None, str]:
10431072
"""
10441073
Create an AutoML Video Training Job.
10451074
@@ -1141,9 +1170,15 @@ def create_auto_ml_video_training_job(
11411170
model_labels=model_labels,
11421171
sync=sync,
11431172
)
1144-
model.wait()
1145-
1146-
return model
1173+
training_id = self.extract_training_id(self._job.resource_name)
1174+
if model:
1175+
model.wait()
1176+
else:
1177+
self.log.warning(
1178+
"Training did not produce a Managed Model returning None. AutoML Video Training "
1179+
"Pipeline is not configured to upload a Model."
1180+
)
1181+
return model, training_id
11471182

11481183
@GoogleBaseHook.fallback_to_default_project_id
11491184
def delete_training_pipeline(

‎airflow/providers/google/cloud/hooks/vertex_ai/custom_job.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,11 @@ def extract_training_id(resource_name: str) -> str:
247247
"""Returns unique id of the Training pipeline."""
248248
return resource_name.rpartition("/")[-1]
249249

250+
@staticmethod
251+
def extract_custom_job_id(custom_job_name: str) -> str:
252+
"""Returns unique id of the Custom Job pipeline."""
253+
return custom_job_name.rpartition("/")[-1]
254+
250255
def wait_for_operation(self, operation: Operation, timeout: float | None = None):
251256
"""Waits for long-lasting operation to complete."""
252257
try:
@@ -292,7 +297,7 @@ def _run_job(
292297
timestamp_split_column_name: str | None = None,
293298
tensorboard: str | None = None,
294299
sync=True,
295-
) -> tuple[models.Model | None, str]:
300+
) -> tuple[models.Model | None, str, str]:
296301
"""Run Job for training pipeline"""
297302
model = job.run(
298303
dataset=dataset,
@@ -323,6 +328,9 @@ def _run_job(
323328
sync=sync,
324329
)
325330
training_id = self.extract_training_id(job.resource_name)
331+
custom_job_id = self.extract_custom_job_id(
332+
job.gca_resource.training_task_metadata.get("backingCustomJob")
333+
)
326334
if model:
327335
model.wait()
328336
else:
@@ -332,7 +340,7 @@ def _run_job(
332340
"model_serving_container_image_uri and model_display_name passed in. "
333341
"Ensure that your training script saves to model to os.environ['AIP_MODEL_DIR']."
334342
)
335-
return model, training_id
343+
return model, training_id, custom_job_id
336344

337345
@GoogleBaseHook.fallback_to_default_project_id
338346
def cancel_pipeline_job(
@@ -613,7 +621,7 @@ def create_custom_container_training_job(
613621
timestamp_split_column_name: str | None = None,
614622
tensorboard: str | None = None,
615623
sync=True,
616-
) -> tuple[models.Model | None, str]:
624+
) -> tuple[models.Model | None, str, str]:
617625
"""
618626
Create Custom Container Training Job
619627
@@ -885,7 +893,7 @@ def create_custom_container_training_job(
885893
if not self._job:
886894
raise AirflowException("CustomJob was not created")
887895

888-
model, training_id = self._run_job(
896+
model, training_id, custom_job_id = self._run_job(
889897
job=self._job,
890898
dataset=dataset,
891899
annotation_schema_uri=annotation_schema_uri,
@@ -915,7 +923,7 @@ def create_custom_container_training_job(
915923
sync=sync,
916924
)
917925

918-
return model, training_id
926+
return model, training_id, custom_job_id
919927

920928
@GoogleBaseHook.fallback_to_default_project_id
921929
def create_custom_python_package_training_job(
@@ -971,7 +979,7 @@ def create_custom_python_package_training_job(
971979
timestamp_split_column_name: str | None = None,
972980
tensorboard: str | None = None,
973981
sync=True,
974-
) -> tuple[models.Model | None, str]:
982+
) -> tuple[models.Model | None, str, str]:
975983
"""
976984
Create Custom Python Package Training Job
977985
@@ -1243,7 +1251,7 @@ def create_custom_python_package_training_job(
12431251
if not self._job:
12441252
raise AirflowException("CustomJob was not created")
12451253

1246-
model, training_id = self._run_job(
1254+
model, training_id, custom_job_id = self._run_job(
12471255
job=self._job,
12481256
dataset=dataset,
12491257
annotation_schema_uri=annotation_schema_uri,
@@ -1273,7 +1281,7 @@ def create_custom_python_package_training_job(
12731281
sync=sync,
12741282
)
12751283

1276-
return model, training_id
1284+
return model, training_id, custom_job_id
12771285

12781286
@GoogleBaseHook.fallback_to_default_project_id
12791287
def create_custom_training_job(
@@ -1329,7 +1337,7 @@ def create_custom_training_job(
13291337
timestamp_split_column_name: str | None = None,
13301338
tensorboard: str | None = None,
13311339
sync=True,
1332-
) -> tuple[models.Model | None, str]:
1340+
) -> tuple[models.Model | None, str, str]:
13331341
"""
13341342
Create Custom Training Job
13351343
@@ -1601,7 +1609,7 @@ def create_custom_training_job(
16011609
if not self._job:
16021610
raise AirflowException("CustomJob was not created")
16031611

1604-
model, training_id = self._run_job(
1612+
model, training_id, custom_job_id = self._run_job(
16051613
job=self._job,
16061614
dataset=dataset,
16071615
annotation_schema_uri=annotation_schema_uri,
@@ -1631,7 +1639,7 @@ def create_custom_training_job(
16311639
sync=sync,
16321640
)
16331641

1634-
return model, training_id
1642+
return model, training_id, custom_job_id
16351643

16361644
@GoogleBaseHook.fallback_to_default_project_id
16371645
def delete_pipeline_job(

0 commit comments

Comments
 (0)