class Ingest():
"""
Contains all methods for building and using vector databases.
"""
def __init__(self):
"""
Initialize AWS S3, Qdrant, and Supabase.
"""
openai.api_key = os.getenv("OPENAI_API_KEY")
# vector DB
self.qdrant_client = QdrantClient(
url=os.getenv('QDRANT_URL'),
api_key=os.getenv('QDRANT_API_KEY'),
)
self.vectorstore = Qdrant(client=self.qdrant_client,
collection_name=os.environ['QDRANT_COLLECTION_NAME'],
embeddings=OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE))
# S3
self.s3_client = boto3.client(
's3',
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
)
# Create a Supabase client
self.supabase_client = supabase.create_client( # type: ignore
supabase_url=os.environ['SUPABASE_URL'], supabase_key=os.environ['SUPABASE_API_KEY'])
self.llm = AzureChatOpenAI(
temperature=0,
deployment_name=os.getenv('AZURE_OPENAI_ENGINE'), #type:ignore
openai_api_base=os.getenv('AZURE_OPENAI_ENDPOINT'), #type:ignore
openai_api_key=os.getenv('AZURE_OPENAI_KEY'), #type:ignore
openai_api_version=os.getenv('OPENAI_API_VERSION'), #type:ignore
openai_api_type=OPENAI_API_TYPE)
self.posthog = Posthog(sync_mode=True,
project_api_key=os.environ['POSTHOG_API_KEY'],
host='https://app.posthog.com')
return None
def __del__(self):
# Gracefully shutdown the Posthog client -- this was a main cause of dangling threads.
# Since I changed Posthog to be sync, no need to shutdown.
# try:
# self.posthog.shutdown()
# except Exception as e:
# print("Failed to shutdown PostHog. Probably fine. Error: ", e)
try:
self.qdrant_client.close()
except Exception as e:
print("Failed to shutdown Qdrant. Probably fine. Error: ", e)
try:
del self.supabase_client
except Exception as e:
print("Failed delete supabase_client. Probably fine. Error: ", e)
try:
del self.s3_client
except Exception as e:
print("Failed to delete s3_client. Probably fine. Error: ", e)
def bulk_ingest(self, s3_paths: Union[List[str], str], course_name: str, **kwargs) -> Dict[str, List[str]]:
def _ingest_single(ingest_method: Callable, s3_path, *args, **kwargs):
"""Handle running an arbitrary ingest function for an individual file."""
# RUN INGEST METHOD
ret = ingest_method(s3_path, *args, **kwargs)
if ret == "Success":
success_status['success_ingest'].append(s3_path)
else:
success_status['failure_ingest'].append(s3_path)
# 👇👇👇👇 ADD NEW INGEST METHODS HERE 👇👇👇👇🎉
file_ingest_methods = {
'.html': self._ingest_html,
'.py': self._ingest_single_py,
'.pdf': self._ingest_single_pdf,
'.txt': self._ingest_single_txt,
'.md': self._ingest_single_txt,
'.srt': self._ingest_single_srt,
'.vtt': self._ingest_single_vtt,
'.docx': self._ingest_single_docx,
'.ppt': self._ingest_single_ppt,
'.pptx': self._ingest_single_ppt,
'.xlsx': self._ingest_single_excel,
'.xls': self._ingest_single_excel,
'.csv': self._ingest_single_csv,
'.png': self._ingest_single_image,
'.jpg': self._ingest_single_image,
}
# Ingest methods via MIME type (more general than filetype)
mimetype_ingest_methods = {
'video': self._ingest_single_video,
'audio': self._ingest_single_video,
'text': self._ingest_single_txt,
'image': self._ingest_single_image,
}
# 👆👆👆👆 ADD NEW INGEST METHODhe 👆👆👆👆🎉
print(f"Top of ingest, Course_name {course_name}. S3 paths {s3_paths}")
success_status = {"success_ingest": [], "failure_ingest": []}
try:
if isinstance(s3_paths, str):
s3_paths = [s3_paths]
for s3_path in s3_paths:
file_extension = Path(s3_path).suffix
with NamedTemporaryFile(suffix=file_extension) as tmpfile:
self.s3_client.download_fileobj(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path, Fileobj=tmpfile)
mime_type = str(mimetypes.guess_type(tmpfile.name, strict=False)[0])
mime_category, mime_subcategory = mime_type.split('/')
if file_extension in file_ingest_methods:
# Use specialized functions when possible, fallback to mimetype. Else raise error.
ingest_method = file_ingest_methods[file_extension]
_ingest_single(ingest_method, s3_path, course_name, **kwargs)
elif mime_category in mimetype_ingest_methods:
# fallback to MimeType
print("mime category", mime_category)
ingest_method = mimetype_ingest_methods[mime_category]
_ingest_single(ingest_method, s3_path, course_name, **kwargs)
else:
# No supported ingest... Fallback to attempting utf-8 decoding, otherwise fail.
try:
self._ingest_single_txt(s3_path, course_name)
success_status['success_ingest'].append(s3_path)
print("✅ FALLBACK TO UTF-8 INGEST WAS SUCCESSFUL :) ")
except Exception as e:
print(
f"We don't have a ingest method for this filetype: {file_extension}. As a last-ditch effort, we tried to ingest the file as utf-8 text, but that failed too. File is unsupported: {s3_path}. UTF-8 ingest error: {e}"
)
success_status['failure_ingest'].append(
f"We don't have a ingest method for this filetype: {file_extension} (with generic type {mime_type}), for file: {s3_path}"
)
return success_status
except Exception as e:
success_status['failure_ingest'].append(f"MAJOR ERROR IN /bulk_ingest: Error: {str(e)}")
sentry_sdk.capture_exception(e)
return success_status
def ingest_single_web_text(self, course_name: str, base_url: str, url: str, content: str, title: str):
"""Crawlee integration
"""
self.posthog.capture('distinct_id_of_the_user',
event='ingest_single_web_text_invoked',
properties={
'course_name': course_name,
'base_url': base_url,
'url': url,
'content': content,
'title': title
})
try:
# if not, ingest the text
text = [content]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': '',
'readable_filename': title,
'pagenumber': '',
'timestamp': '',
'url': url,
'base_url': base_url,
}]
self.split_and_upload(texts=text, metadatas=metadatas)
self.posthog.capture('distinct_id_of_the_user',
event='ingest_single_web_text_succeeded',
properties={
'course_name': course_name,
'base_url': base_url,
'url': url,
'title': title
})
return "Success"
except Exception as e:
err = f"❌❌ Error in (web text ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
) # type: ignore
print(err)
sentry_sdk.capture_exception(e)
return err
def _ingest_single_py(self, s3_path: str, course_name: str, **kwargs):
try:
file_name = s3_path.split("/")[-1]
file_path = "media/" + file_name # download from s3 to local folder for ingest
self.s3_client.download_file(os.getenv('S3_BUCKET_NAME'), s3_path, file_path)
loader = PythonLoader(file_path)
documents = loader.load()
texts = [doc.page_content for doc in documents]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
} for doc in documents]
#print(texts)
os.remove(file_path)
success_or_failure = self.split_and_upload(texts=texts, metadatas=metadatas)
print("Python ingest: ", success_or_failure)
return success_or_failure
except Exception as e:
err = f"❌❌ Error in (Python ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
print(err)
sentry_sdk.capture_exception(e)
return err
def _ingest_single_vtt(self, s3_path: str, course_name: str, **kwargs):
"""
Ingest a single .vtt file from S3.
"""
try:
with NamedTemporaryFile() as tmpfile:
# download from S3 into vtt_tmpfile
self.s3_client.download_fileobj(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path, Fileobj=tmpfile)
loader = TextLoader(tmpfile.name)
documents = loader.load()
texts = [doc.page_content for doc in documents]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
} for doc in documents]
success_or_failure = self.split_and_upload(texts=texts, metadatas=metadatas)
return success_or_failure
except Exception as e:
err = f"❌❌ Error in (VTT ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
print(err)
sentry_sdk.capture_exception(e)
return err
def _ingest_html(self, s3_path: str, course_name: str, **kwargs) -> str:
print(f"IN _ingest_html s3_path `{s3_path}` kwargs: {kwargs}")
try:
response = self.s3_client.get_object(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path)
raw_html = response['Body'].read().decode('utf-8')
soup = BeautifulSoup(raw_html, 'html.parser')
title = s3_path.replace("courses/" + course_name, "")
title = title.replace(".html", "")
title = title.replace("_", " ")
title = title.replace("/", " ")
title = title.strip()
title = title[37:] # removing the uuid prefix
text = [soup.get_text()]
metadata: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': str(title), # adding str to avoid error: unhashable type 'slice'
'url': kwargs.get('url', ''),
'base_url': kwargs.get('base_url', ''),
'pagenumber': '',
'timestamp': '',
}]
success_or_failure = self.split_and_upload(text, metadata)
print(f"_ingest_html: {success_or_failure}")
return success_or_failure
except Exception as e:
err: str = f"ERROR IN _ingest_html: {e}\nTraceback: {traceback.extract_tb(e.__traceback__)}❌❌ Error in {inspect.currentframe().f_code.co_name}:{e}" # type: ignore
print(err)
sentry_sdk.capture_exception(e)
return err
def _ingest_single_video(self, s3_path: str, course_name: str, **kwargs) -> str:
"""
Ingest a single video file from S3.
"""
print("Starting ingest video or audio")
try:
# check for file extension
file_ext = Path(s3_path).suffix
openai.api_key = os.getenv('OPENAI_API_KEY')
transcript_list = []
with NamedTemporaryFile(suffix=file_ext) as video_tmpfile:
# download from S3 into an video tmpfile
self.s3_client.download_fileobj(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path, Fileobj=video_tmpfile)
# extract audio from video tmpfile
mp4_version = AudioSegment.from_file(video_tmpfile.name, file_ext[1:])
# save the extracted audio as a temporary webm file
with NamedTemporaryFile(suffix=".webm", dir="media", delete=False) as webm_tmpfile:
mp4_version.export(webm_tmpfile, format="webm")
# check file size
file_size = os.path.getsize(webm_tmpfile.name)
# split the audio into 25MB chunks
if file_size > 26214400:
# load the webm file into audio object
full_audio = AudioSegment.from_file(webm_tmpfile.name, "webm")
file_count = file_size // 26214400 + 1
split_segment = 35 * 60 * 1000
start = 0
count = 0
while count < file_count:
with NamedTemporaryFile(suffix=".webm", dir="media", delete=False) as split_tmp:
if count == file_count - 1:
# last segment
audio_chunk = full_audio[start:]
else:
audio_chunk = full_audio[start:split_segment]
audio_chunk.export(split_tmp.name, format="webm")
# transcribe the split file and store the text in dictionary
with open(split_tmp.name, "rb") as f:
transcript = openai.Audio.transcribe("whisper-1", f)
transcript_list.append(transcript['text']) # type: ignore
start += split_segment
split_segment += split_segment
count += 1
os.remove(split_tmp.name)
else:
# transcribe the full audio
with open(webm_tmpfile.name, "rb") as f:
transcript = openai.Audio.transcribe("whisper-1", f)
transcript_list.append(transcript['text']) # type: ignore
os.remove(webm_tmpfile.name)
text = [txt for txt in transcript_list]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': text.index(txt),
'url': '',
'base_url': '',
} for txt in text]
self.split_and_upload(texts=text, metadatas=metadatas)
return "Success"
except Exception as e:
err = f"❌❌ Error in (VIDEO ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
print(err)
sentry_sdk.capture_exception(e)
return err
def _ingest_single_docx(self, s3_path: str, course_name: str, **kwargs) -> str:
try:
with NamedTemporaryFile() as tmpfile:
self.s3_client.download_fileobj(Bucket=os.getenv('S3_BUCKET_NAME'), Key=s3_path, Fileobj=tmpfile)
loader = Docx2txtLoader(tmpfile.name)
documents = loader.load()
texts = [doc.page_content for doc in documents]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
} for doc in documents]
self.split_and_upload(texts=texts, metadatas=metadatas)
return "Success"
except Exception as e:
err = f"❌❌ Error in (DOCX ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
print(err)
sentry_sdk.capture_exception(e)
return err
def _ingest_single_srt(self, s3_path: str, course_name: str, **kwargs) -> str:
try:
with NamedTemporaryFile() as tmpfile:
# download from S3 into pdf_tmpfile
self.s3_client.download_fileobj(Bucket=os.getenv('S3_BUCKET_NAME'), Key=s3_path, Fileobj=tmpfile)
loader = SRTLoader(tmpfile.name)
documents = loader.load()
texts = [doc.page_content for doc in documents]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
} for doc in documents]
self.split_and_upload(texts=texts, metadatas=metadatas)
return "Success"
except Exception as e:
err = f"❌❌ Error in (SRT ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
print(err)
sentry_sdk.capture_exception(e)
return err
def _ingest_single_excel(self, s3_path: str, course_name: str, **kwargs) -> str:
try:
with NamedTemporaryFile() as tmpfile:
# download from S3 into pdf_tmpfile
self.s3_client.download_fileobj(Bucket=os.getenv('S3_BUCKET_NAME'), Key=s3_path, Fileobj=tmpfile)
loader = UnstructuredExcelLoader(tmpfile.name, mode="elements")
# loader = SRTLoader(tmpfile.name)
documents = loader.load()
texts = [doc.page_content for doc in documents]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
} for doc in documents]
self.split_and_upload(texts=texts, metadatas=metadatas)
return "Success"
except Exception as e:
err = f"❌❌ Error in (Excel/xlsx ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
print(err)
sentry_sdk.capture_exception(e)
return err
def _ingest_single_image(self, s3_path: str, course_name: str, **kwargs) -> str:
try:
with NamedTemporaryFile() as tmpfile:
# download from S3 into pdf_tmpfile
self.s3_client.download_fileobj(Bucket=os.getenv('S3_BUCKET_NAME'), Key=s3_path, Fileobj=tmpfile)
"""
# Unstructured image loader makes the install too large (700MB --> 6GB. 3min -> 12 min build times). AND nobody uses it.
# The "hi_res" strategy will identify the layout of the document using detectron2. "ocr_only" uses pdfminer.six. https://unstructured-io.github.io/unstructured/core/partition.html#partition-image
loader = UnstructuredImageLoader(tmpfile.name, unstructured_kwargs={'strategy': "ocr_only"})
documents = loader.load()
"""
res_str = pytesseract.image_to_string(Image.open(tmpfile.name))
print("IMAGE PARSING RESULT:", res_str)
documents = [Document(page_content=res_str)]
texts = [doc.page_content for doc in documents]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
} for doc in documents]
self.split_and_upload(texts=texts, metadatas=metadatas)
return "Success"
except Exception as e:
err = f"❌❌ Error in (png/jpg ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
print(err)
sentry_sdk.capture_exception(e)
return err
def _ingest_single_csv(self, s3_path: str, course_name: str, **kwargs) -> str:
try:
with NamedTemporaryFile() as tmpfile:
# download from S3 into pdf_tmpfile
self.s3_client.download_fileobj(Bucket=os.getenv('S3_BUCKET_NAME'), Key=s3_path, Fileobj=tmpfile)
loader = CSVLoader(file_path=tmpfile.name)
documents = loader.load()
texts = [doc.page_content for doc in documents]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
} for doc in documents]
self.split_and_upload(texts=texts, metadatas=metadatas)
return "Success"
except Exception as e:
err = f"❌❌ Error in (CSV ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
print(err)
sentry_sdk.capture_exception(e)
return err
def _ingest_single_pdf(self, s3_path: str, course_name: str, **kwargs):
"""
Both OCR the PDF. And grab the first image as a PNG.
LangChain `Documents` have .metadata and .page_content attributes.
Be sure to use TemporaryFile() to avoid memory leaks!
"""
print("IN PDF ingest: s3_path: ", s3_path, "and kwargs:", kwargs)
try:
with NamedTemporaryFile() as pdf_tmpfile:
# download from S3 into pdf_tmpfile
self.s3_client.download_fileobj(Bucket=os.getenv('S3_BUCKET_NAME'), Key=s3_path, Fileobj=pdf_tmpfile)
### READ OCR of PDF
doc = fitz.open(pdf_tmpfile.name) # type: ignore
# improve quality of the image
zoom_x = 2.0 # horizontal zoom
zoom_y = 2.0 # vertical zoom
mat = fitz.Matrix(zoom_x, zoom_y) # zoom factor 2 in each dimension
pdf_pages_OCRed: List[Dict] = []
for i, page in enumerate(doc): # type: ignore
# UPLOAD FIRST PAGE IMAGE to S3
if i == 0:
with NamedTemporaryFile(suffix=".png") as first_page_png:
pix = page.get_pixmap(matrix=mat)
pix.save(first_page_png) # store image as a PNG
s3_upload_path = str(Path(s3_path)).rsplit('.pdf')[0] + "-pg1-thumb.png"
first_page_png.seek(0) # Seek the file pointer back to the beginning
with open(first_page_png.name, 'rb') as f:
print("Uploading image png to S3")
self.s3_client.upload_fileobj(f, os.getenv('S3_BUCKET_NAME'), s3_upload_path)
# Extract text
text = page.get_text().encode("utf8").decode("utf8", errors='ignore') # get plain text (is in UTF-8)
pdf_pages_OCRed.append(dict(text=text, page_number=i, readable_filename=Path(s3_path).name[37:]))
metadatas: List[Dict[str, Any]] = [
{
'course_name': course_name,
's3_path': s3_path,
'pagenumber': page['page_number'] + 1, # +1 for human indexing
'timestamp': '',
'readable_filename': kwargs.get('readable_filename', page['readable_filename']),
'url': kwargs.get('url', ''),
'base_url': kwargs.get('base_url', ''),
} for page in pdf_pages_OCRed
]
pdf_texts = [page['text'] for page in pdf_pages_OCRed]
success_or_failure = self.split_and_upload(texts=pdf_texts, metadatas=metadatas)
return success_or_failure
except Exception as e:
err = f"❌❌ Error in (PDF ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
) # type: ignore
print(err)
sentry_sdk.capture_exception(e)
return err
return "Success"
def _ingest_single_txt(self, s3_path: str, course_name: str, **kwargs) -> str:
"""Ingest a single .txt or .md file from S3.
Args:
s3_path (str): A path to a .txt file in S3
course_name (str): The name of the course
Returns:
str: "Success" or an error message
"""
print("In text ingest")
try:
# NOTE: slightly different method for .txt files, no need for download. It's part of the 'body'
response = self.s3_client.get_object(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path)
print("s3 Resonse:", response)
text = response['Body'].read().decode('utf-8')
print("Text from s3:", text)
text = [text]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
}]
print("Prior to ingest", metadatas)
success_or_failure = self.split_and_upload(texts=text, metadatas=metadatas)
return success_or_failure
except Exception as e:
err = f"❌❌ Error in (TXT ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
print(err)
sentry_sdk.capture_exception(e)
return err
def _ingest_single_ppt(self, s3_path: str, course_name: str, **kwargs) -> str:
"""
Ingest a single .ppt or .pptx file from S3.
"""
try:
with NamedTemporaryFile() as tmpfile:
# download from S3 into pdf_tmpfile
#print("in ingest PPTX")
self.s3_client.download_fileobj(Bucket=os.environ['S3_BUCKET_NAME'], Key=s3_path, Fileobj=tmpfile)
loader = UnstructuredPowerPointLoader(tmpfile.name)
documents = loader.load()
texts = [doc.page_content for doc in documents]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': '',
'url': '',
'base_url': '',
} for doc in documents]
self.split_and_upload(texts=texts, metadatas=metadatas)
return "Success"
except Exception as e:
err = f"❌❌ Error in (PPTX ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
)
print(err)
sentry_sdk.capture_exception(e)
return err
def list_files_recursively(self, bucket, prefix):
all_files = []
continuation_token = None
while True:
list_objects_kwargs = {
'Bucket': bucket,
'Prefix': prefix,
}
if continuation_token:
list_objects_kwargs['ContinuationToken'] = continuation_token
response = self.s3_client.list_objects_v2(**list_objects_kwargs)
if 'Contents' in response:
for obj in response['Contents']:
all_files.append(obj['Key'])
if response['IsTruncated']:
continuation_token = response['NextContinuationToken']
else:
break
return all_files
def ingest_coursera(self, coursera_course_name: str, course_name: str) -> str:
""" Download all the files from a coursera course and ingest them.
1. Download the coursera content.
2. Upload to S3 (so users can view it)
3. Run everything through the ingest_bulk method.
Args:
coursera_course_name (str): The name of the coursera course.
course_name (str): The name of the course in our system.
Returns:
_type_: Success or error message.
"""
certificate = "-ca 'FVhVoDp5cb-ZaoRr5nNJLYbyjCLz8cGvaXzizqNlQEBsG5wSq7AHScZGAGfC1nI0ehXFvWy1NG8dyuIBF7DLMA.X3cXsDvHcOmSdo3Fyvg27Q.qyGfoo0GOHosTVoSMFy-gc24B-_BIxJtqblTzN5xQWT3hSntTR1DMPgPQKQmfZh_40UaV8oZKKiF15HtZBaLHWLbpEpAgTg3KiTiU1WSdUWueo92tnhz-lcLeLmCQE2y3XpijaN6G4mmgznLGVsVLXb-P3Cibzz0aVeT_lWIJNrCsXrTFh2HzFEhC4FxfTVqS6cRsKVskPpSu8D9EuCQUwJoOJHP_GvcME9-RISBhi46p-Z1IQZAC4qHPDhthIJG4bJqpq8-ZClRL3DFGqOfaiu5y415LJcH--PRRKTBnP7fNWPKhcEK2xoYQLr9RxBVL3pzVPEFyTYtGg6hFIdJcjKOU11AXAnQ-Kw-Gb_wXiHmu63veM6T8N2dEkdqygMre_xMDT5NVaP3xrPbA4eAQjl9yov4tyX4AQWMaCS5OCbGTpMTq2Y4L0Mbz93MHrblM2JL_cBYa59bq7DFK1IgzmOjFhNG266mQlC9juNcEhc'"
always_use_flags = "-u kastanvday@gmail.com -p hSBsLaF5YM469# --ignore-formats mp4 --subtitle-language en --path ./coursera-dl"
try:
subprocess.run(
f"coursera-dl {always_use_flags} {certificate} {coursera_course_name}",
check=True,
shell=True, # nosec -- reasonable bandit error suppression
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) # capture_output=True,
dl_results_path = os.path.join('coursera-dl', coursera_course_name)
s3_paths: Union[List, None] = upload_data_files_to_s3(course_name, dl_results_path)
if s3_paths is None:
return "Error: No files found in the coursera-dl directory"
print("starting bulk ingest")
start_time = time.monotonic()
self.bulk_ingest(s3_paths, course_name)
print("completed bulk ingest")
print(f"⏰ Runtime: {(time.monotonic() - start_time):.2f} seconds")
# Cleanup the coursera downloads
shutil.rmtree(dl_results_path)
return "Success"
except Exception as e:
err: str = f"Traceback: {traceback.extract_tb(e.__traceback__)}❌❌ Error in {inspect.currentframe().f_code.co_name}:{e}" # type: ignore
print(err)
return err
def ingest_github(self, github_url: str, course_name: str) -> str:
"""
Clones the given GitHub URL and uses Langchain to load data.
1. Clone the repo
2. Use Langchain to load the data
3. Pass to split_and_upload()
Args:
github_url (str): The Github Repo URL to be ingested.
course_name (str): The name of the course in our system.
Returns:
_type_: Success or error message.
"""
try:
repo_path = "media/cloned_repo"
repo = Repo.clone_from(github_url, to_path=repo_path, depth=1, clone_submodules=False)
branch = repo.head.reference
loader = GitLoader(repo_path="media/cloned_repo", branch=str(branch))
data = loader.load()
shutil.rmtree("media/cloned_repo")
# create metadata for each file in data
for doc in data:
texts = doc.page_content
metadatas: Dict[str, Any] = {
'course_name': course_name,
's3_path': '',
'readable_filename': doc.metadata['file_name'],
'url': f"{github_url}/blob/main/{doc.metadata['file_path']}",
'pagenumber': '',
'timestamp': '',
}
self.split_and_upload(texts=[texts], metadatas=[metadatas])
return "Success"
except Exception as e:
err = f"❌❌ Error in (GITHUB ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n{traceback.format_exc()}"
print(err)
sentry_sdk.capture_exception(e)
return err
def split_and_upload(self, texts: List[str], metadatas: List[Dict[str, Any]]):
""" This is usually the last step of document ingest. Chunk & upload to Qdrant (and Supabase.. todo).
Takes in Text and Metadata (from Langchain doc loaders) and splits / uploads to Qdrant.
good examples here: https://langchain.readthedocs.io/en/latest/modules/utils/combine_docs_examples/textsplitter.html
Args:
texts (List[str]): _description_
metadatas (List[Dict[str, Any]]): _description_
"""
self.posthog.capture('distinct_id_of_the_user',
event='split_and_upload_invoked',
properties={
'course_name': metadatas[0].get('course_name', None),
's3_path': metadatas[0].get('s3_path', None),
'readable_filename': metadatas[0].get('readable_filename', None),
'url': metadatas[0].get('url', None),
'base_url': metadatas[0].get('base_url', None),
})
print("In split and upload")
print(f"metadatas: {metadatas}")
print(f"Texts: {texts}")
assert len(texts) == len(
metadatas
), f'must have equal number of text strings and metadata dicts. len(texts) is {len(texts)}. len(metadatas) is {len(metadatas)}'
try:
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000,
chunk_overlap=150,
separators=[
"\n\n", "\n", ". ", " ", ""
] # try to split on paragraphs... fallback to sentences, then chars, ensure we always fit in context window
)
contexts: List[Document] = text_splitter.create_documents(texts=texts, metadatas=metadatas)
input_texts = [{'input': context.page_content, 'model': 'text-embedding-ada-002'} for context in contexts]
# check for duplicates
is_duplicate = self.check_for_duplicates(input_texts, metadatas)
if is_duplicate:
self.posthog.capture('distinct_id_of_the_user',
event='split_and_upload_succeeded',
properties={
'course_name': metadatas[0].get('course_name', None),
's3_path': metadatas[0].get('s3_path', None),
'readable_filename': metadatas[0].get('readable_filename', None),
'url': metadatas[0].get('url', None),
'base_url': metadatas[0].get('base_url', None),
'is_duplicate': True,
})
return "Success"
# adding chunk index to metadata for parent doc retrieval
for i, context in enumerate(contexts):
context.metadata['chunk_index'] = i
oai = OpenAIAPIProcessor(
input_prompts_list=input_texts,
request_url='https://api.openai.com/v1/embeddings',
api_key=os.getenv('VLADS_OPENAI_KEY'),
# request_url='https://uiuc-chat-canada-east.openai.azure.com/openai/deployments/text-embedding-ada-002/embeddings?api-version=2023-05-15',
# api_key=os.getenv('AZURE_OPENAI_KEY'),
max_requests_per_minute=5_000,
max_tokens_per_minute=300_000,
max_attempts=20,
logging_level=logging.INFO,
token_encoding_name='cl100k_base') # nosec -- reasonable bandit error suppression
asyncio.run(oai.process_api_requests_from_file())
# parse results into dict of shape page_content -> embedding
embeddings_dict: dict[str, List[float]] = {
item[0]['input']: item[1]['data'][0]['embedding'] for item in oai.results
}
### BULK upload to Qdrant ###
vectors: list[PointStruct] = []
for context in contexts:
# !DONE: Updated the payload so each key is top level (no more payload.metadata.course_name. Instead, use payload.course_name), great for creating indexes.
upload_metadata = {**context.metadata, "page_content": context.page_content}
vectors.append(
PointStruct(id=str(uuid.uuid4()), vector=embeddings_dict[context.page_content], payload=upload_metadata))
self.qdrant_client.upsert(
collection_name=os.environ['QDRANT_COLLECTION_NAME'], # type: ignore
points=vectors # type: ignore
)
### Supabase SQL ###
contexts_for_supa = [{
"text": context.page_content,
"pagenumber": context.metadata.get('pagenumber'),
"timestamp": context.metadata.get('timestamp'),
"chunk_index": context.metadata.get('chunk_index'),
"embedding": embeddings_dict[context.page_content]
} for context in contexts]
document = {
"course_name": contexts[0].metadata.get('course_name'),
"s3_path": contexts[0].metadata.get('s3_path'),
"readable_filename": contexts[0].metadata.get('readable_filename'),
"url": contexts[0].metadata.get('url'),
"base_url": contexts[0].metadata.get('base_url'),
"contexts": contexts_for_supa,
}
response = self.supabase_client.table(
os.getenv('NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE')).insert(document).execute() # type: ignore
# add to Nomic document map
if len(response.data) > 0:
inserted_data = response.data[0]
res = log_to_document_map(inserted_data)
self.posthog.capture('distinct_id_of_the_user',
event='split_and_upload_succeeded',
properties={
'course_name': metadatas[0].get('course_name', None),
's3_path': metadatas[0].get('s3_path', None),
'readable_filename': metadatas[0].get('readable_filename', None),
'url': metadatas[0].get('url', None),
'base_url': metadatas[0].get('base_url', None),
})
print("successful END OF split_and_upload")
return "Success"
except Exception as e:
err: str = f"ERROR IN split_and_upload(): Traceback: {traceback.extract_tb(e.__traceback__)}❌❌ Error in {inspect.currentframe().f_code.co_name}:{e}" # type: ignore
print(err)
sentry_sdk.capture_exception(e)
return err
def delete_entire_course(self, course_name: str):
"""Delete entire course.
Delete materials from S3, Supabase SQL, Vercel KV, and QDrant vector DB
Args:
course_name (str): _description_
"""
print(f"Deleting entire course: {course_name}")
try:
# Delete file from S3
print("Deleting from S3")
objects_to_delete = self.s3_client.list_objects(Bucket=os.getenv('S3_BUCKET_NAME'),
Prefix=f'courses/{course_name}/')
for object in objects_to_delete['Contents']:
self.s3_client.delete_object(Bucket=os.getenv('S3_BUCKET_NAME'), Key=object['Key'])
except Exception as e:
err: str = f"ERROR IN delete_entire_course(): Traceback: {traceback.extract_tb(e.__traceback__)}❌❌ Error in {inspect.currentframe().f_code.co_name}:{e}" # type: ignore
print(err)
sentry_sdk.capture_exception(e)
pass
try:
# Delete from Qdrant
# docs for nested keys: https://qdrant.tech/documentation/concepts/filtering/#nested-key
# Qdrant "points" look like this: Record(id='000295ca-bd28-ac4a-6f8d-c245f7377f90', payload={'metadata': {'course_name': 'zotero-extreme', 'pagenumber_or_timestamp': 15, 'readable_filename': 'Dunlosky et al. - 2013 - Improving Students’ Learning With Effective Learni.pdf', 's3_path': 'courses/zotero-extreme/Dunlosky et al. - 2013 - Improving Students’ Learning With Effective Learni.pdf'}, 'page_content': '18 \nDunlosky et al.\n3.3 Effects in representative educational contexts. Sev-\neral of the large summarization-training studies have been \nconducted in regular classrooms, indicating the feasibility of \ndoing so. For example, the study by A. King (1992) took place \nin the context of a remedial study-skills course for undergrad-\nuates, and the study by Rinehart et al. (1986) took place in \nsixth-grade classrooms, with the instruction led by students \nregular teachers. In these and other cases, students benefited \nfrom the classroom training. We suspect it may actually be \nmore feasible to conduct these kinds of training studies in \nclassrooms than in the laboratory, given the nature of the time \ncommitment for students. Even some of the studies that did \nnot involve training were conducted outside the laboratory; for \nexample, in the Bednall and Kehoe (2011) study on learning \nabout logical fallacies from Web modules (see data in Table 3), \nthe modules were actually completed as a homework assign-\nment. Overall, benefits can be observed in classroom settings; \nthe real constraint is whether students have the skill to suc-\ncessfully summarize, not whether summarization occurs in the \nlab or the classroom.\n3.4 Issues for implementation. Summarization would be \nfeasible for undergraduates or other learners who already \nknow how to summarize. For these students, summarization \nwould constitute an easy-to-implement technique that would \nnot take a lot of time to complete or understand. The only \nconcern would be whether these students might be better \nserved by some other strategy, but certainly summarization \nwould be better than the study strategies students typically \nfavor, such as highlighting and rereading (as we discuss in the \nsections on those strategies below). A trickier issue would \nconcern implementing the strategy with students who are not \nskilled summarizers. Relatively intensive training programs \nare required for middle school students or learners with learn-\ning disabilities to benefit from summarization. Such efforts \nare not misplaced; training has been shown to benefit perfor-\nmance on a range of measures, although the training proce-\ndures do raise practical issues (e.g., Gajria & Salvia, 1992: \n6.511 hours of training used for sixth through ninth graders \nwith learning disabilities; Malone & Mastropieri, 1991: 2 \ndays of training used for middle school students with learning \ndisabilities; Rinehart et al., 1986: 4550 minutes of instruc-\ntion per day for 5 days used for sixth graders). Of course, \ninstructors may want students to summarize material because \nsummarization itself is a goal, not because they plan to use \nsummarization as a study technique, and that goal may merit \nthe efforts of training.\nHowever, if the goal is to use summarization as a study \ntechnique, our question is whether training students would be \nworth the amount of time it would take, both in terms of the \ntime required on the part of the instructor and in terms of the \ntime taken away from students other activities. For instance, \nin terms of efficacy, summarization tends to fall in the middle \nof the pack when compared to other techniques. In direct \ncomparisons, it was sometimes more useful than rereading \n(Rewey, Dansereau, & Peel, 1991) and was as useful as note-\ntaking (e.g., Bretzing & Kulhavy, 1979) but was less powerful \nthan generating explanations (e.g., Bednall & Kehoe, 2011) or \nself-questioning (A. King, 1992).\n3.5 Summarization: Overall assessment. On the basis of the \navailable evidence, we rate summarization as low utility. It can \nbe an effective learning strategy for learners who are already \nskilled at summarizing; however, many learners (including \nchildren, high school students, and even some undergraduates) \nwill require extensive training, which makes this strategy less \nfeasible. Our enthusiasm is further dampened by mixed find-\nings regarding which tasks summarization actually helps. \nAlthough summarization has been examined with a wide \nrange of text materials, many researchers have pointed to fac-\ntors of these texts that seem likely to moderate the effects of \nsummarization (e.g'}, vector=None),
print("deleting from qdrant")
self.qdrant_client.delete(
collection_name=os.environ['QDRANT_COLLECTION_NAME'],
points_selector=models.Filter(must=[
models.FieldCondition(
key="course_name",
match=models.MatchValue(value=course_name),
),
]),
)
except Exception as e:
err: str = f"ERROR IN delete_entire_course(): Traceback: {traceback.extract_tb(e.__traceback__)}❌❌ Error in {inspect.currentframe().f_code.co_name}:{e}" # type: ignore
print(err)
sentry_sdk.capture_exception(e)
pass
try:
# Delete from Supabase
print("deleting from supabase")
response = self.supabase_client.from_(os.environ['NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE']).delete().eq(
'course_name', course_name).execute()
print("supabase response: ", response)
return "Success"
except Exception as e:
err: str = f"ERROR IN delete_entire_course(): Traceback: {traceback.extract_tb(e.__traceback__)}❌❌ Error in {inspect.currentframe().f_code.co_name}:{e}" # type: ignore
print(err)
sentry_sdk.capture_exception(e)
# todo: delete from Vercel KV to fully make the coure not exist. Last db to delete from (as of now, Aug 15)
def delete_data(self, course_name: str, s3_path: str, source_url: str):
"""Delete file from S3, Qdrant, and Supabase."""
print(f"Deleting {s3_path} from S3, Qdrant, and Supabase for course {course_name}")
# add delete from doc map logic here
try:
# Delete file from S3
bucket_name = os.getenv('S3_BUCKET_NAME')
# Delete files by S3 path
if s3_path:
try:
self.s3_client.delete_object(Bucket=bucket_name, Key=s3_path)
except Exception as e:
print("Error in deleting file from s3:", e)
sentry_sdk.capture_exception(e)
# Delete from Qdrant
# docs for nested keys: https://qdrant.tech/documentation/concepts/filtering/#nested-key
# Qdrant "points" look like this: Record(id='000295ca-bd28-ac4a-6f8d-c245f7377f90', payload={'metadata': {'course_name': 'zotero-extreme', 'pagenumber_or_timestamp': 15, 'readable_filename': 'Dunlosky et al. - 2013 - Improving Students’ Learning With Effective Learni.pdf', 's3_path': 'courses/zotero-extreme/Dunlosky et al. - 2013 - Improving Students’ Learning With Effective Learni.pdf'}, 'page_content': '18 \nDunlosky et al.\n3.3 Effects in representative educational contexts. Sev-\neral of the large summarization-training studies have been \nconducted in regular classrooms, indicating the feasibility of \ndoing so. For example, the study by A. King (1992) took place \nin the context of a remedial study-skills course for undergrad-\nuates, and the study by Rinehart et al. (1986) took place in \nsixth-grade classrooms, with the instruction led by students \nregular teachers. In these and other cases, students benefited \nfrom the classroom training. We suspect it may actually be \nmore feasible to conduct these kinds of training ...
try:
self.qdrant_client.delete(
collection_name=os.environ['QDRANT_COLLECTION_NAME'],
points_selector=models.Filter(must=[
models.FieldCondition(
key="s3_path",
match=models.MatchValue(value=s3_path),
),
]),
)
except Exception as e:
if "timed out" in str(e):
# Timed out is fine. Still deletes.
# https://github.com/qdrant/qdrant/issues/3654#issuecomment-1955074525
pass
else:
print("Error in deleting file from Qdrant:", e)
sentry_sdk.capture_exception(e)
try:
# delete from Nomic
response = self.supabase_client.from_(
os.environ['NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE']).select("id, s3_path, contexts").eq(
's3_path', s3_path).eq('course_name', course_name).execute()
data = response.data[0] #single record fetched
nomic_ids_to_delete = []
context_count = len(data['contexts'])
for i in range(1, context_count + 1):
nomic_ids_to_delete.append(str(data['id']) + "_" + str(i))
# delete from Nomic
res = delete_from_document_map(course_name, nomic_ids_to_delete)
except Exception as e:
print("Error in deleting file from Nomic:", e)
sentry_sdk.capture_exception(e)
try:
self.supabase_client.from_(os.environ['NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE']).delete().eq(
's3_path', s3_path).eq('course_name', course_name).execute()
except Exception as e:
print("Error in deleting file from supabase:", e)
sentry_sdk.capture_exception(e)
# Delete files by their URL identifier
elif source_url:
try:
# Delete from Qdrant
self.qdrant_client.delete(
collection_name=os.environ['QDRANT_COLLECTION_NAME'],
points_selector=models.Filter(must=[
models.FieldCondition(
key="url",
match=models.MatchValue(value=source_url),
),
]),
)
except Exception as e:
if "timed out" in str(e):
# Timed out is fine. Still deletes.
# https://github.com/qdrant/qdrant/issues/3654#issuecomment-1955074525
pass
else:
print("Error in deleting file from Qdrant:", e)
sentry_sdk.capture_exception(e)
try:
# delete from Nomic
response = self.supabase_client.from_(
os.environ['NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE']).select("id, url, contexts").eq(
'url', source_url).eq('course_name', course_name).execute()
data = response.data[0] #single record fetched
nomic_ids_to_delete = []
context_count = len(data['contexts'])
for i in range(1, context_count + 1):
nomic_ids_to_delete.append(str(data['id']) + "_" + str(i))
# delete from Nomic
res = delete_from_document_map(course_name, nomic_ids_to_delete)
except Exception as e:
print("Error in deleting file from Nomic:", e)
sentry_sdk.capture_exception(e)
try:
# delete from Supabase
self.supabase_client.from_(os.environ['NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE']).delete().eq(
'url', source_url).eq('course_name', course_name).execute()
except Exception as e:
print("Error in deleting file from supabase:", e)
sentry_sdk.capture_exception(e)
# Delete from Supabase
return "Success"
except Exception as e:
err: str = f"ERROR IN delete_data: Traceback: {traceback.extract_tb(e.__traceback__)}❌❌ Error in {inspect.currentframe().f_code.co_name}:{e}" # type: ignore
print(err)
sentry_sdk.capture_exception(e)
return err
def getAll(
self,
course_name: str,
):
"""Get all course materials based on course name.
Args:
course_name (as uploaded on supabase)
Returns:
list of dictionaries with distinct s3 path, readable_filename and course_name, url, base_url.
"""
response = self.supabase_client.table(os.environ['NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE']).select(
'course_name, s3_path, readable_filename, url, base_url').eq('course_name', course_name).execute()
data = response.data
unique_combinations = set()
distinct_dicts = []
for item in data:
combination = (item['s3_path'], item['readable_filename'], item['course_name'], item['url'], item['base_url'])
if combination not in unique_combinations:
unique_combinations.add(combination)
distinct_dicts.append(item)
return distinct_dicts
def vector_search(self, search_query, course_name):
top_n = 80
# EMBED
openai_start_time = time.monotonic()
o = OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE)
user_query_embedding = o.embed_query(search_query)
openai_embedding_latency = time.monotonic() - openai_start_time
# SEARCH
myfilter = models.Filter(must=[
models.FieldCondition(key='course_name', match=models.MatchValue(value=course_name)),
])
self.posthog.capture('distinct_id_of_the_user',
event='vector_search_invoked',
properties={
'user_query': search_query,
'course_name': course_name,
})
qdrant_start_time = time.monotonic()
search_results = self.qdrant_client.search(
collection_name=os.environ['QDRANT_COLLECTION_NAME'],
query_filter=myfilter,
with_vectors=False,
query_vector=user_query_embedding,
limit=top_n, # Return n closest points
# In a system with high disk latency, the re-scoring step may become a bottleneck: https://qdrant.tech/documentation/guides/quantization/
search_params=models.SearchParams(quantization=models.QuantizationSearchParams(rescore=False)))
found_docs: list[Document] = []
for d in search_results:
try:
metadata = d.payload
page_content = metadata['page_content']
del metadata['page_content']
if "pagenumber" not in metadata.keys() and "pagenumber_or_timestamp" in metadata.keys(): # type: ignore
# aiding in the database migration...
metadata["pagenumber"] = metadata["pagenumber_or_timestamp"] # type: ignore
found_docs.append(Document(page_content=page_content, metadata=metadata)) # type: ignore
except Exception as e:
print(f"Error in vector_search(), for course: `{course_name}`. Error: {e}")
sentry_sdk.capture_exception(e)
self.posthog.capture('distinct_id_of_the_user',
event='vector_search_succeded',
properties={
'user_query': search_query,
'course_name': course_name,
'qdrant_latency_sec': time.monotonic() - qdrant_start_time,
'openai_embedding_latency_sec': openai_embedding_latency
})
# print("found_docs", found_docs)
return found_docs
def getTopContexts(self, search_query: str, course_name: str, token_limit: int = 4_000) -> Union[List[Dict], str]:
"""Here's a summary of the work.
/GET arguments
course name (optional) str: A json response with TBD fields.
Returns
JSON: A json response with TBD fields. See main.py:getTopContexts docs.
or
String: An error message with traceback.
"""
try:
start_time_overall = time.monotonic()
found_docs: list[Document] = self.vector_search(search_query=search_query, course_name=course_name)
pre_prompt = "Please answer the following question. Use the context below, called your documents, only if it's helpful and don't use parts that are very irrelevant. It's good to quote from your documents directly, when you do always use Markdown footnotes for citations. Use react-markdown superscript to number the sources at the end of sentences (1, 2, 3...) and use react-markdown Footnotes to list the full document names for each number. Use ReactMarkdown aka 'react-markdown' formatting for super script citations, use semi-formal style. Feel free to say you don't know. \nHere's a few passages of the high quality documents:\n"
# count tokens at start and end, then also count each context.
token_counter, _ = count_tokens_and_cost(pre_prompt + '\n\nNow please respond to my query: ' +
search_query) # type: ignore
valid_docs = []
num_tokens = 0
for doc in found_docs:
doc_string = f"Document: {doc.metadata['readable_filename']}{', page: ' + str(doc.metadata['pagenumber']) if doc.metadata['pagenumber'] else ''}\n{str(doc.page_content)}\n"
num_tokens, prompt_cost = count_tokens_and_cost(doc_string) # type: ignore
print(
f"tokens used/limit: {token_counter}/{token_limit}, tokens in chunk: {num_tokens}, total prompt cost (of these contexts): {prompt_cost}. 📄 File: {doc.metadata['readable_filename']}"
)
if token_counter + num_tokens <= token_limit:
token_counter += num_tokens
valid_docs.append(doc)
else:
# filled our token size, time to return
break
print(f"Total tokens used: {token_counter}. Docs used: {len(valid_docs)} of {len(found_docs)} docs retrieved")
print(f"Course: {course_name} ||| search_query: {search_query}")
print(f"⏰ ^^ Runtime of getTopContexts: {(time.monotonic() - start_time_overall):.2f} seconds")
if len(valid_docs) == 0:
return []
self.posthog.capture('distinct_id_of_the_user',
event='success_get_top_contexts_OG',
properties={
'user_query': search_query,
'course_name': course_name,
'token_limit': token_limit,
'total_tokens_used': token_counter,
'total_contexts_used': len(valid_docs),
'total_unique_docs_retrieved': len(found_docs),
'getTopContext_total_latency_sec': time.monotonic() - start_time_overall,
})
return self.format_for_json(valid_docs)
except Exception as e:
# return full traceback to front end
err: str = f"ERROR: In /getTopContexts. Course: {course_name} ||| search_query: {search_query}\nTraceback: {traceback.extract_tb(e.__traceback__)}❌❌ Error in {inspect.currentframe().f_code.co_name}:\n{e}" # type: ignore
print(err)
sentry_sdk.capture_exception(e)
return err
def batch_vector_search(self, search_queries: List[str], course_name: str, top_n: int = 50):
"""
Perform a similarity search for all the generated queries at once.
"""
start_time = time.monotonic()
from qdrant_client.http import models as rest
o = OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE)
# Prepare the filter for the course name
myfilter = rest.Filter(must=[
rest.FieldCondition(key='course_name', match=rest.MatchValue(value=course_name)),
])
# Prepare the search requests
search_requests = []
for query in search_queries:
user_query_embedding = o.embed_query(query)
search_requests.append(
rest.SearchRequest(vector=user_query_embedding,
filter=myfilter,
limit=top_n,
with_payload=True,
params=models.SearchParams(quantization=models.QuantizationSearchParams(rescore=False))))
# Perform the batch search
search_results = self.qdrant_client.search_batch(
collection_name=os.environ['QDRANT_COLLECTION_NAME'],
requests=search_requests,
)
# process search results
found_docs: list[list[Document]] = []
for result in search_results:
docs = []
for doc in result:
try:
metadata = doc.payload
page_content = metadata['page_content']
del metadata['page_content']
if "pagenumber" not in metadata.keys() and "pagenumber_or_timestamp" in metadata.keys():
metadata["pagenumber"] = metadata["pagenumber_or_timestamp"]
docs.append(Document(page_content=page_content, metadata=metadata))
except Exception:
print(traceback.print_exc())
found_docs.append(docs)
print(f"⏰ Qdrant Batch Search runtime: {(time.monotonic() - start_time):.2f} seconds")
return found_docs
def reciprocal_rank_fusion(self, results: list[list], k=60):
"""
Since we have multiple queries, and n documents returned per query, we need to go through all the results
and collect the documents with the highest overall score, as scored by qdrant similarity matching.
"""
fused_scores = {}
count = 0
unique_count = 0
for docs in results:
# Assumes the docs are returned in sorted order of relevance
count += len(docs)
for rank, doc in enumerate(docs):
doc_str = dumps(doc)
if doc_str not in fused_scores:
fused_scores[doc_str] = 0
unique_count += 1
fused_scores[doc_str] += 1 / (rank + k)
# Uncomment for debugging
# previous_score = fused_scores[doc_str]
#print(f"Change score for doc: {doc_str}, previous score: {previous_score}, updated score: {fused_scores[doc_str]} ")
print(f"Total number of documents in rank fusion: {count}")
print(f"Total number of unique documents in rank fusion: {unique_count}")
reranked_results = [
(loads(doc), score) for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
]
return reranked_results
def getTopContextsWithMQR(self,
search_query: str,
course_name: str,
token_limit: int = 4_000) -> Union[List[Dict], str]:
"""
New info-retrieval pipeline that uses multi-query retrieval + filtering + reciprocal rank fusion + context padding.
1. Generate multiple queries based on the input search query.
2. Retrieve relevant docs for each query.
3. Filter the relevant docs based on the user query and pass them to the rank fusion step.
4. [CANCELED BEC POINTLESS] Rank the docs based on the relevance score.
5. Parent-doc-retrieval: Pad just the top 5 docs with expanded context from the original document.
"""
return 'fail'
# try:
# top_n_per_query = 40 # HARD CODE TO ENSURE WE HIT THE MAX TOKENS
# start_time_overall = time.monotonic()
# mq_start_time = time.monotonic()
# # 1. GENERATE MULTIPLE QUERIES
# generate_queries = (
# MULTI_QUERY_PROMPT | self.llm | StrOutputParser() | (lambda x: x.split("\n")) |
# (lambda x: list(filter(None, x))) # filter out non-empty strings
# )
# generated_queries = generate_queries.invoke({"original_query": search_query})
# print("generated_queries", generated_queries)
# # 2. VECTOR SEARCH FOR EACH QUERY
# batch_found_docs_nested: list[list[Document]] = self.batch_vector_search(search_queries=generated_queries,
# course_name=course_name,
# top_n=top_n_per_query)
# # 3. RANK REMAINING DOCUMENTS -- good for parent doc padding of top 5 at the end.
# found_docs = self.reciprocal_rank_fusion(batch_found_docs_nested)
# found_docs = [doc for doc, score in found_docs]
# print(f"Num docs after re-ranking: {len(found_docs)}")
# if len(found_docs) == 0:
# return []
# print(f"⏰ Total multi-query processing runtime: {(time.monotonic() - mq_start_time):.2f} seconds")
# # 4. FILTER DOCS
# filtered_docs = filter_top_contexts(contexts=found_docs, user_query=search_query, timeout=30, max_concurrency=180)
# if len(filtered_docs) == 0:
# return []
# # 5. TOP DOC CONTEXT PADDING // parent document retriever
# final_docs = context_parent_doc_padding(filtered_docs, search_query, course_name)
# print(f"Number of final docs after context padding: {len(final_docs)}")
# pre_prompt = "Please answer the following question. Use the context below, called your documents, only if it's helpful and don't use parts that are very irrelevant. It's good to quote from your documents directly, when you do always use Markdown footnotes for citations. Use react-markdown superscript to number the sources at the end of sentences (1, 2, 3...) and use react-markdown Footnotes to list the full document names for each number. Use ReactMarkdown aka 'react-markdown' formatting for super script citations, use semi-formal style. Feel free to say you don't know. \nHere's a few passages of the high quality documents:\n"
# token_counter, _ = count_tokens_and_cost(pre_prompt + '\n\nNow please respond to my query: ' +
# search_query) # type: ignore
# valid_docs = []
# num_tokens = 0
# for doc in final_docs:
# doc_string = f"Document: {doc['readable_filename']}{', page: ' + str(doc['pagenumber']) if doc['pagenumber'] else ''}\n{str(doc['text'])}\n"
# num_tokens, prompt_cost = count_tokens_and_cost(doc_string) # type: ignore
# print(f"token_counter: {token_counter}, num_tokens: {num_tokens}, max_tokens: {token_limit}")
# if token_counter + num_tokens <= token_limit:
# token_counter += num_tokens
# valid_docs.append(doc)
# else:
# # filled our token size, time to return
# break
# print(f"Total tokens used: {token_counter} Used {len(valid_docs)} of total unique docs {len(found_docs)}.")
# print(f"Course: {course_name} ||| search_query: {search_query}")
# print(f"⏰ ^^ Runtime of getTopContextsWithMQR: {(time.monotonic() - start_time_overall):.2f} seconds")
# if len(valid_docs) == 0:
# return []
# self.posthog.capture('distinct_id_of_the_user',
# event='filter_top_contexts_succeeded',
# properties={
# 'user_query': search_query,
# 'course_name': course_name,
# 'token_limit': token_limit,
# 'total_tokens_used': token_counter,
# 'total_contexts_used': len(valid_docs),
# 'total_unique_docs_retrieved': len(found_docs),
# })
# return self.format_for_json_mqr(valid_docs)
# except Exception as e:
# # return full traceback to front end
# err: str = f"ERROR: In /getTopContextsWithMQR. Course: {course_name} ||| search_query: {search_query}\nTraceback: {traceback.format_exc()}❌❌ Error in {inspect.currentframe().f_code.co_name}:\n{e}" # type: ignore
# print(err)
# sentry_sdk.capture_exception(e)
# return err
def format_for_json_mqr(self, found_docs) -> List[Dict]:
"""
Same as format_for_json, but for the new MQR pipeline.
"""
for found_doc in found_docs:
if "pagenumber" not in found_doc.keys():
print("found no pagenumber")
found_doc['pagenumber'] = found_doc['pagenumber_or_timestamp']
contexts = [
{
'text': doc['text'],
'readable_filename': doc['readable_filename'],
'course_name ': doc['course_name'],
's3_path': doc['s3_path'],
'pagenumber': doc['pagenumber'],
'url': doc['url'], # wouldn't this error out?
'base_url': doc['base_url'],
} for doc in found_docs
]
return contexts
def get_context_stuffed_prompt(self, user_question: str, course_name: str, top_n: int, top_k_to_search: int) -> str:
"""
Get a stuffed prompt for a given user question and course name.
Args:
user_question (str)
course_name (str) : used for metadata filtering
Returns : str
a very long "stuffed prompt" with question + summaries of top_n most relevant documents.
"""
# MMR with metadata filtering based on course_name
vec_start_time = time.monotonic()
found_docs = self.vectorstore.max_marginal_relevance_search(user_question, k=top_n, fetch_k=top_k_to_search)
print(
f"⏰ MMR Search runtime (top_n_to_keep: {top_n}, top_k_to_search: {top_k_to_search}): {(time.monotonic() - vec_start_time):.2f} seconds"
)
requests = []
for doc in found_docs:
print("doc", doc)
dictionary = {
"model": "gpt-3.5-turbo",
"messages": [{
"role":
"system",
"content":
"You are a factual summarizer of partial documents. Stick to the facts (including partial info when necessary to avoid making up potentially incorrect details), and say I don't know when necessary."
}, {
"role":
"user",
"content":
f"Provide a comprehensive summary of the given text, based on this question:\n{doc.page_content}\nQuestion: {user_question}\nThe summary should cover all the key points that are relevant to the question, while also condensing the information into a concise format. The length of the summary should be as short as possible, without losing relevant information.\nMake use of direct quotes from the text.\nFeel free to include references, sentence fragments, keywords or anything that could help someone learn about it, only as it relates to the given question.\nIf the text does not provide information to answer the question, please write 'None' and nothing else.",
}],
"n": 1,
"max_tokens": 600,
"metadata": doc.metadata
}
requests.append(dictionary)
oai = OpenAIAPIProcessor(
input_prompts_list=requests,
request_url='https://api.openai.com/v1/chat/completions',
api_key=os.getenv("OPENAI_API_KEY"),
max_requests_per_minute=1500,
max_tokens_per_minute=90000,
token_encoding_name='cl100k_base', # nosec -- reasonable bandit error suppression
max_attempts=5,
logging_level=20)
chain_start_time = time.monotonic()
asyncio.run(oai.process_api_requests_from_file())
results: list[str] = oai.results
print(f"⏰ EXTREME context stuffing runtime: {(time.monotonic() - chain_start_time):.2f} seconds")
print(f"Cleaned results: {oai.cleaned_results}")
all_texts = ""
separator = '---' # between each context
token_counter = 0 #keeps track of tokens in each summarization
max_tokens = 7_500 #limit, will keep adding text to string until 8000 tokens reached.
for i, text in enumerate(oai.cleaned_results):
if text.lower().startswith('none') or text.lower().endswith('none.') or text.lower().endswith('none'):
# no useful text, it replied with a summary of "None"
continue
if text is not None:
if "pagenumber" not in results[i][-1].keys(): # type: ignore
results[i][-1]['pagenumber'] = results[i][-1].get('pagenumber_or_timestamp') # type: ignore
num_tokens, prompt_cost = count_tokens_and_cost(text) # type: ignore
if token_counter + num_tokens > max_tokens:
print(f"Total tokens yet in loop {i} is {num_tokens}")
break # Stop building the string if it exceeds the maximum number of tokens
token_counter += num_tokens
filename = str(results[i][-1].get('readable_filename', '')) # type: ignore
pagenumber_or_timestamp = str(results[i][-1].get('pagenumber', '')) # type: ignore
pagenumber = f", page: {pagenumber_or_timestamp}" if pagenumber_or_timestamp else ''
doc = f"Document : filename: {filename}" + pagenumber
summary = f"\nSummary: {text}"
all_texts += doc + summary + '\n' + separator + '\n'
stuffed_prompt = """Please answer the following question.
Use the context below, called 'your documents', only if it's helpful and don't use parts that are very irrelevant.
It's good to quote 'your documents' directly using informal citations, like "in document X it says Y". Try to avoid giving false or misleading information. Feel free to say you don't know.
Try to be helpful, polite, honest, sophisticated, emotionally aware, and humble-but-knowledgeable.
That said, be practical and really do your best, and don't let caution get too much in the way of being useful.
To help answer the question, here's a few passages of high quality documents:\n{all_texts}
Now please respond to my question: {user_question}"""
# "Please answer the following question. It's good to quote 'your documents' directly, something like 'from ABS source it says XYZ' Feel free to say you don't know. \nHere's a few passages of the high quality 'your documents':\n"
return stuffed_prompt
def get_stuffed_prompt(self, search_query: str, course_name: str, token_limit: int = 7_000) -> str:
"""
Returns
String: A fully formatted prompt string.
"""
try:
top_n = 90
start_time_overall = time.monotonic()
o = OpenAIEmbeddings(openai_api_type=OPENAI_API_TYPE)
user_query_embedding = o.embed_documents(search_query)[0] # type: ignore
myfilter = models.Filter(must=[
models.FieldCondition(key='course_name', match=models.MatchValue(value=course_name)),
])
found_docs = self.qdrant_client.search(
collection_name=os.environ['QDRANT_COLLECTION_NAME'],
query_filter=myfilter,
with_vectors=False,
query_vector=user_query_embedding,
limit=top_n # Return 5 closest points
)
print("Search results: ", found_docs)
if len(found_docs) == 0:
return search_query
pre_prompt = "Please answer the following question. Use the context below, called your documents, only if it's helpful and don't use parts that are very irrelevant. It's good to quote from your documents directly, when you do always use Markdown footnotes for citations. Use react-markdown superscript to number the sources at the end of sentences (1, 2, 3...) and use react-markdown Footnotes to list the full document names for each number. Use ReactMarkdown aka 'react-markdown' formatting for super script citations, use semi-formal style. Feel free to say you don't know. \nHere's a few passages of the high quality documents:\n"
# count tokens at start and end, then also count each context.
token_counter, _ = count_tokens_and_cost(pre_prompt + '\n\nNow please respond to my query: ' +
search_query) # type: ignore
valid_docs = []
for d in found_docs:
if d.payload is not None:
if "pagenumber" not in d.payload.keys():
d.payload["pagenumber"] = d.payload["pagenumber_or_timestamp"]
doc_string = f"---\nDocument: {d.payload['readable_filename']}{', page: ' + str(d.payload['pagenumber']) if d.payload['pagenumber'] else ''}\n{d.payload.get('page_content')}\n"
num_tokens, prompt_cost = count_tokens_and_cost(doc_string) # type: ignore
# print(f"Page: {d.payload.get('page_content', ' '*100)[:100]}...")
print(
f"tokens used/limit: {token_counter}/{token_limit}, tokens in chunk: {num_tokens}, prompt cost of chunk: {prompt_cost}. 📄 File: {d.payload.get('readable_filename', '')}"
)
if token_counter + num_tokens <= token_limit:
token_counter += num_tokens
valid_docs.append(
Document(page_content=d.payload.get('page_content', '<Missing page content>'), metadata=d.payload))
else:
continue
# Convert the valid_docs to full prompt
separator = '---\n' # between each context
context_text = separator.join(
f"Document: {d.metadata['readable_filename']}{', page: ' + str(d.metadata['pagenumber']) if d.metadata['pagenumber'] else ''}\n{d.page_content}\n"
for d in valid_docs)
# Create the stuffedPrompt
stuffedPrompt = (pre_prompt + context_text + '\n\nNow please respond to my query: ' + search_query)
TOTAL_num_tokens, prompt_cost = count_tokens_and_cost(stuffedPrompt, openai_model_name='gpt-4') # type: ignore
print(f"Total tokens: {TOTAL_num_tokens}, prompt_cost: {prompt_cost}")
print("total docs: ", len(found_docs))
print("num docs used: ", len(valid_docs))
print(f"⏰ ^^ Runtime of getTopContexts: {(time.monotonic() - start_time_overall):.2f} seconds")
return stuffedPrompt
except Exception as e:
# return full traceback to front end
err: str = f"Traceback: {traceback.extract_tb(e.__traceback__)}❌❌ Error in {inspect.currentframe().f_code.co_name}:{e}" # type: ignore
print(err)
sentry_sdk.capture_exception(e)
return err
def format_for_json(self, found_docs: List[Document]) -> List[Dict]:
"""Formatting only.
{'course_name': course_name, 'contexts': [{'source_name': 'Lumetta_notes', 'source_location': 'pg. 19', 'text': 'In FSM, we do this...'}, {'source_name': 'Lumetta_notes', 'source_location': 'pg. 20', 'text': 'In Assembly language, the code does that...'},]}
Args:
found_docs (List[Document]): _description_
Raises:
Exception: _description_
Returns:
List[Dict]: _description_
"""
for found_doc in found_docs:
if "pagenumber" not in found_doc.metadata.keys():
print("found no pagenumber")
found_doc.metadata['pagenumber'] = found_doc.metadata['pagenumber_or_timestamp']
contexts = [
{
'text': doc.page_content,
'readable_filename': doc.metadata['readable_filename'],
'course_name ': doc.metadata['course_name'],
's3_path': doc.metadata['s3_path'],
'pagenumber': doc.metadata['pagenumber'], # this because vector db schema is older...
# OPTIONAL PARAMS...
'url': doc.metadata.get('url'), # wouldn't this error out?
'base_url': doc.metadata.get('base_url'),
} for doc in found_docs
]
return contexts
def check_for_duplicates(self, texts: List[Dict], metadatas: List[Dict[str, Any]]) -> bool:
"""
For given metadata, fetch docs from Supabase based on S3 path or URL.
If docs exists, concatenate the texts and compare with current texts, if same, return True.
"""
doc_table = os.getenv('NEW_NEW_NEWNEW_MATERIALS_SUPABASE_TABLE', '')
course_name = metadatas[0]['course_name']
incoming_s3_path = metadatas[0]['s3_path']
url = metadatas[0]['url']
original_filename = incoming_s3_path.split('/')[-1][37:] # remove the 37-char uuid prefix
# check if uuid exists in s3_path -- not all s3_paths have uuids!
incoming_filename = incoming_s3_path.split('/')[-1]
pattern = re.compile(r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}',
re.I) # uuid V4 pattern, and v4 only.
if bool(pattern.search(incoming_filename)):
# uuid pattern exists -- remove the uuid and proceed with duplicate checking
original_filename = incoming_filename[37:]
else:
# do not remove anything and proceed with duplicate checking
original_filename = incoming_filename
if incoming_s3_path:
filename = incoming_s3_path
supabase_contents = self.supabase_client.table(doc_table).select('id', 'contexts', 's3_path').eq(
'course_name', course_name).like('s3_path', '%' + original_filename + '%').order('id', desc=True).execute()
supabase_contents = supabase_contents.data
elif url:
filename = url
supabase_contents = self.supabase_client.table(doc_table).select('id', 'contexts', 's3_path').eq(
'course_name', course_name).eq('url', url).order('id', desc=True).execute()
supabase_contents = supabase_contents.data
else:
filename = None
supabase_contents = []
supabase_whole_text = ""
if len(supabase_contents) > 0: # if a doc with same filename exists in Supabase
# concatenate texts
supabase_contexts = supabase_contents[0]
for text in supabase_contexts['contexts']:
supabase_whole_text += text['text']
current_whole_text = ""
for text in texts:
current_whole_text += text['input']
if supabase_whole_text == current_whole_text: # matches the previous file
print(f"Duplicate ingested! 📄 s3_path: {filename}.")
return True
else: # the file is updated
print(f"Updated file detected! Same filename, new contents. 📄 s3_path: {filename}")
# call the delete function on older docs
for content in supabase_contents:
print("older s3_path to be deleted: ", content['s3_path'])
delete_status = self.delete_data(course_name, content['s3_path'], '')
print("delete_status: ", delete_status)
return False
else: # filename does not already exist in Supabase, so its a brand new file
print(f"NOT a duplicate! 📄s3_path: {filename}")
return False