Hi there,
I am using a gwt atmosphere extension (version 1.1.0 beta2) in my application.
Since we use Spring and Spring does not support WebSocket yet, I have disabled it in the configurations.
Furthermore, not using web sockets means that long polling has to work very reliably. So I set the BroadcasterCache to the SessionBroadcasterCache (HeaderBroadcasterCache won't work yet with gwt see issue #14 ).
Unfortunately when I log out from the session, Atmosphere does not consider this and I get following errors:
01/18 13:54:50.027 [/atmosphereGwt-AsyncWrite-16] [n/a] WARN o.atmosphere.cpr.DefaultBroadcaster - This message Import.ImportJobStateChanged will be lost, adding it to the BroadcasterCache
01/18 13:54:50.029 [/atmosphereGwt-BroadcasterConfig-17] [n/a] WARN o.atmosphere.cpr.DefaultBroadcaster - Unable to track messages Import.ImportJobStateChanged
java.lang.IllegalStateException: setAttribute: Session [151FF3FB3BD6D539AD7322A8D7051079] has already been invalidated
at org.apache.catalina.session.StandardSession.setAttribute(StandardSession.java:1437) ~[catalina.jar:7.0.27]
at org.apache.catalina.session.StandardSession.setAttribute(StandardSession.java:1402) ~[catalina.jar:7.0.27]
at org.apache.catalina.session.StandardSessionFacade.setAttribute(StandardSessionFacade.java:156) ~[catalina.jar:7.0.27]
at org.atmosphere.cache.SessionBroadcasterCache.addToCache(SessionBroadcasterCache.java:88) ~[atmosphere-runtime-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.cpr.DefaultBroadcaster.trackBroadcastMessage(DefaultBroadcaster.java:936) [atmosphere-runtime-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.cpr.DefaultBroadcaster.perRequestFilter(DefaultBroadcaster.java:813) [atmosphere-runtime-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.cpr.DefaultBroadcaster.deliverPush(DefaultBroadcaster.java:738) [atmosphere-runtime-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.cpr.DefaultBroadcaster.push(DefaultBroadcaster.java:649) [atmosphere-runtime-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.cpr.DefaultBroadcaster$2.run(DefaultBroadcaster.java:526) [atmosphere-runtime-1.1.0.beta2.jar:1.1.0.beta2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) [na:1.6.0_35]
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) [na:1.6.0_35]
at java.util.concurrent.FutureTask.run(FutureTask.java:138) [na:1.6.0_35]
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_35]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_35]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_35]
So I tried implementing my own BroadcasterCache:
public class SessionAwareSessionBroadcasterCache extends SessionBroadcasterCache {
@Override
public void addToCache( String broadcasterId, AtmosphereResource resource, Message message ) {
if ( resource.getRequest().isRequestedSessionIdValid() ) {
//log info
super.addToCache( broadcasterId, resource, message );
} else {
//log warning
}
}
}
But the statement
resource.getRequest().isRequestedSessionIdValid()
is never true and occassionaly I get the following error:
01/22 14:05:50.225 [/atmosphereGwt-BroadcasterConfig-0] [n/a] WARN c.h.h.c.u.s.SessionAwareSessionBroadcasterCache - Session invalid, do not add to cache
01/22 14:05:50.229 [/atmosphereGwt-AsyncWrite-4] [n/a] WARN o.a.h.AbstractReflectorAtmosphereHandler - Serializer exception: message: 4dc5bdb9-edc8-4edf-8833-ab478326d8c9
org.apache.catalina.connector.ClientAbortException: null
at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:346) ~[catalina.jar:7.0.27]
at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:306) ~[catalina.jar:7.0.27]
at org.apache.catalina.connector.CoyoteOutputStream.flush(CoyoteOutputStream.java:101) ~[catalina.jar:7.0.27]
at org.atmosphere.gwt.server.impl.CountOutputStream.flush(CountOutputStream.java:79) ~[atmosphere-gwt-server-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.gwt.server.deflate.DeflaterOutputStream.flush(DeflaterOutputStream.java:108) ~[atmosphere-gwt-server-1.1.0.beta2.jar:1.1.0.beta2]
at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:278) ~[na:1.6.0_35]
at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:122) ~[na:1.6.0_35]
at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:212) ~[na:1.6.0_35]
at org.atmosphere.gwt.server.impl.GwtResponseWriterImpl.flush(GwtResponseWriterImpl.java:261) ~[atmosphere-gwt-server-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.gwt.server.impl.GwtResponseWriterImpl.heartbeat(GwtResponseWriterImpl.java:250) ~[atmosphere-gwt-server-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.gwt.server.impl.ManagedStreamResponseWriter.heartbeat(ManagedStreamResponseWriter.java:92) ~[atmosphere-gwt-server-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.gwt.server.impl.GwtResponseWriterImpl.write(GwtResponseWriterImpl.java:226) ~[atmosphere-gwt-server-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.gwt.server.impl.ManagedStreamResponseWriter.write(ManagedStreamResponseWriter.java:86) ~[atmosphere-gwt-server-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.gwt.server.impl.GwtResponseWriterImpl.write(GwtResponseWriterImpl.java:204) ~[atmosphere-gwt-server-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.gwt.server.impl.GwtAtmosphereResourceImpl$3.write(GwtAtmosphereResourceImpl.java:300) ~[atmosphere-gwt-server-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.handler.AbstractReflectorAtmosphereHandler.onStateChange(AbstractReflectorAtmosphereHandler.java:100) ~[atmosphere-runtime-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.cpr.DefaultBroadcaster.invokeOnStateChange(DefaultBroadcaster.java:944) [atmosphere-runtime-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.cpr.DefaultBroadcaster.executeAsyncWrite(DefaultBroadcaster.java:866) [atmosphere-runtime-1.1.0.beta2.jar:1.1.0.beta2]
at org.atmosphere.cpr.DefaultBroadcaster$3.run(DefaultBroadcaster.java:567) [atmosphere-runtime-1.1.0.beta2.jar:1.1.0.beta2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) [na:1.6.0_35]
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) [na:1.6.0_35]
at java.util.concurrent.FutureTask.run(FutureTask.java:138) [na:1.6.0_35]
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_35]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_35]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_35]
Caused by: java.io.IOException: null
at org.apache.coyote.http11.InternalAprOutputBuffer.flushBuffer(InternalAprOutputBuffer.java:205) ~[tomcat-coyote.jar:7.0.27]
at org.apache.coyote.http11.InternalAprOutputBuffer.flush(InternalAprOutputBuffer.java:109) ~[tomcat-coyote.jar:7.0.27]
at org.apache.coyote.http11.AbstractHttp11Processor.action(AbstractHttp11Processor.java:789) ~[tomcat-coyote.jar:7.0.27]
at org.apache.coyote.Response.action(Response.java:174) ~[tomcat-coyote.jar:7.0.27]
at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:341) ~[catalina.jar:7.0.27]
... 24 common frames omitted
Note that atmosphere tries to make a async write though I have disabled that support.
In my web.xml part for atmosphere looks I have
- disabled async-supported and also
- disabled useWebSocket and
- org.atmosphere.disableOnStateEvent is set to true
My AtmosphereGwtHandler:
public class HSPAtmosphereGwtHandler extends AtmosphereGwtHandler {
private static final int TIMEOUT = 120 * 60 * 1000;
private static final String COMET_ID = HSPAtmosphereGwtHandler.class.getName() + "#cometResponseId";
interface BroadcasterFactoryProvider {
BroadcasterFactory getFactory();
}
private static class AtmosphereResponseWriter implements CometWriter {
GwtAtmosphereResource resource;
public AtmosphereResponseWriter( GwtAtmosphereResource resource ) {
this.resource = resource;
}
@Override
public void write( String message ) {
if( !receiverIsCurrentUser() ) {
resource.getBroadcaster().broadcast( message, resource.getAtmosphereResource() );
}
}
private boolean receiverIsCurrentUser() {
boolean result = false;
HttpServletRequest request = RequestFactoryServlet.getThreadLocalRequest();
// If started from a QuartzJob, no request exists
if( requestExists( request ) ) {
String currentCometId = ( String )request.getSession().getAttribute( COMET_ID );
String receiverCometId = resource.getConnectionUUID();
result = receiverCometId.equals( currentCometId );
}
return result;
}
private static boolean requestExists( HttpServletRequest request ) {
return request != null;
}
}
private static final Logger LOG = LoggerFactory.getLogger( HSPAtmosphereGwtHandler.class );
CometRegistration cometService;
Broadcaster broadcaster;
BroadcasterFactoryProvider broadcasterFactoryProvider;
public HSPAtmosphereGwtHandler() {
this.broadcasterFactoryProvider = new BroadcasterFactoryProvider() {
@Override
public BroadcasterFactory getFactory() {
return BroadcasterFactory.getDefault();
}
};
}
@Override
public void init( ServletConfig servletConfig ) throws ServletException {
super.init( servletConfig );
cometService = getBeanFromContext( CometRegistration.class );
String eventSpace = getEventSpace();
broadcaster = getBroadcasterFactory().lookup( eventSpace, true );
}
@Override
public int doComet( GwtAtmosphereResource resource ) throws ServletException, IOException {
resource.getAtmosphereResource().setBroadcaster( broadcaster );
String connectionId = resource.getConnectionUUID();
resource.getRequest().getSession().setAttribute( COMET_ID, connectionId );
cometService.sessionCreated( connectionId, new AtmosphereResponseWriter( resource ) );
return TIMEOUT;
}
@Override
public void terminate( GwtAtmosphereResource cometResponse, boolean serverInitiated ) {
String sessionId = cometResponse.getConnectionUUID();
cometService.sessionDestroyed( sessionId );
LOG.debug( "Atmosphere session destroyed at cometService with id " + sessionId );
}
BroadcasterFactory getBroadcasterFactory() {
return broadcasterFactoryProvider.getFactory();
}
private String getEventSpace() {
return "/atmosphereGwt";
}
private T getBeanFromContext( Class clazz ) {
return ApplicationContextUtils.getUniqueBean( getWebApplicationContext(), clazz );
}
private WebApplicationContext getWebApplicationContext() {
return WebApplicationContextUtils.getWebApplicationContext( getServletContext() );
}
}
Thanks in advance for any help.