Launch Week🚀 Day 2: Introducing Integrated InferenceLearn more
Preview Mode ()

Introduction

RAG applications benefit from leveraging big datasets - the more information available to the application, the higher the likelihood to produce accurate answers to the users’ queries. But handling big datasets comes with a set of challenges, which we’ll explore in this post. We'll walk through creating a Retrieval-Augmented Generation (RAG) application using Pinecone serverless, Anyscale, and Langchain. We'll be using a moderately sized dataset from Cohere, which consists of 35 million entries from Wikipedia.

Data Loading with Ray and Anyscale

When dealing with larger data sets like the one we’ll be working on, we have to consider how to process the data in a reasonable amount of time, and doing so in a way that would be reliable such that it doesn’t break in the middle. Fortunately, there are open source tools as well as cloud solutions that can help us achieve this:

Ray is an open-source framework that provides a simple, universal API for building distributed applications. It is designed to scale Python applications from a single machine to a large cluster with minimal effort. Ray is widely used for machine learning and other computationally intensive tasks, as it can efficiently distribute these tasks across a cluster of machines.

Anyscale offers a cloud service based on the Ray framework. It simplifies the process of deploying, scaling, and managing Ray applications in the cloud. Anyscale's platform enables users to seamlessly scale their Ray applications from a laptop to the cloud without needing to manage the underlying infrastructure, making distributed computing more accessible and efficient.

When dealing with small datasets, we can rely on local operations with Ray. It could even help us do some local optimization to achieve fast ingestion speeds. With that said, when we start dealing with much bigger datasets, we have to parallelize our operation to complete the ingestion in a reasonable amount of time. In our case, the dataset is about 120gb, so we’ll use Anyscale to quick ingest the data. Note, that you can run this code locally without using Anyscale - and it will simply run it on a single worker (Full code listing)

We’ll start by installing some dependencies:

!pip install --user pinecone-client[grpc]==3.0.0.dev4 pandas numpy pyarrow python-dotenv tenacity ray

Next, we’ll import all the libraries:

import ray
import os
import pandas as pd
import uuid
import json
import numpy as np
import logging
import requests
import json
from time import time
from random import shuffle
from datetime import datetime
from tenacity import retry, stop_after_attempt, wait_exponential
from dotenv import load_dotenv

We’ll create a .env file that will hold the necessary credentials:

PINECONE_API_KEY=...

And then load the file with load_dotenv:

load_dotenv()

We’ll add a reference to the Pinecone API in the Ray runtime:

ray.init(
    runtime_env={
        "env_vars": {
          "PINECONE_API_KEY": os.getenv("PINECONE_API_KEY"),
        }
    },
)

The env_vars key in the runtime_env dictionary is used to set environment variables for the Ray runtime. This means that the PINECONE_API_KEY environment variable will be available to all tasks and actors running in the Ray runtime, and they can access it’s value using os.getenv.

Next, let’s load the data:

url = "https://huggingface.co/api/datasets/Cohere/wikipedia-22-12-en-embeddings/parquet/default/train"
response = requests.get(url)
input_files = json.loads(response.content)
columns = ['id', 'title', 'text', 'url', 'emb',] 
ds = ray.data.read_parquet(input_files, columns=columns)

This will load the list of parquet files associated with the dataset, and the use ray to load the data into a Ray dataset object.

Upsert the data

Next, we create a new serverless index where our vectors will be stored.

from pinecone.grpc import PineconeGRPC
from pinecone import ServerlessSpec

pc = PineconeGRPC()
index_name = 'cohere-wikipedia'

# make sure the index doesn't exist before creating it: 
indexes = pc.list_indexes().indexes
names = [_['name'] for _ in indexes]
if index_name not in names:
    pc.create_index(
        name=index_name,
        dimension=768,
        metric="cosine",
        spec=ServerlessSpec(cloud='aws', region='us-west-2') 
    )

After creating the index, we can perform the upserts either synchronously or asynchronously. If you intend to run the ingestion pipeline locally, you’d use the synchronous upsert path, and if you want to leverage multiple workers with Anyscale, you should use the asynchronous upsert code snippet.

Synchronous upserts

def upload(batch):
    client = PineconeGRPC()
    index = client.Index(index_name)
    total_vectors = 0
    num_failures = 0
    # data = process_data(large_batch).to_dict(orient='records')
    data = batch.to_dict(orient='records')
    
    @retry(stop=stop_after_attempt(2), wait=wait_exponential(multiplier=1, min=4, max=10))
    def send_batch(batch):
        return index.upsert(vectors=batch)
    
    try:
        result = send_batch(data)
        total_vectors += result.upserted_count
    except Exception as e:
        logging.exception(e)
        num_failures += len(data)
    return {'upsreted': np.array([total_vectors]), 'errors': np.array([num_failures])}

