Login
main >   audio_workspace >  


AI Sample Managers:

Algonaut Atlas.
XLN Audio - XO.
https://www.producthunt.com/alternatives/sononym

Python ML

https://hackernoon.com/intro-to-audio-analysis-recognizing-sounds-using-machine-learning-qy2r3ufl

Sample name managers:

https://www.adsrsounds.com/product/software/adsr-sample-manager/

Steps to sort samples.

1_find_still_compressed.py
2_uncompress.py
3_find_carrot_and_replace.py
4_list_files_multi.py
5_csv_of_dups.py
6_check_dups.py
7_prepare_copy.py
8_copy.py

Go through archive to see if some need to be uncompressed.

# 1_find_still_compressed.py
import csv
import datetime
import os

PROCESS_FOLDERNAME = 'Loops_Unsorted'
LOCATION = r'F:\Audio\processed'


def save_paths(csv_filename, paths):

    output_csv_path = os.path.join(LOCATION, csv_filename)
    output_csv_file = open(output_csv_path, 'w', newline='', encoding="utf-8")
    output_csv_fieldnames = ['paths']
    writer = csv.DictWriter(output_csv_file, delimiter='^', fieldnames=output_csv_fieldnames)

    header = {value: value for value in output_csv_fieldnames}
    writer.writerow(header)

    rows = []
    for path in paths:
        row = {'paths': path}
        rows.append(row)

    for row in rows:
        writer.writerow(row)
    output_csv_file.close()  


if __name__ == "__main__":
    process_folder_path = os.path.join(LOCATION, PROCESS_FOLDERNAME)
    paths = {}
    for root, dirs, files in os.walk(process_folder_path):
        for file in files:
            if file.lower().endswith(('.rar', '.zip')):
                if file.rsplit('.')[0] not in dirs:
                    #print(dirs)
                    paths[os.path.join(root, file)] = True

    csv_filename = "compressed_{}_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
    save_paths(csv_filename, paths)

Uncompress.

#2_uncompress.py
import csv
import datetime
import os
from pyunpack import Archive
import sys


PROCESS_FOLDERNAME = 'Loops_Unsorted'
LOCATION = r'F:\Audio\processed'


if __name__ == "__main__":
    #Load compressed files.
    csv_filename = "compressed_{}.csv".format(PROCESS_FOLDERNAME)
    csv_path = os.path.join(LOCATION, csv_filename)
    compressed_paths = {}
    if os.path.exists(csv_path):
        csv_file = open(csv_path, 'r', encoding="utf-8")
        reader = csv.DictReader(csv_file, delimiter='^')

        for row in reader:
            compressed_paths[row['path']] = True

        csv_file.close()
    else:
        print("File not found: {}".format(csv_path))
        sys.exit()

    total = len(compressed_paths)
    for idx, compressed_path in enumerate(compressed_paths, 1):
        print("{}/{}".format(idx, total))
        directory = os.path.dirname(compressed_path)
        new_dir = os.path.basename(compressed_path).rsplit('.', 1)[0]
        new_path = os.path.join(directory, new_dir)

        if os.path.exists(new_path):
            print("Allready exists: {}\n{}\n".format(compressed_path, new_path))
        else:
            print("{}\n-->\n{}\n".format(compressed_path, new_path))
            os.mkdir(new_path)
            Archive(compressed_path).extractall(new_path)

Rename files with carrots.

# 3_find_carrot_and_replace.py
import os
import shutil


PATH = r'F:\Audio\processed\Loops_Unsorted'

if __name__ == "__main__":
    count = 0
    for root, dirs, files in os.walk(PATH):
            for file in files:
                count += 1
                if '^' in file:
                    print(os.path.join(root, file))
                    src = os.path.join(root, file)
                    dst = os.path.join(root, file.replace('^', '_'))
                    shutil.move(src, dst)

    print("{} files looked at.".format(count))

Get file hashes.

# 4_list_files_multi.py
import concurrent.futures
import csv
import datetime
import hashlib
import keyboard
import multiprocessing as mp
import os
import random
import shutil
import sys
import time


PROCESS_FOLDERNAME = 'Loops_Unsorted'
LOCATION = r'F:\Audio\processed'


def create_checksum(file_path):
    hash = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash.update(chunk)
    return(hash.hexdigest())
    

