public abstract class

StateObservable<T>

extends java.lang.Object

implements Observable<java.lang.Object>

 java.lang.Object

↳androidx.camera.core.impl.StateObservable<T>

Subclasses:

MutableStateObservable<T>

Gradle dependencies

compile group: 'androidx.camera', name: 'camera-core', version: '1.2.0-alpha01'

  • groupId: androidx.camera
  • artifactId: camera-core
  • version: 1.2.0-alpha01

Artifact androidx.camera:camera-core:1.2.0-alpha01 it located at Google repository (https://maven.google.com/)

Overview

An observable which reports a dynamic state.

The state of a state observable is conflated. That is, the value received by an Observable.Observer will only be the latest state; some state updates may never be observed if the state changes quickly enough.

State observables require an initial state, and thus always have a state available for retrieval via StateObservable.fetchData(), which will return an already-complete .

Errors are also possible as states, and when an error is present, any previous state information is lost. State observables may transition in and out of error states at any time, including the initial state.

All states, including errors, are conflated via equals. That is, if two states evaluate to true, it will be as if the state didn't change and no update will be sent to observers.

Summary

Methods
public voidaddObserver(java.util.concurrent.Executor executor, Observable.Observer<java.lang.Object> observer)

public <any>fetchData()

Fetch the latest state.

public voidremoveObserver(Observable.Observer<java.lang.Object> observer)

from java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait

Methods

public <any> fetchData()

Fetch the latest state.

For state observables, the future returned by fetchData() is guaranteed to be complete and will contain either the current state or an error state which will be thrown as an exception from .

Returns:

A future which will contain the latest value or an error.

public void addObserver(java.util.concurrent.Executor executor, Observable.Observer<java.lang.Object> observer)

public void removeObserver(Observable.Observer<java.lang.Object> observer)

Source

/*
 * Copyright 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.camera.core.impl;

import androidx.annotation.GuardedBy;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.RequiresApi;
import androidx.camera.core.impl.utils.futures.Futures;
import androidx.core.util.Preconditions;

import com.google.auto.value.AutoValue;
import com.google.common.util.concurrent.ListenableFuture;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
 * An observable which reports a dynamic state.
 *
 * <p>The state of a state observable is conflated. That is, the value received by an
 * {@link androidx.camera.core.impl.Observable.Observer} will only be the latest state; some
 * state updates may never be observed if the state changes quickly enough.
 *
 * <p>State observables require an initial state, and thus always have a state available for
 * retrieval via {@link #fetchData()}, which will return an already-complete
 * {@link ListenableFuture}.
 *
 * <p>Errors are also possible as states, and when an error is present, any previous state
 * information is lost. State observables may transition in and out of error states at any time,
 * including the initial state.
 *
 * <p>All states, including errors, are conflated via {@link Object#equals(Object)}. That is, if
 * two states evaluate to {@code true}, it will be as if the state didn't change and no update
 * will be sent to observers.
 *
 * @param <T> The state type.
 */
@RequiresApi(21) // TODO(b/200306659): Remove and replace with annotation on package-info.java
public abstract class StateObservable<T> implements Observable<T> {
    private static final int INITIAL_VERSION = 0;

    private final Object mLock = new Object();
    private final AtomicReference<Object> mState;
    @GuardedBy("mLock")
    private int mVersion = INITIAL_VERSION;
    @GuardedBy("mLock")
    private boolean mUpdating = false;

    // Must be updated together under lock
    @GuardedBy("mLock")
    private final Map<Observer<? super T>, ObserverWrapper<T>> mWrapperMap = new HashMap<>();
    @GuardedBy("mLock")
    private final CopyOnWriteArraySet<ObserverWrapper<T>> mNotifySet = new CopyOnWriteArraySet<>();

    StateObservable(@Nullable Object initialState, boolean isError) {
        if (isError) {
            Preconditions.checkArgument(initialState instanceof Throwable, "Initial errors must "
                    + "be Throwable");
            mState = new AtomicReference<>(ErrorWrapper.wrap((Throwable) initialState));
        } else {
            mState = new AtomicReference<>(initialState);
        }

    }

    void updateState(@Nullable T state) {
        updateStateInternal(state);
    }

    void updateStateAsError(@NonNull Throwable error) {
        updateStateInternal(ErrorWrapper.wrap(error));
    }

    private void updateStateInternal(@Nullable Object newState) {
        Iterator<ObserverWrapper<T>> notifyIter;
        int currentVersion;
        synchronized (mLock) {
            Object oldState = mState.getAndSet(newState);
            // If new state is equal to old state, no need to do anything.
            if (Objects.equals(oldState, newState)) return;
            currentVersion = ++mVersion; // State was updated. Next version.
            if (mUpdating) return; // Already updating. New state will get used due to version bump.
            mUpdating = true;
            notifyIter = mNotifySet.iterator();
        }

        while (true) {
            // Update observers unlocked in case of direct executor.
            while (notifyIter.hasNext()) {
                notifyIter.next().update(currentVersion);
            }

            // Check if a new version was added while updating
            synchronized (mLock) {
                if (mVersion == currentVersion) {
                    // Updating complete. Break out.
                    mUpdating = false;
                    break;
                }

                // A new version was added. Update again on next loop.
                // Get a new iterator in case the observers changed during update.
                notifyIter = mNotifySet.iterator();
                currentVersion = mVersion;
            }
        }
    }

    /**
     * Fetch the latest state.
     *
     * <p>For state observables, the future returned by {@code fetchData()} is guaranteed to be
     * complete and will contain either the current state or an error state which will be thrown
     * as an exception from {@link ListenableFuture#get()}.
     *
     * @return A future which will contain the latest value or an error.
     */
    @SuppressWarnings("unchecked")
    @NonNull
    @Override
    public ListenableFuture<T> fetchData() {
        Object state = mState.get();
        if (state instanceof ErrorWrapper) {
            return Futures.immediateFailedFuture(((ErrorWrapper) state).getError());
        } else {
            return Futures.immediateFuture((T) state);
        }
    }

    @Override
    public void addObserver(@NonNull Executor executor, @NonNull Observer<? super T> observer) {
        ObserverWrapper<T> wrapper;
        synchronized (mLock) {
            // If observer is already registered, remove it. It will get notified again immediately.
            removeObserverLocked(observer);

            wrapper = new ObserverWrapper<>(mState, executor, observer);
            mWrapperMap.put(observer, wrapper);
            mNotifySet.add(wrapper);
        }

        // INITIAL_VERSION won't necessarily match the current tracked version constant, but it
        // will be the initial version this wrapper receives. Any future version updates will
        // always be higher than INITIAL_VERSION.
        wrapper.update(INITIAL_VERSION);
    }

    @Override
    public void removeObserver(@NonNull Observer<? super T> observer) {
        synchronized (mLock) {
            removeObserverLocked(observer);
        }
    }

    @GuardedBy("mLock")
    private void removeObserverLocked(@NonNull Observable.Observer<? super T> observer) {
        ObserverWrapper<T> wrapper = mWrapperMap.remove(observer);
        if (wrapper != null) {
            wrapper.close();
            mNotifySet.remove(wrapper);
        }
    }

    private static final class ObserverWrapper<T> implements Runnable {
        private static final Object NOT_SET = new Object();
        private static final int NO_VERSION = -1;

        private final Executor mExecutor;
        private final Observer<? super T> mObserver;
        private final AtomicBoolean mActive = new AtomicBoolean(true);
        private final AtomicReference<Object> mStateRef;

        // Since run() will always run sequentially, no need to lock for this variable.
        private Object mLastState = NOT_SET;
        @GuardedBy("this")
        private int mLatestSignalledVersion = NO_VERSION;
        @GuardedBy("this")
        private boolean mWrapperUpdating = false;

        ObserverWrapper(@NonNull AtomicReference<Object> stateRef, @NonNull Executor executor,
                @NonNull Observer<? super T> observer) {
            mStateRef = stateRef;
            mExecutor = executor;
            mObserver = observer;
        }

        @SuppressWarnings("unchecked")
        @Override
        public void run() {
            Object newState;
            int currentVersion;
            synchronized (this) {
                // Only update if we're still active.
                if (!mActive.get()) {
                    mWrapperUpdating = false;
                    return;
                }
                // Get latest state.
                newState = mStateRef.get();
                currentVersion = mLatestSignalledVersion;
            }

            // Continue to check if we're active before updating
            while (true) {
                // Conflate notification using equality
                if (!Objects.equals(mLastState, newState)) {
                    mLastState = newState;
                    if (newState instanceof ErrorWrapper) {
                        mObserver.onError(((ErrorWrapper) newState).getError());
                    } else {
                        mObserver.onNewData((T) newState);
                    }
                }

                synchronized (this) {
                    if (currentVersion == mLatestSignalledVersion || !mActive.get()) {
                        // Updating complete or no longer active. Break out of update loop.
                        mWrapperUpdating = false;
                        break;
                    }

                    // Get state and version for next update.
                    newState = mStateRef.get();
                    currentVersion = mLatestSignalledVersion;
                }
            }
        }

        void update(int version) {
            synchronized (this) {
                // If no longer active, then don't attempt update.
                if (!mActive.get()) return;
                // No need to update (but this probably shouldn't happen anyways)
                if (version <= mLatestSignalledVersion) return;
                mLatestSignalledVersion = version;
                // No need to update if already updating. Version bump will cause update.
                if (mWrapperUpdating) return;
                mWrapperUpdating = true;
            }

            try {
                mExecutor.execute(this);
            } catch (Throwable t) {
                // Unable to notify due to state of Executor. The update is lost, but there's
                // not much we can do here since the executor rejected the update. Note this
                // may also mean that any updates which occurred while mWrapperUpdating ==
                // true will have also been lost.
                synchronized (this) {
                    // Update mWrapperUpdating so the next update can try again
                    mWrapperUpdating = false;
                }
            }
        }

        void close() {
            // Best effort cancellation. In progress updates will not be cancelled.
            mActive.set(false);
        }
    }

    @AutoValue
    abstract static class ErrorWrapper {
        @NonNull
        static ErrorWrapper wrap(@NonNull Throwable error) {
            return new AutoValue_StateObservable_ErrorWrapper(error);
        }

        @NonNull
        public abstract Throwable getError();
    }
}