Coverage for app/main/routes.py: 84%
154 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-20 21:23 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-20 21:23 +0000
1from io import BytesIO
2from flask import jsonify, render_template, redirect, send_file, url_for, request, session
3from app.main import bp
4from app.models import User
5from app import db
6from flask import current_app
7from app.models import Document
9from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
10from langchain_core.messages import HumanMessage, AIMessage
11from langchain_ollama.llms import OllamaLLM
12from langchain_community.chat_message_histories import ChatMessageHistory
14import ollama
15from ollama import chat
16from ollama import ChatResponse
17from ollama import Client
19from app.main.forms import LoginForm, PDFUploadForm
20from app.doc_parsers.process_doc import process_doc
21from app.doc_indexer.retrieve_document import query_database
22from sqlalchemy import delete
23from sqlalchemy.exc import SQLAlchemyError
26from flask_login import login_required, current_user, logout_user, login_user
28"""
29Places for routes in the backend
30"""
32llm = OllamaLLM(model="llama3.2", base_url="http://ollama:11434")
35@bp.route("/", methods=["GET", "POST"])
36@bp.route("/index", methods=["GET", "POST"])
37def index():
38 """
39 Author: Troy Witmer
40 Date: 02/06/2025
41 Description: Sample route, currently index endpoint.
43 Author: Justin Truong
44 Date: 02/12/2025
45 Description: Added a admin login.
47 """
48 # fetch all document from database
49 documents = db.session.query(Document).all()
51 form = LoginForm()
52 # Check for correct password/username
53 if form.validate_on_submit():
55 user = User.query.filter_by(username=form.username.data).first()
56 if user and user.check_password(form.password.data):
57 login_user(user)
58 # Render admin page if login is successful
59 return redirect(url_for("main.admin"))
61 else:
62 # return error to index page
63 return render_template(
64 "main/index.html", form=form, error="Invalid username or password", documents=documents
65 )
66 # Pass the forms here.
67 return render_template("main/index.html", form=form, documents=documents)
70@bp.route("/admin", methods=["GET", "POST"])
71@login_required
72def admin():
73 """
74 Direct to the admin dashboard with List document UI
76 """
78 # how to make a simple query
79 user = User.query.filter_by(username="admin").first()
80 if not user:
81 user = User(username="admin")
82 user.set_password("password")
83 db.session.add(user)
84 db.session.commit()
86 form = PDFUploadForm()
87 # fetch all document from database
88 documents = db.session.query(Document).all()
90 if request.method == "POST": # Handle form submission
91 if form.validate_on_submit():
92 uploaded_file = form.pdf_file.data
94 # Check if a file was uploaded
95 if not uploaded_file:
96 return jsonify({"error": "No file uploaded"}), 400
98 # Check if the uploaded file is a PDF (MIME type and file extension)
99 if (
100 uploaded_file.mimetype != "application/pdf"
101 or not uploaded_file.filename.lower().endswith(".pdf")
102 ):
103 return (
104 jsonify({"error": "Invalid file type. Only PDFs are allowed."}),
105 400,
106 )
108 # Extract file name and type
109 file_name = uploaded_file.filename.rsplit(".", 1)[0] # Name without extension
110 file_type = uploaded_file.filename.rsplit(".", 1)[-1] # File extension (should be 'pdf')
112 # Check if a document with the same name and type already exists
113 existing_document = (
114 db.session.query(Document)
115 .filter_by(document_name=file_name, document_type=file_type)
116 .first()
117 )
119 if existing_document:
120 return (
121 jsonify({
122 "error": f"A document named '{uploaded_file.filename}' already exists."
123 }),
124 409,
125 )
127 # Create new document instance
128 new_document = Document(
129 document_name=file_name,
130 document_type=file_type,
131 file_contents=uploaded_file.read(), # Store binary PDF data
132 )
134 # Storing the document into the database
135 db.session.add(new_document)
136 db.session.commit()
137 # Process the upload doc to the parser and index
138 process_doc(new_document)
140 return (
141 jsonify({
142 "message": f"File '{uploaded_file.filename}' uploaded successfully!", "document": {
143 "id": new_document.id,
144 "name": file_name,
145 "type": file_type,
146 "size": len(new_document.file_contents)
147 }
148 }),
149 200,
150 )
151 else:
152 return (
153 jsonify({
154 "error": "Invalid form data. Please ensure all fields are filled correctly."
155 }),
156 400,
157 )
159 documents = db.session.query(Document).all()
160 return render_template("main/admin.html", user=current_user, documents=documents, upload_form=form)
163@bp.route("/delete/<int:item_id>", methods=["DELETE"])
164def delete_item(item_id):
165 """
167 Deletes a document and its associated vector embeddings from the database.
169 This endpoint:
170 - Deletes the document with the given `item_id`.
171 - Removes related embeddings from the `EmbeddingStore`, where the `id` contains `document_name` (case-sensitive).
172 - Ensures transactional integrity by rolling back in case of failure.
174 Args:
175 item_id (int): The unique identifier of the document to delete.
177 Returns:
178 Response (JSON): A success message if deletion is successful,
179 or an error message with appropriate HTTP status codes.
180 """
181 try:
182 # Retrieve the document by ID
183 document = db.session.query(Document).get(item_id)
185 if not document:
186 return (
187 jsonify({"success": False, "message": f"Item {item_id} not found"}),
188 404,
189 )
191 vector_db = current_app.vector_db
193 # Delete associated embeddings (case-sensitive match)
194 db.session.execute(
195 delete(vector_db.EmbeddingStore).where(
196 vector_db.EmbeddingStore.id.like(
197 f"%{document.document_name}.{document.document_type}%"
198 )
199 )
200 )
201 # Delete the document itself
202 db.session.delete(document)
204 # Commit the transaction
205 db.session.commit()
207 return (
208 jsonify(
209 {"success": True, "message": f"Item {item_id} deleted successfully"}
210 ),
211 200,
212 )
214 except Exception as e:
215 db.session.rollback() # Rollback changes on failure
216 return jsonify({'success': False, 'message': 'Failed to delete item', 'error': str(e)}), 500
218@bp.route("/download/<int:item_id>", methods=["GET"])
219def download_document(item_id):
220 """
221 Downloads a document from the database.
223 This endpoint:
224 - Fetches the document with the given `item_id`.
225 - Gets the binary file content for the PDF file.
226 - Uses the document_type and the document_name to get the full file name.
228 Args:
229 item_id (int): The unique identifier of the document to download.
231 Returns:
232 Response (File): The PDF file,
233 or an error if the document is not found/the download fails.
234 """
235 try:
236 # Retrieve the document by ID
237 document = db.session.query(Document).get(item_id)
239 # Send an error if the document could not be found
240 if not document:
241 return jsonify({'success': False, 'message': f'Item {item_id} not found'}), 404
243 # Gets the fullname by combining the name and the type
244 filename = f"{document.document_name}.{document.document_type}"
246 # Sends the document with the proper name and the content of the file for download
247 return send_file(
248 BytesIO(document.file_contents),
249 mimetype="application/pdf",
250 download_name=filename,
251 as_attachment=True
252 )
254 except Exception as e:
255 return jsonify({'success': False, 'message': 'Failed to download document', 'error': str(e)}), 500
259@bp.route("/test", methods=["GET"])
260def test():
261 """
262 A route to test the flask and react connection and database query for admin.
263 """
264 # Once I log in as an admin, the user (admin) should be returned
265 user = User.query.filter_by(username="admin").first()
266 if user:
267 return jsonify({"message": f"Hello: {user.username}"}), 200
268 else:
269 return jsonify({"message": "No one is here :()."}), 200
271"""
272@bp.route("/upload", methods=["GET", "POST"])
273@login_required # Ensure user is logged in to access this route
274def upload_pdf():
276 form = PDFUploadForm()
278 if request.method == "POST": # Handle form submission
279 if form.validate_on_submit():
280 uploaded_file = form.pdf_file.data
282 # Check if a file was uploaded
283 if not uploaded_file:
284 return jsonify({"error": "No file uploaded"}), 400
286 # Check if the uploaded file is a PDF (MIME type and file extension)
287 if (
288 uploaded_file.mimetype != "application/pdf"
289 or not uploaded_file.filename.lower().endswith(".pdf")
290 ):
291 return (
292 jsonify({"error": "Invalid file type. Only PDFs are allowed."}),
293 400,
294 )
296 # Extract file name and type
297 file_name = uploaded_file.filename.rsplit(".", 1)[
298 0
299 ] # Name without extension
300 file_type = uploaded_file.filename.rsplit(".", 1)[
301 -1
302 ] # File extension (should be 'pdf')
304 # Check if a document with the same name and type already exists
305 existing_document = (
306 db.session.query(Document)
307 .filter_by(document_name=file_name, document_type=file_type)
308 .first()
309 )
311 if existing_document:
312 return (
313 jsonify(
314 {
315 "error": f"A document named '{uploaded_file.filename}' already exists."
316 }
317 ),
318 409,
319 )
321 # Create new document instance
322 new_document = Document(
323 document_name=file_name,
324 document_type=file_type,
325 file_contents=uploaded_file.read(), # Store binary PDF data
326 )
328 # Storing the document into the database
329 db.session.add(new_document)
330 db.session.commit()
331 # Process the upload doc to the parser and index
332 process_doc(new_document)
334 return (
335 jsonify(
336 {
337 "message": f"File '{uploaded_file.filename}' uploaded successfully!"
338 }
339 ),
340 200,
341 )
343 else:
344 return (
345 jsonify(
346 {
347 "error": "Invalid form data. Please ensure all fields are filled correctly."
348 }
349 ),
350 400,
351 )
353 # If it's a GET request, render the upload.html template
354 return render_template("main/upload.html", form=form)
356 """
358@bp.route("/chat", methods=["POST"])
359def chat_message():
360 try:
361 data = request.get_json()
363 if not data or "message" not in data:
364 return jsonify({"error": "Message is required"}), 400
366 if not data or "conversationHistory" not in data:
367 return jsonify({"error": "conversationHistory is required"}), 400
369 user_message = data["message"]
371 history = ChatMessageHistory()
373 for chat in data["conversationHistory"]:
374 if chat["sender"] == "User":
375 history.add_user_message(chat["text"])
376 elif chat["sender"] == "Chatbot":
377 history.add_ai_message(chat["text"])
378 print("Chat History:", history.messages, flush=True)
380 # Getting the documentation (chunks) based on the query
381 Documents = query_database(user_message)
383 # Mock the scores only if in testing mode
384 if current_app.config.get("TESTING", False):
385 Documents = [(doc, 0.9) for doc, _ in Documents] # Override scores to 0.9
387 for doc, score in Documents:
388 print(f"Score: {score}")
389 print("---")
391 # Filter documents with similarity score ≥ 0.90
392 filtered_docs = [(doc, score) for doc, score in Documents if score >= 0.5]
394 # If no document meets the threshold, return a message to the frontend
395 if not filtered_docs:
396 return (
397 jsonify(
398 {
399 "response": "No document found",
400 "message": "No relevant information available.",
401 }
402 ),
403 200,
404 )
406 # Joining the filtered chunks together
407 context = "\n\n---\n\n".join([doc.page_content for doc, _ in filtered_docs])
409 # Using the LLM to generate a response based on the context and user message
410 # Defined prompt template that is used when sending the LLM each query, to help refine answers
411 prompt_template = ChatPromptTemplate.from_messages(
412 [
413 (
414 "system", # System message to set the context for the model
415 "You are a Retrieval Augmented Generation (RAG) model.\n"
416 "You have access to a large set of documents regarding various subjects in BioInformatics.\n"
417 "You are only to answer questions based on the provided context.\n"
418 "You are not allowed to make up information.\n"
419 "You are not allowed to answer questions that are not in the context.\n"
420 "If a question is not in the context, you should say 'I don't know'.\n"
421 "Please give all responses in markdown (.md) format.\n" # Markdown format for better readability
422 "---\n"
423 "Context:\n{context}\n" # Insert relevent documents as 'context'
424 "---",
425 ),
426 MessagesPlaceholder(variable_name="history"), # Insert conversation history
427 ("human", "{user_message}"), # Insert user query
428 ]
429 )
431 chain = prompt_template | llm
433 response = chain.invoke(
434 {
435 "context": context,
436 "history": history.messages,
437 "user_message": user_message,
438 }
439 )
441 # Print the filtered documents
442 print("Chunks:")
443 for doc, score in filtered_docs:
444 print(f"Document content: {doc.page_content}")
445 print(f"Score: {score}")
446 print("---")
448 print(f"Response: {response}", flush=True)
450 return jsonify({"response": response})
452 except Exception as e:
453 print(f"Error: {str(e)}", flush=True)
454 return jsonify({"error": f"An error occurred: {str(e)}"}), 500
457@bp.route("/logout")
458@login_required # Ensure user is logged in to access this route
459# Redirect to login page
460def logout():
461 logout_user() # Log out the current user
462 db.session.commit()
463 return redirect(url_for("main.index"))
466from app.doc_parsers.parse_pdf import DATA_PATH, load_documents
467from app.doc_parsers.parse_pdf import split_documents
468from app.doc_indexer.index_doc import index_and_add_to_db
469from app.doc_indexer.retrieve_document import query_database
472@bp.route("/test_indexing", methods=["GET"])
473def test_indexing():
474 documents = load_documents(DATA_PATH)
475 chunks = split_documents(documents)
476 index_and_add_to_db(chunks)
477 doc = query_database("cell cycle")
478 print(doc)
480 return {"awesome": "it works :)", "doc": f"{doc[0][0].page_content}"}, 200