Skip to content

Commit 04c1fef

Browse files
[AIRFLOW-6676] added GCSDeleteBucketOperator (#7307)
1 parent 92c72f4 commit 04c1fef

File tree

6 files changed

+145
-5
lines changed

6 files changed

+145
-5
lines changed

‎airflow/providers/google/cloud/example_dags/example_gcs.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
from airflow import models
2525
from airflow.operators.bash import BashOperator
2626
from airflow.providers.google.cloud.operators.gcs import (
27-
GCSBucketCreateAclEntryOperator, GCSCreateBucketOperator, GCSDeleteObjectsOperator,
28-
GcsFileTransformOperator, GCSListObjectsOperator, GCSObjectCreateAclEntryOperator, GCSToLocalOperator,
27+
GCSBucketCreateAclEntryOperator, GCSCreateBucketOperator, GCSDeleteBucketOperator,
28+
GCSDeleteObjectsOperator, GcsFileTransformOperator, GCSListObjectsOperator,
29+
GCSObjectCreateAclEntryOperator, GCSToLocalOperator,
2930
)
3031
from airflow.providers.google.cloud.operators.gcs_to_gcs import GCSToGCSOperator
3132
from airflow.providers.google.cloud.operators.local_to_gcs import LocalFilesystemToGCSOperator
@@ -126,7 +127,32 @@
126127
task_id="delete_files", bucket_name=BUCKET_1, objects=[BUCKET_FILE_LOCATION]
127128
)
128129

130+
# [START howto_operator_gcs_delete_bucket]
131+
delete_bucket_1 = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_1)
132+
delete_bucket_2 = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_2)
133+
# [END howto_operator_gcs_delete_bucket]
134+
129135
[create_bucket1, create_bucket2] >> list_buckets >> list_buckets_result
130136
[create_bucket1, create_bucket2] >> upload_file
131137
upload_file >> [download_file, copy_file]
132138
upload_file >> gcs_bucket_create_acl_entry_task >> gcs_object_create_acl_entry_task >> delete_files
139+
140+
create_bucket1 >> delete_bucket_1
141+
create_bucket2 >> delete_bucket_2
142+
create_bucket2 >> copy_file
143+
create_bucket1 >> copy_file
144+
list_buckets >> delete_bucket_1
145+
upload_file >> delete_bucket_1
146+
create_bucket1 >> upload_file >> delete_bucket_1
147+
transform_file >> delete_bucket_1
148+
gcs_bucket_create_acl_entry_task >> delete_bucket_1
149+
gcs_object_create_acl_entry_task >> delete_bucket_1
150+
download_file >> delete_bucket_1
151+
copy_file >> delete_bucket_1
152+
copy_file >> delete_bucket_2
153+
delete_files >> delete_bucket_1
154+
155+
156+
if __name__ == '__main__':
157+
dag.clear(reset_dag_runs=True)
158+
dag.run()

‎airflow/providers/google/cloud/hooks/gcs.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from typing import Optional, Set, Tuple, Union
2929
from urllib.parse import urlparse
3030

31+
from google.api_core.exceptions import NotFound
3132
from google.cloud import storage
3233

3334
from airflow.exceptions import AirflowException
@@ -322,6 +323,27 @@ def delete(self, bucket_name, object_name):
322323

323324
self.log.info('Blob %s deleted.', object_name)
324325

