| import pandas as pd |
| import numpy as np |
| import requests |
| import zlib |
| import os |
| import shelve |
| import magic |
| from multiprocessing import Pool |
| from tqdm import tqdm |
|
|
| |
| |
| |
| |
| |
|
|
| def _df_split_apply(tup_arg): |
| split_ind, subset, func = tup_arg |
| r = subset.apply(func, axis=1) |
| return (split_ind, r) |
|
|
| def df_multiprocess(df, processes, chunk_size, func, dataset_name): |
| print("Generating parts...") |
| with shelve.open('%s_%s_%s_results.tmp' % (dataset_name, func.__name__, chunk_size)) as results: |
| |
| pbar = tqdm(total=len(df), position=0) |
| |
| finished_chunks = set([int(k) for k in results.keys()]) |
| pbar.desc = "Resuming" |
| for k in results.keys(): |
| pbar.update(len(results[str(k)][1])) |
|
|
| pool_data = ((index, df[i:i + chunk_size], func) for index, i in enumerate(range(0, len(df), chunk_size)) if index not in finished_chunks) |
| print(int(len(df) / chunk_size), "parts.", chunk_size, "per part.", "Using", processes, "processes") |
| |
| pbar.desc = "Downloading" |
| with Pool(processes) as pool: |
| for i, result in enumerate(pool.imap_unordered(_df_split_apply, pool_data, 2)): |
| results[str(result[0])] = result |
| pbar.update(len(result[1])) |
| pbar.close() |
|
|
| print("Finished Downloading.") |
| return |
|
|
| |
| def _file_name(row): |
| row.name = str(int(row.name) // 1000) |
| return "%s/%s_%s.jpg" % (row['folder'], row.name, (zlib.crc32(row['url'].encode('utf-8')) & 0xffffffff)) |
|
|
| |
| def check_mimetype(row): |
| if os.path.isfile(str(row['file'])): |
| row['mimetype'] = magic.from_file(row['file'], mime=True) |
| row['size'] = os.stat(row['file']).st_size |
| return row |
|
|
| |
| |
| def check_download(row): |
| fname = _file_name(row) |
| sub_dir = fname.split('_')[0] |
| if not os.path.exists(sub_dir): |
| os.mkdir(sub_dir) |
| fname = '/'.join(fname.split('_')) |
| try: |
| |
| response = requests.head(row['url'], stream=False, timeout=5, allow_redirects=True ) |
| row['status'] = response.status_code |
| row['headers'] = dict(response.headers) |
| except: |
| |
| row['status'] = 408 |
| return row |
| if response.ok: |
| row['file'] = fname |
| return row |
|
|
| def download_image(row): |
| |
| fname = _file_name(row) |
| sub_dir = fname.split('_')[0] |
| if not os.path.exists(sub_dir): |
| os.mkdir(sub_dir) |
| fname = '/'.join(fname.split('_')) |
| |
| |
| if os.path.isfile(fname): |
| row['status'] = 200 |
| row['file'] = fname |
| row['mimetype'] = magic.from_file(row['file'], mime=True) |
| row['size'] = os.stat(row['file']).st_size |
| return row |
|
|
| try: |
| |
| response = requests.get(row['url'], stream=False, timeout=10, allow_redirects=True ) |
| row['status'] = response.status_code |
| |
| except Exception as e: |
| |
| row['status'] = 408 |
| return row |
| |
| if response.ok: |
| try: |
| with open(fname, 'wb') as out_file: |
| |
| response.raw.decode_content = True |
| out_file.write(response.content) |
| row['mimetype'] = magic.from_file(fname, mime=True) |
| row['size'] = os.stat(fname).st_size |
| except: |
| |
| row['status'] = 408 |
| return row |
| row['file'] = fname |
| return row |
|
|
| def open_tsv(fname, folder): |
| print("Opening %s Data File..." % fname) |
| df = pd.read_csv(fname, sep='\t', names=["caption","url"], usecols=range(1,2)) |
| df['folder'] = folder |
| print("Processing", len(df), " Images:") |
| return df |
|
|
| def df_from_shelve(chunk_size, func, dataset_name): |
| print("Generating Dataframe from results...") |
| with shelve.open('%s_%s_%s_results.tmp' % (dataset_name, func.__name__, chunk_size)) as results: |
| keylist = sorted([int(k) for k in results.keys()]) |
| df = pd.concat([results[str(k)][1] for k in keylist], sort=True) |
| return df |
|
|
| |
| num_processes = 32 |
| |
| images_per_part = 100 |
|
|
|
|
| |
| data_name = "/CC3M/images/validation" |
| df = open_tsv("/CC3M/validation.tsv", data_name) |
| df_multiprocess(df=df, processes=num_processes, chunk_size=images_per_part, func=download_image, dataset_name=data_name) |
| df = df_from_shelve(chunk_size=images_per_part, func=download_image, dataset_name=data_name) |
| df.to_csv("%s_report.tsv.gz" % data_name, compression='gzip', sep='\t', header=False, index=False) |
| |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |