| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- # Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from typing import Any, Dict, Optional, Union
- from ....utils.deps import pipeline_requires_extra
- from ...common.batch_sampler import ImageBatchSampler
- from ...common.reader import ReadImage
- from ...utils.benchmark import benchmark
- from ...utils.hpi import HPIConfig
- from ...utils.pp_option import PaddlePredictorOption
- from ..base import BasePipeline
- from ..components import CropByBoxes, FaissBuilder, FaissIndexer
- from .result import ShiTuResult
- @benchmark.time_methods
- @pipeline_requires_extra("cv")
- class ShiTuV2Pipeline(BasePipeline):
- """ShiTuV2 Pipeline"""
- entities = "PP-ShiTuV2"
- def __init__(
- self,
- config: Dict,
- device: str = None,
- pp_option: PaddlePredictorOption = None,
- use_hpip: bool = False,
- hpi_config: Optional[Union[Dict[str, Any], HPIConfig]] = None,
- ):
- super().__init__(
- device=device, pp_option=pp_option, use_hpip=use_hpip, hpi_config=hpi_config
- )
- self._topk, self._rec_threshold, self._hamming_radius, self._det_threshold = (
- config.get("rec_topk", 5),
- config.get("rec_threshold", 0.5),
- config.get("hamming_radius", None),
- config.get("det_threshold", 0.5),
- )
- index = config.get("index", None)
- self.img_reader = ReadImage(format="BGR")
- self.det_model = self.create_model(config["SubModules"]["Detection"])
- self.rec_model = self.create_model(config["SubModules"]["Recognition"])
- self.crop_by_boxes = CropByBoxes()
- self.indexer = FaissIndexer(index=index) if index else None
- self.batch_sampler = ImageBatchSampler(
- batch_size=self.det_model.batch_sampler.batch_size
- )
- def predict(self, input, index=None, **kwargs):
- indexer = FaissIndexer(index) if index is not None else self.indexer
- assert indexer
- kwargs = {k: v for k, v in kwargs.items() if v is not None}
- topk = kwargs.get("rec_topk", self._topk)
- rec_threshold = kwargs.get("rec_threshold", self._rec_threshold)
- hamming_radius = kwargs.get("hamming_radius", self._hamming_radius)
- det_threshold = kwargs.get("det_threshold", self._det_threshold)
- for img_id, batch_data in enumerate(self.batch_sampler(input)):
- raw_imgs = self.img_reader(batch_data.instances)
- all_det_res = list(self.det_model(raw_imgs, threshold=det_threshold))
- for input_data, raw_img, det_res in zip(
- batch_data.instances, raw_imgs, all_det_res
- ):
- rec_res = self.get_rec_result(
- raw_img, det_res, indexer, rec_threshold, hamming_radius, topk
- )
- yield self.get_final_result(input_data, raw_img, det_res, rec_res)
- def get_rec_result(
- self, raw_img, det_res, indexer, rec_threshold, hamming_radius, topk
- ):
- if len(det_res["boxes"]) == 0:
- w, h = raw_img.shape[:2]
- det_res["boxes"].append(
- {
- "cls_id": 0,
- "label": "full_img",
- "score": 0,
- "coordinate": [0, 0, h, w],
- }
- )
- subs_of_img = list(self.crop_by_boxes(raw_img, det_res["boxes"]))
- img_list = [img["img"] for img in subs_of_img]
- all_rec_res = list(self.rec_model(img_list))
- all_rec_res = indexer(
- [rec_res["feature"] for rec_res in all_rec_res],
- score_thres=rec_threshold,
- hamming_radius=hamming_radius,
- topk=topk,
- )
- output = {"label": [], "score": []}
- for res in all_rec_res:
- output["label"].append(res["label"])
- output["score"].append(res["score"])
- return output
- def get_final_result(self, input_data, raw_img, det_res, rec_res):
- single_img_res = {"input_path": input_data, "input_img": raw_img, "boxes": []}
- for i, obj in enumerate(det_res["boxes"]):
- rec_scores = rec_res["score"][i]
- rec_scores = rec_scores if rec_scores is not None else [None]
- labels = rec_res["label"][i]
- labels = labels if labels is not None else [None]
- single_img_res["boxes"].append(
- {
- "labels": labels,
- "rec_scores": rec_scores,
- "det_score": obj["score"],
- "coordinate": obj["coordinate"],
- }
- )
- return ShiTuResult(single_img_res)
- def build_index(
- self,
- gallery_imgs,
- gallery_label,
- metric_type="IP",
- index_type="HNSW32",
- **kwargs
- ):
- return FaissBuilder.build(
- gallery_imgs,
- gallery_label,
- self.rec_model.predict,
- metric_type=metric_type,
- index_type=index_type,
- )
- def remove_index(self, remove_ids, index):
- return FaissBuilder.remove(remove_ids, index)
- def append_index(
- self,
- gallery_imgs,
- gallery_label,
- index,
- ):
- return FaissBuilder.append(
- gallery_imgs,
- gallery_label,
- self.rec_model.predict,
- index,
- )
|