Skip to content

Commit e7aeacf

Browse files
authored
Add OracleToGCS Transfer (#13246)
1 parent 323084e commit e7aeacf

File tree

7 files changed

+374
-1
lines changed

7 files changed

+374
-1
lines changed

‎CONTRIBUTING.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica
692692
apache.livy http
693693
dingding http
694694
discord http
695-
google amazon,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,postgres,presto,salesforce,sftp,ssh
695+
google amazon,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,oracle,postgres,presto,salesforce,sftp,ssh
696696
hashicorp google
697697
microsoft.azure google,oracle
698698
microsoft.mssql odbc

‎airflow/providers/dependencies.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
"microsoft.azure",
3737
"microsoft.mssql",
3838
"mysql",
39+
"oracle",
3940
"postgres",
4041
"presto",
4142
"salesforce",
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import os
19+
20+
from airflow import models
21+
from airflow.providers.google.cloud.transfers.oracle_to_gcs import OracleToGCSOperator
22+
from airflow.utils import dates
23+
24+
GCS_BUCKET = os.environ.get("GCP_GCS_BUCKET", "example-airflow-oracle-gcs")
25+
FILENAME = 'test_file'
26+
27+
SQL_QUERY = "SELECT * from test_table"
28+
29+
with models.DAG(
30+
'example_oracle_to_gcs',
31+
default_args=dict(start_date=dates.days_ago(1)),
32+
schedule_interval=None,
33+
tags=['example'],
34+
) as dag:
35+
# [START howto_operator_oracle_to_gcs]
36+
upload = OracleToGCSOperator(
37+
task_id='oracle_to_gcs', sql=SQL_QUERY, bucket=GCS_BUCKET, filename=FILENAME, export_format='csv'
38+
)
39+
# [END howto_operator_oracle_to_gcs]
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
import base64
20+
import calendar
21+
from datetime import date, datetime, timedelta
22+
from decimal import Decimal
23+
from typing import Dict
24+
25+
import cx_Oracle
26+
27+
from airflow.providers.google.cloud.transfers.sql_to_gcs import BaseSQLToGCSOperator
28+
from airflow.providers.oracle.hooks.oracle import OracleHook
29+
from airflow.utils.decorators import apply_defaults
30+
31+
32+
class OracleToGCSOperator(BaseSQLToGCSOperator):
33+
"""Copy data from Oracle to Google Cloud Storage in JSON or CSV format.
34+
35+
.. seealso::
36+
For more information on how to use this operator, take a look at the guide:
37+
:ref:`howto/operator:OracleToGCSOperator`
38+
39+
:param oracle_conn_id: Reference to a specific Oracle hook.
40+
:type oracle_conn_id: str
41+
:param ensure_utc: Ensure TIMESTAMP columns exported as UTC. If set to
42+
`False`, TIMESTAMP columns will be exported using the Oracle server's
43+
default timezone.
44+
:type ensure_utc: bool
45+
"""
46+
47+
ui_color = '#a0e08c'
48+
49+
type_map = {
50+
cx_Oracle.DB_TYPE_BINARY_DOUBLE: 'DECIMAL',
51+
cx_Oracle.DB_TYPE_BINARY_FLOAT: 'DECIMAL',
52+
cx_Oracle.DB_TYPE_BINARY_INTEGER: 'INTEGER',
53+
cx_Oracle.DB_TYPE_BOOLEAN: 'BOOLEAN',
54+
cx_Oracle.DB_TYPE_DATE: 'TIMESTAMP',
55+
cx_Oracle.DB_TYPE_NUMBER: 'NUMERIC',
56+
cx_Oracle.DB_TYPE_TIMESTAMP: 'TIMESTAMP',
57+
cx_Oracle.DB_TYPE_TIMESTAMP_LTZ: 'TIMESTAMP',
58+
cx_Oracle.DB_TYPE_TIMESTAMP_TZ: 'TIMESTAMP',
59+
}
60+
61+
@apply_defaults
62+
def __init__(self, *, oracle_conn_id='oracle_default', ensure_utc=False, **kwargs):
63+
super().__init__(**kwargs)
64+
self.ensure_utc = ensure_utc
65+
self.oracle_conn_id = oracle_conn_id
66+
67+
def query(self):
68+
"""Queries Oracle and returns a cursor to the results."""
69+
oracle = OracleHook(oracle_conn_id=self.oracle_conn_id)
70+
conn = oracle.get_conn()
71+
cursor = conn.cursor()
72+
if self.ensure_utc:
73+
# Ensure TIMESTAMP results are in UTC
74+
tz_query = "SET time_zone = '+00:00'"
75+
self.log.info('Executing: %s', tz_query)
76+
cursor.execute(tz_query)
77+
self.log.info('Executing: %s', self.sql)
78+
cursor.execute(self.sql)
79+
return cursor
80+
81+
def field_to_bigquery(self, field) -> Dict[str, str]:
82+
field_type = self.type_map.get(field[1], "STRING")
83+
84+
field_mode = "NULLABLE" if not field[6] or field_type == "TIMESTAMP" else "REQUIRED"
85+
return {
86+
'name': field[0],
87+
'type': field_type,
88+
'mode': field_mode,
89+
}
90+
91+
def convert_type(self, value, schema_type):
92+
"""
93+
Takes a value from Oracle db, and converts it to a value that's safe for
94+
JSON/Google Cloud Storage/BigQuery.
95+
96+
* Datetimes are converted to UTC seconds.
97+
* Decimals are converted to floats.
98+
* Dates are converted to ISO formatted string if given schema_type is
99+
DATE, or UTC seconds otherwise.
100+
* Binary type fields are converted to integer if given schema_type is
101+
INTEGER, or encoded with base64 otherwise. Imported BYTES data must
102+
be base64-encoded according to BigQuery documentation:
103+
https://cloud.google.com/bigquery/data-types
104+
105+
:param value: Oracle db column value
106+
:type value: Any
107+
:param schema_type: BigQuery data type
108+
:type schema_type: str
109+
"""
110+
if value is None:
111+
return value
112+
if isinstance(value, datetime):
113+
value = calendar.timegm(value.timetuple())
114+
elif isinstance(value, timedelta):
115+
value = value.total_seconds()
116+
elif isinstance(value, Decimal):
117+
value = float(value)
118+
elif isinstance(value, date):
119+
if schema_type == "DATE":
120+
value = value.isoformat()
121+
else:
122+
value = calendar.timegm(value.timetuple())
123+
elif isinstance(value, bytes):
124+
if schema_type == "INTEGER":
125+
value = int.from_bytes(value, "big")
126+
else:
127+
value = base64.standard_b64encode(value).decode('ascii')
128+
return value

‎airflow/providers/google/provider.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,10 @@ transfers:
633633
target-integration-name: Google Cloud Storage (GCS)
634634
python-module: airflow.providers.google.cloud.transfers.mysql_to_gcs
635635
how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/mysql_to_gcs.rst
636+
- source-integration-name: Oracle
637+
target-integration-name: Google Cloud Storage (GCS)
638+
python-module: airflow.providers.google.cloud.transfers.oracle_to_gcs
639+
how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst
636640
- source-integration-name: Google Cloud Storage (GCS)
637641
target-integration-name: Google Spreadsheet
638642
how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/gcs_to_sheets.rst
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
Oracle To Google Cloud Storage Operator
19+
=======================================
20+
The `Google Cloud Storage <https://cloud.google.com/storage/>`__ (GCS) service is
21+
used to store large data from various applications. This page shows how to copy
22+
data from Oracle to GCS.
23+
24+
.. contents::
25+
:depth: 1
26+
:local:
27+
28+
29+
Prerequisite Tasks
30+
^^^^^^^^^^^^^^^^^^
31+
32+
.. include::/howto/operator/google/_partials/prerequisite_tasks.rst
33+
34+
.. _howto/operator:OracleToGCSOperator:
35+
36+
OracleToGCSOperator
37+
~~~~~~~~~~~~~~~~~~~
38+
39+
:class:`~airflow.providers.google.cloud.transfers.oracle_to_gcs.OracleToGCSOperator` allows you to upload
40+
data from Oracle database to GCS.
41+
42+
When you use this operator, you can optionally compress the data being uploaded to gzip format.
43+
44+
Below is an example of using this operator to upload data to GCS.
45+
46+
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_oracle_to_gcs.py
47+
:language: python
48+
:dedent: 0
49+
:start-after: [START howto_operator_oracle_to_gcs]
50+
:end-before: [END howto_operator_oracle_to_gcs]
51+
52+
53+
Reference
54+
---------
55+
56+
For further information, look at:
57+
* `cx_Oracle Documentation <https://cx-oracle.readthedocs.io/en/latest/>`__
58+
* `Google Cloud Storage Documentation <https://cloud.google.com/storage/>`__
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
import unittest
20+
from unittest import mock
21+
22+
import cx_Oracle
23+
24+
from airflow.providers.google.cloud.transfers.oracle_to_gcs import OracleToGCSOperator
25+
26+
TASK_ID = 'test-oracle-to-gcs'
27+
ORACLE_CONN_ID = 'oracle_conn_test'
28+
SQL = 'select 1'
29+
BUCKET = 'gs://test'
30+
JSON_FILENAME = 'test_{}.ndjson'
31+
GZIP = False
32+
33+
ROWS = [('mock_row_content_1', 42), ('mock_row_content_2', 43), ('mock_row_content_3', 44)]
34+
CURSOR_DESCRIPTION = (
35+
('some_str', cx_Oracle.DB_TYPE_VARCHAR, None, None, None, None, None),
36+
('some_num', cx_Oracle.DB_TYPE_NUMBER, None, None, None, None, None),
37+
)
38+
NDJSON_LINES = [
39+
b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
40+
b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
41+
b'{"some_num": 44, "some_str": "mock_row_content_3"}\n',
42+
]
43+
SCHEMA_FILENAME = 'schema_test.json'
44+
SCHEMA_JSON = [
45+
b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ',
46+
b'{"mode": "NULLABLE", "name": "some_num", "type": "NUMERIC"}]',
47+
]
48+
49+
50+
class TestOracleToGoogleCloudStorageOperator(unittest.TestCase):
51+
def test_init(self):
52+
"""Test OracleToGoogleCloudStorageOperator instance is properly initialized."""
53+
op = OracleToGCSOperator(task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME)
54+
self.assertEqual(op.task_id, TASK_ID)
55+
self.assertEqual(op.sql, SQL)
56+
self.assertEqual(op.bucket, BUCKET)
57+
self.assertEqual(op.filename, JSON_FILENAME)
58+
59+
@mock.patch('airflow.providers.google.cloud.transfers.oracle_to_gcs.OracleHook')
60+
@mock.patch('airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook')
61+
def test_exec_success_json(self, gcs_hook_mock_class, oracle_hook_mock_class):
62+
"""Test successful run of execute function for JSON"""
63+
op = OracleToGCSOperator(
64+
task_id=TASK_ID, oracle_conn_id=ORACLE_CONN_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME
65+
)
66+
67+
oracle_hook_mock = oracle_hook_mock_class.return_value
68+
oracle_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
69+
oracle_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
70+
71+
gcs_hook_mock = gcs_hook_mock_class.return_value
72+
73+
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
74+
self.assertEqual(BUCKET, bucket)
75+
self.assertEqual(JSON_FILENAME.format(0), obj)
76+
self.assertEqual('application/json', mime_type)
77+
self.assertEqual(GZIP, gzip)
78+
with open(tmp_filename, 'rb') as file:
79+
self.assertEqual(b''.join(NDJSON_LINES), file.read())
80+
81+
gcs_hook_mock.upload.side_effect = _assert_upload
82+
83+
op.execute(None)
84+
85+
oracle_hook_mock_class.assert_called_once_with(oracle_conn_id=ORACLE_CONN_ID)
86+
oracle_hook_mock.get_conn().cursor().execute.assert_called_once_with(SQL)
87+
88+
@mock.patch('airflow.providers.google.cloud.transfers.oracle_to_gcs.OracleHook')
89+
@mock.patch('airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook')
90+
def test_file_splitting(self, gcs_hook_mock_class, oracle_hook_mock_class):
91+
"""Test that ndjson is split by approx_max_file_size_bytes param."""
92+
oracle_hook_mock = oracle_hook_mock_class.return_value
93+
oracle_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
94+
oracle_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
95+
96+
gcs_hook_mock = gcs_hook_mock_class.return_value
97+
expected_upload = {
98+
JSON_FILENAME.format(0): b''.join(NDJSON_LINES[:2]),
99+
JSON_FILENAME.format(1): NDJSON_LINES[2],
100+
}
101+
102+
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
103+
self.assertEqual(BUCKET, bucket)
104+
self.assertEqual('application/json', mime_type)
105+
self.assertEqual(GZIP, gzip)
106+
with open(tmp_filename, 'rb') as file:
107+
self.assertEqual(expected_upload[obj], file.read())
108+
109+
gcs_hook_mock.upload.side_effect = _assert_upload
110+
111+
op = OracleToGCSOperator(
112+
task_id=TASK_ID,
113+
sql=SQL,
114+
bucket=BUCKET,
115+
filename=JSON_FILENAME,
116+
approx_max_file_size_bytes=len(expected_upload[JSON_FILENAME.format(0)]),
117+
)
118+
op.execute(None)
119+
120+
@mock.patch('airflow.providers.google.cloud.transfers.oracle_to_gcs.OracleHook')
121+
@mock.patch('airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook')
122+
def test_schema_file(self, gcs_hook_mock_class, oracle_hook_mock_class):
123+
"""Test writing schema files."""
124+
oracle_hook_mock = oracle_hook_mock_class.return_value
125+
oracle_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
126+
oracle_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION
127+
128+
gcs_hook_mock = gcs_hook_mock_class.return_value
129+
130+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip): # pylint: disable=unused-argument
131+
if obj == SCHEMA_FILENAME:
132+
with open(tmp_filename, 'rb') as file:
133+
self.assertEqual(b''.join(SCHEMA_JSON), file.read())
134+
135+
gcs_hook_mock.upload.side_effect = _assert_upload
136+
137+
op = OracleToGCSOperator(
138+
task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME, schema_filename=SCHEMA_FILENAME
139+
)
140+
op.execute(None)
141+
142+
# once for the file and once for the schema
143+
self.assertEqual(2, gcs_hook_mock.upload.call_count)

0 commit comments

Comments
 (0)