Skip to content

openai.embedders

A module for calling OpenAI's Embeddings models.

BaseEmbedder

Bases: BaseModel, Generic[BaseEmbeddingT], ABC

The base class abstract interface for interacting with LLM embeddings.

Source code in mirascope/rag/embedders.py
class BaseEmbedder(BaseModel, Generic[BaseEmbeddingT], ABC):
    """The base class abstract interface for interacting with LLM embeddings."""

    api_key: ClassVar[Optional[str]] = None
    base_url: ClassVar[Optional[str]] = None
    embedding_params: ClassVar[BaseEmbeddingParams] = BaseEmbeddingParams(
        model="text-embedding-ada-002"
    )
    dimensions: Optional[int] = None
    configuration: ClassVar[BaseConfig] = BaseConfig(llm_ops=[], client_wrappers=[])
    _provider: ClassVar[str] = "base"

    @abstractmethod
    def embed(self, input: list[str]) -> BaseEmbeddingT:
        """A call to the embedder with a single input"""
        ...  # pragma: no cover

    @abstractmethod
    async def embed_async(self, input: list[str]) -> BaseEmbeddingT:
        """Asynchronously call the embedder with a single input"""
        ...  # pragma: no cover

embed(input) abstractmethod

A call to the embedder with a single input

Source code in mirascope/rag/embedders.py
@abstractmethod
def embed(self, input: list[str]) -> BaseEmbeddingT:
    """A call to the embedder with a single input"""
    ...  # pragma: no cover

embed_async(input) abstractmethod async

Asynchronously call the embedder with a single input

Source code in mirascope/rag/embedders.py
@abstractmethod
async def embed_async(self, input: list[str]) -> BaseEmbeddingT:
    """Asynchronously call the embedder with a single input"""
    ...  # pragma: no cover

OpenAIEmbedder

Bases: BaseEmbedder[OpenAIEmbeddingResponse]

OpenAI Embedder

Example:

import os
from mirascope.openai import OpenAIEmbedder

os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

openai_embedder = OpenAIEmbedder()
response = openai_embedder.embed(["your text to embed"])
print(response)
Source code in mirascope/openai/embedders.py
class OpenAIEmbedder(BaseEmbedder[OpenAIEmbeddingResponse]):
    """OpenAI Embedder

    Example:

    ```python
    import os
    from mirascope.openai import OpenAIEmbedder

    os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"

    openai_embedder = OpenAIEmbedder()
    response = openai_embedder.embed(["your text to embed"])
    print(response)
    ```
    """

    dimensions: Optional[int] = 1536
    embed_batch_size: Optional[int] = 20
    max_workers: Optional[int] = 64
    embedding_params: ClassVar[OpenAIEmbeddingParams] = OpenAIEmbeddingParams(
        model="text-embedding-3-small"
    )
    _provider: ClassVar[str] = "openai"

    def embed(self, inputs: list[str]) -> OpenAIEmbeddingResponse:
        """Call the embedder with multiple inputs"""
        if self.embed_batch_size is None:
            return self._embed(inputs)

        input_batches = [
            inputs[i : i + self.embed_batch_size]
            for i in range(0, len(inputs), self.embed_batch_size)
        ]

        embedding_responses: list[OpenAIEmbeddingResponse] = [
            response
            for response in ThreadPoolExecutor(self.max_workers).map(
                lambda inputs: self._embed(inputs),
                input_batches,
            )
        ]
        return self._merge_batch_embeddings(embedding_responses)

    async def embed_async(self, inputs: list[str]) -> OpenAIEmbeddingResponse:
        """Asynchronously call the embedder with multiple inputs"""
        if self.embed_batch_size is None:
            return await self._embed_async(inputs)

        input_batches = [
            inputs[i : i + self.embed_batch_size]
            for i in range(0, len(inputs), self.embed_batch_size)
        ]
        embedding_responses: list[OpenAIEmbeddingResponse] = await asyncio.gather(
            *[self._embed_async(inputs) for inputs in input_batches]
        )
        return self._merge_batch_embeddings(embedding_responses)

    def __call__(self, input: list[str]) -> list[list[float]]:
        """Call the embedder with a input

        Chroma expects parameter to be `input`.
        """
        embedding_response = self.embed(input)

        return embedding_response.embeddings

    ############################## PRIVATE METHODS ###################################

    def _embed(self, inputs: list[str]) -> OpenAIEmbeddingResponse:
        """Call the embedder with a single input"""
        client = get_wrapped_client(
            OpenAI(api_key=self.api_key, base_url=self.base_url), self
        )
        kwargs = self.embedding_params.kwargs()
        if self.embedding_params.model != "text-embedding-ada-002":
            kwargs["dimensions"] = self.dimensions
        start_time = datetime.datetime.now().timestamp() * 1000
        embeddings = client.embeddings.create(input=inputs, **kwargs)
        return OpenAIEmbeddingResponse(
            response=embeddings,
            start_time=start_time,
            end_time=datetime.datetime.now().timestamp() * 1000,
        )

    async def _embed_async(self, inputs: list[str]) -> OpenAIEmbeddingResponse:
        """Asynchronously call the embedder with a single input"""
        client = get_wrapped_async_client(
            AsyncOpenAI(api_key=self.api_key, base_url=self.base_url), self
        )
        kwargs = self.embedding_params.kwargs()
        if self.embedding_params.model != "text-embedding-ada-002":
            kwargs["dimensions"] = self.dimensions
        start_time = datetime.datetime.now().timestamp() * 1000
        embeddings = await client.embeddings.create(input=inputs, **kwargs)
        return OpenAIEmbeddingResponse(
            response=embeddings,
            start_time=start_time,
            end_time=datetime.datetime.now().timestamp() * 1000,
        )

    def _merge_batch_embeddings(
        self, openai_embeddings: list[OpenAIEmbeddingResponse]
    ) -> OpenAIEmbeddingResponse:
        """Merge a batch of embeddings into a single embedding"""
        embeddings: list[Embedding] = []
        usage = Usage(
            prompt_tokens=0,
            total_tokens=0,
        )
        start_time = float("inf")
        end_time: float = 0.0
        i: int = 0
        for openai_embedding in openai_embeddings:
            for embedding in openai_embedding.response.data:
                embedding.index = i
                embeddings.append(embedding)
                i += 1
            usage.prompt_tokens += openai_embedding.response.usage.prompt_tokens
            usage.total_tokens += openai_embedding.response.usage.total_tokens
            start_time = min(start_time, openai_embedding.start_time)
            end_time = max(end_time, openai_embedding.end_time)
        create_embedding_response = CreateEmbeddingResponse(
            data=embeddings,
            model=openai_embeddings[0].response.model,
            object=openai_embeddings[0].response.object,
            usage=usage,
        )
        return OpenAIEmbeddingResponse(
            response=create_embedding_response,
            start_time=start_time,
            end_time=end_time,
        )

