diff --git a/scr/src/main/java/org/apache/felix/scr/impl/Activator.java b/scr/src/main/java/org/apache/felix/scr/impl/Activator.java index f54e70f7ca..66896b0d68 100644 --- a/scr/src/main/java/org/apache/felix/scr/impl/Activator.java +++ b/scr/src/main/java/org/apache/felix/scr/impl/Activator.java @@ -97,7 +97,7 @@ public class Activator extends AbstractExtender private ComponentRegistry m_componentRegistry; // thread acting upon configurations - private ComponentActorThread m_componentActor; + private ComponentActorExecutor m_componentActor; private ServiceRegistration m_runtime_reg; @@ -210,7 +210,8 @@ protected void doStart() throws Exception // prepare component registry m_componentBundles = new HashMap<>(); - m_componentRegistry = new ComponentRegistry( this.m_configuration, this.logger ); + m_componentActor = new ComponentActorExecutor( this.logger ); + m_componentRegistry = new ComponentRegistry( this.m_configuration, this.logger, this.m_componentActor ); final ServiceComponentRuntimeImpl runtime = new ServiceComponentRuntimeImpl( m_globalContext, m_componentRegistry ); m_runtime_reg = m_context.registerService( ServiceComponentRuntime.class, @@ -222,12 +223,6 @@ protected void doStart() throws Exception logger.log(Level.INFO, " Version = {0}", null, m_bundle.getVersion().toString() ); - // create and start the component actor - m_componentActor = new ComponentActorThread( this.logger ); - Thread t = new Thread( m_componentActor, "SCR Component Actor" ); - t.setDaemon( true ); - t.start(); - super.doStart(); m_componentCommands = new ComponentCommands(m_context, runtime, m_configuration); @@ -427,14 +422,13 @@ public void doStop() throws Exception // dispose component registry if ( m_componentRegistry != null ) { - m_componentRegistry.shutdown(); m_componentRegistry = null; } // terminate the actor thread if ( m_componentActor != null ) { - m_componentActor.terminate(); + m_componentActor.shutdownNow(); m_componentActor = null; } ClassUtils.setFrameworkWiring(null); diff --git a/scr/src/main/java/org/apache/felix/scr/impl/BundleComponentActivator.java b/scr/src/main/java/org/apache/felix/scr/impl/BundleComponentActivator.java index 8dbc277508..4ff4869792 100644 --- a/scr/src/main/java/org/apache/felix/scr/impl/BundleComponentActivator.java +++ b/scr/src/main/java/org/apache/felix/scr/impl/BundleComponentActivator.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.StringTokenizer; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -81,7 +82,7 @@ public class BundleComponentActivator implements ComponentActivator private final List> m_holders = new ArrayList<>(); // thread acting upon configurations - private final ComponentActorThread m_componentActor; + private final ScheduledExecutorService m_componentActor; // true as long as the dispose method is not called private final AtomicBoolean m_active = new AtomicBoolean( true ); @@ -196,7 +197,7 @@ public void removeServiceListener(String serviceFilterString, */ public BundleComponentActivator(final ScrLogger scrLogger, final ComponentRegistry componentRegistry, - final ComponentActorThread componentActor, + final ScheduledExecutorService componentActor, final BundleContext context, final ScrConfiguration configuration, final List cachedComponentMetadata, @@ -712,10 +713,10 @@ public void schedule(Runnable task) { if ( isActive() ) { - ComponentActorThread cat = m_componentActor; + ScheduledExecutorService cat = m_componentActor; if ( cat != null ) { - cat.schedule( task ); + cat.submit( task ); } else { @@ -762,7 +763,7 @@ public void leaveCreate(ServiceReference serviceReference) @Override public void missingServicePresent(ServiceReference serviceReference) { - m_componentRegistry.missingServicePresent( serviceReference, m_componentActor ); + m_componentRegistry.missingServicePresent( serviceReference ); } @Override diff --git a/scr/src/main/java/org/apache/felix/scr/impl/ComponentActorExecutor.java b/scr/src/main/java/org/apache/felix/scr/impl/ComponentActorExecutor.java new file mode 100644 index 0000000000..67786694bf --- /dev/null +++ b/scr/src/main/java/org/apache/felix/scr/impl/ComponentActorExecutor.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.scr.impl; + + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +import org.apache.felix.scr.impl.logger.InternalLogger.Level; +import org.apache.felix.scr.impl.logger.ScrLogger; + + +/** + * The ComponentActorExecutor is the thread used to act upon registered + * components of the service component runtime. + * This is also used by the ComponentRegistry to schedule service.changecount updates. + */ +class ComponentActorExecutor extends ScheduledThreadPoolExecutor +{ + + private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() + { + @Override + public Thread newThread(Runnable r) + { + Thread thread = new Thread(r, "SCR Component Actor"); + thread.setDaemon(true); + return thread; + } + }; + + private final ScrLogger logger; + + ComponentActorExecutor(final ScrLogger log ) + { + super( 1, THREAD_FACTORY ); + logger = log; + } + + @Override + protected void beforeExecute(Thread t, Runnable r) + { + logger.log(Level.DEBUG, "Running task: " + r, null); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) + { + if (t != null) + { + logger.log(Level.ERROR, "Unexpected problem executing task " + r, t); + } + } +} diff --git a/scr/src/main/java/org/apache/felix/scr/impl/ComponentActorThread.java b/scr/src/main/java/org/apache/felix/scr/impl/ComponentActorThread.java deleted file mode 100644 index d1aa3bafca..0000000000 --- a/scr/src/main/java/org/apache/felix/scr/impl/ComponentActorThread.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.felix.scr.impl; - - -import java.util.LinkedList; - -import org.apache.felix.scr.impl.logger.InternalLogger.Level; -import org.apache.felix.scr.impl.logger.ScrLogger; - - -/** - * The ComponentActorThread is the thread used to act upon registered - * components of the service component runtime. - */ -class ComponentActorThread implements Runnable -{ - - // sentinel task to terminate this thread - private static final Runnable TERMINATION_TASK = new Runnable() - { - @Override - public void run() - { - } - - - @Override - public String toString() - { - return "Component Actor Terminator"; - } - }; - - // the queue of Runnable instances to be run - private final LinkedList tasks = new LinkedList<>(); - - private final ScrLogger logger; - - - ComponentActorThread( final ScrLogger log ) - { - logger = log; - } - - - // waits on Runnable instances coming into the queue. As instances come - // in, this method calls the Runnable.run method, logs any exception - // happening and keeps on waiting for the next Runnable. If the Runnable - // taken from the queue is this thread instance itself, the thread - // terminates. - @Override - public void run() - { - logger.log(Level.DEBUG, "Starting ComponentActorThread", null); - - for ( ;; ) - { - final Runnable task; - synchronized ( tasks ) - { - while ( tasks.isEmpty() ) - { - boolean interrupted = Thread.interrupted(); - try - { - tasks.wait(); - } - catch ( InterruptedException ie ) - { - interrupted = true; - // don't care - } - finally - { - if (interrupted) - { // restore interrupt status - Thread.currentThread().interrupt(); - } - } - } - - task = tasks.removeFirst(); - } - - try - { - // return if the task is this thread itself - if ( task == TERMINATION_TASK ) - { - logger.log(Level.DEBUG, "Shutting down ComponentActorThread", - null); - return; - } - - // otherwise execute the task, log any issues - logger.log(Level.DEBUG, "Running task: " + task, null); - task.run(); - } - catch ( Throwable t ) - { - logger.log(Level.ERROR, "Unexpected problem executing task " + task, - t); - } - finally - { - synchronized ( tasks ) - { - tasks.notifyAll(); - } - } - } - } - - - // cause this thread to terminate by adding this thread to the end - // of the queue - void terminate() - { - schedule( TERMINATION_TASK ); - synchronized ( tasks ) - { - while ( !tasks.isEmpty() ) - { - boolean interrupted = Thread.interrupted(); - try - { - tasks.wait(); - } - catch ( InterruptedException e ) - { - interrupted = true; - logger.log(Level.ERROR, - "Interrupted exception waiting for queue to empty", e); - } - finally - { - if (interrupted) - { // restore interrupt status - Thread.currentThread().interrupt(); - } - } - } - } - } - - - // queue the given runnable to be run as soon as possible - void schedule( Runnable task ) - { - synchronized ( tasks ) - { - // append to the task queue - tasks.add( task ); - - logger.log(Level.DEBUG, "Adding task [{0}] as #{1} in the queue", null, - task, tasks.size()); - - // notify the waiting thread - tasks.notifyAll(); - } - } -} diff --git a/scr/src/main/java/org/apache/felix/scr/impl/ComponentRegistry.java b/scr/src/main/java/org/apache/felix/scr/impl/ComponentRegistry.java index a4cb54dd01..718c18361c 100644 --- a/scr/src/main/java/org/apache/felix/scr/impl/ComponentRegistry.java +++ b/scr/src/main/java/org/apache/felix/scr/impl/ComponentRegistry.java @@ -28,10 +28,10 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.felix.scr.impl.inject.ComponentMethods; @@ -130,14 +130,16 @@ public class ComponentRegistry private final ScrConfiguration m_configuration; - public ComponentRegistry( final ScrConfiguration scrConfiguration, final ScrLogger logger ) + private final ScheduledExecutorService m_componentActor; + + public ComponentRegistry(final ScrConfiguration scrConfiguration, final ScrLogger logger, final ScheduledExecutorService componentActor ) { m_configuration = scrConfiguration; m_logger = logger; + m_componentActor = componentActor; m_componentHoldersByName = new HashMap<>(); m_componentHoldersByPid = new HashMap<>(); m_componentsById = new HashMap<>(); - } //---------- ComponentManager registration by component Id @@ -560,7 +562,7 @@ public void leaveCreate(final ServiceReference serviceReference) * @param serviceReference * @param actor */ - public synchronized void missingServicePresent( final ServiceReference serviceReference, ComponentActorThread actor ) + public synchronized void missingServicePresent( final ServiceReference serviceReference ) { final List> dependencyManagers = m_missingDependencies.remove( serviceReference ); if ( dependencyManagers != null ) @@ -590,7 +592,7 @@ public String toString() } ; m_logger.log(Level.DEBUG, "Scheduling runnable {0} asynchronously", null, runnable); - actor.schedule( runnable ); + m_componentActor.submit( runnable ); } } @@ -704,10 +706,6 @@ public void unregisterRegionConfigurationSupport( private final AtomicLong changeCount = new AtomicLong(); - private volatile Timer changeCountTimer; - - private final Object changeCountTimerLock = new Object(); - private volatile ServiceRegistration registration; public Dictionary getServiceRegistrationProperties() @@ -729,16 +727,9 @@ public void updateChangeCount() { final long count = this.changeCount.incrementAndGet(); - final Timer timer; - synchronized ( this.changeCountTimerLock ) { - if ( this.changeCountTimer == null ) { - this.changeCountTimer = new Timer("SCR Component Registry", true); - } - timer = this.changeCountTimer; - } try { - timer.schedule(new TimerTask() + m_componentActor.schedule(new Runnable() { @Override @@ -754,18 +745,9 @@ public void run() { // we ignore this as this might happen on shutdown } - synchronized ( changeCountTimerLock ) - { - if ( changeCount.get() == count ) - { - changeCountTimer.cancel(); - changeCountTimer = null; - } - } - } } - }, m_configuration.serviceChangecountTimeout()); + }, m_configuration.serviceChangecountTimeout(), TimeUnit.MILLISECONDS); } catch (Exception e) { m_logger.log(Level.WARN, @@ -774,11 +756,4 @@ public void run() } } } - - public void shutdown() { - final Timer timer = changeCountTimer; - if (timer != null) { - timer.cancel(); - } - } } diff --git a/scr/src/test/java/org/apache/felix/scr/integration/ComponentTestBase.java b/scr/src/test/java/org/apache/felix/scr/integration/ComponentTestBase.java index 19ee076e9d..b6164049a6 100644 --- a/scr/src/test/java/org/apache/felix/scr/integration/ComponentTestBase.java +++ b/scr/src/test/java/org/apache/felix/scr/integration/ComponentTestBase.java @@ -55,6 +55,7 @@ import javax.inject.Inject; import org.apache.felix.scr.impl.ComponentCommands; +import org.apache.felix.scr.impl.manager.ScrConfiguration; import org.apache.felix.scr.integration.components.SimpleComponent; import org.apache.felix.service.command.Converter; import org.junit.After; diff --git a/scr/src/test/java/org/apache/felix/scr/integration/Felix6778Test.java b/scr/src/test/java/org/apache/felix/scr/integration/Felix6778Test.java new file mode 100644 index 0000000000..dbc4079a7e --- /dev/null +++ b/scr/src/test/java/org/apache/felix/scr/integration/Felix6778Test.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.scr.integration; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.ops4j.pax.exam.Configuration; +import org.ops4j.pax.exam.Option; +import org.ops4j.pax.exam.OptionUtils; +import org.ops4j.pax.exam.junit.PaxExam; +import org.osgi.framework.BundleException; +import org.osgi.framework.Constants; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceEvent; +import org.osgi.framework.ServiceListener; +import org.osgi.service.component.runtime.ServiceComponentRuntime; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.ops4j.pax.exam.CoreOptions.systemProperty; + +@RunWith(PaxExam.class) +public class Felix6778Test extends ComponentTestBase implements ServiceListener +{ + + private static final long DS_SERVICE_CHANGECOUNT_TIMEOUT = 1000; + + static + { + descriptorFile = "/integration_test_simple_components.xml"; + } + + class RecordedScrChangeCount + { + private final Thread thread; + private final long changecount; + + RecordedScrChangeCount(ServiceEvent event) + { + this.thread = Thread.currentThread(); + this.changecount = (long) event.getServiceReference().getProperty(Constants.SERVICE_CHANGECOUNT); + } + } + + @Configuration + public static Option[] configuration() + { + return OptionUtils.combine(ComponentTestBase.configuration(), + systemProperty( "ds.service.changecount.timeout" ).value( Long.toString(DS_SERVICE_CHANGECOUNT_TIMEOUT) )); + } + + private List recordedEvents = new CopyOnWriteArrayList<>(); + + @Before + public void addServiceListener() throws InvalidSyntaxException + { + bundleContext.addServiceListener( + this, + "("+Constants.OBJECTCLASS + "=" + ServiceComponentRuntime.class.getName() + ")" + ); + } + + @After + public void removeServiceListener() + { + bundleContext.removeServiceListener(this); + } + + @Test + public void verify_changecount_updates() throws InterruptedException, BundleException + { + // Wait for 2x the changecount timeout`to account for the asynchronous service.changecount property update + Thread.sleep(DS_SERVICE_CHANGECOUNT_TIMEOUT * 2); + + // Check that the service.changecount update was recorded + assertEquals(1, recordedEvents.size()); + assertEquals(13L, recordedEvents.get(0).changecount); + assertEquals("SCR Component Actor", recordedEvents.get(0).thread.getName()); + + // Trigger a change by stopping the bundle with components + bundle.stop(); + + // Wait for 2x the changecount timeout`to account for the asynchronous service.changecount property update + Thread.sleep(DS_SERVICE_CHANGECOUNT_TIMEOUT * 2); + + // Check that another service.changecount update was recorded + assertEquals(2, recordedEvents.size()); + assertEquals(26L, recordedEvents.get(1).changecount); + assertEquals("SCR Component Actor", recordedEvents.get(1).thread.getName()); + + // Check if both events originate from the same thread + assertSame(recordedEvents.get(0).thread, recordedEvents.get(1).thread); + } + + @Override + public void serviceChanged(ServiceEvent event) + { + if (event.getType() == ServiceEvent.MODIFIED) + { + recordedEvents.add(new RecordedScrChangeCount(event)); + } + } + + +}