def hash_file(file_path):
    #time.sleep(3)

    return(file_path, create_checksum(file_path))


def create_chunks(list_name, n):
    for i in range(0, len(list_name), n):
        yield list_name[i:i + n]


# Print iterations progress
def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█', printEnd = "\r"):
    """
    Call in a loop to create terminal progress bar
    @params:
        iteration   - Required  : current iteration (Int)
        total       - Required  : total iterations (Int)
        prefix      - Optional  : prefix string (Str)
        suffix      - Optional  : suffix string (Str)
        decimals    - Optional  : positive number of decimals in percent complete (Int)
        length      - Optional  : character length of bar (Int)
        fill        - Optional  : bar fill character (Str)
        printEnd    - Optional  : end character (e.g. "\r", "\r\n") (Str)
    """
    percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
    filledLength = int(length * iteration // total)
    bar = fill * filledLength + '-' * (length - filledLength)
    print(f'\r{prefix} |{bar}| {iteration}/{total} {percent}% {suffix}      ', end = printEnd)
    # Print New Line on Complete
    if iteration == total: 
        print()


def save_hashes(csv_filename, paths_hashed):

    output_csv_path = os.path.join(LOCATION, csv_filename)
    output_csv_file = open(output_csv_path, 'w', newline='', encoding="utf-8")
    output_csv_fieldnames = ['hash', 'path']
    writer = csv.DictWriter(output_csv_file, delimiter='^', fieldnames=output_csv_fieldnames)

    header = {value: value for value in output_csv_fieldnames}
    writer.writerow(header)

    rows = []
    for path in paths_hashed:
        row = {'hash': paths_hashed[path], 'path': path}
        rows.append(row)

    for row in rows:
        writer.writerow(row)
    output_csv_file.close()    


def save_extensions(csv_filename, exts):

    output_csv_path = os.path.join(LOCATION, csv_filename)
    output_csv_file = open(output_csv_path, 'w', newline='', encoding="utf-8")
    output_csv_fieldnames = ['extension']
    writer = csv.DictWriter(output_csv_file, delimiter='^', fieldnames=output_csv_fieldnames)

    header = {value: value for value in output_csv_fieldnames}
    writer.writerow(header)

    rows = []
    for ext in exts:
        row = {'extension': ext}
        rows.append(row)

    for row in rows:
        writer.writerow(row)
    output_csv_file.close()  


if __name__ == "__main__":

    confirmation = ''
    while confirmation not in ['Y', 'N']:
        confirmation = input('Did you remember to run find_carrot_and_replace.py? (Y/N): ').upper()
        
    if confirmation == 'N':
        sys.exit()

    begin_time = datetime.datetime.now()
    begin_time_str = begin_time.strftime("%Y%m%d%H%M%S")
    print("Program Launched at: {}".format(begin_time))

    #Load existing hashed files.
    csv_hashed = {}
    output_csv_path = os.path.join(LOCATION, "{}.csv".format(PROCESS_FOLDERNAME))
    if os.path.exists(output_csv_path):
        csv_file = open(output_csv_path, 'r', encoding="utf-8")
        reader = csv.DictReader(csv_file, delimiter='^')

        for row in reader:
            csv_hashed[row['path']] = row['hash']

        csv_file.close()
    loaded_hashes = csv_hashed.copy()

    #Load all filenames.
    print("Load files.")
    _start = time.time()
    to_hash_paths = {}
    count = 0
    start_count = 0
    process_folder_path = os.path.join(LOCATION, PROCESS_FOLDERNAME)
    extensions = {}
    for root, dirs, files in os.walk(process_folder_path):
        for file in files:
            extensions[file.rsplit('.', 1)[-1]] = True
            path = os.path.join(root, file)
            if path not in loaded_hashes:
                to_hash_paths[path] = True
            else:
                start_count += 1
                del loaded_hashes[path]
            count += 1
            # if count >= 1000:
            #     break
        else:
            continue
        break

    total_hashes = count
    done_time = datetime.datetime.now()
    print("Done loading  {} in {}".format(total_hashes, time.time() - _start))

    ####### Extensions.
    csv_filename = "extensions_{}_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
    save_extensions(csv_filename, extensions)
    
    #Process hashes
    _start = time.time()
    _next_time_start = time.time()
    _save_time_start = time.time()
    rps = 0
    total_rps = 0
    average_rps = 1
    count_rps = 0
    files_hashed = {}
    count = start_count
    for file_p in to_hash_paths:
        filepath, hash_of_file = hash_file(file_p)
        files_hashed[filepath] = hash_of_file
        #print(count)
        printProgressBar(count, total_hashes, prefix = 'Progress (hold Q to exit):', suffix = 'rps({}) Complete'.format(round(average_rps, 2)), length = 50)
        count += 1
        if count % 10 == 0:
            count_rps += 1
            rps = 10/(time.time() - _next_time_start)
            _next_time_start = time.time()
            total_rps += rps
            average_rps = total_rps / count_rps
        if keyboard.is_pressed('q'):
            break
        if keyboard.is_pressed('s'):
            csv_filename = "{}_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
            all_hashed = csv_hashed | files_hashed
            save_hashes(csv_filename, all_hashed)
        if (time.time() - _save_time_start) > 300:
            csv_filename = "{}_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
            all_hashed = csv_hashed | files_hashed
            save_hashes(csv_filename, all_hashed)
            _save_time_start = time.time()

    printProgressBar(count, total_hashes, prefix = 'Progress (hold Q to exit):', suffix = 'rps({}) Complete'.format(round(average_rps, 2)), length = 50)
    print("\n\nLoop took: {}".format(time.time() - _start))

    # confirmation = ''
    # while confirmation not in ['Y', 'N']:
    #     confirmation = input('Continue? (Y/N): ').upper()
        
    # if confirmation == 'N':
    #     sys.exit()

    ####### Thread Pool Executor
    # _start = time.time()
    # files_hashed = {}
    # with concurrent.futures.ThreadPoolExecutor() as executor:
    #     futures = []
    #     for filepath in to_hash_paths:
    #         futures.append(executor.submit(hash_file, file_path=filepath))

    #     count = 1
    #     for future in concurrent.futures.as_completed(futures):
    #         filepath, hash_of_file = future.result()
    #         files_hashed[filepath] = hash_of_file
    #         #print(count)
    #         count += 1

    # print("ThreadPoolExecutor took: {}".format(time.time() - _start))

    # # print(files_hashed)

    # confirmation = ''
    # while confirmation not in ['Y', 'N']:
    #     confirmation = input('Continue? (Y/N): ').upper()
        
    # if confirmation == 'N':
    #     sys.exit()

    ####### Multi Processing
    # _start = time.time()
    # cpu_c = mp.cpu_count()
    # pool = mp.Pool()
    # files_hashed = {}
    # count = start_count
    # exit_now = False
    # print("Load Pool")
    # _next_time_start = time.time()
    # rps = 0
    # total_rps = 0
    # average_rps = 1
    # count_rps = 0
    # for list_of_files in create_chunks(list(to_hash_paths), cpu_c):
    #     pool.map(hash_file, list_of_files)
    #     hashes = pool.map(hash_file, list_of_files)
    #     for file_path, hash_of_file in hashes:
    #         files_hashed[file_path] = hash_of_file
    #         #print(count)
    #         printProgressBar(count, total_hashes, prefix = 'Progress (hold Q to exit):', suffix = 'rps({}) Complete'.format(round(average_rps, 2)), length = 50)
    #         count += 1
    #         if count % 10 == 0:
    #             count_rps += 1
    #             rps = 10/(time.time() - _next_time_start)
    #             _next_time_start = time.time()
    #             total_rps += rps
    #             average_rps = total_rps / count_rps
    #     if keyboard.is_pressed('q'):
    #         break

    # pool.close()
    # printProgressBar(count, total_hashes, prefix = 'Progress (hold Q to exit):', suffix = 'rps({}) Complete'.format(round(average_rps, 2)), length = 50)
    # print("\n\nMultiprocessing took: {}".format(time.time() - _start))

    #print(all_hashed)

    #Save hashes of files in CSV.

    #Save hashes.
    csv_filename = "{}_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
    all_hashed = csv_hashed | files_hashed
    save_hashes(csv_filename, all_hashed)
      

List files duplicates.

# 5_csv_of_dups.py
import csv
import datetime
import os
import sys


PROCESS_FOLDERNAME = 'Loops_Unsorted'
LOCATION = r'F:\Audio\processed'


if __name__ == "__main__":
    #Load existing hashed files.
    csv_hashed = {}
    output_csv_path = os.path.join(LOCATION, "{}.csv".format(PROCESS_FOLDERNAME))
    if os.path.exists(output_csv_path):
        csv_file = open(output_csv_path, 'r', encoding="utf-8")
        reader = csv.DictReader(csv_file, delimiter='^')

        for row in reader:
            csv_hashed[row['path']] = row['hash']

        csv_file.close()
    else:
        print("Not found: {}".format(output_csv_path))
        sys.exit()

    # Group hashes.
    multi_hashed = {}
    for path in csv_hashed:
        multi_hashed.setdefault(csv_hashed[path], []).append(path)

    # Save Grouped Hashes to CSV.
    output_csv_path = os.path.join(LOCATION, "{}_multi_found_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S")))
    output_csv_file = open(output_csv_path, 'w', newline='', encoding="utf-8")
    output_csv_fieldnames = ['hash', 'path']
    writer = csv.DictWriter(output_csv_file, delimiter='^', fieldnames=output_csv_fieldnames)

    header = {value: value for value in output_csv_fieldnames}
    writer.writerow(header)

    rows = []
    for hash_ in multi_hashed:
        if len(multi_hashed[hash_]) > 1:
            row = {'hash': hash_, 'path': multi_hashed[hash_][0]}
            rows.append(row)
            for path in multi_hashed[hash_][1:]:
                row = {'hash': '', 'path': path}
                rows.append(row)

    for row in rows:
        writer.writerow(row)
    output_csv_file.close()    