embed(inputs)

Call the embedder with multiple inputs

Source code in mirascope/openai/embedders.py
def embed(self, inputs: list[str]) -> OpenAIEmbeddingResponse:
    """Call the embedder with multiple inputs"""
    if self.embed_batch_size is None:
        return self._embed(inputs)

    input_batches = [
        inputs[i : i + self.embed_batch_size]
        for i in range(0, len(inputs), self.embed_batch_size)
    ]

    embedding_responses: list[OpenAIEmbeddingResponse] = [
        response
        for response in ThreadPoolExecutor(self.max_workers).map(
            lambda inputs: self._embed(inputs),
            input_batches,
        )
    ]
    return self._merge_batch_embeddings(embedding_responses)

embed_async(inputs) async

Asynchronously call the embedder with multiple inputs

Source code in mirascope/openai/embedders.py
async def embed_async(self, inputs: list[str]) -> OpenAIEmbeddingResponse:
    """Asynchronously call the embedder with multiple inputs"""
    if self.embed_batch_size is None:
        return await self._embed_async(inputs)

    input_batches = [
        inputs[i : i + self.embed_batch_size]
        for i in range(0, len(inputs), self.embed_batch_size)
    ]
    embedding_responses: list[OpenAIEmbeddingResponse] = await asyncio.gather(
        *[self._embed_async(inputs) for inputs in input_batches]
    )
    return self._merge_batch_embeddings(embedding_responses)

OpenAIEmbeddingResponse

Bases: BaseEmbeddingResponse[CreateEmbeddingResponse]

A convenience wrapper around the OpenAI CreateEmbeddingResponse response.

Source code in mirascope/openai/types.py
class OpenAIEmbeddingResponse(BaseEmbeddingResponse[CreateEmbeddingResponse]):
    """A convenience wrapper around the OpenAI `CreateEmbeddingResponse` response."""

    @property
    def embeddings(self) -> list[list[float]]:
        """Returns the raw embeddings."""
        embeddings_model: list[Embedding] = [
            embedding for embedding in self.response.data
        ]
        return [embedding.embedding for embedding in embeddings_model]

embeddings: list[list[float]] property

Returns the raw embeddings.

get_wrapped_async_client(client, self)

Get a wrapped async client.

Source code in mirascope/base/ops_utils.py
def get_wrapped_async_client(client: T, self: Union[BaseCall, BaseEmbedder]) -> T:
    """Get a wrapped async client."""
    if self.configuration.client_wrappers:
        for op in self.configuration.client_wrappers:
            if op == "langfuse":  # pragma: no cover
                from langfuse.openai import AsyncOpenAI as LangfuseAsyncOpenAI

                client = LangfuseAsyncOpenAI(
                    api_key=self.api_key, base_url=self.base_url
                )
            elif op == "logfire":  # pragma: no cover
                import logfire

                if self._provider == "openai":
                    logfire.instrument_openai(client)  # type: ignore
                elif self._provider == "anthropic":
                    logfire.instrument_anthropic(client)  # type: ignore
            elif callable(op):
                client = op(client)
    return client

get_wrapped_client(client, self)

Get a wrapped client.

Source code in mirascope/base/ops_utils.py
def get_wrapped_client(client: T, self: Union[BaseCall, BaseEmbedder]) -> T:
    """Get a wrapped client."""
    if self.configuration.client_wrappers:
        for op in self.configuration.client_wrappers:  # pragma: no cover
            if op == "langfuse":
                from langfuse.openai import OpenAI as LangfuseOpenAI

                client = LangfuseOpenAI(api_key=self.api_key, base_url=self.base_url)
            elif op == "logfire":  # pragma: no cover
                import logfire

                if self._provider == "openai":
                    logfire.instrument_openai(client)  # type: ignore
                elif self._provider == "anthropic":
                    logfire.instrument_anthropic(client)  # type: ignore
            elif callable(op):
                client = op(client)
    return client