SaveState=false not working as expected - Spring batch -
we using multi threaded step in spring batch following. read multiple rows in table. each row in table processed , written separate excel file.
to implement thread safety in jdbccursoritemreader - have done following. 1) created synchronizeditemreader delegates synchronized call jdbccursritemreader's read method. 2) set savestate=false in jdbcitemreader bean. 3) added process indicator in input table
i have following questions 1)per understanding if set savestate=false spring batch not storing state in execution context i.e. read_count, commit_count, write_count etc stay @ 0.
however, savestate=false counts being updated. have given xml config below. ideas why happening?
2) understand jdbcbatchitemwriter thread safe. mean, 1 thread doing business data write @ time? or thread safe maintaining state purpose? if 1 thread writing db @ time not sure if performance good
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:util="http://www.springframework.org/schema/util" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.2.xsd "> <!-- database settings --> <import resource="../config/database-context.xml" /> <import resource="../config/launch-context.xml" /> <batch:job id="multithreadedjob"> <batch:step id="readwritemultithreadedstep"> <batch:tasklet task-executor="taskexecutor"> <batch:chunk reader="synchronizingitemreader" processor="itemprocessor" writer="excelwriter" commit-interval="1" /> </batch:tasklet> </batch:step> </batch:job> <bean id="taskexecutor" class="org.springframework.scheduling.concurrent.threadpooltaskexecutor"> <property name="corepoolsize" value="5" /> <property name="maxpoolsize" value="5" /> </bean> <bean id="synchronizingitemreader" class="org.multithreadedstep.synchronizingitemreader"> <property name="delegate" ref="jdbcitemreader" /> </bean> <!-- inject stepexecutioncontext --> <bean id="itemprocessor" class="org.multithreadedstep.userprocessor" scope="step"> <property name="threadname" value="#{stepexecutioncontext[name]}" /> <property name="basedao" ref="basedao" /> </bean> <bean id="jdbcitemreader" class="org.springframework.batch.item.database.jdbccursoritemreader" scope="step"> <property name="datasource" ref="datasource" /> <property name="sql"> <value> <![cdata[ select id, user_login, user_pass, age, name users_test processed='n' order to_number (id) ]]> </value> </property> <property name="rowmapper" ref="userrowmapper" /> <property name="savestate" value="false"/> </bean> <bean id="userrowmapper" class="org.multithreadedstep.userrowmapper" /> <bean id="excelwriter" class="org.multithreadedstep.excelwriter" scope="step"> </bean> <bean id="docname" class="java.lang.string" scope="prototype"/> <bean id="basedao" class="org.multithreadedstep.basedao"> <property name="jdbctemplate" ref="jdbctemplate" /> </bean> <bean id="jdbctemplate" class="org.springframework.jdbc.core.jdbctemplate"> <constructor-arg ref="datasource"/>
synchronizingitemreader.java package org.multithreadedstep; import org.springframework.batch.item.executioncontext; import org.springframework.batch.item.itemreader; import org.springframework.batch.item.itemstream; import org.springframework.batch.item.itemstreamexception; public class synchronizingitemreader implements itemreader<user>, itemstream { private itemreader<user> delegate; public synchronized user read() throws exception { return delegate.read(); } public void close() throws itemstreamexception { if (this.delegate instanceof itemstream) { ((itemstream) this.delegate).close(); } } public void open(executioncontext context) throws itemstreamexception { if (this.delegate instanceof itemstream) { ((itemstream) this.delegate).open(context); } } public void update(executioncontext context) throws itemstreamexception { if (this.delegate instanceof itemstream) { ((itemstream) this.delegate).update(context); } } /** * @return delegate */ public itemreader<user> getdelegate() { return delegate; } /** * @param delegate delegate set */ public void setdelegate(itemreader<user> delegate) { this.delegate = delegate; }
}
Comments
Post a Comment