Newer
Older
skyldap / batch_add.py
@One One on 20 Apr 2022 2 KB implement CSV output
import json
import os
import glob

import ldap3
import pandas


class LDAPBatchAdd:
    def __init__(self, conf):
        # initialize config
        self.config = {
            "server": "",
            "userdn": "",
            "userpassword": "",
            "template_dn": "",
        }
        self.config.update(conf)

        # connect server
        self.server = ldap3.Server(self.config["server"])
        self.conn = ldap3.Connection(
            self.server,
            user=self.config["userdn"],
            password=self.config["userpassword"],
            auto_bind=True)

        # get template
        assert self.conn.search(
            search_base=self.config["template_dn"],
            search_filter="(objectClass=*)",
            search_scope=ldap3.BASE,
            attributes=ldap3.ALL_ATTRIBUTES), "Template user not found"
        self.template = self.conn.entries[0]

    def add_user(self, fields):
        # calculate dn (replace first fields)
        template_dn = self.template.entry_dn.split(",")
        k, v = template_dn[0].split("=")
        new_dn = ",".join([k + "=" + fields.get(k, v)] + template_dn[1:])

        # add
        return self.conn.add(new_dn, self.template.objectClass.value, fields)


def main():
    # load config
    script_dir = os.path.dirname(os.path.realpath(__file__))
    with open(os.path.join(script_dir, "data/config.json"), "r") as f:
        conf = json.load(f)
        f.close()

    batch_add = LDAPBatchAdd(conf)

    # load autocomplete
    import autocomplete
    plugins = [plugin(conf.get(plugin.__name__, {})) for plugin in autocomplete.plugins]

    # import all files
    for csv_filename in glob.glob(os.path.join(script_dir, "data/import/*.csv")):
        added_users = []
        # add users
        df = pandas.read_csv(csv_filename)
        for idx, row in df.iterrows():
            fields = row.to_dict()
            # autocomplete
            for plugin in plugins:
                fields = plugin.run(fields)
            # add
            if batch_add.add_user(fields):
                added_users.append(fields)
            else:
                print("Failed to add: ", fields)

        print("{} user(s) added.".format(len(added_users)))

        # write result csv
        out_path = os.path.join(script_dir, "data/import_result")
        os.makedirs(out_path, exist_ok=True)
        out_filename = os.path.join(out_path, os.path.basename(csv_filename))

        pandas.DataFrame.from_dict(added_users).to_csv(out_filename)


if __name__ == "__main__":
    main()