Spring integration multithreading

Spring integration multithreading



Referring to my earlier question at URL - Spring integration multithreading requirement - I think I may have figured out the root cause of the issue.
My requirement in brief -

Poll the database after a fixed delay of 1 sec and then publish very limited data to Tibco EMS queue. Now from this EMS queue I have to do the following tasks all in multithreaded fashion :-
i) consume the messages,
ii) fetch the full data now from the database and
iii) converting this data into json format.



My design -


`<int:channel id="dbchannel"/>
<int-jdbc:inbound-channel-adapter id="dbchanneladapter"
channel="dbchannel" data-source="datasource"
query="$selectquery" update="$updatequery"
max-rows-per-poll="1000">
<int:poller id="dbchanneladapterpoller"
fixed-delay="1000">
<int:transactional transaction-manager="transactionmanager" />
</int:poller>
</int-jdbc:inbound-channel-adapter>
<int:service-activator input-channel="dbchannel"
output-channel="publishchannel" ref="jdbcmessagehandler" method="handleJdbcMessage" />
<bean id="jdbcmessagehandler" class="com.citigroup.handler.JdbcMessageHandler" />

<int:publish-subscribe-channel id="publishchannel"/>
<int-jms:outbound-channel-adapter id="publishchanneladapter"
channel="publishchannel" jms-template="publishrealtimefeedinternaljmstemplate" />

<int:channel id="subscribechannel"/>
<int-jms:message-driven-channel-adapter
id="subscribechanneladapter" destination="subscriberealtimeinternalqueue"
connection-factory="authenticationconnectionfactory" channel="subscribechannel"
concurrent-consumers="5" max-concurrent-consumers="5" />
<int:service-activator input-channel="subscribechannel"
ref="subscribemessagehandler" method="logJMSMessage" />
<bean id="subscribemessagehandler" class="com.citigroup.handler.SubscribeJMSMessageHandler" />
</beans>

<bean id="authenticationconnectionfactory"
class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory" ref="connectionFactory" />
<property name="username" value="test" />
<property name="password" value="test123" />
</bean>

<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate">
<ref bean="jndiTemplate" />
</property>
<property name="jndiName" value="app.jndi.testCF" />
</bean>

<bean id="subscriberealtimeinternalqueue" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate">
<ref bean="jndiTemplate" />
</property>
<property name="jndiName"
value="app.queue.testQueue" />
</bean>
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">com.tibco.tibjms.naming.TibjmsInitialContextFactory
</prop>
<prop key="java.naming.provider.url">tibjmsnaming://test01d.nam.nsroot.net:7222</prop>
</props>
</property>
</bean>`






Issue -

Using message-driven-channel with concurrent consumers value set to 5. However, it looks like just one consumer thread (container-2) is created and is picking up the messages from EMS queue. Please find below the log4j log -



16 Aug 2018 11:31:12,077 INFO SubscribeJMSMessageHandler [subscribechanneladapter.container-2]:
Total count of records read from Queue at this moment is 387

record#1:: [ID=7694066395]
record#2:: [ID=7694066423]
.. .. ..
record#387:: [ID=6147457333]



Probable root cause here -

May be its the first step in the configuration where I am polling the database to fetch the data after a fixed-delay that's causing this multithreading issue. Referring to the logs above, my assumption here is since the number of records fetched is 387 and all these are bundled into a List object (List> message), it is being considered as just 1 message/payload instead of 387 different messages and that's why just one thread/container/consumer is picking up this bundled message. Reason for this assumption is the logs below -



GenericMessage [payload=["ID":7694066395,"ID":7694066423,"ID":6147457333],
headers=json__ContentTypeId__=class org.springframework.util.LinkedCaseInsensitiveMap, jms_redelivered=false, json__TypeId__=class java.util.ArrayList, jms_destination=Queue[app.queue.testQueue], id=e034ba73-7781-b62c-0307-170099263068, priority=4, jms_timestamp=1534820792064, contentType=application/json, jms_messageId=ID:test.21415B667C051:40C149C0, timestamp=1534820792481]




Question -

Is my understanding of the root cause correct? If yes then what can be done to treat these 387 messages as individual messages (and not one List object of messages) and publish them one by one without impacting the transaction management??

I had discussed this issue with https://stackoverflow.com/users/2756547/artem-bilan in my earlier post on stackoverflow and I had to check this design by replacing Tibco EMS with ActiveMQ. However, ActiveMQ infrastructure is is still being analysed by our architecture team and so can't be used till its approved.