class Upserter:
    def __call__(self, large_batch):
        return upload(large_batch)

In this snippet, the upload function does the following:

  1. Initialization: It initializes a Pinecone client and an index. It also sets total_vectors and num_failures to 0. These will be used to keep track of the number of successfully uploaded vectors and the number of failures, respectively.
  2. Data Conversion: It converts the input batch of data into a dictionary format using the to_dict method with orient='records'. This format is required for the upsert method of the Pinecone index.
  3. It defines a send_batch function that uploads a batch of vectors to the Pinecone index using the upsert method. This function is decorated with a retry decorator, which will automatically retry the function if it raises an exception. The retry will stop after 2 attempts, and the wait time between retries will exponentially increase from a minimum of 4 to a maximum of 10.
  4. Data Upload: It tries to upload the data using the send_batch function. If the upload is successful, it increments total_vectors by the number of vectors that were successfully uploaded (result.upserted_count). If an exception occurs during the upload, it logs the exception and increments num_failures by the length of the data batch.
  5. Finally, it returns a dictionary containing the total number of vectors that were successfully uploaded and the total number of failures. These are returned as numpy arrays.

Asynchronous upserts

If we want to accelerate the process, we can leverage Anyscale which allows us to parallelize the ray operations by using the following async_upload function:

batch_size = 350
concurrent_reqests = 5

def chunker(seq, batch_size):
    return ((pos, seq[pos:pos + batch_size]) for pos in range(0, len(seq), batch_size))


def async_upload(large_batch):
    client = PineconeGRPC()
    index = client.Index(index_name)
    total_vectors = 0
    num_failures = 0
    data = large_batch.to_dict(orient='records')

    # The current implementation of `upsert(async_req=True)` uses thread pool, not asyncio - so we need threading Semaphore
    sem = Semaphore(concurrent_reqests)

    def done(future):
        sem.release()
    
    def send_batch(batch):
        sem.acquire()
        future = index.upsert(vectors=batch, async_req=True)
        future.add_done_callback(done)
        return future
    
    tasks = [(pos, send_batch(batch)) for pos, batch in chunker(data, batch_size=batch_size)] 
    total_vectors = 0
    failed_ids = []
    num_failures = 0
    for idx, task in tasks:
        try:
            result = task.result()
            total_vectors += result.upserted_count
        except Exception:
            failed_batch = data[idx: idx + batch_size]
            try:
                result = index.upsert(failed_batch)
                total_vectors += result.upserted_count
            except Exception as e:
                logging.exception(e)
                num_failures += len(failed_batch)
    return {'upsreted': np.array([total_vectors]), 'errors': np.array([num_failures])}
    
class UpserterConcurrent:
    def __call__(self, large_batch):
        return async_upload(large_batch)

Let’s discuss what’s happening in this snippet:

  1. Setting up constants: batch_size and concurrent_requests are set to control the size of each data batch and the number of concurrent requests, respectively.
  2. chunker function: This function splits a sequence into chunks of a specified size. It's used to divide the large data batch into smaller ones for processing.
  3. async_upload function: This function handles the asynchronous uploading of data to a Pinecone index. It does the following:
  • Initializes a Pinecone client and an index.
  • Converts the large batch of data into a dictionary format.
  • Uses a Semaphore to control the number of concurrent requests.
  • Defines a send_batch function that sends a batch of data to the Pinecone index and adds a callback function to release the Semaphore when the request is done.
  • Iterates over the chunks of data, sending each one to the Pinecone index. If an exception occurs during the upload, it tries to upload the failed batch again. If the second attempt also fails, it logs the exception and increments the failure count.

The we apply the process_data function on the dataset:

num_workers = 5

new_ds = ds.map_batches(
    process_data, 
    batch_format="pandas"
).map_batches(
    UpserterConcurrent, 
    batch_size=batch_size * 20, 
    batch_format='pandas', 
    zero_copy_batch=True,
    concurrency=(1, num_workers)
)

before = datetime.now()
summary = new_ds.materialize().sum(['upsreted', 'errors'])
# summary
duration = datetime.now() - before
print({k: f"{v: ,}" for k,v in summary.items()})
print(duration)

In this snippet, the script:

  • Maps the process_data function to the batches of data in the dataset (ds).
  • Maps the UpserterConcurrent callable to the batches of data, specifying the batch size and the number of concurrent workers.
  • Materializes the dataset and sums up the number of successfully upserted items and errors.
  • Finally, it prints the summary of the operation and the duration it took.

