Skip to content

Commit c931d88

Browse files
Add deferrable mode to CloudBuildCreateBuildOperator (#27783)
1 parent 24745c7 commit c931d88

File tree

10 files changed

+1082
-78
lines changed

10 files changed

+1082
-78
lines changed

‎airflow/providers/google/cloud/hooks/cloud_build.py

Lines changed: 87 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020

2121
from typing import Sequence
2222

23+
from google.api_core.exceptions import AlreadyExists
2324
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
2425
from google.api_core.operation import Operation
2526
from google.api_core.retry import Retry
26-
from google.cloud.devtools.cloudbuild import CloudBuildClient
27+
from google.cloud.devtools.cloudbuild_v1 import CloudBuildAsyncClient, CloudBuildClient, GetBuildRequest
2728
from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
2829

2930
from airflow.exceptions import AirflowException
@@ -77,6 +78,14 @@ def _get_build_id_from_operation(self, operation: Operation) -> str:
7778
except Exception:
7879
raise AirflowException("Could not retrieve Build ID from Operation.")
7980

81+
def wait_for_operation(self, operation: Operation, timeout: float | None = None):
82+
"""Waits for long-lasting operation to complete."""
83+
try:
84+
return operation.result(timeout=timeout)
85+
except Exception:
86+
error = operation.exception(timeout=timeout)
87+
raise AirflowException(error)
88+
8089
def get_conn(self) -> CloudBuildClient:
8190
"""
8291
Retrieves the connection to Google Cloud Build.
@@ -123,6 +132,41 @@ def cancel_build(
123132

124133
return build
125134

135+
@GoogleBaseHook.fallback_to_default_project_id
136+
def create_build_without_waiting_for_result(
137+
self,
138+
build: dict | Build,
139+
project_id: str = PROVIDE_PROJECT_ID,
140+
retry: Retry | _MethodDefault = DEFAULT,
141+
timeout: float | None = None,
142+
metadata: Sequence[tuple[str, str]] = (),
143+
) -> tuple[Operation, str]:
144+
"""
145+
Starts a build with the specified configuration without waiting for it to finish.
146+
147+
:param build: The build resource to create. If a dict is provided, it must be of the same form
148+
as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
149+
:param project_id: Optional, Google Cloud Project project_id where the function belongs.
150+
If set to None or missing, the default project_id from the GCP connection is used.
151+
:param retry: Optional, a retry object used to retry requests. If `None` is specified, requests
152+
will not be retried.
153+
:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
154+
Note that if `retry` is specified, the timeout applies to each individual attempt.
155+
:param metadata: Optional, additional metadata that is provided to the method.
156+
"""
157+
client = self.get_conn()
158+
159+
self.log.info("Start creating build...")
160+
161+
operation = client.create_build(
162+
request={"project_id": project_id, "build": build},
163+
retry=retry,
164+
timeout=timeout,
165+
metadata=metadata,
166+
)
167+
id_ = self._get_build_id_from_operation(operation)
168+
return operation, id_
169+
126170
@GoogleBaseHook.fallback_to_default_project_id
127171
def create_build(
128172
self,
@@ -150,7 +194,7 @@ def create_build(
150194
"""
151195
client = self.get_conn()
152196

153-
self.log.info("Start creating build.")
197+
self.log.info("Start creating build...")
154198

155199
operation = client.create_build(
156200
request={"project_id": project_id, "build": build},
@@ -195,14 +239,17 @@ def create_build_trigger(
195239
"""
196240
client = self.get_conn()
197241

198-
self.log.info("Start creating build trigger.")
242+
self.log.info("Start creating build trigger...")
199243

200-
trigger = client.create_build_trigger(
201-
request={"project_id": project_id, "trigger": trigger},
202-
retry=retry,
203-
timeout=timeout,
204-
metadata=metadata,
205-
)
244+
try:
245+
trigger = client.create_build_trigger(
246+
request={"project_id": project_id, "trigger": trigger},
247+
retry=retry,
248+
timeout=timeout,
249+
metadata=metadata,
250+
)
251+
except AlreadyExists:
252+
raise AirflowException("Cloud Build Trigger with such parameters already exists.")
206253

207254
self.log.info("Build trigger has been created.")
208255

@@ -492,7 +539,6 @@ def run_build_trigger(
492539
client = self.get_conn()
493540

494541
self.log.info("Start running build trigger: %s.", trigger_id)
495-
496542
operation = client.run_build_trigger(
497543
request={"project_id": project_id, "trigger_id": trigger_id, "source": source},
498544
retry=retry,
@@ -504,7 +550,6 @@ def run_build_trigger(
504550

505551
if not wait:
506552
return self.get_build(id_=id_, project_id=project_id)
507-
508553
operation.result()
509554

510555
self.log.info("Build trigger has been run: %s.", trigger_id)
@@ -550,3 +595,34 @@ def update_build_trigger(
550595
self.log.info("Build trigger has been updated: %s.", trigger_id)
551596

552597
return trigger
598+
599+
600+
class CloudBuildAsyncHook(GoogleBaseHook):
601+
"""Asynchronous Hook for the Google Cloud Build Service."""
602+
603+
@GoogleBaseHook.fallback_to_default_project_id
604+
async def get_cloud_build(
605+
self,
606+
id_: str,
607+
project_id: str = PROVIDE_PROJECT_ID,
608+
retry: Retry | _MethodDefault = DEFAULT,
609+
timeout: float | None = None,
610+
metadata: Sequence[tuple[str, str]] = (),
611+
) -> Build:
612+
"""Retrieves a Cloud Build with a specified id."""
613+
if not id_:
614+
raise AirflowException("Google Cloud Build id is required.")
615+
616+
client = CloudBuildAsyncClient()
617+
618+
request = GetBuildRequest(
619+
project_id=project_id,
620+
id=id_,
621+
)
622+
build_instance = await client.get_build(
623+
request=request,
624+
retry=retry,
625+
timeout=timeout,
626+
metadata=metadata,
627+
)
628+
return build_instance

‎airflow/providers/google/cloud/operators/cloud_build.py

Lines changed: 72 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
CloudBuildTriggerDetailsLink,
3838
CloudBuildTriggersListLink,
3939
)
40+
from airflow.providers.google.cloud.triggers.cloud_build import CloudBuildCreateBuildTrigger
41+
from airflow.providers.google.common.consts import GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
4042
from airflow.utils import yaml
4143

4244
if TYPE_CHECKING:
@@ -147,7 +149,13 @@ class CloudBuildCreateBuildOperator(BaseOperator):
147149
If set as a sequence, the identities from the list must grant
148150
Service Account Token Creator IAM role to the directly preceding identity, with first
149151
account from the list granting this role to the originating account (templated).
150-
152+
:param delegate_to: The account to impersonate using domain-wide delegation of authority,
153+
if any. For this to work, the service account making the request must have
154+
domain-wide delegation enabled.
155+
:param retry: Designation of what errors, if any, should be retried.
156+
:param timeout: The timeout for this request.
157+
:param metadata: Strings which should be sent along with the request as metadata.
158+
:param deferrable: Run operator in the deferrable mode
151159
"""
152160

153161
template_fields: Sequence[str] = ("project_id", "build", "gcp_conn_id", "impersonation_chain")
@@ -164,19 +172,25 @@ def __init__(
164172
metadata: Sequence[tuple[str, str]] = (),
165173
gcp_conn_id: str = "google_cloud_default",
166174
impersonation_chain: str | Sequence[str] | None = None,
175+
delegate_to: str | None = None,
176+
poll_interval: float = 4.0,
177+
deferrable: bool = False,
167178
**kwargs,
168179
) -> None:
169180
super().__init__(**kwargs)
181+
self.build = build
182+
# Not template fields to keep original value
183+
self.build_raw = build
170184
self.project_id = project_id
171185
self.wait = wait
172186
self.retry = retry
173187
self.timeout = timeout
174188
self.metadata = metadata
175189
self.gcp_conn_id = gcp_conn_id
176190
self.impersonation_chain = impersonation_chain
177-
self.build = build
178-
# Not template fields to keep original value
179-
self.build_raw = build
191+
self.delegate_to = delegate_to
192+
self.poll_interval = poll_interval
193+
self.deferrable = deferrable
180194

181195
def prepare_template(self) -> None:
182196
# if no file is specified, skip
@@ -189,29 +203,69 @@ def prepare_template(self) -> None:
189203
self.build = json.loads(file.read())
190204

191205
def execute(self, context: Context):
192-
hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
193-
206+
hook = CloudBuildHook(
207+
gcp_conn_id=self.gcp_conn_id,
208+
impersonation_chain=self.impersonation_chain,
209+
delegate_to=self.delegate_to,
210+
)
194211
build = BuildProcessor(build=self.build).process_body()
195212

196-
result = hook.create_build(
213+
self.cloud_build_operation, self.id_ = hook.create_build_without_waiting_for_result(
197214
build=build,
198215
project_id=self.project_id,
199-
wait=self.wait,
200216
retry=self.retry,
201217
timeout=self.timeout,
202218
metadata=self.metadata,
203219
)
204-
205-
self.xcom_push(context, key="id", value=result.id)
206-
project_id = self.project_id or hook.project_id
207-
if project_id:
208-
CloudBuildLink.persist(
209-
context=context,
210-
task_instance=self,
211-
project_id=project_id,
212-
build_id=result.id,
220+
self.xcom_push(context, key="id", value=self.id_)
221+
if not self.wait:
222+
return Build.to_dict(hook.get_build(id_=self.id_, project_id=self.project_id))
223+
224+
if self.deferrable:
225+
self.defer(
226+
trigger=CloudBuildCreateBuildTrigger(
227+
id_=self.id_,
228+
project_id=self.project_id,
229+
gcp_conn_id=self.gcp_conn_id,
230+
impersonation_chain=self.impersonation_chain,
231+
delegate_to=self.delegate_to,
232+
poll_interval=self.poll_interval,
233+
),
234+
method_name=GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME,
213235
)
214-
return Build.to_dict(result)
236+
else:
237+
cloud_build_instance_result = hook.wait_for_operation(
238+
timeout=self.timeout, operation=self.cloud_build_operation
239+
)
240+
project_id = self.project_id or hook.project_id
241+
if project_id:
242+
CloudBuildLink.persist(
243+
context=context,
244+
task_instance=self,
245+
project_id=project_id,
246+
build_id=cloud_build_instance_result.id,
247+
)
248+
return Build.to_dict(cloud_build_instance_result)
249+
250+
def execute_complete(self, context: Context, event: dict):
251+
if event["status"] == "success":
252+
hook = CloudBuildHook(
253+
gcp_conn_id=self.gcp_conn_id,
254+
impersonation_chain=self.impersonation_chain,
255+
delegate_to=self.delegate_to,
256+
)
257+
self.log.info("Cloud Build completed with response %s ", event["message"])
258+
project_id = self.project_id or hook.project_id
259+
if project_id:
260+
CloudBuildLink.persist(
261+
context=context,
262+
task_instance=self,
263+
project_id=project_id,
264+
build_id=event["id_"],
265+
)
266+
return event["instance"]
267+
else:
268+
raise AirflowException(f"Unexpected error in the operation: {event['message']}")
215269

216270

217271
class CloudBuildCreateBuildTriggerOperator(BaseOperator):

0 commit comments

Comments
 (0)