Python script for validating data between source database and hive tables for two events invoices and restocked data
Importing Libraries
import pandas as pd
import pymysql as p
import time
import jaydebeapi as jm
pandas is used for data manipulation and analysis.
pymysql is used to connect to a MySQL database.
jaydebeapi is used to connect to a Hive database using JDBC.
time is imported but not used in the code.
abc Function
This function connects to a MySQL database and retrieves distinct values based on specific conditions.
def abc():
print(“checking in abc db for ids….”)
con = p.connect(user=”user”, passwd=”pwd”, host=”127.0.0.1″, port=3306, database=”abc”)
cur = con.cursor()
q = ‘select distinct concat(“PQR_qtytwe_A2B_invoice_”,col3,”_XYZ”) from table1 where created_on >= “2024-07-01” and created_on <“2024-07-27″ and col4=”invoice” and col2 is not null’
cur.execute(q)
df = pd.DataFrame(cur.fetchall(), columns=[‘col3’])
inv = df[‘col3’].to_list()
q1 = ‘select distinct concat(“PQR_jnucc_Credit_Note_A2B_”,col3,”_XYZ”) from table1 where created_on >= “2024-07-01” and created_on <“2024-07-27″ and col3=”credit_note” and col4 is not null’
cur.execute(q1)
df1 = pd.DataFrame(cur.fetchall(), columns=[‘col3’])
cred = df1[‘col3′].to_list()
print(“ids found for invoices and credit_notes :”, len(inv), len(cred))
return inv, cred
Connects to the MySQL database abc using provided credentials.
Executes a query (q) to retrieve distinct concatenated values from table1 where conditions on created_on, col4, and col2 are met.
Converts the results to a DataFrame and extracts the list of values for inv.
Executes another query (q1) to retrieve distinct concatenated values from table1 with different conditions.
Converts the results to a DataFrame and extracts the list of values for cred.
Prints the count of inv and cred and returns these lists.
hive_check Function
This function compares the results obtained from abc() with the data in a Hive database.
def hive_check():
inv, cred = abc()
inv1 = [] cred1 = []
hive_con = jm.connect(“driver.com”, “path/hive”, [“abc”, “pwd”], “path/.jar”)
cur = hive_con.cursor()
c = 50000
for i in range(0, len(inv), c):
j = inv[i:i+c]
q = f’select distinct col3 from sch.tabl2 where col3 in {tuple(j)}’
cur.execute(q)
for k in cur.fetchall():
inv1.append(k[0])
print(‘checked for 50k ids (invoice)…’)
for j in range(0, len(cred), c):
k = cred[j:j+c]
q = f’select distinct col3 from sch.table3 where col3 in {tuple(k)}’
cur.execute(q)
for m in cur.fetchall():
cred1.append(m[0])
print(“checked for 50k ids (credit notes)…”)
diff_inv = set(inv) – set(inv1)
diff_cred = set(cred) – set(cred1)
print(‘ids missing in hive table for packed:’, len(diff_inv))
for n in diff_inv:
print(n)
print(‘ids missing in hive table for credit_note:’, len(diff_cred))
for l in diff_cred:
print(l)
Calls abc() to get inv and cred.
Connects to a Hive database using JDBC.
Divides the list of inv into chunks of 50,000 and queries the Hive table sch.tabl2 to check for matching col3.
Appends the results to inv1.
Prints a message every time a chunk of 50,000 IDs is checked.
Repeats the process for cred with the Hive table sch.table3.
Compares the original lists (inv and cred) with the results from Hive (inv1 and cred1).
Prints the count of missing IDs and the missing IDs themselves for both invoices and credit notes.
Summary
The code defines two functions to:
Retrieve and process data from a MySQL database.
Compare the retrieved data with data in a Hive database and identify missing records.
Pooja K