java - Deliver message from a local broker to a disconected central broker -


i have specific requirement need send message server won't available.

for used network of brokers, specific activemq.

the goal have local application (producer only) push message central application b (consumer only). network won't available. application's broker have store messages , wait connection before can send message application b. proxy need forward message b when available

broker's b configuration includes durable topic listening on in order consume message.

as said in activemq's documentation have use static network bridge that, did.

note : can't have b subscribe a, because there multiple instance of , can't configure of them in b.

so here configuration (raw spring) local application :

<!--as said in http://activemq.apache.org/spring-support.html use      pooled conntection along jmstemplate -->  <amq:connectionfactory id="jmsfactory" brokerurl="${jms.broker.local.url}" /> <!--springjmstemplate --> <bean id="myjmstemplate" class="org.springframework.jms.core.jmstemplate">     <property name="connectionfactory" ref="jmsfactory" /> </bean> <!-- local broker embedded --> <bean id="localbroker" class="org.apache.activemq.broker.brokerservice"     init-method="start" destroy-method="stop">     <property name="brokername" value="localbroker" />     <property name="transportconnectoruris">         <list>             <value>${jms.broker.local.url}</value>         </list>     </property>     <property name="networkconnectors">         <list>             <ref bean="networkconnector" />         </list>     </property> </bean>  <amq:connectionfactory id="remotejmsfactory"     brokerurl="${jms.broker.remote.url}" clientidprefix="bridged-test" />  <bean id="networkconnector" class="org.apache.activemq.network.discoverynetworkconnector">     <property name="uri" value="static:(${jms.broker.remote.url})"></property>     <property name="staticallyincludeddestinations">         <list>             <bean class="org.apache.activemq.command.activemqtopic">                 <constructor-arg type="java.lang.string" value="${jms.topic.sample}"/>             </bean>         </list>     </property>     <property name="staticbridge" value="true"></property><!-- deliver content if no consumer, usefull durable topic --> </bean> 

the localbroker embedded broker connecting remote broker (the application can download apachemq page).

here central configuration

<bean class="org.springframework.beans.factory.config.propertyplaceholderconfigurer">     <property name="locations">         <value>file:${activemq.conf}/credentials.properties</value>     </property> </bean>   <bean id="logquery" class="io.fabric8.insight.log.log4j.log4jlogquery"       lazy-init="false" scope="singleton"       init-method="start" destroy-method="stop"> </bean>  <broker xmlns="http://activemq.apache.org/schema/core" brokername="localhost" datadirectory="${activemq.data}" usevirtualdestsubs="true">      <destinationpolicy>         <policymap>           <policyentries>             <policyentry topic=">" >               <pendingmessagelimitstrategy>                 <constantpendingmessagelimitstrategy limit="1000"/>               </pendingmessagelimitstrategy>             </policyentry>           </policyentries>         </policymap>     </destinationpolicy>     <managementcontext>         <managementcontext createconnector="false"/>     </managementcontext>     <persistenceadapter>         <kahadb directory="${activemq.data}/kahadb"/>     </persistenceadapter>       <systemusage>         <systemusage>             <memoryusage>                 <memoryusage percentofjvmheap="70" />             </memoryusage>             <storeusage>                 <storeusage limit="100 gb"/>             </storeusage>             <tempusage>                 <tempusage limit="50 gb"/>             </tempusage>         </systemusage>     </systemusage>     <transportconnectors>         <transportconnector name="http" uri="http://0.0.0.0:61612?maximumconnections=1000&amp;wireformat.maxframesize=10485760"/>     </transportconnectors>     <shutdownhooks>         <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.springcontexthook" />     </shutdownhooks>  </broker> <import resource="jetty.xml"/> 

so happening when try send/receive messages :

  • if producer (a) connected , consumer (b) connected respective broker, , broker connected works fine.
  • if consumer (b) connected broker , there message pending, while producer a's broker disconnected, works fine.
  • if producer (a) disconnected network, a's broker won't deliver message b's broker when b available again.

before network connectors tried jmsbridgeconnector using outboundtopicbridge in local broker configuration without luck.

here question : how local's broker send message central's broker b on reconnect. , while not available, sure won't lost message.

note :

  • the network work on not alaways available (can days!), , can rely on http port, why 1 openned. means no multicast discovery possible.
  • message must deliver once.
  • the reason why use local broker not manage have send myself. they're only, @ moment, used store , forward central.

edit : have been able make working using jms bridge, have last problem, if connection os lost @ application booting or during application lifecycle, need restart broker able send messages.

i've been using "store , forward" pattern success using bridge.

i can not comment network connector bridge, have :

  • use recent version of jmeter due bug amq-5859
  • add org.apache.activemq.network.jms.reconnectionpolicy on bridge
  • make sure set reconnectonexception on remote broker connection factory

Comments