import requests import time from playwright.sync_api import sync_playwright, Playwright import pandas as pd import Apikeys IMPORTFILE = "~/Downloads/Internal Zenjob Testing group.csv" APIKEY = Apikeys.ZENJOB HEADERS = {"X-Api-Key": APIKEY, "accept": "application/json"} ZENJOBURL = "https://zenjob.northpass.com/?uid=" BASEURL = "https://api.northpass.com/v2/" def delete_people_first(): print("Deleting everyone with a zenjob email address...") count = 0 delete_list = [] while True: count += 1 person_url = f"{BASEURL}people?filter[email][cont]=zenjob.com&filter[partnerships_type][not_eq]=Partnerships::Admin&include=partnerships&page={count}" person_resp = requests.get(person_url, headers=HEADERS) persjson = person_resp.json() nextlink = persjson["links"] for people in persjson["data"]: learneruuid = people["id"] delete_list.append(learneruuid) if "next" not in nextlink: break del_load = {"payload": delete_list} print(len(delete_list)) del_url = f"{BASEURL}bulk/people/delete" print("Running Post Func for Deleting Users") post_func(del_url, del_load) def run(playwright: Playwright): df = pd.read_csv(IMPORTFILE) emails = df["Work Email"].values.tolist() webkit = playwright.webkit for email in emails: browser = webkit.launch() context = browser.new_context() page = context.new_page() page.goto(f"{ZENJOBURL}{email}") print(page.url) print(f"{ZENJOBURL}{email}") context.close() add_ppl_to_groups(emails) # https://zenjob.northpass.com/?uid=testing.test@zenjob.com def add_ppl_to_groups(): df = pd.read_csv(IMPORTFILE) emails = df["Work Email"].values.tolist() print("Assuming everyone was created with playwright, now I'm going to add everyone to the group.") learner_list = [] for person in emails: print(person) get_person_url = f"{BASEURL}people?filter[sso_uid][eq]={person}" get_person_resp = requests.get(get_person_url, headers=HEADERS) person_json = get_person_resp.json() print(person_json["data"][0]["id"]) learner_uuid = person_json["data"][0]["id"] learner_list.append(learner_uuid) bulk_to_groups_url = f"{BASEURL}bulk/people/membership" bulk_payload = {"payload": {"person_ids": learner_list, "group_ids": ["940a5d24-32af-45f1-8ed4-8a6b4689d9c9"]}} print("Running Post Func for Bulk Add to Groups") post_func(bulk_to_groups_url, bulk_payload) def post_func(url, payload): try: req = requests.post(url, headers=HEADERS, json=payload) except (HTTPError, ExceptionError) as e: print(e) finally: print(req.status_code) print(req.text) if __name__ == "__main__": add_ppl_to_groups() # delete_people_first() # time.sleep(15) # print("Sleeping to allow deletions to process.") # with sync_playwright() as playwright: # run(playwright)