Creating a RAG Application with Langchain

Once the data is loaded, we can move on to creating a RAG application with Langchain - which makes the process very straightforward. Let’s jump into the application code (Full code listing) : this will be a simple FastAPI application:

from fastapi import FastAPI
from fastapi.responses import RedirectResponse
from langserve import add_routes
from app.chain import chain as pinecone_wiki_chain

app = FastAPI()

@app.get("/")
async def redirect_root_to_docs():
    return RedirectResponse("/docs")

# Edit this to add the chain you want to add
add_routes(app, pinecone_wiki_chain, path="/pinecone-wikipedia")

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)

The heart of the application lies in the pinecone_wiki_chain (breakdown below):

import os

from dotenv import load_dotenv
from langchain_community.chat_models import ChatOpenAI
from langchain_community.embeddings import CohereEmbeddings
from langchain_community.vectorstores import Pinecone
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables import RunnableParallel, RunnablePassthrough, RunnableLambda
from pinecone import Pinecone as PineconeClient

load_dotenv()

# Keys
PINECONE_API_KEY = os.environ["PINECONE_API_KEY"]
PINECONE_ENVIRONMENT = os.environ["PINECONE_ENVIRONMENT"]
PINECONE_INDEX_NAME = os.environ["PINECONE_INDEX_NAME"]

pinecone = PineconeClient(api_key=PINECONE_API_KEY)

embeddings = CohereEmbeddings(model="multilingual-22-12")
vectorstore = Pinecone.from_existing_index(index_name=PINECONE_INDEX_NAME,embedding=embeddings)

retriever = vectorstore.as_retriever()

def fetch_wikipedia_page(id):
    url = f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&format=json&pageids={id}"
    response = requests.get(url)
    data = response.json()
    page_content = list(data['query']['pages'].values())[0]['extract']
    return page_content

def fetch_url(x):
    urls = [doc.metadata['url'] for doc in x['context']]
    ids = [url.split('=')[-1] for url in urls]
    contents = [fetch_wikipedia_page(id)[:32000] for id in ids]    
    return {"context": contents, "question": x["question"]}

# RAG prompt
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

# RAG
model = ChatOpenAI(model="gpt-4-32k")
chain = (
    RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
    | RunnableLambda(fetch_url)  
    | prompt
    | model
    | StrOutputParser()
)

Here’s a step by step breakdown:

  1. Imports: We start by importing necessary modules and functions from various libraries such as os, dotenv, langchain, and pinecone.
  2. Environment Variables: We then load environment variables using load_dotenv(). These variables include API keys and other configuration details for the Pinecone service.
  3. Pinecone Client: We initialize Pinecone client is initialized using the API key from the environment variables.
  4. Embeddings: We define the embedding model using the multilingual-22-12 Cohere Embeddings model. This matches the model used to create the embeddings in the wikipedia-22-12-en-embeddings dataset. This model is used to convert text into vectors that can then be used to query Pinecone.
  5. Vector Store: We create a Pinecone vectorstore from an existing index. To set it up, we pass the embedding model we defined before
  6. Retriever: We create a retriever from the vector store. This retriever can be used to find contextually relevant information from the vector store based on a given query.
  7. Lambda functions for retrieving content from Wikipedia: fetch_url and fetch_wikipedia are two functions that fetch the content from the wikipedia pages found by the retriever.
  8. Prompt Template: We define a template for chat prompts. This template is used to format the input for the chat model.
  9. Chat Model: We initialize a ChatOpenAI model. Specifically we’ll use gpt-4-32k to support the longer context we intend to send the model. This is the main chatbot model that will generate responses to user queries.
  10. Pipeline: Finally, we create a pipeline that ties all these components together. The pipeline takes a dictionary with context and question as inputs. The context is passed to the retriever, and the question is passed through as is. Both are passed to the RunnableLambda to expand on the context retrieved from Pinecone. The output from these three components is then passed to the prompt, which formats the input for the chat model. The chat model generates a response, which is then parsed into a string by the StrOutputParser.

We now can test our application in http://localhost:8000/pinecone-wikipedia/playground.

Conclusion

While the thought of integrating large datasets into a RAG application can be intimidating, it’s important to remember that the process is more manageable than it initially appears. This guide was developed to demystify the process, providing clear and practical steps you can follow to accomplish this task. With tools like Ray, Anyscale, Pinecone serverless and Langchain - setting up a RAG application becomes attainable even for very large scale datasets.

Share: