Răsfoiți Sursa

[Feat] Add pipeline serving (#2077)

* Support pipeline serving

* Refactor

* Update det and instance_seg apps

* Fix bugs and enhance typing

* Set device when creating app

* Add TS apps

* Add ppchatocrv3 app

* Fix bugs and add anomaly detection app

* Fix bugs

* Update ppchatocrv3

* Update ppchatocrv3 APIs

* Update serving CLI

* Update ppchatocrv3

* Update and fix

* Fix bugs

* Update APIs

* Fix and update

* Remove bs setting
Lin Manhui 1 an în urmă
părinte
comite
36d992c2f3
45 a modificat fișierele cu 2269 adăugiri și 74 ștergeri
  1. 4 1
      .precommit/check_custom.py
  2. 42 0
      paddlex/__main__.py
  3. 1 1
      paddlex/inference/components/task_related/det.py
  4. 17 17
      paddlex/inference/models/__init__.py
  5. 46 19
      paddlex/inference/pipelines/__init__.py
  6. 1 1
      paddlex/inference/pipelines/ocr.py
  7. 1 1
      paddlex/inference/pipelines/ppchatocrv3/ppchatocrv3.py
  8. 17 0
      paddlex/inference/pipelines/serving/__init__.py
  9. 140 0
      paddlex/inference/pipelines/serving/_pipeline_apps/__init__.py
  10. 80 0
      paddlex/inference/pipelines/serving/_pipeline_apps/anomaly_detection.py
  11. 99 0
      paddlex/inference/pipelines/serving/_pipeline_apps/image_classification.py
  12. 110 0
      paddlex/inference/pipelines/serving/_pipeline_apps/instance_segmentation.py
  13. 88 0
      paddlex/inference/pipelines/serving/_pipeline_apps/multi_label_image_classification.py
  14. 88 0
      paddlex/inference/pipelines/serving/_pipeline_apps/object_detection.py
  15. 96 0
      paddlex/inference/pipelines/serving/_pipeline_apps/ocr.py
  16. 515 0
      paddlex/inference/pipelines/serving/_pipeline_apps/ppchatocrv3.py
  17. 82 0
      paddlex/inference/pipelines/serving/_pipeline_apps/semantic_segmentation.py
  18. 108 0
      paddlex/inference/pipelines/serving/_pipeline_apps/table_recognition.py
  19. 66 0
      paddlex/inference/pipelines/serving/_pipeline_apps/ts_ad.py
  20. 68 0
      paddlex/inference/pipelines/serving/_pipeline_apps/ts_cls.py
  21. 66 0
      paddlex/inference/pipelines/serving/_pipeline_apps/ts_fc.py
  22. 162 0
      paddlex/inference/pipelines/serving/app.py
  23. 80 0
      paddlex/inference/pipelines/serving/file_storage.py
  24. 30 0
      paddlex/inference/pipelines/serving/models.py
  25. 21 0
      paddlex/inference/pipelines/serving/server.py
  26. 110 0
      paddlex/inference/pipelines/serving/utils.py
  27. 31 1
      paddlex/inference/pipelines/single_model_pipeline.py
  28. 2 0
      paddlex/inference/pipelines/table_recognition/utils.py
  29. 3 1
      paddlex/modules/text_detection/model_list.py
  30. 87 16
      paddlex/paddlex_cli.py
  31. 0 1
      paddlex/pipelines/OCR.yaml
  32. 2 3
      paddlex/pipelines/PP-ChatOCRv3-doc.yaml
  33. 0 1
      paddlex/pipelines/anomaly_detection.yaml
  34. 0 1
      paddlex/pipelines/image_classification.yaml
  35. 0 1
      paddlex/pipelines/instance_segmentation.yaml
  36. 0 1
      paddlex/pipelines/multi_label_image_classification.yaml
  37. 0 1
      paddlex/pipelines/object_detection.yaml
  38. 0 1
      paddlex/pipelines/semantic_segmentation.yaml
  39. 0 1
      paddlex/pipelines/small_object_detection.yaml
  40. 0 1
      paddlex/pipelines/table_recognition.yaml
  41. 0 1
      paddlex/pipelines/ts_ad.yaml
  42. 0 1
      paddlex/pipelines/ts_cls.yaml
  43. 0 1
      paddlex/pipelines/ts_fc.yaml
  44. 5 0
      paddlex/utils/logging.py
  45. 1 1
      setup.py

+ 4 - 1
.precommit/check_custom.py

@@ -34,11 +34,14 @@ LICENSE_TEXT = """# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
 def check(file_path):
     with open(file_path, "r") as f:
         content = f.read()
+    # Exclude shebang line
+    if content.startswith("#!"):
+        content = content[content.index("\n") + 1 :]
     if not content.startswith(LICENSE_TEXT):
         print(f"License header missing in {file_path}")
         return False
     if "import paddle" in content or "from paddle import " in content:
-        print(f"Please using `lazy_paddle` instead `paddle` when import in {file_path}")
+        print(f"Please use `lazy_paddle` instead `paddle` when import in {file_path}")
         return False
     return True
 

+ 42 - 0
paddlex/__main__.py

@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+
+from .paddlex_cli import main
+
+
+def console_entry() -> int:
+    # See https://docs.python.org/3/library/signal.html#note-on-sigpipe
+    try:
+        # Flush output here to force SIGPIPE to be triggered while inside this
+        # try block.
+        code = main()
+        sys.stdout.flush()
+        sys.stderr.flush()
+        return code
+    except BrokenPipeError:
+        # Python flushes standard streams on exit;
+        # redirect remaining output to devnull to avoid another BrokenPipeError
+        # at shutdown.
+        devnull = os.open(os.devnull, os.O_WRONLY)
+        os.dup2(devnull, sys.stdout.fileno())
+        return 1
+
+
+if __name__ == "__main__":
+    sys.exit(console_entry())

+ 1 - 1
paddlex/inference/components/task_related/det.py

@@ -26,7 +26,7 @@ def restructured_boxes(boxes, labels, img_size):
     w, h = img_size
 
     for box in boxes:
-        xmin, ymin, xmax, ymax = list(map(int, box[2:]))
+        xmin, ymin, xmax, ymax = box[2:]
         xmin = max(0, xmin)
         ymin = max(0, ymin)
         xmax = min(w, xmax)

+ 17 - 17
paddlex/inference/models/__init__.py

@@ -16,6 +16,7 @@
 from pathlib import Path
 from typing import Any, Dict, Optional
 
+from ...utils import errors
 from ..utils.official_models import official_models
 from .base import BasePredictor, BasicPredictor
 from .image_classification import ClasPredictor
@@ -39,26 +40,25 @@ def _create_hp_predictor(
     model_name, model_dir, device, config, hpi_params, *args, **kwargs
 ):
     try:
-        from paddlex_hpi.predictors import HPPredictor
-    except ModuleNotFoundError as e:
+        from paddlex_hpi.models import HPPredictor
+    except ModuleNotFoundError:
         raise RuntimeError(
             "The PaddleX HPI plugin is not properly installed, and the high-performance model inference features are not available."
+        ) from None
+    try:
+        predictor = HPPredictor.get(model_name)(
+            model_dir=model_dir,
+            config=config,
+            device=device,
+            *args,
+            hpi_params=hpi_params,
+            **kwargs,
         )
-    if hpi_params is None:
-        raise ValueError("No HPI params given")
-    if "serial_number" not in hpi_params:
-        raise ValueError("The serial number is required but was not provided.")
-    serial_number = hpi_params["serial_number"]
-    update_license = hpi_params.get("update_license", False)
-    return HPPredictor.get(model_name)(
-        model_dir=model_dir,
-        config=config,
-        device=device,
-        serial_number=serial_number,
-        update_license=update_license,
-        *args,
-        **kwargs,
-    )
+    except errors.others.ClassNotFoundException:
+        raise ValueError(
+            f"{model_name} is not supported by the PaddleX HPI plugin."
+        ) from None
+    return predictor
 
 
 def create_predictor(

+ 46 - 19
paddlex/inference/pipelines/__init__.py

@@ -29,15 +29,28 @@ from .single_model_pipeline import (
     TSCls,
     MultiLableImageClas,
     SmallObjDet,
-    AnomolyDetection,
+    AnomalyDetection,
 )
 from .ocr import OCRPipeline
 from .table_recognition import TableRecPipeline
 from .ppchatocrv3 import PPChatOCRPipeline
 
 
-def create_pipeline(
-    pipeline: str,
+def load_pipeline_config(pipeline: str) -> Dict[str, Any]:
+    if not Path(pipeline).exists():
+        pipeline_path = get_pipeline_path(pipeline)
+        if pipeline_path is None:
+            raise Exception(
+                f"The pipeline ({pipeline}) does not exist! Please use a pipeline name or a config file path!"
+            )
+    else:
+        pipeline_path = pipeline
+    config = parse_config(pipeline_path)
+    return config
+
+
+def create_pipeline_from_config(
+    config: Dict[str, Any],
     device=None,
     pp_option=None,
     use_hpip: bool = False,
@@ -45,22 +58,6 @@ def create_pipeline(
     *args,
     **kwargs,
 ) -> BasePipeline:
-    """build model evaluater
-
-    Args:
-        pipeline (str): the pipeline name, that is name of pipeline class
-
-    Returns:
-        BasePipeline: the pipeline, which is subclass of BasePipeline.
-    """
-    if not Path(pipeline).exists():
-        pipeline_path = get_pipeline_path(pipeline)
-        if pipeline_path is None:
-            raise Exception(
-                f"The pipeline({pipeline}) don't exist! Please use the pipeline name or config yaml file!"
-            )
-    pipeline_path = pipeline
-    config = parse_config(pipeline_path)
     pipeline_name = config["Global"]["pipeline_name"]
     pipeline_setting = config["Pipeline"]
 
@@ -87,4 +84,34 @@ def create_pipeline(
     pipeline = BasePipeline.get(pipeline_name)(
         predictor_kwargs=predictor_kwargs, *args, **pipeline_setting
     )
+
     return pipeline
+
+
+def create_pipeline(
+    pipeline: str,
+    device=None,
+    pp_option=None,
+    use_hpip: bool = False,
+    hpi_params: Optional[Dict[str, Any]] = None,
+    *args,
+    **kwargs,
+) -> BasePipeline:
+    """build model evaluater
+
+    Args:
+        pipeline (str): the pipeline name, that is name of pipeline class
+
+    Returns:
+        BasePipeline: the pipeline, which is subclass of BasePipeline.
+    """
+    config = load_pipeline_config(pipeline)
+    return create_pipeline_from_config(
+        config,
+        device=device,
+        pp_option=pp_option,
+        use_hpip=use_hpip,
+        hpi_params=hpi_params,
+        *args,
+        **kwargs,
+    )

+ 1 - 1
paddlex/inference/pipelines/ocr.py

@@ -56,7 +56,7 @@ class OCRPipeline(BasePipeline):
             self.text_rec_model.set_predictor(batch_size=text_rec_batch_size)
 
     def predict(self, input, **kwargs):
-        device = kwargs.get("device", "gpu")
+        device = kwargs.get("device", None)
         for det_res in self.text_det_model(
             input, batch_size=kwargs.get("det_batch_size", 1), device=device
         ):

+ 1 - 1
paddlex/inference/pipelines/ppchatocrv3/ppchatocrv3.py

@@ -58,7 +58,7 @@ class PPChatOCRPipeline(TableRecPipeline):
         curve_batch_size=1,
         oricls_batch_size=1,
         recovery=True,
-        device="gpu",
+        device=None,
         predictor_kwargs=None,
     ):
         self.layout_model = layout_model

+ 17 - 0
paddlex/inference/pipelines/serving/__init__.py

@@ -0,0 +1,17 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._pipeline_apps import create_pipeline_app
+from .app import create_app_config
+from .server import run_server

+ 140 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/__init__.py

@@ -0,0 +1,140 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Any, Dict
+
+from fastapi import FastAPI
+
+from ...base import BasePipeline
+from ...ocr import OCRPipeline
+from ...ppchatocrv3 import PPChatOCRPipeline
+from ...single_model_pipeline import (
+    AnomalyDetection,
+    ImageClassification,
+    InstanceSegmentation,
+    MultiLableImageClas,
+    ObjectDetection,
+    SemanticSegmentation,
+    SmallObjDet,
+    TSAd,
+    TSCls,
+    TSFc,
+)
+from ...table_recognition import TableRecPipeline
+from ..app import create_app_config
+from .anomaly_detection import create_pipeline_app as create_anomaly_detection_app
+from .image_classification import create_pipeline_app as create_image_classification_app
+from .instance_segmentation import (
+    create_pipeline_app as create_instance_segmentation_app,
+)
+from .multi_label_image_classification import (
+    create_pipeline_app as create_multi_label_image_classification_app,
+)
+from .object_detection import create_pipeline_app as create_object_detection_app
+from .ocr import create_pipeline_app as create_ocr_app
+from .ppchatocrv3 import create_pipeline_app as create_ppchatocrv3_app
+from .semantic_segmentation import (
+    create_pipeline_app as create_semantic_segmentation_app,
+)
+from .table_recognition import create_pipeline_app as create_table_recognition_app
+from .ts_ad import create_pipeline_app as create_ts_ad_app
+from .ts_cls import create_pipeline_app as create_ts_cls_app
+from .ts_fc import create_pipeline_app as create_ts_fc_app
+
+
+# XXX (Bobholamovic): This is tightly coupled to the name-pipeline mapping,
+# which is dirty but necessary. I want to keep the pipeline definition code
+# untouched while adding the pipeline serving feature. Each pipeline app depends
+# on a specific pipeline class, and a pipeline name must be provided (in the
+# pipeline config) to specify the type of the pipeline.
+def create_pipeline_app(
+    pipeline: BasePipeline, pipeline_config: Dict[str, Any]
+) -> FastAPI:
+    pipeline_name = pipeline_config["Global"]["pipeline_name"]
+    app_config = create_app_config(pipeline_config)
+    if pipeline_name == "image_classification":
+        if not isinstance(pipeline, ImageClassification):
+            raise TypeError(
+                "Expected `pipeline` to be an instance of `ImageClassification`."
+            )
+        return create_image_classification_app(pipeline, app_config)
+    elif pipeline_name == "instance_segmentation":
+        if not isinstance(pipeline, InstanceSegmentation):
+            raise TypeError(
+                "Expected `pipeline` to be an instance of `InstanceSegmentation`."
+            )
+        return create_instance_segmentation_app(pipeline, app_config)
+    elif pipeline_name == "object_detection":
+        if not isinstance(pipeline, ObjectDetection):
+            raise TypeError(
+                "Expected `pipeline` to be an instance of `ObjectDetection`."
+            )
+        return create_object_detection_app(pipeline, app_config)
+    elif pipeline_name == "OCR":
+        if not isinstance(pipeline, OCRPipeline):
+            raise TypeError("Expected `pipeline` to be an instance of `OCRPipeline`.")
+        return create_ocr_app(pipeline, app_config)
+    elif pipeline_name == "semantic_segmentation":
+        if not isinstance(pipeline, SemanticSegmentation):
+            raise TypeError(
+                "Expected `pipeline` to be an instance of `SemanticSegmentation`."
+            )
+        return create_semantic_segmentation_app(pipeline, app_config)
+    elif pipeline_name == "table_recognition":
+        if not isinstance(pipeline, TableRecPipeline):
+            raise TypeError(
+                "Expected `pipeline` to be an instance of `TableRecPipeline`."
+            )
+        return create_table_recognition_app(pipeline, app_config)
+    elif pipeline_name == "ts_ad":
+        if not isinstance(pipeline, TSAd):
+            raise TypeError("Expected `pipeline` to be an instance of `TSAd`.")
+        return create_ts_ad_app(pipeline, app_config)
+    elif pipeline_name == "ts_cls":
+        if not isinstance(pipeline, TSCls):
+            raise TypeError("Expected `pipeline` to be an instance of `TSCls`.")
+        return create_ts_cls_app(pipeline, app_config)
+    elif pipeline_name == "ts_fc":
+        if not isinstance(pipeline, TSFc):
+            raise TypeError("Expected `pipeline` to be an instance of `TSFc`.")
+        return create_ts_fc_app(pipeline, app_config)
+    elif pipeline_name == "multi_label_image_classification":
+        if not isinstance(pipeline, MultiLableImageClas):
+            raise TypeError(
+                "Expected `pipeline` to be an instance of `MultiLableImageClas`."
+            )
+        return create_multi_label_image_classification_app(pipeline, app_config)
+    elif pipeline_name == "small_object_detection":
+        if not isinstance(pipeline, SmallObjDet):
+            raise TypeError("Expected `pipeline` to be an instance of `SmallObjDet`.")
+        return create_object_detection_app(pipeline, app_config)
+    elif pipeline_name == "anomaly_detection":
+        if not isinstance(pipeline, AnomalyDetection):
+            raise TypeError(
+                "Expected `pipeline` to be an instance of `AnomalyDetection`."
+            )
+        return create_anomaly_detection_app(pipeline, app_config)
+    elif pipeline_name == "PP-ChatOCRv3-doc":
+        if not isinstance(pipeline, PPChatOCRPipeline):
+            raise TypeError(
+                "Expected `pipeline` to be an instance of `PPChatOCRPipeline`."
+            )
+        return create_ppchatocrv3_app(pipeline, app_config)
+    else:
+        if BasePipeline.get(pipeline_name):
+            raise ValueError(
+                f"The {pipeline_name} pipeline does not support pipeline serving."
+            )
+        else:
+            raise ValueError("Unknown pipeline name")

+ 80 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/anomaly_detection.py

@@ -0,0 +1,80 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List
+
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated
+
+from .....utils import logging
+from ...single_model_pipeline import AnomalyDetection
+from .. import utils as serving_utils
+from ..app import AppConfig, create_app
+from ..models import Response, ResultResponse
+
+
+class InferRequest(BaseModel):
+    image: str
+
+
+class InferResult(BaseModel):
+    labelMap: List[int]
+    size: Annotated[List[int], Field(min_length=2, max_length=2)]
+    image: str
+
+
+def create_pipeline_app(pipeline: AnomalyDetection, app_config: AppConfig) -> FastAPI:
+    app, ctx = create_app(
+        pipeline=pipeline, app_config=app_config, app_aiohttp_session=True
+    )
+
+    @app.post(
+        "/anomaly-detection",
+        operation_id="infer",
+        responses={422: {"model": Response}},
+    )
+    async def _infer(request: InferRequest) -> ResultResponse[InferResult]:
+        pipeline = ctx.pipeline
+        aiohttp_session = ctx.aiohttp_session
+
+        try:
+            file_bytes = await serving_utils.get_raw_bytes(
+                request.image, aiohttp_session
+            )
+            image = serving_utils.image_bytes_to_array(file_bytes)
+
+            result = (await pipeline.infer(image))[0]
+
+            pred = result["pred"][0].tolist()
+            size = [len(pred), len(pred[0])]
+            label_map = [item for sublist in pred for item in sublist]
+            output_image_base64 = serving_utils.image_to_base64(
+                result.img.convert("RGB")
+            )
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=InferResult(
+                    labelMap=label_map, size=size, image=output_image_base64
+                ),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    return app

+ 99 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/image_classification.py

@@ -0,0 +1,99 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from itertools import islice
+from typing import List, Optional
+
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated
+
+from .....utils import logging
+from ...single_model_pipeline import ImageClassification
+from .. import utils as serving_utils
+from ..app import AppConfig, create_app
+from ..models import Response, ResultResponse
+
+
+class InferenceParams(BaseModel):
+    topK: Optional[Annotated[int, Field(gt=0)]] = None
+
+
+class InferRequest(BaseModel):
+    image: str
+    inferenceParams: Optional[InferenceParams] = None
+
+
+class Category(BaseModel):
+    id: int
+    name: str
+    score: float
+
+
+class InferResult(BaseModel):
+    categories: List[Category]
+    image: str
+
+
+def create_pipeline_app(
+    pipeline: ImageClassification, app_config: AppConfig
+) -> FastAPI:
+    app, ctx = create_app(
+        pipeline=pipeline, app_config=app_config, app_aiohttp_session=True
+    )
+
+    @app.post(
+        "/image-classification",
+        operation_id="infer",
+        responses={422: {"model": Response}},
+    )
+    async def _infer(request: InferRequest) -> ResultResponse[InferResult]:
+        pipeline = ctx.pipeline
+        aiohttp_session = ctx.aiohttp_session
+
+        try:
+            file_bytes = await serving_utils.get_raw_bytes(
+                request.image, aiohttp_session
+            )
+            image = serving_utils.image_bytes_to_array(file_bytes)
+            top_k: Optional[int] = None
+            if request.inferenceParams is not None:
+                if request.inferenceParams.topK is not None:
+                    top_k = request.inferenceParams.topK
+
+            result = (await pipeline.infer(image))[0]
+
+            if "label_names" in result:
+                cat_names = result["label_names"]
+            else:
+                cat_names = [str(id_) for id_ in result["class_ids"]]
+            categories: List[Category] = []
+            for id_, name, score in islice(
+                zip(result["class_ids"], cat_names, result["scores"]), None, top_k
+            ):
+                categories.append(Category(id=id_, name=name, score=score))
+            output_image_base64 = serving_utils.image_to_base64(result.img)
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=InferResult(categories=categories, image=output_image_base64),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    return app

+ 110 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/instance_segmentation.py

@@ -0,0 +1,110 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List
+
+import numpy as np
+import pycocotools.mask as mask_util
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated, TypeAlias
+
+from .....utils import logging
+from ...single_model_pipeline import InstanceSegmentation
+from .. import utils as serving_utils
+from ..app import AppConfig, create_app
+from ..models import Response, ResultResponse
+
+
+class InferRequest(BaseModel):
+    image: str
+
+
+BoundingBox: TypeAlias = Annotated[List[float], Field(min_length=4, max_length=4)]
+
+
+class Mask(BaseModel):
+    rleResult: str
+    size: Annotated[List[int], Field(min_length=2, max_length=2)]
+
+
+class Instance(BaseModel):
+    bbox: BoundingBox
+    categoryId: int
+    score: float
+    mask: Mask
+
+
+class InferResult(BaseModel):
+    instances: List[Instance]
+    image: str
+
+
+def _rle(mask: np.ndarray) -> str:
+    rle_res = mask_util.encode(np.asarray(mask[..., None], order="F", dtype="uint8"))[0]
+    return rle_res["counts"].decode("utf-8")
+
+
+def create_pipeline_app(
+    pipeline: InstanceSegmentation, app_config: AppConfig
+) -> FastAPI:
+    app, ctx = create_app(
+        pipeline=pipeline,
+        app_config=app_config,
+        app_aiohttp_session=True,
+    )
+
+    @app.post(
+        "/instance-segmentation",
+        operation_id="infer",
+        responses={422: {"model": Response}},
+    )
+    async def _infer(request: InferRequest) -> ResultResponse[InferResult]:
+        pipeline = ctx.pipeline
+        aiohttp_session = ctx.aiohttp_session
+
+        try:
+            file_bytes = await serving_utils.get_raw_bytes(
+                request.image, aiohttp_session
+            )
+            image = serving_utils.image_bytes_to_array(file_bytes)
+
+            result = (await pipeline.infer(image))[0]
+
+            instances: List[Instance] = []
+            for obj, mask in zip(result["boxes"], result["masks"]):
+                rle_res = _rle(mask)
+                mask = Mask(rleResult=rle_res, size=mask.shape)
+                instances.append(
+                    Instance(
+                        bbox=obj["coordinate"],
+                        categoryId=obj["cls_id"],
+                        score=obj["score"],
+                        mask=mask,
+                    )
+                )
+            output_image_base64 = serving_utils.image_to_base64(result.img)
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=InferResult(instances=instances, image=output_image_base64),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    return app

+ 88 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/multi_label_image_classification.py

@@ -0,0 +1,88 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List
+
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel
+
+from .....utils import logging
+from ...single_model_pipeline import ImageClassification
+from .. import utils as serving_utils
+from ..app import AppConfig, create_app
+from ..models import Response, ResultResponse
+
+
+class InferRequest(BaseModel):
+    image: str
+
+
+class Category(BaseModel):
+    id: int
+    name: str
+    score: float
+
+
+class InferResult(BaseModel):
+    categories: List[Category]
+    image: str
+
+
+def create_pipeline_app(
+    pipeline: ImageClassification, app_config: AppConfig
+) -> FastAPI:
+    app, ctx = create_app(
+        pipeline=pipeline, app_config=app_config, app_aiohttp_session=True
+    )
+
+    @app.post(
+        "/multilabel-image-classification",
+        operation_id="infer",
+        responses={422: {"model": Response}},
+    )
+    async def _infer(request: InferRequest) -> ResultResponse[InferResult]:
+        pipeline = ctx.pipeline
+        aiohttp_session = ctx.aiohttp_session
+
+        try:
+            file_bytes = await serving_utils.get_raw_bytes(
+                request.image, aiohttp_session
+            )
+            image = serving_utils.image_bytes_to_array(file_bytes)
+
+            result = (await pipeline.infer(image))[0]
+
+            if "label_names" in result:
+                cat_names = result["label_names"]
+            else:
+                cat_names = [str(id_) for id_ in result["class_ids"]]
+            categories: List[Category] = []
+            for id_, name, score in zip(
+                result["class_ids"], cat_names, result["scores"]
+            ):
+                categories.append(Category(id=id_, name=name, score=score))
+            output_image_base64 = serving_utils.image_to_base64(result.img)
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=InferResult(categories=categories, image=output_image_base64),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    return app

+ 88 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/object_detection.py

@@ -0,0 +1,88 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List
+
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated, TypeAlias
+
+from .....utils import logging
+from ...single_model_pipeline import ObjectDetection
+from .. import utils as serving_utils
+from ..app import AppConfig, create_app
+from ..models import Response, ResultResponse
+
+
+class InferRequest(BaseModel):
+    image: str
+
+
+BoundingBox: TypeAlias = Annotated[List[float], Field(min_length=4, max_length=4)]
+
+
+class DetectedObject(BaseModel):
+    bbox: BoundingBox
+    categoryId: int
+    score: float
+
+
+class InferResult(BaseModel):
+    detectedObjects: List[DetectedObject]
+    image: str
+
+
+def create_pipeline_app(pipeline: ObjectDetection, app_config: AppConfig) -> FastAPI:
+    app, ctx = create_app(
+        pipeline=pipeline, app_config=app_config, app_aiohttp_session=True
+    )
+
+    @app.post(
+        "/object-detection", operation_id="infer", responses={422: {"model": Response}}
+    )
+    async def _infer(request: InferRequest) -> ResultResponse[InferResult]:
+        pipeline = ctx.pipeline
+        aiohttp_session = ctx.aiohttp_session
+
+        try:
+            file_bytes = await serving_utils.get_raw_bytes(
+                request.image, aiohttp_session
+            )
+            image = serving_utils.image_bytes_to_array(file_bytes)
+
+            result = (await pipeline.infer(image))[0]
+
+            objects: List[DetectedObject] = []
+            for obj in result["boxes"]:
+                objects.append(
+                    DetectedObject(
+                        bbox=obj["coordinate"],
+                        categoryId=obj["cls_id"],
+                        score=obj["score"],
+                    )
+                )
+            output_image_base64 = serving_utils.image_to_base64(result.img)
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=InferResult(detectedObjects=objects, image=output_image_base64),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    return app

+ 96 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/ocr.py

@@ -0,0 +1,96 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List, Optional
+
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated, TypeAlias
+
+from .....utils import logging
+from ...ocr import OCRPipeline
+from .. import utils as serving_utils
+from ..app import AppConfig, create_app
+from ..models import Response, ResultResponse
+
+
+class InferenceParams(BaseModel):
+    maxLongSide: Optional[Annotated[int, Field(gt=0)]] = None
+
+
+class InferRequest(BaseModel):
+    image: str
+    inferenceParams: Optional[InferenceParams] = None
+
+
+Point: TypeAlias = Annotated[List[int], Field(min_length=2, max_length=2)]
+Polygon: TypeAlias = Annotated[List[Point], Field(min_length=3)]
+
+
+class Text(BaseModel):
+    poly: Polygon
+    text: str
+    score: float
+
+
+class InferResult(BaseModel):
+    texts: List[Text]
+    image: str
+
+
+def create_pipeline_app(pipeline: OCRPipeline, app_config: AppConfig) -> FastAPI:
+    app, ctx = create_app(
+        pipeline=pipeline, app_config=app_config, app_aiohttp_session=True
+    )
+
+    @app.post("/ocr", operation_id="infer", responses={422: {"model": Response}})
+    async def _infer(request: InferRequest) -> ResultResponse[InferResult]:
+        pipeline = ctx.pipeline
+        aiohttp_session = ctx.aiohttp_session
+
+        if request.inferenceParams:
+            max_long_side = request.inferenceParams.maxLongSide
+            if max_long_side:
+                raise HTTPException(
+                    status_code=422,
+                    detail="`max_long_side` is currently not supported.",
+                )
+
+        try:
+            file_bytes = await serving_utils.get_raw_bytes(
+                request.image, aiohttp_session
+            )
+            image = serving_utils.image_bytes_to_array(file_bytes)
+
+            result = (await pipeline.infer(image))[0]
+
+            texts: List[Text] = []
+            for poly, text, score in zip(
+                result["dt_polys"], result["rec_text"], result["rec_score"]
+            ):
+                texts.append(Text(poly=poly, text=text, score=score))
+            output_image_base64 = serving_utils.image_to_base64(result.img)
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=InferResult(texts=texts, image=output_image_base64),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    return app

+ 515 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/ppchatocrv3.py

@@ -0,0 +1,515 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import os
+import re
+import uuid
+from typing import Awaitable, Final, List, Literal, Optional, Tuple, Union
+from urllib.parse import parse_qs, urlparse
+
+import cv2
+import numpy as np
+from fastapi import FastAPI, HTTPException
+from numpy.typing import ArrayLike
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated, TypeAlias, assert_never
+
+from .....utils import logging
+from ...ppchatocrv3 import PPChatOCRPipeline
+from .. import file_storage
+from .. import utils as serving_utils
+from ..app import AppConfig, create_app
+from ..models import Response, ResultResponse
+
+_DEFAULT_MAX_IMG_SIZE: Final[Tuple[int, int]] = (2000, 2000)
+_DEFAULT_MAX_NUM_IMGS: Final[int] = 10
+
+
+FileType: TypeAlias = Literal[0, 1]
+
+
+class InferenceParams(BaseModel):
+    maxLongSide: Optional[Annotated[int, Field(gt=0)]] = None
+
+
+class AnalyzeImageRequest(BaseModel):
+    file: str
+    fileType: Optional[FileType] = None
+    useOricls: bool = True
+    useCurve: bool = True
+    useUvdoc: bool = True
+    inferenceParams: Optional[InferenceParams] = None
+
+
+Point: TypeAlias = Annotated[List[int], Field(min_length=2, max_length=2)]
+Polygon: TypeAlias = Annotated[List[Point], Field(min_length=3)]
+BoundingBox: TypeAlias = Annotated[List[float], Field(min_length=4, max_length=4)]
+
+
+class Text(BaseModel):
+    poly: Polygon
+    text: str
+    score: float
+
+
+class Table(BaseModel):
+    bbox: BoundingBox
+    html: str
+
+
+class VisionResult(BaseModel):
+    texts: List[Text]
+    tables: List[Table]
+    inputImage: str
+    ocrImage: str
+    layoutImage: str
+
+
+class AnalyzeImageResult(BaseModel):
+    visionResults: List[VisionResult]
+    visionInfo: dict
+
+
+class AIStudioParams(BaseModel):
+    accessToken: str
+    apiType: Literal["aistudio"] = "aistudio"
+
+
+class QianfanParams(BaseModel):
+    apiKey: str
+    secretKey: str
+    apiType: Literal["qianfan"] = "qianfan"
+
+
+LLMName: TypeAlias = Literal[
+    "ernie-3.5",
+    "ernie-3.5-8k",
+    "ernie-lite",
+    "ernie-4.0",
+    "ernie-4.0-turbo-8k",
+    "ernie-speed",
+    "ernie-speed-128k",
+    "ernie-tiny-8k",
+    "ernie-char-8k",
+]
+LLMParams: TypeAlias = Union[AIStudioParams, QianfanParams]
+
+
+class BuildVectorStoreRequest(BaseModel):
+    visionInfo: dict
+    minChars: Optional[int] = None
+    llmRequestInterval: Optional[float] = None
+    llmName: Optional[LLMName] = None
+    llmParams: Optional[Annotated[LLMParams, Field(discriminator="apiType")]] = None
+
+
+class BuildVectorStoreResult(BaseModel):
+    vectorStore: dict
+
+
+class RetrieveKnowledgeRequest(BaseModel):
+    keys: List[str]
+    vectorStore: dict
+    visionInfo: dict
+    llmName: Optional[LLMName] = None
+    llmParams: Optional[Annotated[LLMParams, Field(discriminator="apiType")]] = None
+
+
+class RetrieveKnowledgeResult(BaseModel):
+    retrievalResult: str
+
+
+class ChatRequest(BaseModel):
+    keys: List[str]
+    visionInfo: dict
+    taskDescription: Optional[str] = None
+    rules: Optional[str] = None
+    fewShot: Optional[str] = None
+    useVectorStore: bool = True
+    vectorStore: Optional[dict] = None
+    retrievalResult: Optional[str] = None
+    returnPrompts: bool = True
+    llmName: Optional[LLMName] = None
+    llmParams: Optional[Annotated[LLMParams, Field(discriminator="apiType")]] = None
+
+
+class Prompts(BaseModel):
+    ocr: str
+    table: str
+    html: str
+
+
+class ChatResult(BaseModel):
+    chatResult: str
+    prompts: Optional[Prompts] = None
+
+
+def _generate_request_id() -> str:
+    return str(uuid.uuid4())
+
+
+def _infer_file_type(url: str) -> FileType:
+    # Is it more reliable to guess the file type based on the response headers?
+    SUPPORTED_IMG_EXTS: Final[List[str]] = [".jpg", ".jpeg", ".png"]
+
+    url_parts = urlparse(url)
+    ext = os.path.splitext(url_parts.path)[1]
+    # HACK: The support for BOS URLs with query params is implementation-based,
+    # not interface-based.
+    is_bos_url = (
+        re.fullmatch(r"(?:bj|bd|su|gz|cd|hkg|fwh|fsh)\.bcebos\.com", url_parts.netloc)
+        is not None
+    )
+    if is_bos_url and url_parts.query:
+        params = parse_qs(url_parts.query)
+        if (
+            "responseContentDisposition" not in params
+            or len(params["responseContentDisposition"]) != 1
+        ):
+            raise ValueError("`responseContentDisposition` not found")
+        match_ = re.match(
+            r"attachment;filename=(.*)", params["responseContentDisposition"][0]
+        )
+        if not match_ or not match_.groups()[0] is not None:
+            raise ValueError(
+                "Failed to extract the filename from `responseContentDisposition`"
+            )
+        ext = os.path.splitext(match_.groups()[0])[1]
+    ext = ext.lower()
+    if ext == ".pdf":
+        return 0
+    elif ext in SUPPORTED_IMG_EXTS:
+        return 1
+    else:
+        raise ValueError("Unsupported file type")
+
+
+def _llm_params_to_dict(llm_params: LLMParams) -> dict:
+    if llm_params.apiType == "aistudio":
+        return {"api_type": "aistudio", "access_token": llm_params.accessToken}
+    elif llm_params.apiType == "qianfan":
+        return {
+            "api_type": "qianfan",
+            "ak": llm_params.apiKey,
+            "sk": llm_params.secretKey,
+        }
+    else:
+        assert_never(llm_params.apiType)
+
+
+def _bytes_to_arrays(
+    file_bytes: bytes,
+    file_type: FileType,
+    *,
+    max_img_size: Tuple[int, int],
+    max_num_imgs: int,
+) -> List[np.ndarray]:
+    if file_type == 0:
+        images = serving_utils.read_pdf(
+            file_bytes, resize=True, max_num_imgs=max_num_imgs
+        )
+    elif file_type == 1:
+        images = [serving_utils.image_bytes_to_array(file_bytes)]
+    else:
+        assert_never(file_type)
+    h, w = images[0].shape[0:2]
+    if w > max_img_size[1] or h > max_img_size[0]:
+        if w / h > max_img_size[0] / max_img_size[1]:
+            factor = max_img_size[0] / w
+        else:
+            factor = max_img_size[1] / h
+        images = [cv2.resize(img, (int(factor * w), int(factor * h))) for img in images]
+    return images
+
+
+def _postprocess_image(
+    img: ArrayLike,
+    request_id: str,
+    filename: str,
+    file_storage_config: file_storage.FileStorageConfig,
+) -> str:
+    key = f"{request_id}/{filename}"
+    ext = os.path.splitext(filename)[1]
+    img = np.asarray(img)
+    _, encoded_img = cv2.imencode(ext, img)
+    encoded_img = encoded_img.tobytes()
+    return file_storage.postprocess_file(
+        encoded_img, config=file_storage_config, key=key
+    )
+
+
+def create_pipeline_app(pipeline: PPChatOCRPipeline, app_config: AppConfig) -> FastAPI:
+    app, ctx = create_app(
+        pipeline=pipeline, app_config=app_config, app_aiohttp_session=True
+    )
+
+    if "file_storage_config" in ctx.extra:
+        ctx.extra["file_storage_config"] = file_storage.parse_file_storage_config(
+            ctx.extra["file_storage_config"]
+        )
+    else:
+        ctx.extra["file_storage_config"] = file_storage.InMemoryStorageConfig()
+    ctx.extra.setdefault("max_img_size", _DEFAULT_MAX_IMG_SIZE)
+    ctx.extra.setdefault("max_num_imgs", _DEFAULT_MAX_NUM_IMGS)
+
+    @app.post(
+        "/chatocr-vision",
+        operation_id="analyzeImage",
+        responses={422: {"model": Response}},
+    )
+    async def _analyze_image(
+        request: AnalyzeImageRequest,
+    ) -> ResultResponse[AnalyzeImageResult]:
+        pipeline = ctx.pipeline
+        aiohttp_session = ctx.aiohttp_session
+
+        request_id = _generate_request_id()
+
+        if request.fileType is None:
+            if serving_utils.is_url(request.file):
+                try:
+                    file_type = _infer_file_type(request.file)
+                except Exception as e:
+                    logging.exception(e)
+                    raise HTTPException(
+                        status_code=422,
+                        detail="The file type cannot be inferred from the URL. Please specify the file type explicitly.",
+                    )
+            else:
+                raise HTTPException(status_code=422, detail="Unknown file type")
+        else:
+            file_type = request.fileType
+
+        if request.inferenceParams:
+            max_long_side = request.inferenceParams.maxLongSide
+            if max_long_side:
+                raise HTTPException(
+                    status_code=422,
+                    detail="`max_long_side` is currently not supported.",
+                )
+
+        try:
+            file_bytes = await serving_utils.get_raw_bytes(
+                request.file, aiohttp_session
+            )
+            images = await serving_utils.call_async(
+                _bytes_to_arrays,
+                file_bytes,
+                file_type,
+                max_img_size=ctx.extra["max_img_size"],
+                max_num_imgs=ctx.extra["max_num_imgs"],
+            )
+
+            result = await pipeline.infer(
+                images,
+                use_oricls=request.useOricls,
+                use_curve=request.useCurve,
+                use_uvdoc=request.useUvdoc,
+            )
+
+            vision_results: List[VisionResult] = []
+            for i, (img, item) in enumerate(zip(images, result[0])):
+                pp_img_futures: List[Awaitable] = []
+                future = serving_utils.call_async(
+                    _postprocess_image,
+                    img,
+                    request_id=request_id,
+                    filename=f"input_image_{i}.jpg",
+                    file_storage_config=ctx.extra["file_storage_config"],
+                )
+                pp_img_futures.append(future)
+                future = serving_utils.call_async(
+                    _postprocess_image,
+                    item["ocr_result"].img,
+                    request_id=request_id,
+                    filename=f"ocr_image_{i}.jpg",
+                    file_storage_config=ctx.extra["file_storage_config"],
+                )
+                pp_img_futures.append(future)
+                future = serving_utils.call_async(
+                    _postprocess_image,
+                    item["layout_result"].img,
+                    request_id=request_id,
+                    filename=f"layout_image_{i}.jpg",
+                    file_storage_config=ctx.extra["file_storage_config"],
+                )
+                pp_img_futures.append(future)
+                texts: List[Text] = []
+                for poly, text, score in zip(
+                    item["ocr_result"]["dt_polys"],
+                    item["ocr_result"]["rec_text"],
+                    item["ocr_result"]["rec_score"],
+                ):
+                    texts.append(Text(poly=poly, text=text, score=score))
+                tables = [
+                    Table(bbox=r["layout_bbox"], html=r["html"])
+                    for r in item["table_result"]
+                ]
+                input_img, ocr_img, layout_img = await asyncio.gather(*pp_img_futures)
+                vision_result = VisionResult(
+                    texts=texts,
+                    tables=tables,
+                    inputImage=input_img,
+                    ocrImage=ocr_img,
+                    layoutImage=layout_img,
+                )
+                vision_results.append(vision_result)
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=AnalyzeImageResult(
+                    visionResults=vision_results,
+                    visionInfo=result[1],
+                ),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    @app.post(
+        "/chatocr-vector",
+        operation_id="buildVectorStore",
+        responses={422: {"model": Response}},
+    )
+    async def _build_vector_store(
+        request: BuildVectorStoreRequest,
+    ) -> ResultResponse[BuildVectorStoreResult]:
+        pipeline = ctx.pipeline
+
+        try:
+            kwargs = {"visual_info": request.visionInfo}
+            if request.minChars is not None:
+                kwargs["min_characters"] = request.minChars
+            else:
+                kwargs["min_characters"] = 0
+            if request.llmRequestInterval is not None:
+                kwargs["llm_request_interval"] = request.llmRequestInterval
+            if request.llmName is not None:
+                kwargs["llm_name"] = request.llmName
+            if request.llmParams is not None:
+                kwargs["llm_params"] = _llm_params_to_dict(request.llmParams)
+
+            result = await serving_utils.call_async(
+                pipeline.pipeline.get_vector_text, **kwargs
+            )
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=BuildVectorStoreResult(vectorStore=result),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    @app.post(
+        "/chatocr-retrieval",
+        operation_id="retrieveKnowledge",
+        responses={422: {"model": Response}},
+    )
+    async def _retrieve_knowledge(
+        request: RetrieveKnowledgeRequest,
+    ) -> ResultResponse[RetrieveKnowledgeResult]:
+        pipeline = ctx.pipeline
+
+        try:
+            kwargs = {
+                "key_list": request.keys,
+                "vector": request.vectorStore,
+                "visual_info": request.visionInfo,
+            }
+            if request.llmName is not None:
+                kwargs["llm_name"] = request.llmName
+            if request.llmParams is not None:
+                kwargs["llm_params"] = _llm_params_to_dict(request.llmParams)
+
+            result = await serving_utils.call_async(
+                pipeline.pipeline.get_retrieval_text, **kwargs
+            )
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=RetrieveKnowledgeResult(retrievalResult=result["retrieval"]),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    @app.post(
+        "/chatocr-chat", operation_id="chat", responses={422: {"model": Response}}
+    )
+    async def _chat(
+        request: ChatRequest,
+    ) -> ResultResponse[ChatResult]:
+        pipeline = ctx.pipeline
+
+        try:
+            kwargs = {
+                "key_list": request.keys,
+                "visual_info": request.visionInfo,
+            }
+            if request.taskDescription is not None:
+                kwargs["user_task_description"] = request.taskDescription
+            if request.rules is not None:
+                kwargs["rules"] = request.rules
+            if request.fewShot is not None:
+                kwargs["few_shot"] = request.fewShot
+            kwargs["use_vector"] = request.useVectorStore
+            if request.vectorStore is not None:
+                kwargs["vector"] = request.vectorStore
+            if request.retrievalResult is not None:
+                kwargs["retrieval_result"] = request.retrievalResult
+            kwargs["save_prompt"] = request.returnPrompts
+            if request.llmName is not None:
+                kwargs["llm_name"] = request.llmName
+            if request.llmParams is not None:
+                kwargs["llm_params"] = _llm_params_to_dict(request.llmParams)
+
+            result = await serving_utils.call_async(pipeline.pipeline.chat, **kwargs)
+
+            if result["prompt"]:
+                prompts = Prompts(
+                    ocr=result["prompt"]["ocr_prompt"],
+                    table=result["prompt"]["table_prompt"],
+                    html=result["prompt"]["html_prompt"],
+                )
+                chat_result = ChatResult(
+                    chatResult=result["chat_res"],
+                    prompts=prompts,
+                )
+            else:
+                chat_result = ChatResult(
+                    chatResult=result["chat_res"],
+                )
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=chat_result,
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    return app

+ 82 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/semantic_segmentation.py

@@ -0,0 +1,82 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List
+
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated
+
+from .....utils import logging
+from ...single_model_pipeline import SemanticSegmentation
+from .. import utils as serving_utils
+from ..app import AppConfig, create_app
+from ..models import Response, ResultResponse
+
+
+class InferRequest(BaseModel):
+    image: str
+
+
+class InferResult(BaseModel):
+    labelMap: List[int]
+    size: Annotated[List[int], Field(min_length=2, max_length=2)]
+    image: str
+
+
+def create_pipeline_app(
+    pipeline: SemanticSegmentation, app_config: AppConfig
+) -> FastAPI:
+    app, ctx = create_app(
+        pipeline=pipeline, app_config=app_config, app_aiohttp_session=True
+    )
+
+    @app.post(
+        "/semantic-segmentation",
+        operation_id="infer",
+        responses={422: {"model": Response}},
+    )
+    async def _infer(request: InferRequest) -> ResultResponse[InferResult]:
+        pipeline = ctx.pipeline
+        aiohttp_session = ctx.aiohttp_session
+
+        try:
+            file_bytes = await serving_utils.get_raw_bytes(
+                request.image, aiohttp_session
+            )
+            image = serving_utils.image_bytes_to_array(file_bytes)
+
+            result = (await pipeline.infer(image))[0]
+
+            pred = result["pred"][0].tolist()
+            size = [len(pred), len(pred[0])]
+            label_map = [item for sublist in pred for item in sublist]
+            output_image_base64 = serving_utils.image_to_base64(
+                result.img.convert("RGB")
+            )
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=InferResult(
+                    labelMap=label_map, size=size, image=output_image_base64
+                ),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    return app

+ 108 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/table_recognition.py

@@ -0,0 +1,108 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List, Optional
+
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel, Field
+from typing_extensions import Annotated, TypeAlias
+
+from .....utils import logging
+from ...table_recognition import TableRecPipeline
+from .. import utils as serving_utils
+from ..app import AppConfig, create_app
+from ..models import Response, ResultResponse
+
+
+class InferenceParams(BaseModel):
+    maxLongSide: Optional[Annotated[int, Field(gt=0)]] = None
+
+
+class InferRequest(BaseModel):
+    image: str
+    inferenceParams: Optional[InferenceParams] = None
+
+
+Point: TypeAlias = Annotated[List[int], Field(min_length=2, max_length=2)]
+BoundingBox: TypeAlias = Annotated[List[float], Field(min_length=4, max_length=4)]
+
+
+class Table(BaseModel):
+    bbox: BoundingBox
+    html: str
+
+
+class InferResult(BaseModel):
+    tables: List[Table]
+    layoutImage: str
+    ocrImage: str
+
+
+def create_pipeline_app(pipeline: TableRecPipeline, app_config: AppConfig) -> FastAPI:
+    app, ctx = create_app(
+        pipeline=pipeline, app_config=app_config, app_aiohttp_session=True
+    )
+
+    @app.post(
+        "/table-recognition", operation_id="infer", responses={422: {"model": Response}}
+    )
+    async def _infer(request: InferRequest) -> ResultResponse[InferResult]:
+        pipeline = ctx.pipeline
+        aiohttp_session = ctx.aiohttp_session
+
+        if request.inferenceParams:
+            max_long_side = request.inferenceParams.maxLongSide
+            if max_long_side:
+                raise HTTPException(
+                    status_code=422,
+                    detail="`max_long_side` is currently not supported.",
+                )
+
+        try:
+            file_bytes = await serving_utils.get_raw_bytes(
+                request.image, aiohttp_session
+            )
+            image = serving_utils.image_bytes_to_array(file_bytes)
+
+            result = (await pipeline.infer(image))[0]
+
+            tables: List[Table] = []
+            for item in result["table_result"]:
+                tables.append(
+                    Table(
+                        bbox=item["layout_bbox"],
+                        html=item["html"],
+                    )
+                )
+            layout_image_base64 = serving_utils.image_to_base64(
+                result["layout_result"].img
+            )
+            ocr_iamge_base64 = serving_utils.image_to_base64(result["ocr_result"].img)
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=InferResult(
+                    tables=tables,
+                    layoutImage=layout_image_base64,
+                    ocrImage=ocr_iamge_base64,
+                ),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    return app

+ 66 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/ts_ad.py

@@ -0,0 +1,66 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel
+
+from .....utils import logging
+from ...single_model_pipeline import TSAd
+from .. import utils as serving_utils
+from ..app import AppConfig, create_app
+from ..models import Response, ResultResponse
+
+
+class InferRequest(BaseModel):
+    csv: str
+
+
+class InferResult(BaseModel):
+    csv: str
+
+
+def create_pipeline_app(pipeline: TSAd, app_config: AppConfig) -> FastAPI:
+    app, ctx = create_app(
+        pipeline=pipeline, app_config=app_config, app_aiohttp_session=True
+    )
+
+    @app.post(
+        "/time-series-anomaly-detection",
+        operation_id="infer",
+        responses={422: {"model": Response}},
+    )
+    async def _infer(request: InferRequest) -> ResultResponse[InferResult]:
+        pipeline = ctx.pipeline
+        aiohttp_session = ctx.aiohttp_session
+
+        try:
+            file_bytes = await serving_utils.get_raw_bytes(request.csv, aiohttp_session)
+            df = serving_utils.csv_bytes_to_data_frame(file_bytes)
+
+            result = (await pipeline.infer(df))[0]
+
+            output_csv = serving_utils.data_frame_to_base64(result["anomaly"])
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=InferResult(csv=output_csv),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    return app

+ 68 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/ts_cls.py

@@ -0,0 +1,68 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel
+
+from .....utils import logging
+from ...single_model_pipeline import TSCls
+from .. import utils as serving_utils
+from ..app import AppConfig, create_app
+from ..models import Response, ResultResponse
+
+
+class InferRequest(BaseModel):
+    csv: str
+
+
+class InferResult(BaseModel):
+    label: str
+    score: float
+
+
+def create_pipeline_app(pipeline: TSCls, app_config: AppConfig) -> FastAPI:
+    app, ctx = create_app(
+        pipeline=pipeline, app_config=app_config, app_aiohttp_session=True
+    )
+
+    @app.post(
+        "/time-series-classification",
+        operation_id="infer",
+        responses={422: {"model": Response}},
+    )
+    async def _infer(request: InferRequest) -> ResultResponse[InferResult]:
+        pipeline = ctx.pipeline
+        aiohttp_session = ctx.aiohttp_session
+
+        try:
+            file_bytes = await serving_utils.get_raw_bytes(request.csv, aiohttp_session)
+            df = serving_utils.csv_bytes_to_data_frame(file_bytes)
+
+            result = (await pipeline.infer(df))[0]
+
+            label = str(result["classification"].at[0, "classid"])
+            score = float(result["classification"].at[0, "score"])
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=InferResult(label=label, score=score),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    return app

+ 66 - 0
paddlex/inference/pipelines/serving/_pipeline_apps/ts_fc.py

@@ -0,0 +1,66 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel
+
+from .....utils import logging
+from ...single_model_pipeline import TSFc
+from .. import utils as serving_utils
+from ..app import AppConfig, create_app
+from ..models import Response, ResultResponse
+
+
+class InferRequest(BaseModel):
+    csv: str
+
+
+class InferResult(BaseModel):
+    csv: str
+
+
+def create_pipeline_app(pipeline: TSFc, app_config: AppConfig) -> FastAPI:
+    app, ctx = create_app(
+        pipeline=pipeline, app_config=app_config, app_aiohttp_session=True
+    )
+
+    @app.post(
+        "/time-series-forecasting",
+        operation_id="infer",
+        responses={422: {"model": Response}},
+    )
+    async def _infer(request: InferRequest) -> ResultResponse[InferResult]:
+        pipeline = ctx.pipeline
+        aiohttp_session = ctx.aiohttp_session
+
+        try:
+            file_bytes = await serving_utils.get_raw_bytes(request.csv, aiohttp_session)
+            df = serving_utils.csv_bytes_to_data_frame(file_bytes)
+
+            result = (await pipeline.infer(df))[0]
+
+            output_csv = serving_utils.data_frame_to_base64(result["forecast"])
+
+            return ResultResponse(
+                logId=serving_utils.generate_log_id(),
+                errorCode=0,
+                errorMsg="Success",
+                result=InferResult(csv=output_csv),
+            )
+
+        except Exception as e:
+            logging.exception(e)
+            raise HTTPException(status_code=500, detail="Internal server error")
+
+    return app

+ 162 - 0
paddlex/inference/pipelines/serving/app.py

@@ -0,0 +1,162 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import contextlib
+import json
+from typing import (
+    Any,
+    AsyncGenerator,
+    Callable,
+    Dict,
+    Generic,
+    List,
+    Optional,
+    Tuple,
+    TypeVar,
+)
+
+import aiohttp
+import fastapi
+from fastapi.encoders import jsonable_encoder
+from fastapi.exceptions import RequestValidationError
+from fastapi.responses import JSONResponse
+from pydantic import BaseModel
+from starlette.exceptions import HTTPException
+from typing_extensions import Final, ParamSpec
+
+from ..base import BasePipeline
+from .models import Response
+from .utils import call_async, generate_log_id
+
+SERVING_CONFIG_KEY: Final[str] = "Serving"
+
+_PipelineT = TypeVar("_PipelineT", bound=BasePipeline)
+_P = ParamSpec("_P")
+_R = TypeVar("_R")
+
+
+class PipelineWrapper(Generic[_PipelineT]):
+    def __init__(self, pipeline: _PipelineT) -> None:
+        super().__init__()
+        self._pipeline = pipeline
+        self._lock = asyncio.Lock()
+
+    @property
+    def pipeline(self) -> _PipelineT:
+        return self._pipeline
+
+    async def infer(self, *args: Any, **kwargs: Any) -> List[Any]:
+        def _infer() -> List[Any]:
+            output = list(self._pipeline(*args, **kwargs))
+            return output
+
+        return await self.call(_infer)
+
+    async def call(
+        self, func: Callable[_P, _R], *args: _P.args, **kwargs: _P.kwargs
+    ) -> _R:
+        async with self._lock:
+            return await call_async(func, *args, **kwargs)
+
+
+class AppConfig(BaseModel):
+    extra: Optional[Dict[str, Any]] = None
+
+
+class AppContext(Generic[_PipelineT]):
+    def __init__(self, *, config: AppConfig) -> None:
+        super().__init__()
+        self._config = config
+        self.extra: Dict[str, Any] = {}
+        self._pipeline: Optional[PipelineWrapper[_PipelineT]] = None
+        self._aiohttp_session: Optional[aiohttp.ClientSession] = None
+
+    @property
+    def config(self) -> AppConfig:
+        return self._config
+
+    @property
+    def pipeline(self) -> PipelineWrapper[_PipelineT]:
+        if not self._pipeline:
+            raise AttributeError("`pipeline` has not been set.")
+        return self._pipeline
+
+    @pipeline.setter
+    def pipeline(self, val: PipelineWrapper[_PipelineT]) -> None:
+        self._pipeline = val
+
+    @property
+    def aiohttp_session(self) -> aiohttp.ClientSession:
+        if not self._aiohttp_session:
+            raise AttributeError("`aiohttp_session` has not been set.")
+        return self._aiohttp_session
+
+    @aiohttp_session.setter
+    def aiohttp_session(self, val: aiohttp.ClientSession) -> None:
+        self._aiohttp_session = val
+
+
+def create_app_config(pipeline_config: Dict[str, Any], **kwargs: Any) -> AppConfig:
+    app_config = pipeline_config.get(SERVING_CONFIG_KEY, {})
+    app_config.update(kwargs)
+    return AppConfig.model_validate(app_config)
+
+
+def create_app(
+    *, pipeline: _PipelineT, app_config: AppConfig, app_aiohttp_session: bool = True
+) -> Tuple[fastapi.FastAPI, AppContext[_PipelineT]]:
+    @contextlib.asynccontextmanager
+    async def _app_lifespan(app: fastapi.FastAPI) -> AsyncGenerator[None, None]:
+        ctx.pipeline = PipelineWrapper[_PipelineT](pipeline)
+        if app_aiohttp_session:
+            ctx.aiohttp_session = aiohttp.ClientSession(
+                cookie_jar=aiohttp.DummyCookieJar()
+            )
+        yield
+        if app_aiohttp_session:
+            await ctx.aiohttp_session.close()
+
+    app = fastapi.FastAPI(lifespan=_app_lifespan)
+    ctx = AppContext[_PipelineT](config=app_config)
+
+    @app.get("/health", operation_id="checkHealth")
+    async def _check_health() -> Response:
+        return Response(logId=generate_log_id(), errorCode=0, errorMsg="Healthy")
+
+    @app.exception_handler(RequestValidationError)
+    async def _validation_exception_handler(
+        request: fastapi.Request, exc: RequestValidationError
+    ) -> JSONResponse:
+        json_compatible_data = jsonable_encoder(
+            Response(
+                logId=generate_log_id(),
+                errorCode=422,
+                errorMsg=json.dumps(exc.errors()),
+            )
+        )
+        return JSONResponse(content=json_compatible_data, status_code=422)
+
+    @app.exception_handler(HTTPException)
+    async def _http_exception_handler(
+        request: fastapi.Request, exc: HTTPException
+    ) -> JSONResponse:
+        json_compatible_data = jsonable_encoder(
+            Response(
+                logId=generate_log_id(), errorCode=exc.status_code, errorMsg=exc.detail
+            )
+        )
+        return JSONResponse(content=json_compatible_data, status_code=exc.status_code)
+
+    return app, ctx

+ 80 - 0
paddlex/inference/pipelines/serving/file_storage.py

@@ -0,0 +1,80 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import base64
+import uuid
+from typing import Any, Dict, Literal, Optional, Union
+
+from baidubce.auth.bce_credentials import BceCredentials
+from baidubce.bce_client_configuration import BceClientConfiguration
+from baidubce.services.bos.bos_client import BosClient
+from pydantic import BaseModel, Discriminator, SecretStr, TypeAdapter
+from typing_extensions import Annotated, assert_never
+
+
+class InMemoryStorageConfig(BaseModel):
+    type: Literal["memory"] = "memory"
+
+
+class BOSConfig(BaseModel):
+    endpoint: str
+    ak: SecretStr
+    sk: SecretStr
+    bucket_name: str
+    key_prefix: Optional[str] = None
+    connection_timeout_in_mills: Optional[int] = None
+
+    type: Literal["bos"] = "bos"
+
+
+FileStorageConfig = Union[InMemoryStorageConfig, BOSConfig]
+
+
+def parse_file_storage_config(dic: Dict[str, Any]) -> FileStorageConfig:
+    # XXX: mypy deduces a wrong type
+    return TypeAdapter(
+        Annotated[FileStorageConfig, Discriminator("type")]
+    ).validate_python(
+        dic
+    )  # type: ignore[return-value]
+
+
+def postprocess_file(
+    file: bytes, config: FileStorageConfig, key: Optional[str] = None
+) -> str:
+    if config.type == "memory":
+        return base64.b64encode(file).decode("ascii")
+    elif config.type == "bos":
+        # TODO: Currently BOS clients are created on the fly since they are not
+        # thread-safe. Should we use a background thread with a queue or use a
+        # dedicated thread?
+        bos_cfg = BceClientConfiguration(
+            credentials=BceCredentials(
+                config.ak.get_secret_value(), config.sk.get_secret_value()
+            ),
+            endpoint=config.endpoint,
+            connection_timeout_in_mills=config.connection_timeout_in_mills,
+        )
+        client = BosClient(bos_cfg)
+        if key is None:
+            key = str(uuid.uuid4())
+        if config.key_prefix:
+            key = f"{config.key_prefix}{key}"
+        client.put_object_from_string(bucket=config.bucket_name, key=key, data=file)
+        url = client.generate_pre_signed_url(
+            config.bucket_name, key, expiration_in_seconds=-1
+        ).decode("ascii")
+        return url
+    else:
+        assert_never(config.type)

+ 30 - 0
paddlex/inference/pipelines/serving/models.py

@@ -0,0 +1,30 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Generic, TypeVar
+
+from pydantic import BaseModel
+
+
+class Response(BaseModel):
+    logId: str
+    errorCode: int
+    errorMsg: str
+
+
+ResultT = TypeVar("ResultT", bound=BaseModel)
+
+
+class ResultResponse(Response, Generic[ResultT]):
+    result: ResultT

+ 21 - 0
paddlex/inference/pipelines/serving/server.py

@@ -0,0 +1,21 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import uvicorn
+from fastapi import FastAPI
+
+
+def run_server(app: FastAPI, *, host: str, port: int, debug: bool) -> None:
+    # XXX: Currently, `debug` is not used.
+    uvicorn.run(app, host=host, port=port, log_level="info")

+ 110 - 0
paddlex/inference/pipelines/serving/utils.py

@@ -0,0 +1,110 @@
+# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import base64
+import io
+import uuid
+from functools import partial
+from typing import Awaitable, Callable, List, Optional, TypeVar
+from urllib.parse import urlparse
+
+import aiohttp
+import cv2
+import fitz
+import numpy as np
+import pandas as pd
+import yarl
+from PIL import Image
+from typing_extensions import ParamSpec
+
+_P = ParamSpec("_P")
+_R = TypeVar("_R")
+
+
+def generate_log_id() -> str:
+    return str(uuid.uuid4())
+
+
+def is_url(s: str) -> bool:
+    if not (s.startswith("http://") or s.startswith("https://")):
+        # Quick rejection
+        return False
+    result = urlparse(s)
+    return all([result.scheme, result.netloc]) and result.scheme in ("http", "https")
+
+
+async def get_raw_bytes(file: str, session: aiohttp.ClientSession) -> bytes:
+    if is_url(file):
+        async with session.get(yarl.URL(file, encoded=True)) as resp:
+            return await resp.read()
+    else:
+        return base64.b64decode(file)
+
+
+def image_bytes_to_array(data: bytes) -> np.ndarray:
+    return cv2.imdecode(np.frombuffer(data, np.uint8), cv2.IMREAD_COLOR)
+
+
+def image_to_base64(image: Image.Image) -> str:
+    with io.BytesIO() as f:
+        image.save(f, format="JPEG")
+        image_base64 = base64.b64encode(f.getvalue()).decode("ascii")
+    return image_base64
+
+
+def csv_bytes_to_data_frame(data: bytes) -> pd.DataFrame:
+    with io.StringIO(data.decode("utf-8")) as f:
+        df = pd.read_csv(f)
+    return df
+
+
+def data_frame_to_base64(df: str) -> str:
+    return base64.b64encode(df.to_csv().encode("utf-8")).decode("ascii")
+
+
+def read_pdf(
+    bytes_: bytes, resize: bool = False, max_num_imgs: Optional[int] = None
+) -> List[np.ndarray]:
+    images: List[np.ndarray] = []
+    img_size = None
+    with fitz.open("pdf", bytes_) as doc:
+        for page in doc:
+            if max_num_imgs is not None and len(images) >= max_num_imgs:
+                break
+            # TODO: Do not always use zoom=2.0
+            zoom = 2.0
+            deg = 0
+            mat = fitz.Matrix(zoom, zoom).prerotate(deg)
+            pixmap = page.get_pixmap(matrix=mat, alpha=False)
+            image = np.frombuffer(pixmap.samples, dtype=np.uint8).reshape(
+                pixmap.h, pixmap.w, pixmap.n
+            )
+            image = np.ascontiguousarray(image[..., ::-1])
+            if resize:
+                if img_size is None:
+                    img_size = (image.shape[1], image.shape[0])
+                else:
+                    if (image.shape[1], image.shape[0]) != img_size:
+                        image = cv2.resize(image, img_size)
+            images.append(image)
+    return images
+
+
+def call_async(
+    func: Callable[_P, _R], /, *args: _P.args, **kwargs: _P.kwargs
+) -> Awaitable[_R]:
+    return asyncio.get_running_loop().run_in_executor(
+        None, partial(func, *args, **kwargs)
+    )

+ 31 - 1
paddlex/inference/pipelines/single_model_pipeline.py

@@ -35,38 +35,68 @@ class _SingleModelPipeline(BasePipeline):
 class ImageClassification(_SingleModelPipeline):
     entities = "image_classification"
 
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
 
 class ObjectDetection(_SingleModelPipeline):
     entities = "object_detection"
 
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
 
 class InstanceSegmentation(_SingleModelPipeline):
     entities = "instance_segmentation"
 
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
 
 class SemanticSegmentation(_SingleModelPipeline):
     entities = "semantic_segmentation"
 
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
 
 class TSFc(_SingleModelPipeline):
     entities = "ts_fc"
 
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
 
 class TSAd(_SingleModelPipeline):
     entities = "ts_ad"
 
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
 
 class TSCls(_SingleModelPipeline):
     entities = "ts_cls"
 
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
 
 class MultiLableImageClas(_SingleModelPipeline):
     entities = "multi_label_image_classification"
 
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
 
 class SmallObjDet(_SingleModelPipeline):
     entities = "small_object_detection"
 
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
 
-class AnomolyDetection(_SingleModelPipeline):
+
+class AnomalyDetection(_SingleModelPipeline):
     entities = "anomaly_detection"
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)

+ 2 - 0
paddlex/inference/pipelines/table_recognition/utils.py

@@ -297,6 +297,8 @@ def get_ori_coordinate_for_table(x, y, table_bbox):
     Returns:
         list: list of original coordinates, eg. [[x1, y1, x2, y2, x3, y3, x4, y4]]
     """
+    if not table_bbox:
+        return table_bbox
     offset = np.array([x, y] * 4)
     table_bbox = np.array(table_bbox)
     if table_bbox.shape[-1] == 2:

+ 3 - 1
paddlex/modules/text_detection/model_list.py

@@ -16,5 +16,7 @@ MODELS = [
     "PP-OCRv4_mobile_det",
     "PP-OCRv4_server_det",
     "PP-OCRv4_mobile_seal_det",
-    "PP-OCRv4_server_seal_det"
+    "PP-OCRv4_server_seal_det",
 ]
+
+CURVE_MODELS = ["PP-OCRv4_mobile_seal_det", "PP-OCRv4_server_seal_det"]

+ 87 - 16
paddlex/paddlex_cli.py

@@ -14,15 +14,37 @@
 
 import os
 import argparse
-import textwrap
-from types import SimpleNamespace
+import subprocess
+import sys
+import tempfile
 
 from . import create_pipeline
+from .inference.pipelines import create_pipeline_from_config, load_pipeline_config
 from .repo_manager import setup, get_all_supported_repo_names
 from .utils import logging
 from .utils.interactive_get_pipeline import interactive_get_pipeline
 
 
+def _install_serving_deps():
+    SERVING_DEPS = [
+        "aiohttp>=3.9",
+        "bce-python-sdk>=0.8",
+        "fastapi>=0.110",
+        "pydantic>=2",
+        "starlette>=0.36",
+        "typing_extensions>=4.11",
+        "uvicorn>=0.16",
+        "yarl>=1.9",
+    ]
+    with tempfile.NamedTemporaryFile("w", suffix=".txt", encoding="utf-8") as f:
+        for dep in SERVING_DEPS:
+            f.write(dep + "\n")
+        f.flush()
+        return subprocess.check_call(
+            [sys.executable, "-m", "pip", "install", "-r", f.name]
+        )
+
+
 def args_cfg():
     """parse cli arguments"""
 
@@ -43,7 +65,7 @@ def args_cfg():
 
     ################# install pdx #################
     parser.add_argument("--install", action="store_true", default=False, help="")
-    parser.add_argument("devkits", nargs="*", default=[])
+    parser.add_argument("plugins", nargs="*", default=[])
     parser.add_argument("--no_deps", action="store_true")
     parser.add_argument("--platform", type=str, default="github.com")
     parser.add_argument(
@@ -65,8 +87,16 @@ def args_cfg():
     parser.add_argument("--input", type=str, default=None, help="")
     parser.add_argument("--save_path", type=str, default=None, help="")
     parser.add_argument("--device", type=str, default=None, help="")
+    parser.add_argument("--use_hpip", action="store_true")
+    parser.add_argument("--serial_number", type=str)
+    parser.add_argument("--update_license", action="store_true")
     parser.add_argument("--get_pipeline_config", type=str, default=None, help="")
 
+    ################# serving #################
+    parser.add_argument("--serve", action="store_true")
+    parser.add_argument("--host", type=str, default="0.0.0.0")
+    parser.add_argument("--port", type=int, default=8080)
+
     return parser.parse_args()
 
 
@@ -77,22 +107,38 @@ def install(args):
     # Disable eager initialization
     os.environ["PADDLE_PDX_EAGER_INIT"] = "False"
 
-    repo_names = args.devkits
-    if len(repo_names) == 0:
-        repo_names = get_all_supported_repo_names()
-    setup(
-        repo_names=repo_names,
-        no_deps=args.no_deps,
-        platform=args.platform,
-        update_repos=args.update_repos,
-        use_local_repos=args.use_local_repos,
-    )
-    return
+    plugins = args.plugins[:]
+
+    if "serving" in plugins:
+        plugins.remove("serving")
+        _install_serving_deps()
+
+    if plugins:
+        repo_names = plugins
+        if len(repo_names) == 0:
+            repo_names = get_all_supported_repo_names()
+        setup(
+            repo_names=repo_names,
+            no_deps=args.no_deps,
+            platform=args.platform,
+            update_repos=args.update_repos,
+            use_local_repos=args.use_local_repos,
+        )
+        return
+
 
+def _get_hpi_params(serial_number, update_license):
+    return {"serial_number": serial_number, "update_license": update_license}
 
-def pipeline_predict(pipeline, input, device=None, save_path=None):
+
+def pipeline_predict(
+    pipeline, input, device, save_path, use_hpip, serial_number, update_license
+):
     """pipeline predict"""
-    pipeline = create_pipeline(pipeline, device=device)
+    hpi_params = _get_hpi_params(serial_number, update_license)
+    pipeline = create_pipeline(
+        pipeline, device=device, use_hpip=use_hpip, hpi_params=hpi_params
+    )
     result = pipeline(input)
     for res in result:
         res.print(json_format=False)
@@ -100,12 +146,34 @@ def pipeline_predict(pipeline, input, device=None, save_path=None):
             res.save_all(save_path=save_path)
 
 
+def serve(pipeline, *, device, use_hpip, serial_number, update_license, host, port):
+    from .inference.pipelines.serving import create_pipeline_app, run_server
+
+    hpi_params = _get_hpi_params(serial_number, update_license)
+    pipeline_config = load_pipeline_config(pipeline)
+    pipeline = create_pipeline_from_config(
+        pipeline_config, device=device, use_hpip=use_hpip, hpi_params=hpi_params
+    )
+    app = create_pipeline_app(pipeline, pipeline_config)
+    run_server(app, host=host, port=port, debug=False)
+
+
 # for CLI
 def main():
     """API for commad line"""
     args = args_cfg()
     if args.install:
         install(args)
+    elif args.serve:
+        serve(
+            args.pipeline,
+            device=args.device,
+            use_hpip=args.use_hpip,
+            serial_number=args.serial_number,
+            update_license=args.update_license,
+            host=args.host,
+            port=args.port,
+        )
     else:
         if args.get_pipeline_config is not None:
             interactive_get_pipeline(args.get_pipeline_config)
@@ -115,4 +183,7 @@ def main():
                 args.input,
                 args.device,
                 args.save_path,
+                use_hpip=args.use_hpip,
+                serial_number=args.serial_number,
+                update_license=args.update_license,
             )

+ 0 - 1
paddlex/pipelines/OCR.yaml

@@ -6,4 +6,3 @@ Pipeline:
   text_det_model: PP-OCRv4_mobile_det
   text_rec_model: PP-OCRv4_mobile_rec
   text_rec_batch_size: 1
-  device: "gpu:0"

+ 2 - 3
paddlex/pipelines/PP-ChatOCRv3-doc.yaml

@@ -13,8 +13,8 @@ Pipeline:
   llm_name: "ernie-3.5"
   llm_params: 
     api_type: qianfan
-    ak: 
-    sk: 
+    ak: "api_key" # Set this to a real API key
+    sk: "secret_key"  # Set this to a real secret key
   task_prompt_yaml: None
   user_prompt_yaml:
   layout_batch_size: 1
@@ -25,4 +25,3 @@ Pipeline:
   curve_batch_size: 1
   oricls_batch_size: 1
   recovery: True
-  device: "gpu"

+ 0 - 1
paddlex/pipelines/anomaly_detection.yaml

@@ -5,4 +5,3 @@ Global:
 Pipeline:
   model: STFPM
   batch_size: 1
-  device: "gpu:0"

+ 0 - 1
paddlex/pipelines/image_classification.yaml

@@ -5,4 +5,3 @@ Global:
 Pipeline:
   model: PP-LCNet_x0_5
   batch_size: 1
-  device: "gpu:0"

+ 0 - 1
paddlex/pipelines/instance_segmentation.yaml

@@ -5,4 +5,3 @@ Global:
 Pipeline:
   model: Mask-RT-DETR-S
   batch_size: 1
-  device: "gpu:0"

+ 0 - 1
paddlex/pipelines/multi_label_image_classification.yaml

@@ -5,4 +5,3 @@ Global:
 Pipeline:
   model: PP-LCNet_x1_0_ML
   batch_size: 1
-  device: "gpu:0"

+ 0 - 1
paddlex/pipelines/object_detection.yaml

@@ -5,4 +5,3 @@ Global:
 Pipeline:
   model: PicoDet-S
   batch_size: 1
-  device: "gpu:0"

+ 0 - 1
paddlex/pipelines/semantic_segmentation.yaml

@@ -5,4 +5,3 @@ Global:
 Pipeline:
   model: PP-LiteSeg-T
   batch_size: 1
-  device: "gpu:0"

+ 0 - 1
paddlex/pipelines/small_object_detection.yaml

@@ -5,4 +5,3 @@ Global:
 Pipeline:
   model: PP-YOLOE_plus_SOD-L
   batch_size: 1
-  device: "gpu:0"

+ 0 - 1
paddlex/pipelines/table_recognition.yaml

@@ -10,4 +10,3 @@ Pipeline:
   layout_batch_size: 1
   text_rec_batch_size: 1
   table_batch_size: 1
-  device: "gpu:0"

+ 0 - 1
paddlex/pipelines/ts_ad.yaml

@@ -5,4 +5,3 @@ Global:
 Pipeline:
   model: DLinear_ad
   batch_size: 1
-  device: "gpu:0"

+ 0 - 1
paddlex/pipelines/ts_cls.yaml

@@ -5,4 +5,3 @@ Global:
 Pipeline:
   model: TimesNet_cls
   batch_size: 1
-  device: "gpu:0"

+ 0 - 1
paddlex/pipelines/ts_fc.yaml

@@ -5,4 +5,3 @@ Global:
 Pipeline:
   model: DLinear
   batch_size: 1
-  device: "gpu:0"

+ 5 - 0
paddlex/utils/logging.py

@@ -78,6 +78,11 @@ def critical(msg, *args, **kwargs):
     _logger.critical(msg, *args, **kwargs)
 
 
+def exception(msg, *args, **kwargs):
+    """exception"""
+    _logger.exception(msg, *args, **kwargs)
+
+
 def setup_logging(verbosity: str = None):
     """setup logging level
 

+ 1 - 1
setup.py

@@ -88,7 +88,7 @@ if __name__ == "__main__":
         package_data=pkg_data,
         entry_points={
             "console_scripts": [
-                "paddlex = paddlex.paddlex_cli:main",
+                "paddlex = paddlex.__main__:console_entry",
             ],
         },
         # PyPI package information