1 Answer
1



Oh! Now I see what is your problem. The int-jdbc:inbound-channel-Adapter indeed returns a list of records it could select from the DB. And this whole list is sent as a single message to the JMS. That’s the reason how you see only one thread in the consumer side: there is just only one message to get from the queue.


int-jdbc:inbound-channel-Adapter



If you would like to have separate messages for each pulled record, you need to consider to use a <splitter> in between JDBC polling operation and sending to JMS.


<splitter>





Hope use of <splitter> won't break the transaction if I use it between JDBC polling operation and sending to EMS queue. The way I have set the transaction in my design is like - The transaction begins when the first poll of database fetches say 10 records and it ends when these records are sent to Tibco EMS queue. Only after it is sent to EMS queue, the second poll should start and with it the second transaction. Please confirm
– Mitesh Parikh
Aug 24 at 13:13



<splitter>





That’s correct, the transaction is there really per poll and is for the whole sub-flow as long as everything downstream is performed in the same polling thread
– Artem Bilan
Aug 24 at 13:32





I have used <splitter/> for each pulled record. I can see multiple threads now. Thanks. One question here - I am using <logging-channel-adapter logger-name="feedlogger" level="INFO" log-full-message="true"/> to log the messages using log4j framework - Is there any way to log the 'id' of each message channel, used in my design, in the log file?
– Mitesh Parikh
Aug 27 at 11:18



<splitter/>


<logging-channel-adapter logger-name="feedlogger" level="INFO" log-full-message="true"/>





Let’s arrange that as a separate SO question!
– Artem Bilan
Aug 27 at 11:48





sure, thanks Artem for all the help
– Mitesh Parikh
Aug 27 at 11:54






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

𛂒𛀶,𛀽𛀑𛂀𛃧𛂓𛀙𛃆𛃑𛃷𛂟𛁡𛀢𛀟𛁤𛂽𛁕𛁪𛂟𛂯,𛁞𛂧𛀴𛁄𛁠𛁼𛂿𛀤 𛂘,𛁺𛂾𛃭𛃭𛃵𛀺,𛂣𛃍𛂖𛃶 𛀸𛃀𛂖𛁶𛁏𛁚 𛂢𛂞 𛁰𛂆𛀔,𛁸𛀽𛁓𛃋𛂇𛃧𛀧𛃣𛂐𛃇,𛂂𛃻𛃲𛁬𛃞𛀧𛃃𛀅 𛂭𛁠𛁡𛃇𛀷𛃓𛁥,𛁙𛁘𛁞𛃸𛁸𛃣𛁜,𛂛,𛃿,𛁯𛂘𛂌𛃛𛁱𛃌𛂈𛂇 𛁊𛃲,𛀕𛃴𛀜 𛀶𛂆𛀶𛃟𛂉𛀣,𛂐𛁞𛁾 𛁷𛂑𛁳𛂯𛀬𛃅,𛃶𛁼

ữḛḳṊẴ ẋ,Ẩṙ,ỹḛẪẠứụỿṞṦ,Ṉẍừ,ứ Ị,Ḵ,ṏ ṇỪḎḰṰọửḊ ṾḨḮữẑỶṑỗḮṣṉẃ Ữẩụ,ṓ,ḹẕḪḫỞṿḭ ỒṱṨẁṋṜ ḅẈ ṉ ứṀḱṑỒḵ,ḏ,ḊḖỹẊ Ẻḷổ,ṥ ẔḲẪụḣể Ṱ ḭỏựẶ Ồ Ṩ,ẂḿṡḾồ ỗṗṡịṞẤḵṽẃ ṸḒẄẘ,ủẞẵṦṟầṓế

⃀⃉⃄⃅⃍,⃂₼₡₰⃉₡₿₢⃉₣⃄₯⃊₮₼₹₱₦₷⃄₪₼₶₳₫⃍₽ ₫₪₦⃆₠₥⃁₸₴₷⃊₹⃅⃈₰⃁₫ ⃎⃍₩₣₷ ₻₮⃊⃀⃄⃉₯,⃏⃊,₦⃅₪,₼⃀₾₧₷₾ ₻ ₸₡ ₾,₭⃈₴⃋,€⃁,₩ ₺⃌⃍⃁₱⃋⃋₨⃊⃁⃃₼,⃎,₱⃍₲₶₡ ⃍⃅₶₨₭,⃉₭₾₡₻⃀ ₼₹⃅₹,₻₭ ⃌