Best Practices using PigServer (embedded pig)

Using PigServer in your own java application is a great way to leverage the simplicity of pig scripts, especially if you are generating your pig scripts dynamically and then execute them on demand or via a scheduler.

But if you are using multiple PigServer instances in a multi-threaded, long running application there are quite some pitfalls you need to avoid.

Usage of ThreadLocal in PigServer

PigServer is using ThreadLocal objects internally to pass on state. This leads to strange behavior if you use multiple PigServer instances sequentially in the same thread (or within a fixed-size thread pool with long living threads) as some metadata of your previous PigServer execution can mess up your current script. We ended up using an internal thread pool to enable parallel execution, but within each worker thread we create a new thread for each PigServer execution and then we terminate that thread afterwards.

Memory Leaks

Every PigServer will add a dedicated Shutdown hook to the running jvm so in case the jvm gets terminated this hook will try to kill the associated mapreduce jobs. Sadly this hook will continue to exist even if the PigServer finished successfully. As this hook holds a reference to the full mapreduce job client and configuration this sums up to approx 250kbyte of memory.

Getting rid of these shutdown hooks is not that easy, as you can not really tell if a shutdown hook is obsolete or if it can be removed. We used a periodic task to keep track of all hooks using reflection and removed them after an configurable amount of time (120 minutes):

public class PigShutdownHooksDrainer extends QuartzJobBean implements StatefulJob, InterruptableJob {
private static WeakHashMap<Thread, Long> firstSeen = new WeakHashMap<Thread, Long>();
private int minutesToKeep = 120;
@Override
public void interrupt() throws UnableToInterruptJobException {
}
@SuppressWarnings("unchecked")
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
	try {
           final Field field = Class.forName("e;java.lang.ApplicationShutdownHooks"e;).getDeclaredField("hooks");
	   field.setAccessible(true);
	   Map<Thread, Thread> shutdownHooks = (Map<Thread, Thread>) field.get(null);
	   for (Thread t : new ArrayList(shutdownHooks.keySet())) {
		if (t.getClass().getName().equals("org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher$HangingJobKiller")) {
			if (t.getClass().getClassLoader().equals(this.getClass().getClassLoader())) {
			   if (!firstSeen.containsKey(t)) {
				firstSeen.put(t, System.currentTimeMillis());
			   } else {
				Long ts = firstSeen.get(t);
				if (ts + (minutesToKeep * 60000) < System.currentTimeMillis()) {
				   Runtime.getRuntime().removeShutdownHook(t);
				   firstSeen.remove(t);
				}
			   }
			}
		}
  	  }
	} catch (Exception e) {
	  e.printStackTrace();
	}
}
}