Python Automation script for data validation

Blogs

Private Link in Fabric: The Key to Enhanced Security and Compliance
May 21, 2024

Python Automation script for data validation

This Python script performs a data validation task across multiple databases (SPS, Comm Report, OMS, OMS Old) to ensure that certain terms (or identifiers) are correctly tagged or represented. Here’s a detailed explanation of each part of the script:

1. Imports and Directory Setup

import pandas as pd
import pymysql as p
import jaydebeapi as j

dir = ‘/users/downloads/’
df = pd.read_csv(dir + ‘check_ids.csv’)
id = df[‘id’].astype(str).to_list()

Explanation:

  • pandas is imported as pd for handling data frames.
  • pymysql is imported as p for connecting to MySQL databases.
  • jaydebeapi is imported as j for connecting to JDBC databases.
  • dir is set to the directory path where the CSV file check_ids.csv is located.
  • The script reads the CSV file into a Pandas DataFrame (df) and extracts the id column as a list of strings (id)

2. sps_check() Function

def sps_check():
l = [] l1 = []

# Connect to SPS database
sps_con = p.connect(user=””, passwd=””,
host=””,
port=3306, database=”sps”)
cur = sps_con.cursor()

print(“Checking in sps for the term tag…”)

# SQL query to check for term tags in SPS
q = f”select entity_id from order_entity_term where entity_type=’order_line’
and term_type=’seller_incentive’ and entity_id in {tuple(id)} and term_id is not null”
cur.execute(q)

# Fetching results
for i in cur.fetchall():
l.append(i[0])

print(“term ids tagged count:”, len(l))

# Finding IDs not tagged
for i in id:
if i not in l:
l1.append(i)

print(“term id not tagged count:”, len(l1))

# Writing IDs to file if not tagged
if len(l1) > 0:
f = open(dir + ‘term_mis.csv’, ‘w’)
f.writelines(str(i) + ‘n’ for i in l1)
f.close()

print(“ids for which term need to tag saved in term_mis.csv file..”)

Explanation:

  • sps_check() function connects to the MySQL database (myntra_sps) using pymysql.
  • Executes a query (q) to retrieve entity_id from order_entity_term table where conditions are met.
  • Collects results into list l and identifies IDs not found (l1).
  • Writes IDs needing tagging to a CSV file (term_mis.csv) if any are found.

3. comm_report_check() Function

def comm_report_check():
comm_con = j.connect(“com.bifrost.jdbc.BifrostDriver”,
“jdbc:bifrost://bifrostx-gateway..com:80/hive”,
[“”, “”],
“”)
cur = comm_con.cursor()

print(“Checking in com report for term ids…”)

q = f”select revenuereport_ext_ref_id, accrual_attribute_ref_11 from fin_wallstreet.forward_mp_commission_20240605_new
where revenuereport_ext_ref_id in {tuple(id)}”
cur.execute(q)
df = pd.DataFrame(cur.fetchall())
df.columns = [‘ord_id’, ‘term_id’]

# Writing results to CSV files based on conditions
df1 = df[df[‘term_id’] == ‘0’] df2 = df[df[‘term_id’] > ‘0’] df1.to_csv(dir + ‘term_zero.csv’, index=False)

print(“term ids avail in comm report:”, len(df2))
print(“term ids zero in comm report:”, len(df1))

# Return IDs based on conditions
if len(df1) > 0:
ids = df1[‘ord_id’].astype(str).to_list()
else:
exit()

return ids

Explanation:

  • comm_report_check() function connects to a JDBC database using jaydebeapi.
  • Executes a query (q) to retrieve data from fin_wallstreet.forward_mp_commission_20240605_new.
  • Filters results into two DataFrames (df1 for term_id equals ‘0’ and df2 for term_id greater than ‘0’).
  • Writes df1 to term_zero.csv and prints counts of available and zero term IDs.
  • Returns a list of IDs (ids) based on conditions.

4. oms() and oms_old() Functions

def oms():
l = [] ids = comm_report_check()

print(“Fetching line ids for the term id zero…”)
print(“Checking in oms db…”)

# Connect to OMS database
oms_con = p.connect(user=””, passwd=””,
host=””, port=3306, database=”oms”)
cur = oms_con.cursor()

q = f”select id from order_line where order_release_id_fk in {tuple(ids)}”
cur.execute(q)

# Collecting results into list
for i in cur.fetchall():
l.append(i[0])

print(“line ids in oms:”, len(l))

# Fallback to older OMS if no results found
if len(l) == 0:
l = oms_old()

return l

def oms_old():
l = []

# Connect to old OMS database
oms_con = p.connect(user=”darshan.viswana1″, passwd=”April@2024″,
host=”myntra_oms_old-userslave1.mynt.myntra.com”,
port=3306, database=”myntra_oms_old”)
cur = oms_con.cursor()

print(“Checking in oms old for line ids…”)

q = f”select id from order_line where order_release_id_fk in {tuple(id)}”
cur.execute(q)

# Collecting results into list
for i in cur.fetchall():
l.append(i[0])

print(“line ids in oms old:”, len(l))

return l

Explanation:

  • oms() function connects to the OMS database (myntra_oms) using pymysql.
  • Executes a query to retrieve id from order_line where conditions are met.
  • If no results are found (len(l) == 0), it calls oms_old() to query the older OMS database (myntra_oms_old).

5. Main Script Execution

if __name__ == ‘__main__’:
sps_check()

Explanation:

  • Executes the sps_check() function when the script is run directly (not imported as a module).

 

Summary:

This script integrates Python with database connectivity libraries (pymysql for MySQL and jaydebeapi for JDBC) to perform data validation tasks across multiple databases. It reads input IDs from a CSV file, checks term tagging in sps, analyzes term IDs in a commission report (fin_wallstreet.forward_mp_commission_20240605_new), and retrieves related line IDs from myntra_oms and myntra_oms_old databases as necessary. Results are logged and saved into CSV files for further analysis and action. Adjustments may be needed based on specific database schemas and access permissions.

 


Pooja K

Leave a Reply

Your email address will not be published. Required fields are marked *