Oven logo

Oven

Published

An integration package connecting Elasticsearch and LangChain

pip install langchain-elasticsearch

Package Downloads

Weekly DownloadsMonthly Downloads

Authors

Requires Python

<4.0,>=3.9

langchain-elasticsearch

This package contains the LangChain integration with Elasticsearch.

Installation

pip install -U langchain-elasticsearch

Elasticsearch setup

Elastic Cloud

You need a running Elasticsearch deployment. The easiest way to start one is through Elastic Cloud. You can sign up for a free trial.

  1. Create a deployment
  2. Get your Cloud ID:
    1. In the Elastic Cloud console, click "Manage" next to your deployment
    2. Copy the Cloud ID and paste it into the es_cloud_id parameter below
  3. Create an API key:
    1. In the Elastic Cloud console, click "Open" next to your deployment
    2. In the left-hand side menu, go to "Stack Management", then to "API Keys"
    3. Click "Create API key"
    4. Enter a name for the API key and click "Create"
    5. Copy the API key and paste it into the es_api_key parameter below

Elastic Cloud

Alternatively, you can run Elasticsearch via Docker as described in the docs.

Usage

ElasticsearchStore

The ElasticsearchStore class exposes Elasticsearch as a vector store.

from langchain_elasticsearch import ElasticsearchStore

embeddings = ... # use a LangChain Embeddings class or ElasticsearchEmbeddings

vectorstore = ElasticsearchStore(
    es_cloud_id="your-cloud-id",
    es_api_key="your-api-key",
    index_name="your-index-name",
    embeddings=embeddings,
)

ElasticsearchRetriever

The ElasticsearchRetriever class can be user to implement more complex queries. This can be useful for power users and necessary if data was ingested outside of LangChain (for example using a web crawler).

def fuzzy_query(search_query: str) -> Dict:
    return {
        "query": {
            "match": {
                text_field: {
                    "query": search_query,
                    "fuzziness": "AUTO",
                }
            },
        },
    }


fuzzy_retriever = ElasticsearchRetriever.from_es_params(
    es_cloud_id="your-cloud-id",
    es_api_key="your-api-key",
    index_name="your-index-name",
    body_func=fuzzy_query,
    content_field=text_field,
)

fuzzy_retriever.get_relevant_documents("fooo")

ElasticsearchEmbeddings

The ElasticsearchEmbeddings class provides an interface to generate embeddings using a model deployed in an Elasticsearch cluster.

from langchain_elasticsearch import ElasticsearchEmbeddings

embeddings = ElasticsearchEmbeddings.from_credentials(
    model_id="your-model-id",
    input_field="your-input-field",
    es_cloud_id="your-cloud-id",
    es_api_key="your-api-key",
)

ElasticsearchChatMessageHistory

The ElasticsearchChatMessageHistory class stores chat histories in Elasticsearch.

from langchain_elasticsearch import ElasticsearchChatMessageHistory

chat_history = ElasticsearchChatMessageHistory(
    index="your-index-name",
    session_id="your-session-id",
    es_cloud_id="your-cloud-id",
    es_api_key="your-api-key",
)

ElasticsearchCache

A caching layer for LLMs that uses Elasticsearch.

Simple example:

from langchain.globals import set_llm_cache

from langchain_elasticsearch import ElasticsearchCache

set_llm_cache(
    ElasticsearchCache(
        es_url="http://localhost:9200",
        index_name="llm-chat-cache",
        metadata={"project": "my_chatgpt_project"},
    )
)

The index_name parameter can also accept aliases. This allows to use the ILM: Manage the index lifecycle that we suggest to consider for managing retention and controlling cache growth.

Look at the class docstring for all parameters.

Index the generated text

The cached data won't be searchable by default. The developer can customize the building of the Elasticsearch document in order to add indexed text fields, where to put, for example, the text generated by the LLM.

This can be done by subclassing end overriding methods. The new cache class can be applied also to a pre-existing cache index:

import json
from typing import Any, Dict, List

from langchain.globals import set_llm_cache
from langchain_core.caches import RETURN_VAL_TYPE

from langchain_elasticsearch import ElasticsearchCache


class SearchableElasticsearchCache(ElasticsearchCache):
    @property
    def mapping(self) -> Dict[str, Any]:
        mapping = super().mapping
        mapping["mappings"]["properties"]["parsed_llm_output"] = {
            "type": "text",
            "analyzer": "english",
        }
        return mapping

    def build_document(
        self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE
    ) -> Dict[str, Any]:
        body = super().build_document(prompt, llm_string, return_val)
        body["parsed_llm_output"] = self._parse_output(body["llm_output"])
        return body

    @staticmethod
    def _parse_output(data: List[str]) -> List[str]:
        return [
            json.loads(output)["kwargs"]["message"]["kwargs"]["content"]
            for output in data
        ]


set_llm_cache(
    SearchableElasticsearchCache(
       es_url="http://localhost:9200", 
       index_name="llm-chat-cache"
    )
)

When overriding the mapping and the document building, please only make additive modifications, keeping the base mapping intact.

ElasticsearchEmbeddingsCache

Store and temporarily cache embeddings.

Caching embeddings is obtained by using the CacheBackedEmbeddings, it can be instantiated using CacheBackedEmbeddings.from_bytes_store method.

from langchain.embeddings import CacheBackedEmbeddings
from langchain_openai import OpenAIEmbeddings

from langchain_elasticsearch import ElasticsearchEmbeddingsCache

underlying_embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

store = ElasticsearchEmbeddingsCache(
    es_url="http://localhost:9200",
    index_name="llm-chat-cache",
    metadata={"project": "my_chatgpt_project"},
    namespace="my_chatgpt_project",
)

embeddings = CacheBackedEmbeddings.from_bytes_store(
    underlying_embeddings=OpenAIEmbeddings(),
    document_embedding_cache=store,
    query_embedding_cache=store,
)

Similarly to the chat cache, one can subclass ElasticsearchEmbeddingsCache in order to index vectors for search.

from typing import Any, Dict, List
from langchain_elasticsearch import ElasticsearchEmbeddingsCache

class SearchableElasticsearchStore(ElasticsearchEmbeddingsCache):
    @property
    def mapping(self) -> Dict[str, Any]:
        mapping = super().mapping
        mapping["mappings"]["properties"]["vector"] = {
            "type": "dense_vector",
            "dims": 1536,
            "index": True,
            "similarity": "dot_product",
        }
        return mapping

    def build_document(self, llm_input: str, vector: List[float]) -> Dict[str, Any]:
        body = super().build_document(llm_input, vector)
        body["vector"] = vector
        return body