mirror of
https://github.com/NVIDIA/dgx-spark-playbooks.git
synced 2026-04-23 02:23:53 +00:00
Add document tracking to prevent duplicates
This commit is contained in:
parent
97e4be5772
commit
6e90701a9b
@ -160,6 +160,16 @@ export async function POST(req: NextRequest) {
|
||||
// Store triples in the graph database
|
||||
await graphDbService.importTriples(validTriples);
|
||||
|
||||
// Mark document as processed if documentName is provided (only for ArangoDB)
|
||||
if (graphDbType === 'arangodb' && documentName) {
|
||||
try {
|
||||
await (graphDbService as any).markDocumentAsProcessed(documentName, validTriples.length);
|
||||
} catch (error) {
|
||||
console.error('Error marking document as processed:', error);
|
||||
// Don't fail the request if marking fails
|
||||
}
|
||||
}
|
||||
|
||||
// Return success response
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
|
||||
@ -9,6 +9,7 @@ export class ArangoDBService {
|
||||
private static instance: ArangoDBService;
|
||||
private collectionName: string = 'entities';
|
||||
private edgeCollectionName: string = 'relationships';
|
||||
private documentsCollectionName: string = 'processed_documents';
|
||||
|
||||
private constructor() {}
|
||||
|
||||
@ -75,6 +76,16 @@ export class ArangoDBService {
|
||||
});
|
||||
}
|
||||
|
||||
// Create documents collection if it doesn't exist
|
||||
if (!collectionNames.includes(this.documentsCollectionName)) {
|
||||
await this.db.createCollection(this.documentsCollectionName);
|
||||
await this.db.collection(this.documentsCollectionName).ensureIndex({
|
||||
type: 'persistent',
|
||||
fields: ['documentName'],
|
||||
unique: true
|
||||
});
|
||||
}
|
||||
|
||||
console.log('ArangoDB initialized successfully');
|
||||
} catch (error) {
|
||||
console.error('Failed to initialize ArangoDB:', error);
|
||||
@ -250,6 +261,88 @@ export class ArangoDBService {
|
||||
return await collection.save({ name });
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a document has already been processed and stored in ArangoDB
|
||||
* @param documentName - Name of the document to check
|
||||
* @returns Promise resolving to true if document exists, false otherwise
|
||||
*/
|
||||
public async isDocumentProcessed(documentName: string): Promise<boolean> {
|
||||
if (!this.db) {
|
||||
throw new Error('ArangoDB connection not initialized. Call initialize() first.');
|
||||
}
|
||||
|
||||
try {
|
||||
const existing = await this.executeQuery(
|
||||
`FOR d IN ${this.documentsCollectionName} FILTER d.documentName == @documentName RETURN d`,
|
||||
{ documentName }
|
||||
);
|
||||
return existing.length > 0;
|
||||
} catch (error) {
|
||||
console.error('Error checking if document is processed:', error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a document as processed in ArangoDB
|
||||
* @param documentName - Name of the document
|
||||
* @param tripleCount - Number of triples stored for this document
|
||||
* @returns Promise resolving when the document is marked as processed
|
||||
*/
|
||||
public async markDocumentAsProcessed(documentName: string, tripleCount: number): Promise<void> {
|
||||
if (!this.db) {
|
||||
throw new Error('ArangoDB connection not initialized. Call initialize() first.');
|
||||
}
|
||||
|
||||
try {
|
||||
const collection = this.db.collection(this.documentsCollectionName);
|
||||
await collection.save({
|
||||
documentName,
|
||||
tripleCount,
|
||||
processedAt: new Date().toISOString()
|
||||
});
|
||||
console.log(`Marked document "${documentName}" as processed with ${tripleCount} triples`);
|
||||
} catch (error) {
|
||||
// If error is due to unique constraint (document already exists), update it instead
|
||||
if (error && typeof error === 'object' && 'errorNum' in error && error.errorNum === 1210) {
|
||||
console.log(`Document "${documentName}" already exists, updating...`);
|
||||
await this.executeQuery(
|
||||
`FOR d IN ${this.documentsCollectionName}
|
||||
FILTER d.documentName == @documentName
|
||||
UPDATE d WITH { tripleCount: @tripleCount, processedAt: @processedAt } IN ${this.documentsCollectionName}`,
|
||||
{
|
||||
documentName,
|
||||
tripleCount,
|
||||
processedAt: new Date().toISOString()
|
||||
}
|
||||
);
|
||||
} else {
|
||||
console.error('Error marking document as processed:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all processed documents from ArangoDB
|
||||
* @returns Promise resolving to array of processed document names
|
||||
*/
|
||||
public async getProcessedDocuments(): Promise<string[]> {
|
||||
if (!this.db) {
|
||||
throw new Error('ArangoDB connection not initialized. Call initialize() first.');
|
||||
}
|
||||
|
||||
try {
|
||||
const documents = await this.executeQuery(
|
||||
`FOR d IN ${this.documentsCollectionName} RETURN d.documentName`
|
||||
);
|
||||
return documents;
|
||||
} catch (error) {
|
||||
console.error('Error getting processed documents:', error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get graph data in a format compatible with the existing application
|
||||
* @returns Promise resolving to nodes and relationships
|
||||
|
||||
Loading…
Reference in New Issue
Block a user