Python Data Validation across Multiple Databases

Blogs

Python script to check null values for column in file and validate in database
August 15, 2024
Effortless Container Migration: A Step-by-Step Guide Using PowerShell
August 23, 2024

Python Data Validation across Multiple Databases

This code is a complex script that performs several operations across multiple databases and tables, focusing on data validation and cleanup.
The script is organized into several functions that work sequentially to check data in different environments and perform actions based on the results.
Here’s a detailed breakdown:

Imports and Initial Setup

Imports:
pandas for data manipulation.
pymysql for connecting to MySQL databases.
jaydebeapi for connecting to a Hive database using JDBC.

Directory Path:
The script sets a directory (dir) where files are read from and saved to.

Reading Input Data
CSV Loading:
The script reads a CSV file (filename.csv) and converts the id column to a list of strings (df).

hive_check Function
Hive Database Connection:
Connects to a Hive database using JDBC.

Data Validation:
Queries tab1 in the Hive database to find records where col1 matches any of the IDs in df.
Stores the results in a list r.
Queries tab2 to check if the IDs found in tab1 exist in tab2.
Identifies IDs that are missing from tab2 and returns this list (mis_id).

appl_check Function
Application Database Check:
Calls hive_check to get the list of missing IDs.
Connects to a MySQL database (db2).
Queries tab3 to find IDs that match the missing IDs.
Identifies IDs that are still missing after this check and returns them.

inv_check Function
Invoice Database Check:
Calls appl_check to get the list of IDs that are missing from the application database.
Connects to another MySQL database (db1).
Queries ab2 to check if these IDs have a specific record (reco 11).
If records are found, they are saved to a CSV file (client_ref_id.csv).
If no records are found, queries tab2 and saves the missing IDs to a CSV file (app_misses.csv).

orch_check Function
Orchestration Database Check:
Calls inv_check to get the list of IDs that are missing from the invoice check.
Connects to a MySQL database (db3).
Queries tab5 to check for locked records related to these IDs.
If locks are found, the user is prompted to delete them. If the user chooses to delete, the records are removed from the database.
If no locks are found, the user is prompted to update a partition key in tab2 of another database (db4). If the user agrees, the partition key is updated.

Main Execution Block
The script starts by calling the orch_check function, which initiates the sequence of checks and actions described above.

Below is the python script for the above description

import pandas as pd
import pymysql as py
import jaydebeapi as jd

dir=’/location/’

#df=pd.read_csv(dir+’app.csv’,index_col=False)

df=pd.read_csv(dir+’filename.csv’)

df=df[‘id’].astype(str).tolist()

id=[]

def hive_check():

hive_con=jd.connect(“com..BifrostDriver”,
“bifrost.com”,
[“usn”, “pwd”],
“xyz1.jar”)

cur=hive_con.cursor()

print(“—————————“)
print(” Checking in hive “)
print(“—————————“)
print(“Given ids to check:”,len(df))
#print(df)

print(“Checking in tab1…”)

q=f”select col2 from tab1
where col1 in {tuple(df)}”

r=[]

cur.execute(q)

for i in cur.fetchall():
r.append(i[0])

print(“invoice found:”,len(r))

#print(r)

print(“Checking in tab2 table…”)

q1=f”select col2 from tab2 where
col2 in {tuple(r)}”

row=[] mis_id=[]

cur.execute(q1)
# appl_set = {}
# for ids in cur.fetchall()
# appl_set.add()
for i in cur.fetchall():
row.append(i[0])

for i in r:
if i not in row:
mis_id.append(i)

print(“Ids avail in tab2: “,len(row))

print(“Missing in tab2:”,len(mis_id))

if len(mis_id) > 0:
return mis_id

else:
exit()

def appl_check():

id=hive_check()

appl_con=py.connect(user=’usn’,passwd=’pwd’,host=’127.0.0.1′,database=’db2′,port=1234)

cur=appl_con.cursor()

print(“——————————-“)
print(” Checking in apl “)
print(“——————————-“)

print(“Checking in application db….”)

q=f”select col2 from tab3 where col2 in {tuple(id)}”

r=[] mis_id=[]

cur.execute(q)

for i in cur.fetchall():
r.append(i[0])

print(“ids avail in db2 :”,len(r))

for i in id:
if i not in r:
mis_id.append(i)

print(“Missing in db2:”,len(mis_id))

if len(mis_id) > 0:
return mis_id

else:
exit()

def inv_check():

l=[]

id=appl_check()

inv_con=py.connect(user = “usn”, passwd = “pwd”, host = “127.0.0.1”, port = 1234, database = “db1”)

cur=inv_con.cursor()

print(“Checking in tab2 for reco 11…”)

q1=f”SELECT col2 FROM ab2 WHERE col2 in {tuple(id)}
AND col4 = 11″

cur.execute(q1)

r=[]

for i in cur.fetchall():
r.append(i[0])

if len(r) > 0:

df=pd.DataFrame(r)
df.to_csv(dir+’client_ref_id.csv’,header=False,index=False)

print(“ids with reco 11 in tab2: “,len(r))

return r

else:

q=f”select id,col2 from tab2 where col2 in {tuple(id)}”

print(q)

cur.execute(q)

col=[i[0] for i in cur.description]

print(col)

df=pd.DataFrame(cur.fetchall())

df.to_csv(dir+’app_misses.csv’,index=False)

print(“ids saved in app_misses.csv file…”)

exit()

def orch_check():

id=inv_check()

if len(id) > 0:

p='[[:payable_debit_note,’+'”‘
s='”]]’
#print(id)
k=[p+i+s for i in id]

orch_con=py.connect(user=”usn”,passwd=”pwd”,host=”127.0.0.1″,port=1234,database=”db3″)

cur=orch_con.cursor()

print(“Checking in db3 db…”)

q=f”SELECT * FROM `tab5` WHERE `key` IN {tuple(k)} AND resource_name = ‘payment-statement-payable_debit_note'”
#print(q)

cur.execute(q)

r=cur.fetchall()

row=pd.DataFrame(r)

print(“ids with lock:”,len(row))

if len(row) > 0:

inp=input(“Do you want to delete (y/n):”)

if inp==’y’:

q1=f”delete from tab5 where key in {tuple(k)}”
cur.execute(q1)
print(“the ids with lock are deleted…”)

else:
print(f”delete from tab5 where key in {tuple(k)}”)

else:

inv_con=py.connect(user = “usn”, passwd = “pwd”, host = “127.0.0.1”, port = 1234, database = “db4”)

cur=inv_con.cursor()

inp=input(“Do you want to update partition_key in tab2 (y/n): “)

if inp==’y’:

q2=f”update tab2 set partition_key=’yyyymm’ where col3 in {tuple(id)}”
cur.execute(q)
print(“The partition_key is updated to run aggregation”)

else:

print(f”update tab2 set partition_key=’yyyymm’ where col3 in {tuple(id)}”)

else:
exit()

if __name__ == ‘__main__’:

orch_check()

 


Pooja K

Leave a Reply

Your email address will not be published. Required fields are marked *