Skip to content

Commit 65f3b18

Browse files
authored
Sql to gcs with exclude columns (#23695)
1 parent 69f444f commit 65f3b18

File tree

3 files changed

+43
-2
lines changed

3 files changed

+43
-2
lines changed

‎airflow/providers/google/cloud/transfers/sql_to_gcs.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class BaseSQLToGCSOperator(BaseOperator):
7171
If set as a sequence, the identities from the list must grant
7272
Service Account Token Creator IAM role to the directly preceding identity, with first
7373
account from the list granting this role to the originating account (templated).
74+
:param exclude_columns: set of columns to exclude from transmission
7475
"""
7576

7677
template_fields: Sequence[str] = (
@@ -103,9 +104,13 @@ def __init__(
103104
gcp_conn_id: str = 'google_cloud_default',
104105
delegate_to: Optional[str] = None,
105106
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
107+
exclude_columns=None,
106108
**kwargs,
107109
) -> None:
108110
super().__init__(**kwargs)
111+
if exclude_columns is None:
112+
exclude_columns = set()
113+
109114
self.sql = sql
110115
self.bucket = bucket
111116
self.filename = filename
@@ -120,6 +125,7 @@ def __init__(
120125
self.gcp_conn_id = gcp_conn_id
121126
self.delegate_to = delegate_to
122127
self.impersonation_chain = impersonation_chain
128+
self.exclude_columns = exclude_columns
123129

124130
def execute(self, context: 'Context'):
125131
self.log.info("Executing query")
@@ -165,7 +171,9 @@ def _write_local_data_files(self, cursor):
165171
names in GCS, and values are file handles to local files that
166172
contain the data for the GCS objects.
167173
"""
168-
schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
174+
org_schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
175+
schema = [column for column in org_schema if column not in self.exclude_columns]
176+
169177
col_type_dict = self._get_col_type_dict()
170178
file_no = 0
171179

@@ -314,7 +322,11 @@ def _write_local_schema_file(self, cursor):
314322
schema = self.schema
315323
else:
316324
self.log.info("Starts generating schema")
317-
schema = [self.field_to_bigquery(field) for field in cursor.description]
325+
schema = [
326+
self.field_to_bigquery(field)
327+
for field in cursor.description
328+
if field[0] not in self.exclude_columns
329+
]
318330

319331
if isinstance(schema, list):
320332
schema = json.dumps(schema, sort_keys=True)
-3.65 KB
Binary file not shown.

‎tests/providers/google/cloud/transfers/test_sql_to_gcs.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@
6161

6262
OUTPUT_DF = pd.DataFrame([['convert_type_return_value'] * 3] * 3, columns=COLUMNS)
6363

64+
EXCLUDE_COLUMNS = set('column_c')
65+
NEW_COLUMNS = [c for c in COLUMNS if c not in EXCLUDE_COLUMNS]
66+
OUTPUT_DF_WITH_EXCLUDE_COLUMNS = pd.DataFrame(
67+
[['convert_type_return_value'] * len(NEW_COLUMNS)] * 3, columns=NEW_COLUMNS
68+
)
69+
6470

6571
class DummySQLToGCSOperator(BaseSQLToGCSOperator):
6672
def field_to_bigquery(self, field) -> Dict[str, str]:
@@ -287,3 +293,26 @@ def test__write_local_data_files_parquet(self):
287293
file.flush()
288294
df = pd.read_parquet(file.name)
289295
assert df.equals(OUTPUT_DF)
296+
297+
def test__write_local_data_files_json_with_exclude_columns(self):
298+
op = DummySQLToGCSOperator(
299+
sql=SQL,
300+
bucket=BUCKET,
301+
filename=FILENAME,
302+
task_id=TASK_ID,
303+
schema_filename=SCHEMA_FILE,
304+
export_format="json",
305+
gzip=False,
306+
schema=SCHEMA,
307+
gcp_conn_id='google_cloud_default',
308+
exclude_columns=EXCLUDE_COLUMNS,
309+
)
310+
cursor = MagicMock()
311+
cursor.__iter__.return_value = INPUT_DATA
312+
cursor.description = CURSOR_DESCRIPTION
313+
314+
files = op._write_local_data_files(cursor)
315+
file = next(files)['file_handle']
316+
file.flush()
317+
df = pd.read_json(file.name, orient='records', lines=True)
318+
assert df.equals(OUTPUT_DF_WITH_EXCLUDE_COLUMNS)

0 commit comments

Comments
 (0)