Put duplicates in a directory for checking.

# 6_check_dups.py
import csv
import datetime
import os
import shutil
import sys


DUPS_FOLDERNAME = 'Duplicates'
LOCATION = r'F:\Audio\processed'


if __name__ == "__main__":
    #Load dup hashed files.
    csv_hashed = []
    dup_csv_path = os.path.join(LOCATION, "{}.csv".format('Loops_Unsorted_multi_found'))
    if os.path.exists(dup_csv_path):
        csv_file = open(dup_csv_path, 'r', encoding="utf-8")
        reader = csv.DictReader(csv_file, delimiter='^')

        for row in reader:
            filename, file_extension = os.path.splitext(row['path'])
            if file_extension.lower() in ['.mp4', '.mp3', '.wav', '.aif', '.aiff']:
                csv_hashed.append([row['hash'], row['path']])

        csv_file.close()
    else:
        print("Not found: {}".format(dup_csv_path))
        sys.exit()

    # Get missing exception.
    hash_extensions = {}
    for item in csv_hashed:
        filename, file_extension = os.path.splitext(item[1])
        #print(filename, file_extension)
        hash_extensions.setdefault(item[0], []).append(file_extension.lower())

    chosen_hash_extensions = {}
    for hash in hash_extensions:
        extentions = hash_extensions[hash]
        extension = set(extentions)
        if len(extension) > 1:
            print("Too many extensions: {} {}".format(hash, ", ".join(extension)))
            sys.exit()
        chosen_hash_extensions[hash] =list(extension)[0]

    # for hash in chosen_hash_extensions:
    #     print("Hash: {} Extension: {}".format(hash, chosen_hash_extensions[hash]))

    # Copy files
    count_of_hash = ''
    count_copies = 0
    for item in csv_hashed:
        hash = item[0]

        if not hash:
            print("No hash found: {}".format(item[1]))
            sys.exit()

        if count_of_hash != hash:
            count = 1
            count_of_hash = hash

        dest_dir = os.path.join(LOCATION, DUPS_FOLDERNAME, hash)
        if not os.path.exists(dest_dir):
            os.mkdir(dest_dir)
        new_filename = '{}{}'.format(count, chosen_hash_extensions[hash])
        dest = os.path.join(dest_dir, new_filename)
        print("\n{}\n-->\n{}\n".format(item[1], dest))
        if not os.path.exists(dest):
            shutil.copy(item[1], dest)
        else:
            print("Path exists: {}".format(dest))
        print("{}/{}".format(count_copies, len(csv_hashed)))
        count += 1
        count_copies += 1

