Browse Source

commit 批量解析接口

Hai Lin 1 week ago
parent
commit
70ebbd005c
1 changed files with 165 additions and 0 deletions
  1. 165 0
      app.py

+ 165 - 0
app.py

@@ -716,6 +716,171 @@ def parse_pdf(pdf_id):
 
     return jsonify({"success": True, "message": "PDF解析已开始,将在后台执行"})
 
+@app.route('/manager/batch_ai_parse', methods=['GET'])
+def batch_ai_parse():
+    """Batch AI parse for unprocessed records."""
+    if 'user_id' not in session:
+        return jsonify({"success": False, "message": "Unauthorized"}), 401
+    
+    # Start background thread
+    thread = threading.Thread(target=batch_ai_parse_async)
+    thread.daemon = True
+    thread.start()
+    return jsonify({"success": True, "message": "批量AI解析已开始,请稍候查看结果"})
+
+def batch_ai_parse_async():
+    """Background task to batch AI parse unprocessed records."""
+    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+    print(f"[{timestamp}] [Batch AI Parse] Starting batch AI parse task...")
+    
+    # Get unprocessed records (ai_status = 0)
+    conn = None
+    unprocessed_records = []
+    
+    try:
+        conn = get_db_connection()
+        with conn.cursor() as cursor:
+            cursor.execute("SELECT id, oss_url FROM genealogy_records WHERE ai_status = 0 order by page_number")
+            unprocessed_records = cursor.fetchall()
+        conn.close()
+        conn = None
+        
+        total_records = len(unprocessed_records)
+        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+        print(f"[{timestamp}] [Batch AI Parse] Found {total_records} unprocessed records")
+        
+        if total_records == 0:
+            timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+            print(f"[{timestamp}] [Batch AI Parse] No unprocessed records found")
+            return
+        
+        # Control concurrency to 5
+        max_concurrency = 5
+        semaphore = threading.Semaphore(max_concurrency)
+        threads = []
+        
+        def process_record(record):
+            """Process a single record with semaphore."""
+            with semaphore:
+                try:
+                    record_id = record['id']
+                    image_url = record['oss_url']
+                    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+                    print(f"[{timestamp}] [Batch AI Parse] Processing record {record_id}")
+                    process_ai_task(record_id, image_url)
+                except Exception as e:
+                    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+                    print(f"[{timestamp}] [Batch AI Parse] Error processing record {record['id']}: {e}")
+                    # If failed, we'll handle it in the next batch
+        
+        # Start threads for each record
+        for record in unprocessed_records:
+            thread = threading.Thread(target=process_record, args=(record,))
+            thread.daemon = True
+            thread.start()
+            threads.append(thread)
+        
+        # Wait for all threads to complete
+        for thread in threads:
+            thread.join()
+        
+        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+        print(f"[{timestamp}] [Batch AI Parse] Batch processing completed. Processed {total_records} records")
+        
+        # Check for failed records and restart them
+        check_failed_records()
+        
+    except Exception as e:
+        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+        print(f"[{timestamp}] [Batch AI Parse] Error: {e}")
+    finally:
+        if conn:
+            try:
+                conn.close()
+            except:
+                pass
+
+def check_failed_records():
+    """Check for failed records and restart them."""
+    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+    print(f"[{timestamp}] [Batch AI Parse] Checking for failed records...")
+    
+    conn = None
+    failed_records = []
+    
+    try:
+        conn = get_db_connection()
+        with conn.cursor() as cursor:
+            cursor.execute("SELECT id, oss_url FROM genealogy_records WHERE ai_status = 3")
+            failed_records = cursor.fetchall()
+        conn.close()
+        conn = None
+        
+        total_failed = len(failed_records)
+        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+        print(f"[{timestamp}] [Batch AI Parse] Found {total_failed} failed records")
+        
+        if total_failed == 0:
+            timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+            print(f"[{timestamp}] [Batch AI Parse] No failed records found")
+            return
+        
+        # Control concurrency to 5 for failed records
+        max_concurrency = 5
+        semaphore = threading.Semaphore(max_concurrency)
+        threads = []
+        
+        def process_failed_record(record):
+            """Process a failed record with semaphore."""
+            with semaphore:
+                retry_conn = None
+                try:
+                    record_id = record['id']
+                    image_url = record['oss_url']
+                    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+                    print(f"[{timestamp}] [Batch AI Parse] Retrying failed record {record_id}")
+                    # Reset status to processing
+                    retry_conn = get_db_connection()
+                    with retry_conn.cursor() as cursor:
+                        cursor.execute("UPDATE genealogy_records SET ai_status = 1 WHERE id = %s", (record_id,))
+                    retry_conn.commit()
+                    retry_conn.close()
+                    retry_conn = None
+                    process_ai_task(record_id, image_url)
+                except Exception as e:
+                    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+                    print(f"[{timestamp}] [Batch AI Parse] Error retrying record {record['id']}: {e}")
+                finally:
+                    if retry_conn:
+                        try:
+                            retry_conn.close()
+                        except:
+                            pass
+        
+        # Start threads for each failed record
+        for record in failed_records:
+            thread = threading.Thread(target=process_failed_record, args=(record,))
+            thread.daemon = True
+            thread.start()
+            threads.append(thread)
+        
+        # Wait for all threads to complete
+        for thread in threads:
+            thread.join()
+        
+        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+        print(f"[{timestamp}] [Batch AI Parse] Retry processing completed. Retried {total_failed} failed records")
+        
+    except Exception as e:
+        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+        print(f"[{timestamp}] [Batch AI Parse] Error checking failed records: {e}")
+    finally:
+        if conn:
+            try:
+                conn.close()
+            except:
+                pass
+
 @app.route('/manager/delete_pdf/<int:pdf_id>', methods=['POST'])
 def delete_pdf(pdf_id):
     if 'user_id' not in session: