multithreading - Java Fork Join Pool getting stuck -
i have data migration service reads stuff 1 database in chunks migrates database.
to work through 'chunks' of data- trying use recursiveaction , fork join pool. there reason work execute work on these "chunks" in parallel, chunk, execute, chunk, etc.
what's happening process stops. see no exceptions in logs, , see no deadlocked threads. code below, questions are:
- am missing in worker? need return value or call method free resource?
- is dumb me using recursiveaction here? should using different parallelize chunks of work?
- i have 1 worker thread in thread dump seems waiting- normal or indicator of problem?
forkjoinpool-1-worker-18 id=12191 state=waiting - waiting on <0x1b5ca93e> (a java.util.concurrent.forkjoinpool) - locked <0x1b5ca93e> (a java.util.concurrent.forkjoinpool) @ sun.misc.unsafe.park(native method) @ java.util.concurrent.locks.locksupport.park(locksupport.java:186) @ java.util.concurrent.forkjoinpool.tryawaitwork(forkjoinpool.java:864) @ java.util.concurrent.forkjoinpool.work(forkjoinpool.java:647) @ java.util.concurrent.forkjoinworkerthread.run(forkjoinworkerthread.java:398)
code:
@component public class bulkmigrationservice { final forkjoinpool pool = new forkjoinpool(); private static final logger log = loggerfactory.getlogger(bulkmigrationservice.class); private sourcedataapi api; private final migrator migrator; private metadataservice metadataservice; @autowired public bulkmigrationservice(sourcedataapi api, migrator migrator, metadataservice metadataservice) { this.api = api; this.migrator = migrator; this.metadataservice = metadataservice; } public void migrate(integer batchsize, long max) throws migrationexception { long currentcount = 0l; integer currentindex = 0; while (currentcount < max) { list<string> itemstomigrate = api.finditemrange(currentindex, currentindex + batchsize); if (assetstomigrate.size() > 0) { migrateforkedworker starter = new migrateforkedworker(assetstomigrate); pool.invoke(starter); } currentcount += assetstomigrate.size(); currentindex += batchsize - 1; if (log.isdebugenabled()) { log.debug("migrated " + currentcount + " items."); } } } public class migrateforkedworker extends recursiveaction { private int max = 10; private list<string> allitems; public migrateforkedworker(list<string> allitems) { this.allitems = allitems; } @override protected void compute() { if (allitems.size() <= max) { (string iteminfo : allitems) { try { migrator.migrateasset(iteminfo); } catch (migrationexception e) { e.printstacktrace(); } } } else { int targetsize = allitems.size() % 2 == 0 ? allitems.size() / 2 : (allitems.size() + 1) / 2; list<list<string>> splits = lists.partition(allitems, targetsize); migrateforkedworker migrateforkedworkerone = new migrateforkedworker(splits.get(0)); migrateforkedworker migrateforkedworkertwo = new migrateforkedworker(splits.get(1)); invokeall(migrateforkedworkerone, migrateforkedworkertwo); } } } }
your first problem using invokeall(). submits new requests pool , waits them complete. follow example in javadoc: use fork() migrateforkedworkerone.fork(); migrateforkedworkertwo.compute(); migrateforkedworkerone.join();
Comments
Post a Comment