|
|
|
|
|
|
| import argparse
|
| import glob
|
| import logging
|
| import os
|
| import sys
|
| from typing import Any, ClassVar, Dict, List
|
| import torch
|
|
|
| from detectron2.config import CfgNode, get_cfg
|
| from detectron2.data.detection_utils import read_image
|
| from detectron2.engine.defaults import DefaultPredictor
|
| from detectron2.structures.instances import Instances
|
| from detectron2.utils.logger import setup_logger
|
|
|
| from densepose import add_densepose_config
|
| from densepose.structures import DensePoseChartPredictorOutput, DensePoseEmbeddingPredictorOutput
|
| from densepose.utils.logger import verbosity_to_level
|
| from densepose.vis.base import CompoundVisualizer
|
| from densepose.vis.bounding_box import ScoredBoundingBoxVisualizer
|
| from densepose.vis.densepose_outputs_vertex import (
|
| DensePoseOutputsTextureVisualizer,
|
| DensePoseOutputsVertexVisualizer,
|
| get_texture_atlases,
|
| )
|
| from densepose.vis.densepose_results import (
|
| DensePoseResultsContourVisualizer,
|
| DensePoseResultsFineSegmentationVisualizer,
|
| DensePoseResultsUVisualizer,
|
| DensePoseResultsVVisualizer,
|
| )
|
| from densepose.vis.densepose_results_textures import (
|
| DensePoseResultsVisualizerWithTexture,
|
| get_texture_atlas,
|
| )
|
| from densepose.vis.extractor import (
|
| CompoundExtractor,
|
| DensePoseOutputsExtractor,
|
| DensePoseResultExtractor,
|
| create_extractor,
|
| )
|
|
|
| DOC = """Apply Net - a tool to print / visualize DensePose results
|
| """
|
|
|
| LOGGER_NAME = "apply_net"
|
| logger = logging.getLogger(LOGGER_NAME)
|
|
|
| _ACTION_REGISTRY: Dict[str, "Action"] = {}
|
|
|
|
|
| class Action:
|
| @classmethod
|
| def add_arguments(cls: type, parser: argparse.ArgumentParser):
|
| parser.add_argument(
|
| "-v",
|
| "--verbosity",
|
| action="count",
|
| help="Verbose mode. Multiple -v options increase the verbosity.",
|
| )
|
|
|
|
|
| def register_action(cls: type):
|
| """
|
| Decorator for action classes to automate action registration
|
| """
|
| global _ACTION_REGISTRY
|
| _ACTION_REGISTRY[cls.COMMAND] = cls
|
| return cls
|
|
|
|
|
| class InferenceAction(Action):
|
| @classmethod
|
| def add_arguments(cls: type, parser: argparse.ArgumentParser):
|
| super(InferenceAction, cls).add_arguments(parser)
|
| parser.add_argument("cfg", metavar="<config>", help="Config file")
|
| parser.add_argument("model", metavar="<model>", help="Model file")
|
| parser.add_argument(
|
| "--opts",
|
| help="Modify config options using the command-line 'KEY VALUE' pairs",
|
| default=[],
|
| nargs=argparse.REMAINDER,
|
| )
|
|
|
| @classmethod
|
| def execute(cls: type, args: argparse.Namespace, human_img):
|
| logger.info(f"Loading config from {args.cfg}")
|
| opts = []
|
| cfg = cls.setup_config(args.cfg, args.model, args, opts)
|
| logger.info(f"Loading model from {args.model}")
|
| predictor = DefaultPredictor(cfg)
|
|
|
|
|
|
|
|
|
|
|
| context = cls.create_context(args, cfg)
|
|
|
|
|
| with torch.no_grad():
|
| outputs = predictor(human_img)["instances"]
|
| out_pose = cls.execute_on_outputs(context, {"image": human_img}, outputs)
|
| cls.postexecute(context)
|
| return out_pose
|
|
|
| @classmethod
|
| def setup_config(
|
| cls: type, config_fpath: str, model_fpath: str, args: argparse.Namespace, opts: List[str]
|
| ):
|
| cfg = get_cfg()
|
| add_densepose_config(cfg)
|
| cfg.merge_from_file(config_fpath)
|
| cfg.merge_from_list(args.opts)
|
| if opts:
|
| cfg.merge_from_list(opts)
|
| cfg.MODEL.WEIGHTS = model_fpath
|
| cfg.freeze()
|
| return cfg
|
|
|
| @classmethod
|
| def _get_input_file_list(cls: type, input_spec: str):
|
| if os.path.isdir(input_spec):
|
| file_list = [
|
| os.path.join(input_spec, fname)
|
| for fname in os.listdir(input_spec)
|
| if os.path.isfile(os.path.join(input_spec, fname))
|
| ]
|
| elif os.path.isfile(input_spec):
|
| file_list = [input_spec]
|
| else:
|
| file_list = glob.glob(input_spec)
|
| return file_list
|
|
|
|
|
| @register_action
|
| class DumpAction(InferenceAction):
|
| """
|
| Dump action that outputs results to a pickle file
|
| """
|
|
|
| COMMAND: ClassVar[str] = "dump"
|
|
|
| @classmethod
|
| def add_parser(cls: type, subparsers: argparse._SubParsersAction):
|
| parser = subparsers.add_parser(cls.COMMAND, help="Dump model outputs to a file.")
|
| cls.add_arguments(parser)
|
| parser.set_defaults(func=cls.execute)
|
|
|
| @classmethod
|
| def add_arguments(cls: type, parser: argparse.ArgumentParser):
|
| super(DumpAction, cls).add_arguments(parser)
|
| parser.add_argument(
|
| "--output",
|
| metavar="<dump_file>",
|
| default="results.pkl",
|
| help="File name to save dump to",
|
| )
|
|
|
| @classmethod
|
| def execute_on_outputs(
|
| cls: type, context: Dict[str, Any], entry: Dict[str, Any], outputs: Instances
|
| ):
|
| image_fpath = entry["file_name"]
|
| logger.info(f"Processing {image_fpath}")
|
| result = {"file_name": image_fpath}
|
| if outputs.has("scores"):
|
| result["scores"] = outputs.get("scores").cpu()
|
| if outputs.has("pred_boxes"):
|
| result["pred_boxes_XYXY"] = outputs.get("pred_boxes").tensor.cpu()
|
| if outputs.has("pred_densepose"):
|
| if isinstance(outputs.pred_densepose, DensePoseChartPredictorOutput):
|
| extractor = DensePoseResultExtractor()
|
| elif isinstance(outputs.pred_densepose, DensePoseEmbeddingPredictorOutput):
|
| extractor = DensePoseOutputsExtractor()
|
| result["pred_densepose"] = extractor(outputs)[0]
|
| context["results"].append(result)
|
|
|
| @classmethod
|
| def create_context(cls: type, args: argparse.Namespace, cfg: CfgNode):
|
| context = {"results": [], "out_fname": args.output}
|
| return context
|
|
|
| @classmethod
|
| def postexecute(cls: type, context: Dict[str, Any]):
|
| out_fname = context["out_fname"]
|
| out_dir = os.path.dirname(out_fname)
|
| if len(out_dir) > 0 and not os.path.exists(out_dir):
|
| os.makedirs(out_dir)
|
| with open(out_fname, "wb") as hFile:
|
| torch.save(context["results"], hFile)
|
| logger.info(f"Output saved to {out_fname}")
|
|
|
|
|
| @register_action
|
| class ShowAction(InferenceAction):
|
| """
|
| Show action that visualizes selected entries on an image
|
| """
|
|
|
| COMMAND: ClassVar[str] = "show"
|
| VISUALIZERS: ClassVar[Dict[str, object]] = {
|
| "dp_contour": DensePoseResultsContourVisualizer,
|
| "dp_segm": DensePoseResultsFineSegmentationVisualizer,
|
| "dp_u": DensePoseResultsUVisualizer,
|
| "dp_v": DensePoseResultsVVisualizer,
|
| "dp_iuv_texture": DensePoseResultsVisualizerWithTexture,
|
| "dp_cse_texture": DensePoseOutputsTextureVisualizer,
|
| "dp_vertex": DensePoseOutputsVertexVisualizer,
|
| "bbox": ScoredBoundingBoxVisualizer,
|
| }
|
|
|
| @classmethod
|
| def add_parser(cls: type, subparsers: argparse._SubParsersAction):
|
| parser = subparsers.add_parser(cls.COMMAND, help="Visualize selected entries")
|
| cls.add_arguments(parser)
|
| parser.set_defaults(func=cls.execute)
|
|
|
| @classmethod
|
| def add_arguments(cls: type, parser: argparse.ArgumentParser):
|
| super(ShowAction, cls).add_arguments(parser)
|
| parser.add_argument(
|
| "visualizations",
|
| metavar="<visualizations>",
|
| help="Comma separated list of visualizations, possible values: "
|
| "[{}]".format(",".join(sorted(cls.VISUALIZERS.keys()))),
|
| )
|
| parser.add_argument(
|
| "--min_score",
|
| metavar="<score>",
|
| default=0.8,
|
| type=float,
|
| help="Minimum detection score to visualize",
|
| )
|
| parser.add_argument(
|
| "--nms_thresh", metavar="<threshold>", default=None, type=float, help="NMS threshold"
|
| )
|
| parser.add_argument(
|
| "--texture_atlas",
|
| metavar="<texture_atlas>",
|
| default=None,
|
| help="Texture atlas file (for IUV texture transfer)",
|
| )
|
| parser.add_argument(
|
| "--texture_atlases_map",
|
| metavar="<texture_atlases_map>",
|
| default=None,
|
| help="JSON string of a dict containing texture atlas files for each mesh",
|
| )
|
| parser.add_argument(
|
| "--output",
|
| metavar="<image_file>",
|
| default="outputres.png",
|
| help="File name to save output to",
|
| )
|
|
|
| @classmethod
|
| def setup_config(
|
| cls: type, config_fpath: str, model_fpath: str, args: argparse.Namespace, opts: List[str]
|
| ):
|
| opts.append("MODEL.ROI_HEADS.SCORE_THRESH_TEST")
|
| opts.append(str(args.min_score))
|
| if args.nms_thresh is not None:
|
| opts.append("MODEL.ROI_HEADS.NMS_THRESH_TEST")
|
| opts.append(str(args.nms_thresh))
|
| cfg = super(ShowAction, cls).setup_config(config_fpath, model_fpath, args, opts)
|
| return cfg
|
|
|
| @classmethod
|
| def execute_on_outputs(
|
| cls: type, context: Dict[str, Any], entry: Dict[str, Any], outputs: Instances
|
| ):
|
| import cv2
|
| import numpy as np
|
| visualizer = context["visualizer"]
|
| extractor = context["extractor"]
|
|
|
|
|
| image = cv2.cvtColor(entry["image"], cv2.COLOR_BGR2GRAY)
|
| image = np.tile(image[:, :, np.newaxis], [1, 1, 3])
|
| data = extractor(outputs)
|
| image_vis = visualizer.visualize(image, data)
|
|
|
| return image_vis
|
| entry_idx = context["entry_idx"] + 1
|
| out_fname = './image-densepose/' + image_fpath.split('/')[-1]
|
| out_dir = './image-densepose'
|
| out_dir = os.path.dirname(out_fname)
|
| if len(out_dir) > 0 and not os.path.exists(out_dir):
|
| os.makedirs(out_dir)
|
| cv2.imwrite(out_fname, image_vis)
|
| logger.info(f"Output saved to {out_fname}")
|
| context["entry_idx"] += 1
|
|
|
| @classmethod
|
| def postexecute(cls: type, context: Dict[str, Any]):
|
| pass
|
|
|
|
|
| @classmethod
|
| def _get_out_fname(cls: type, entry_idx: int, fname_base: str):
|
| base, ext = os.path.splitext(fname_base)
|
| return base + ".{0:04d}".format(entry_idx) + ext
|
|
|
| @classmethod
|
| def create_context(cls: type, args: argparse.Namespace, cfg: CfgNode) -> Dict[str, Any]:
|
| vis_specs = args.visualizations.split(",")
|
| visualizers = []
|
| extractors = []
|
| for vis_spec in vis_specs:
|
| texture_atlas = get_texture_atlas(args.texture_atlas)
|
| texture_atlases_dict = get_texture_atlases(args.texture_atlases_map)
|
| vis = cls.VISUALIZERS[vis_spec](
|
| cfg=cfg,
|
| texture_atlas=texture_atlas,
|
| texture_atlases_dict=texture_atlases_dict,
|
| )
|
| visualizers.append(vis)
|
| extractor = create_extractor(vis)
|
| extractors.append(extractor)
|
| visualizer = CompoundVisualizer(visualizers)
|
| extractor = CompoundExtractor(extractors)
|
| context = {
|
| "extractor": extractor,
|
| "visualizer": visualizer,
|
| "out_fname": args.output,
|
| "entry_idx": 0,
|
| }
|
| return context
|
|
|
|
|
| def create_argument_parser() -> argparse.ArgumentParser:
|
| parser = argparse.ArgumentParser(
|
| description=DOC,
|
| formatter_class=lambda prog: argparse.HelpFormatter(prog, max_help_position=120),
|
| )
|
| parser.set_defaults(func=lambda _: parser.print_help(sys.stdout))
|
| subparsers = parser.add_subparsers(title="Actions")
|
| for _, action in _ACTION_REGISTRY.items():
|
| action.add_parser(subparsers)
|
| return parser
|
|
|
|
|
| def main():
|
| parser = create_argument_parser()
|
| args = parser.parse_args()
|
| verbosity = getattr(args, "verbosity", None)
|
| global logger
|
| logger = setup_logger(name=LOGGER_NAME)
|
| logger.setLevel(verbosity_to_level(verbosity))
|
| args.func(args)
|
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|
|
|
|
|
|
|