Writing python script to check the data in mongodb using pymongo module
The ids which are fetched in hive tables are needed to be validated in mongodb to check agggreation is done for the ids so it gets paid for the ords
#importing necesasry libraries
import pandas as pd
import pymysql as p
import jaydebeapi as j
import pymongo as pm
dir=’path to read file’
#reading the file into dataframe
d=pd.read_csv(dir+’check_ids.csv’)
#print(d.columns.to_list())
ids=d[‘id’].astype(str).tolist()
#checking the ids in hive tables to fetch client_ref_ids
inv_con=j.connect(“bifrost”,
“hive”,
[“uds”, “pwd”],
“jaf file”)
cur=inv_con.cursor()
def inv_check():
print(“Checking in invoices table for client_ref_id’s…..”)
q=f’select external_ref_id,client_ref_id from fin_wallstreet.core_invoices_rtd_mp_20240504 where external_ref_id in {tuple(ids)} ‘
cur.execute(q)
df=pd.DataFrame(cur.fetchall(),columns=[i[0] for i in cur.description])
print(“ids avail in invoices table:”,len(df))
return l
def gamma_check():
id=inv_check()
# print(id)
l=[] k=[]
print(“Checking in Gamma…..”)
#checking in mongodb with client_ref_ids in hive to validate in mappings collection
prod_conn = “mongodb://{}:{}@{}:{}/{}?readPreference=primary&appname=MongoDB+Compass&directConnection=true&ssl=false”.format(
‘usn’, ‘pwd’, ‘localhost’, 5109, ‘gamma_invoice_agg_prod’)
gamma_con=pm.MongoClient(prod_conn,connectTimeoutMS=60000)
#database name
db=gamma_con[‘myntra_invoice_agg_prod’]
#collection name
col=db[‘mappings’]
#q contains the query in dictonary format to be checked
q={‘client_ref_id’:{‘$in’:[i for i in id]}}
# p contains the columns to be selected
prj={‘client_ref_id’:1,’_id’:0}
# passing the query and columns in find to fetch the ids in collection
doc=col.find(q,prj)
print(doc)
# the result in saved in dataframe
res=pd.DataFrame(doc,columns=[‘client_ref_id’])
l=res[‘client_ref_id’].to_list()
print(“ids found in gamma:”,len(res))
for i in id :
if i not in l:
k.append(i)
print(“missing in gamma:”,len(k))
if len(k) == 0:
exit()
else :
q=f’select external_ref_id,client_ref_id,reco_status,updated_at from fin_wallstreet.core_invoices_rtd_mp_20240413 where
client_ref_id in {tuple(k)}’
cur.execute(q)
df=pd.DataFrame(cur.fetchall(),columns=[i[0] for i in cur.description])
df.to_csv(dir+’app_ingest.csv’,index=False)
print(“The missing ids are saved in app_ingest.csv file…”)
ord=df[‘external_ref_id’].to_list()
return ord
def oms_con():
id=gamma_check()
l=[]
print(“Checking in oms to fetch packet_id…”)
oms_old=p.connect(user = “usn”, passwd = “pwd”, host = “127.0.0.1”, port = 3306, database = “oms_old”)
cur=oms_old.cursor()
q=f’select packet_id_fk from order_line where order_release_id_fk in {tuple(id)}’
cur.execute(q)
for i in cur.fetchall():
l.append(i[0])
if len(l) < len(id):
oms=p.connect(user = “usn”, passwd = “pwd”, host = “127.0.0.1”, port = 3306, database = “oms”)
curs=oms.cursor()
q=f’select packet_id_fk from order_line where order_release_id_fk in {tuple(id)}’
curs.execute(q)
for i in curs.fetchall():
l.append(i[0])
f=open(dir+’ingest_pack_id.csv’,’w’)
#f.writelines([str(i)+’,’+’packet_delivered-MYNTRAMP-‘+str(i) for i in l]+’n’)
for i in l:
f.write(str(i)+’,’+’packet_delivered-MYNTRAMP-‘+str(i)+’n’)
f.close()
print(“ids to be ingested are saved in ingest_pack_ids.csv file….”)
if __name__ == ‘__main__’:
oms_con()
Pooja K