Prepare info for copying:

# 7_prepare_copy.py
import csv
import datetime
import os
import shutil
import sys


PROCESS_FOLDERNAME = 'Loops_Unsorted'
TYPE_FOLDERNAME = 'Type'
LOCATION = r'F:\Audio\processed'


if __name__ == "__main__":

    #Load dup hashed files.
    csv_dup_hashed = []
    dup_csv_path = os.path.join(LOCATION, "{}.csv".format('Loops_Unsorted_multi_found'))
    if os.path.exists(dup_csv_path):
        csv_file = open(dup_csv_path, 'r', encoding="utf-8")
        reader = csv.DictReader(csv_file, delimiter='^')

        for row in reader:
            filename, file_extension = os.path.splitext(row['path'])
            if file_extension.lower() in ['.mp4', '.mp3', '.wav', '.aif', '.aiff']:
                csv_dup_hashed.append([row['hash'], row['path']])

        csv_file.close()
    else:
        print("Not found: {}".format(dup_csv_path))
        sys.exit()

    # Get missing exception.
    hash_path = {}
    hash_extensions = {}
    for item in csv_dup_hashed:
        filename, file_extension = os.path.splitext(item[1])
        #print(filename, file_extension)
        hash_extensions.setdefault(item[0], []).append(file_extension.lower())
        hash_path[item[0]] = item[1]

    chosen_hash_extensions = {}
    for hash in hash_extensions:
        extentions = hash_extensions[hash]
        extension = set(extentions)
        if len(extension) > 1:
            print("Too many extensions: {} {}".format(hash, ", ".join(extension)))
            sys.exit()
        chosen_hash_extensions[hash] =list(extension)[0]

    #Load existing hashed files.
    csv_hashed_to_copy = {}
    hash_csv_path = os.path.join(LOCATION, "{}.csv".format(PROCESS_FOLDERNAME))
    if os.path.exists(hash_csv_path):
        csv_file = open(hash_csv_path, 'r', encoding="utf-8")
        reader = csv.DictReader(csv_file, delimiter='^')

        for row in reader:
            filename, file_extension = os.path.splitext(row['path'])
            if file_extension.lower() in ['.mp4', '.mp3', '.wav', '.aif', '.aiff']:
                if row['hash'] in chosen_hash_extensions:
                    csv_hashed_to_copy[row['hash']] = hash_path[row['hash']]
                else:
                    csv_hashed_to_copy[row['hash']] = row['path']

        csv_file.close()
    else:
        print("File not found: {}".format())

    types = {}
    count_files = 0
    for hash in csv_hashed_to_copy:
        filename, file_extension = os.path.splitext(csv_hashed_to_copy[hash])
        types.setdefault(file_extension.lower(), {})[hash] = csv_hashed_to_copy[hash]
        count_files += 1

    copy_these = []
    for type in types:
        print(type)
        group = 1
        count = 1
        for hash in types[type]:
            dest_folder = os.path.join(LOCATION, TYPE_FOLDERNAME, type[1:], "{:05d}".format(group))
            #if not os.path.exists(dest_folder):
            #    os.makedirs(dest_folder)
            dest_filename = "{:05d}{}".format(count, type)
            src = types[type][hash]
            dest = os.path.join(dest_folder, dest_filename)
            #shutil.copy(src, dest)
            copy_these.append([src, dest])

            if count >= 1000:
                group += 1
                count = 1
            else:
                count += 1

    # Save Grouped Hashes to CSV.
    output_csv_path = os.path.join(LOCATION, "{}_copy_{}.csv".format(PROCESS_FOLDERNAME, datetime.datetime.now().strftime("%Y%m%d%H%M%S")))
    output_csv_file = open(output_csv_path, 'w', newline='', encoding="utf-8")
    output_csv_fieldnames = ['src', 'dst']
    writer = csv.DictWriter(output_csv_file, delimiter='^', fieldnames=output_csv_fieldnames)

    header = {value: value for value in output_csv_fieldnames}
    writer.writerow(header)

    rows = []
    for src, dst in copy_these:
        row = {'src': src, 'dst': dst}
        rows.append(row)

    for row in rows:
        writer.writerow(row)
    output_csv_file.close() 

hidden1

hidden2