326+
def delete_bucket(self, bucket_name: str, force: bool = False):
327+
"""
328+
Delete a bucket object from the Google Cloud Storage.
329+
330+
:param bucket_name: name of the bucket which will be deleted
331+
:type bucket_name: str
332+
:param force: false not allow to delete non empty bucket, set force=True
333+
allows to delete non empty bucket
334+
:type: bool
335+
"""
336+
337+
client = self.get_conn()
338+
bucket = client.bucket(bucket_name)
339+
340+
self.log.info("Deleting %s bucket", bucket_name)
341+
try:
342+
bucket.delete(force=force)
343+
self.log.info("Bucket %s has been deleted", bucket_name)
344+
except NotFound:
345+
self.log.info("Bucket %s not exists", bucket_name)
346+
325347
def list(self, bucket_name, versions=None, max_results=None, prefix=None, delimiter=None):
326348
"""
327349
List all objects from the bucket with the give string prefix in name

‎airflow/providers/google/cloud/operators/gcs.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,3 +624,37 @@ def execute(self, context: Dict):
624624
object_name=self.destination_object,
625625
filename=destination_file.name
626626
)
627+
628+
629+
class GCSDeleteBucketOperator(BaseOperator):
630+
"""
631+
Deletes bucket from a Google Cloud Storage.
632+
633+
.. seealso::
634+
For more information on how to use this operator, take a look at the guide:
635+
:ref:`howto/operator:GCSDeleteBucketOperator`
636+
637+
:param bucket_name: name of the bucket which will be deleted
638+
:type bucket_name: str
639+
:param force: false not allow to delete non empty bucket, set force=True
640+
allows to delete non empty bucket
641+
:type: bool
642+
"""
643+
644+
template_fields = ('bucket_name', "gcp_conn_id")
645+
646+
@apply_defaults
647+
def __init__(self,
648+
bucket_name: str,
649+
force: bool = True,
650+
gcp_conn_id: str = 'google_cloud_default',
651+
*args, **kwargs) -> None:
652+
super().__init__(*args, **kwargs)
653+
654+
self.bucket_name = bucket_name
655+
self.force: bool = force
656+
self.gcp_conn_id = gcp_conn_id
657+
658+
def execute(self, context):
659+
hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
660+
hook.delete_bucket(bucket_name=self.bucket_name, force=self.force)

‎docs/howto/operator/gcp/gcs.rst

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,30 @@ For further information, look at:
138138

139139
* `Client Library Documentation <https://googleapis.github.io/google-cloud-python/latest/storage/index.html>`__
140140
* `Product Documentation <https://cloud.google.com/storage/docs/>`__
141+
142+
.. _howto/operator:GCSDeleteBucketOperator:
143+
144+
Deleting Bucket
145+
^^^^^^^^^^^^^^^
146+
147+
Deleting Bucket allows you to remove bucket object from the Google Cloud Storage.
148+
It is performed through the
149+
:class:`~airflow.providers.google.cloud.operators.gcs.GCSDeleteBucketOperator` operator.
150+
151+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs.py
152+
:language: python
153+
:dedent: 4
154+
:start-after: [START howto_operator_gcs_delete_bucket]
155+
:end-before: [END howto_operator_gcs_delete_bucket]
156+
157+
You can use :ref:`Jinja templating <jinja-templating>` with
158+
:template-fields:`airflow.providers.google.cloud.operators.gcs.GCSDeleteBucketOperator`
159+
parameters which allows you to dynamically determine values.
160+
161+
Reference
162+
^^^^^^^^^
163+
164+
For further information, look at:
165+
166+
* `Client Library Documentation <https://googleapis.dev/python/storage/latest/buckets.html>`__
167+
* `Product Documentation <https://cloud.google.com/storage/docs/json_api/v1/buckets>`__

‎tests/providers/google/cloud/hooks/test_gcs.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import tempfile
2323
import unittest
2424
from datetime import datetime
25+
from unittest import mock
2526

2627
import dateutil
27-
import mock
2828
from google.cloud import exceptions, storage
2929

3030
from airflow.exceptions import AirflowException
@@ -334,6 +334,26 @@ def test_delete_nonexisting_object(self, mock_service):
334334
with self.assertRaises(exceptions.NotFound):
335335
self.gcs_hook.delete(bucket_name=test_bucket, object_name=test_object)
336336

337+
@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
338+
def test_delete_bucket(self, mock_service):
339+
test_bucket = "test bucket"
340+
341+
self.gcs_hook.delete_bucket(bucket_name=test_bucket)
342+
343+
mock_service.return_value.bucket.assert_called_once_with(test_bucket)
344+
mock_service.return_value.bucket.return_value.delete.assert_called_once()
345+
346+
@mock.patch(GCS_STRING.format('GCSHook.get_conn'), **{
347+
'return_value.bucket.return_value.delete.side_effect': exceptions.NotFound(message="Not Found")
348+
})
349+
def test_delete_nonexisting_bucket(self, mock_service):
350+
test_bucket = "test bucket"
351+
352+
self.gcs_hook.delete_bucket(bucket_name=test_bucket)
353+
354+
mock_service.return_value.bucket.assert_called_once_with(test_bucket)
355+
mock_service.return_value.bucket.return_value.delete.assert_called_once()
356+
337357
@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
338358
def test_object_get_size(self, mock_service):
339359
test_bucket = 'test_bucket'

‎tests/providers/google/cloud/operators/test_gcs.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
import mock
2222

2323
from airflow.providers.google.cloud.operators.gcs import (
24-
GCSBucketCreateAclEntryOperator, GCSCreateBucketOperator, GCSDeleteObjectsOperator,
25-
GcsFileTransformOperator, GCSListObjectsOperator, GCSObjectCreateAclEntryOperator, GCSToLocalOperator,
24+
GCSBucketCreateAclEntryOperator, GCSCreateBucketOperator, GCSDeleteBucketOperator,
25+
GCSDeleteObjectsOperator, GcsFileTransformOperator, GCSListObjectsOperator,
26+
GCSObjectCreateAclEntryOperator, GCSToLocalOperator,
2627
)
2728

2829
TASK_ID = "test-gcs-operator"
@@ -230,3 +231,13 @@ def test_execute(self, mock_hook, mock_subprocess, mock_tempfile):
230231
object_name=destination_object,
231232
filename=destination,
232233
)
234+
235+
236+
class TestGCSDeleteBucketOperator(unittest.TestCase):
237+
@mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook")
238+
def test_delete_bucket(self, mock_hook):
239+
operator = GCSDeleteBucketOperator(
240+
task_id=TASK_ID, bucket_name=TEST_BUCKET)
241+
242+
operator.execute(None)
243+
mock_hook.return_value.delete_bucket.assert_called_once_with(bucket_name=TEST_BUCKET, force=True)

0 commit comments

Comments
 (0)