24 March, 2009

Highly Parallel LINQ To Lucene Indexer

I've been wanting to post about this for a while, but haven't been able because I was away vacationing in Hawaii. :)

I was recently given the assignment to index a couple of our tables that hold large amounts of data. Each tables has well over 3 million records, and each record is fairly large as it represents "emails" with our clients.

I knew from the get-go I'd have to write a highly parallel process in order for the task to finish promptly - just bringing the data over the wire from the database takes about 45 minutes.

I have to show a lot of code now, so please bear with me for just a second. The key here is to use several RamDirectory(s) for the small data chunks and merge them into a FSDirectory index that will hold all the data:


namespace Indexer
{ public class BatchIndexer[T] : ParallelIndexer[T]
{
IQueryable[T] items;
bool isMergingFinished = false;
private static object lockObject = new object();

public override bool Stopped
{
get
{
var pstopped = base.Stopped;
if (pstopped == true)
{
return isMergingFinished;
}
else
{
return false;
}
}
}

//stop setting the temp directory here and force the client to pass it in
public BatchIndexer(string path, IQueryable[T] items)
: base(new Index[T](path))
{
this.items = items;
QueueChildren();
}

private void QueueChildren()
{
if (null != items && items.Count() > 0)
{
const int pageSize = 100;
int pageIndex = 0;

var currentPage = items.Take(pageSize);
int pageCount = 1 + items.Count() / pageSize;
while (pageIndex < pageCount)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(QueueChild), currentPage);
pageIndex++;
currentPage = items.Skip(pageIndex * pageSize).Take(pageSize);
}

//wait for all children to be added to parent
while (Children.Count() < pageCount)
{
Thread.Sleep(2000);
}
}
}

private void QueueChild(object stateInfo)
{
var currentPage = stateInfo as IQueryable[T];
lock (lockObject)
{
AddChild(new BatchIndexerChild[T](new Index[T](), currentPage.ToList()));
}
}

public override void Run()
{
//start indexing the children
foreach (var item in this.Children)
{
var child = item as BatchIndexerChild[T];
ThreadPool.QueueUserWorkItem(new WaitCallback(child.AsyncRun));
}

//wait for all of the children to finish indexing
while (!IsChildrenStopped)
{
Thread.Sleep(1000);
}

//merge all indeces into main index
this.Merge((from c in this.Children select c.Index.Directory).ToArray());
isMergingFinished = true;
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
//release managed resource
if (this.Index != null)
{
this.Index.Dispose();
}
}
}
}
}

I particularly like the approach I used because it threads the chunking of the data into smaller pieces AND is also threads the actual indexing of each chunk.

If you're going to index as much data as I had to index, however, you still have to pre-chunk the data and feed into several instances BatchIndexer; I found that 100,000 records was optima. You'll also want to make sure you release all database resources ASAP: you don't to be hogging a whole bunch of memory!

Anyhow, I hope this code is useful... it should at least give you start in the right direction. Let me know if you have any questions.