import os import datetime from airflow import models from airflow.providers.google.cloud.operators import bigquery from airflow.utils import trigger_rule location = 'EU' PROJECT_ID = os.environ.get("GCP_PROJECT_ID","vf-uk-ngbi-dev-gen-01") DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME","EDW_ACCOUNT") TABLE_1 = "d_customer_account" execute_insert_query= ( f"DECLARE DUMMY_KEY STRING;\n" f"SET DUMMY_KEY = to_hex(sha256('*NA'));\n" f"INSERT {DATASET_NAME}.{TABLE_1} \n" f"select to_hex(sha256(cast(BASE_ACCN_MAIN.SOURCE_SYSTEM_CD || BASE_ACCN_MAIN.SOURCE_ACCOUNT_ID || BASE_ACCN_MAIN.EFFECTIVE_DT || BASE_ACCN_MAIN.EFFECTIVE_TM as string))) as dw_uniq_cust_acct_id,\n" f"to_hex(sha256(cast(BASE_ACCN_MAIN.ACCOUNT_CD as string))) AS dw_cust_acct_id,\n" f"to_hex(sha256(cast(BASE_ACCN_MAIN.ACCOUNT_CD as string))) as dw_sub_cust_acct_id ,\n" f"cast(BASE_ACCN_MAIN.ACCOUNT_CD as string) as cust_acct_cd,\n" f"cast(BASE_ACCN_MAIN.ACCOUNT_CD as string) as sub_cust_acct_cd,\n" f"cast(BASE_ACCN_MAIN.SOURCE_ACCOUNT_ID as string) as cust_acct_internal_cd,\n" f"cast(BASE_ACCN_MAIN.SOURCE_SYSTEM_CD as string) as ss_cd,\n" f"to_hex(sha256('123')) as dw_cust_id,\n" f"to_hex(sha256('343')) as dw_sub_cust_id,\n" f"DUMMY_KEY as dw_alt_cust_id,\n" f"DUMMY_KEY as dw_sub_alt_cust_id,\n" f"DUMMY_KEY as dw_credit_rating_id,\n" f"DUMMY_KEY as dw_billing_cycle_id,\n" f"DUMMY_KEY as dw_bill_format_id,\n" f"to_hex(sha256(BASE_ACCN_MAIN.SOURCE_SYSTEM_CD || 'Primary' || BASE_ACCN_MAIN. ACCOUNT_TYP || BASE_ACCN_MAIN.ACCOUNT_CAT || BASE_ACCN_MAIN.ACCOUNT_SUB_CAT)) as dw_cust_acct_cat_id,\n" f"BASE_ACCN_MAIN.SOURCE_SYSTEM_CD || BASE_ACCN_MAIN.ACCOUNT_STATUS_CD as dw_cust_acct_stat_id,\n" f"DUMMY_KEY as dw_pymt_meth_id,\n" f"to_hex(sha256(BASE_ACCN_MAIN.SOURCE_SYSTEM_CD || BASE_ACCN_MAIN.IN_COLLECTIONS_FLG || BASE_ACCN_MAIN.WRITE_OFF_INDICATOR || DUMMY_KEY)) as dw_cust_acct_coll_stat_id,\n" f"to_hex(sha256(BASE_ACCN_MAIN.SOURCE_SYSTEM_CD || BASE_ACCN_MAIN.ACCOUNT_STATUS_CD)) as dw_cust_acct_rpt_stat_id,\n" f"DUMMY_KEY as dw_cust_acct_stat_rsn_id,\n" f"DUMMY_KEY as dw_cust_acct_barring_stat_id,\n" f"BASE_ACCN_MAIN.ACCOUNT_NM as cust_acct_name,\n" f"least(SUB_TBL1.CREATION_DT1,SUB_TBL1.CREATION_DT2) as cust_acct_creation_dt,\n" f"cast(SUB_TBL2.CONNECTION_DT as DATE) as cust_acct_conn_dt,\n" f"cast(SUB_TBL2.DISCONNECTION_DT as DATE) as cust_acct_dconn_dt,\n" f"cast(SUB_TBL2.RECONNECTION_DT as DATE) as cust_acct_reconnection_dt,\n" f"null as cust_acct_barring_dt,\n" f"null as cust_acct_bill_responsibility,\n" f"case when BASE_ACCN_MAIN.ANONYMOUS_IND = 'N' then 1\n" f" when BASE_ACCN_MAIN.ANONYMOUS_IND = 'Y' then 0 \n" f" else 2\n" f"END as cust_acct_valid_flag,\n" f"'Default VAT' as cust_acct_tax_rate,\n" f"null as cust_acct_tax_ident_num,\n" f"null as extended_attr_1,\n" f"null as extended_attr_2,\n" f"BASE_ACCN_MAIN.BLACK_TARIFF_FLAG as Extended_Attribute_3,\n" f"null as extended_attr_4,\n" f"null as extended_attr_5,\n" f"null as extended_attr_6,\n" f"null as extended_attr_7,\n" f"null as extended_attr_8,\n" f"null as extended_attr_9,\n" f"null as extended_attr_10,\n" f"@Extraction_Dttm as extraction_dttm ,\n" f"cast(@Insert_Load_Id as string) as insert_load_id, \n" f"cast(@Update_Load_Id as string) as update_load_id,\n" f"cast(BASE_ACCN_MAIN.INSERT_TS as DATE) as load_dttm, \n" f"cast(BASE_ACCN_MAIN.UPDATE_TS as DATE) as update_dttm\n" f"from ( select ab.* from(\n" f"Select\n" f"rank() over (partition by SOURCE_SYSTEM_CD, EFFECTIVE_DT,cast(EFFECTIVE_TM as STRING),SOURCE_ACCOUNT_ID order by EXPIRY_DT desc, ANONYMOUS_IND asc,\n" f"ACCOUNT_STATUS_CD asc) as rnk,\n" f"a.*\n" f"from vf-uk-ngbi-dev-gen-01.EDW_ACCOUNT.ACCNT a\n" f"WHERE SOURCE_SYSTEM_CD in (60, 61)\n" f") ab where ab.RNK=1)BASE_ACCN_MAIN\n" f"LEFT JOIN\n" f"(Select\n" f"min(BASE_TABLE.EFFECTIVE_DT) as CREATION_DT1,\n" f"min(BASE_TABLE.ACCOUNT_CREATED_DT) as CREATION_DT2,\n" f"ACCOUNT_ID\n" f"from (Select c.* from (\n" f"Select\n" f"rank() over (partition by SOURCE_SYSTEM_CD, EFFECTIVE_DT, cast(EFFECTIVE_TM as STRING), SOURCE_ACCOUNT_ID order by EXPIRY_DT desc, ANONYMOUS_IND asc, ACCOUNT_STATUS_CD asc) as rnk,\n" f"a.*\n" f"from vf-uk-ngbi-dev-gen-01.EDW_ACCOUNT.ACCNT a\n" f"WHERE SOURCE_SYSTEM_CD in (60, 61)\n" f") c where c.rnk = 1) BASE_TABLE\n" f"group by\n" f"ACCOUNT_ID) SUB_TBL1\n" f"ON SUB_TBL1.ACCOUNT_ID = BASE_ACCN_MAIN.ACCOUNT_ID\n" f"left join\n" f"(SELECT \n" f"CURR_ID, CURR_EFFECTIVE_DT, CURR_EFF_TM, CURR_STATUS,\n" f"case when CURR_STATUS = 'Z' AND (PREV_STATUS <> 'Z' OR PREV_STATUS IS NULL) THEN CURR_PREACTIVATION_DT ELSE null END as PREACTIVATION_DT,\n" f"case when CURR_STATUS = 'A' AND (PREV_STATUS <> 'A' OR PREV_STATUS IS NULL) THEN CURR_CONNECTION_DT ELSE null END as CONNECTION_DT,\n" f"case when CURR_STATUS IN ( 'N','C') AND (PREV_STATUS NOT IN ( 'N','C') OR PREV_STATUS IS NULL) THEN CURR_DISCONNECTION_DT ELSE null END as DISCONNECTION_DT,\n" f"case when CURR_STATUS = 'A' AND PREV_STATUS IN ( 'N','C') THEN CURR_CONNECTION_DT ELSE null END as RECONNECTION_DT\n" f"FROM (\n" f"SELECT CURR.*,\n" f"RANK() OVER (PARTITION BY CURR_ID, CURR_EFFECTIVE_DT, CURR_STATUS ORDER BY PREV_EFFECTIVE_DT DESC, PREV_EFF_TM DESC, PREV_STATUS DESC) AS RNK,\n" f"PREV.*\n" f"FROM\n" f"( SELECT C.*, \n" f"DENSE_RANK() OVER (PARTITION BY CURR_ID ORDER BY CURR_EFFECTIVE_DT, CURR_EFF_TM) AS CURR_RANK\n" f"FROM\n" f"(\n" f"SELECT B1.account_id AS CURR_ID, B1.effective_dt AS CURR_EFFECTIVE_DT, B1.account_status_cd AS CURR_STATUS, B1.effective_tm AS CURR_EFF_TM,\n" f"min(case when B1.account_status_cd ='A' THEN B1.effective_dt END) AS CURR_CONNECTION_DT,\n" f"min(case when B1.account_status_cd ='Z' THEN B1.effective_dt END) AS CURR_PREACTIVATION_DT,\n" f"min(case when B1.account_status_cd IN ('N','C') THEN B1.effective_dt END) AS CURR_DISCONNECTION_DT\n" f"FROM\n" f"(\n" f"select b.* from (\n" f"select\n" f"rank() over (partition by SOURCE_SYSTEM_CD, EFFECTIVE_DT, cast(EFFECTIVE_TM as string), SOURCE_ACCOUNT_ID order by EXPIRY_DT desc, ANONYMOUS_IND asc, ACCOUNT_STATUS_CD asc) as rnk,\n" f"a.*\n" f"from vf-uk-ngbi-dev-gen-01.EDW_ACCOUNT.ACCNT a\n" f"WHERE SOURCE_SYSTEM_CD in (60, 61)\n" f") b where b.rnk = 1\n" f") B1\n" f"GROUP BY B1.account_id, B1.effective_dt, B1.account_status_cd, B1.effective_tm\n" f") C ) CURR\n" f"LEFT JOIN (\n" f"SELECT P.*, \n" f"DENSE_RANK() OVER (PARTITION BY PREV_ID ORDER BY PREV_EFFECTIVE_DT, PREV_EFF_TM) AS PREV_RANK\n" f"FROM\n" f"(SELECT B2.account_id AS PREV_ID, B2.effective_dt AS PREV_EFFECTIVE_DT, B2.account_status_cd AS PREV_STATUS, B2.effective_tm AS PREV_EFF_TM,\n" f"min(case when B2.account_status_cd ='A' THEN B2.effective_dt END) AS PREV_CONNECTION_DT,\n" f"min(case when B2.account_status_cd ='Z' THEN B2.effective_dt END) AS PREV_PREACTIVATION_DT,\n" f"min(case when B2.account_status_cd IN ('N','C') THEN B2.effective_dt END) AS PREV_DISCONNECTION_DT\n" f"FROM\n" f"(\n" f"select b.* from (\n" f"select\n" f"rank() over (partition by SOURCE_SYSTEM_CD, EFFECTIVE_DT, cast(EFFECTIVE_TM as string), SOURCE_ACCOUNT_ID order by EXPIRY_DT desc, ANONYMOUS_IND asc, ACCOUNT_STATUS_CD asc) as rnk,\n" f"a.*\n" f"from vf-uk-ngbi-dev-gen-01.EDW_ACCOUNT.ACCNT a\n" f"WHERE SOURCE_SYSTEM_CD in (60, 61)\n" f") b where b.rnk = 1\n" f") B2\n" f"GROUP BY B2.account_id, B2.effective_dt,B2.account_status_cd, B2.effective_tm\n" f") P ) PREV\n" f"ON CURR_ID = PREV_ID\n" f"AND PREV_RANK < CURR_RANK\n" f")D WHERE RNK = 1) SUB_TBL2\n" f"\n" f"on\n" f"SUB_TBL2.CURR_ID=BASE_ACCN_MAIN.ACCOUNT_ID\n" f"and \n" f"SUB_TBL2.CURR_EFFECTIVE_DT=BASE_ACCN_MAIN.EFFECTIVE_DT\n" f"and\n" f"SUB_TBL2.CURR_EFF_TM=BASE_ACCN_MAIN.EFFECTIVE_TM\n" f"and\n" f"SUB_TBL2.CURR_STATUS=BASE_ACCN_MAIN.ACCOUNT_STATUS_CD\n") yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) default_dag_args = { 'start_date': yesterday, 'retries': 1, 'retry_delay': datetime.timedelta(minutes=5), 'project_id': PROJECT_ID } with models.DAG( '011Dag_Test', schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag: execute_update_query = bigquery.BigQueryExecuteQueryOperator( task_id="execute_update_query", sql=execute_insert_query, use_legacy_sql=False, location="EU") # Define DAG dependencies. ( execute_update_query )
Write, Run & Share Python code online using OneCompiler's Python online compiler for free. It's one of the robust, feature-rich online compilers for python language, supporting both the versions which are Python 3 and Python 2.7. Getting started with the OneCompiler's Python editor is easy and fast. The editor shows sample boilerplate code when you choose language as Python or Python2 and start coding.
OneCompiler's python online editor supports stdin and users can give inputs to programs using the STDIN textbox under the I/O tab. Following is a sample python program which takes name as input and print your name with hello.
import sys
name = sys.stdin.readline()
print("Hello "+ name)
Python is a very popular general-purpose programming language which was created by Guido van Rossum, and released in 1991. It is very popular for web development and you can build almost anything like mobile apps, web apps, tools, data analytics, machine learning etc. It is designed to be simple and easy like english language. It's is highly productive and efficient making it a very popular language.
When ever you want to perform a set of operations based on a condition IF-ELSE is used.
if conditional-expression
#code
elif conditional-expression
#code
else:
#code
Indentation is very important in Python, make sure the indentation is followed correctly
For loop is used to iterate over arrays(list, tuple, set, dictionary) or strings.
mylist=("Iphone","Pixel","Samsung")
for i in mylist:
print(i)
While is also used to iterate a set of statements based on a condition. Usually while is preferred when number of iterations are not known in advance.
while condition
#code
There are four types of collections in Python.
List is a collection which is ordered and can be changed. Lists are specified in square brackets.
mylist=["iPhone","Pixel","Samsung"]
print(mylist)
Tuple is a collection which is ordered and can not be changed. Tuples are specified in round brackets.
myTuple=("iPhone","Pixel","Samsung")
print(myTuple)
Below throws an error if you assign another value to tuple again.
myTuple=("iPhone","Pixel","Samsung")
print(myTuple)
myTuple[1]="onePlus"
print(myTuple)
Set is a collection which is unordered and unindexed. Sets are specified in curly brackets.
myset = {"iPhone","Pixel","Samsung"}
print(myset)
Dictionary is a collection of key value pairs which is unordered, can be changed, and indexed. They are written in curly brackets with key - value pairs.
mydict = {
"brand" :"iPhone",
"model": "iPhone 11"
}
print(mydict)
Following are the libraries supported by OneCompiler's Python compiler
Name | Description |
---|---|
NumPy | NumPy python library helps users to work on arrays with ease |
SciPy | SciPy is a scientific computation library which depends on NumPy for convenient and fast N-dimensional array manipulation |
SKLearn/Scikit-learn | Scikit-learn or Scikit-learn is the most useful library for machine learning in Python |
Pandas | Pandas is the most efficient Python library for data manipulation and analysis |
DOcplex | DOcplex is IBM Decision Optimization CPLEX Modeling for Python, is a library composed of Mathematical Programming Modeling and Constraint Programming Modeling |