Асинхронный воркер в Android WorkManager

Google недавно анонсировал новый архитектурный компонент WorkManager. Это упрощает планирование синхронной работы за счет реализации doWork() в классе Worker, но что, если я хочу выполнить некоторую асинхронную работу в фоновом режиме? Например, я хочу позвонить в сетевую службу с помощью Retrofit. Я знаю, что могу сделать синхронный сетевой запрос, но он заблокирует поток, и мне кажется, что это неправильно. Есть ли какое-то решение для этого или он просто не поддерживается на данный момент?


person Anton Tananaev    schedule 18.05.2018    source источник
comment
Вы имеете в виду, что MainThread заблокирован или текущий поток заблокирован?   -  person Sagar    schedule 18.05.2018
comment
Рабочий поток будет заблокирован.   -  person Anton Tananaev    schedule 18.05.2018
comment
Вместо того, чтобы создавать новый поток, вы можете поставить в очередь двух воркеров одновременно?   -  person Sagar    schedule 18.05.2018
comment
Пожалуйста, внимательно прочтите вопрос. Я не создаю новых тем.   -  person Anton Tananaev    schedule 18.05.2018
comment
Я имел в виду, что если вы хотите сделать что-то асинхронно, вам нужно создать поток, чтобы он не выполнялся в том же потоке. Я пытаюсь понять ваш вариант использования.   -  person Sagar    schedule 18.05.2018
comment
Что ж, внутренняя модернизация, вероятно, имеет отдельный поток.   -  person Anton Tananaev    schedule 18.05.2018


Ответы (8)


Согласно документам WorkManager:

По умолчанию WorkManager выполняет свои операции в фоновом потоке. Если вы уже работаете в фоновом потоке и вам нужны синхронные (блокирующие) вызовы WorkManager, используйте synchronous () для доступа к таким методам.

Следовательно, если вы не используете synchronous(), вы можете безопасно выполнять сетевые вызовы синхронизации из doWork(). Это также лучший подход с точки зрения дизайна, потому что обратные вызовы беспорядочные.

Тем не менее, если вы действительно хотите запускать асинхронные задания из doWork(), вам необходимо приостановить поток выполнения и возобновить его после завершения асинхронного задания с использованием механизма wait/notify (или какого-либо другого механизма управления потоками, например Semaphore). В большинстве случаев я бы не рекомендовал.

Кстати, WorkManager находится на очень ранней стадии альфа-тестирования.

person Vasiliy    schedule 18.05.2018

Я использовал обратный отсчет и ждал, пока он достигнет 0, что произойдет только после того, как асинхронный обратный вызов обновит его. Смотрите этот код:

public WorkerResult doWork() {

        final WorkerResult[] result = {WorkerResult.RETRY};
        CountDownLatch countDownLatch = new CountDownLatch(1);
        FirebaseFirestore db = FirebaseFirestore.getInstance();

        db.collection("collection").whereEqualTo("this","that").get().addOnCompleteListener(task -> {
            if(task.isSuccessful()) {
                task.getResult().getDocuments().get(0).getReference().update("field", "value")
                        .addOnCompleteListener(task2 -> {
                            if (task2.isSuccessful()) {
                                result[0] = WorkerResult.SUCCESS;
                            } else {
                                result[0] = WorkerResult.RETRY;
                            }
                            countDownLatch.countDown();
                        });
            } else {
                result[0] = WorkerResult.RETRY;
                countDownLatch.countDown();
            }
        });

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return result[0];

    }
person TomH    schedule 19.05.2018
comment
Что происходит при сбое ограничения. Ограничение средств. Для идеального состояния триггеры менеджера работы. и через некоторое время телефон вышел из идеального состояния. - person Nitish; 24.11.2018

К вашему сведению, теперь существует ListenableWorker, который предназначен для работы в асинхронном режиме.

Изменить: вот несколько примеров использования. Я вырезал большие куски кода, которые, на мой взгляд, не являются иллюстративными, так что есть большая вероятность, что здесь есть небольшая ошибка или две.

Это для задачи, которая принимает String photoKey, извлекает метаданные с сервера, выполняет некоторую работу по сжатию, а затем загружает сжатую фотографию. Это происходит вне основного потока. Вот как мы отправляем запрос на работу:

private void compressAndUploadFile(final String photoKey) {
    Data inputData = new Data.Builder()
            .putString(UploadWorker.ARG_PHOTO_KEY, photoKey)
            .build();
    Constraints constraints = new Constraints.Builder()
            .setRequiredNetworkType(NetworkType.CONNECTED)
            .build();
    OneTimeWorkRequest request = new OneTimeWorkRequest.Builder(UploadWorker.class)
            .setInputData(inputData)
            .setConstraints(constraints)
            .build();
    WorkManager.getInstance().enqueue(request);
}

И в UploadWorker:

public class UploadWorker extends ListenableWorker {
    private static final String TAG = "UploadWorker";
    public static final String ARG_PHOTO_KEY = "photo-key";

    private String mPhotoKey;

    /**
     * @param appContext   The application {@link Context}
     * @param workerParams Parameters to setup the internal state of this worker
     */
    public UploadWorker(@NonNull Context appContext, @NonNull WorkerParameters workerParams) {
        super(appContext, workerParams);
        mPhotoKey = workerParams.getInputData().getString(ARG_PHOTO_KEY);
    }

    @NonNull
    @Override
    public ListenableFuture<Payload> onStartWork() {
        SettableFuture<Payload> future = SettableFuture.create();
        Photo photo = getPhotoMetadataFromServer(mPhotoKey).addOnCompleteListener(task -> {
            if (!task.isSuccessful()) {
                Log.e(TAG, "Failed to retrieve photo metadata", task.getException());
                future.setException(task.getException());
                return;
            }
            MyPhotoType photo = task.getResult();
            File file = photo.getFile();
            Log.d(TAG, "Compressing " + photo);
            MyImageUtil.compressImage(file, MyConstants.photoUploadConfig).addOnCompleteListener(compressionTask -> {
                if (!compressionTask.isSuccessful()) {
                    Log.e(TAG, "Could not parse " + photo + " as an image.", compressionTask.getException());
                    future.set(new Payload(Result.FAILURE));
                    return;
                }
                byte[] imageData = compressionTask.getResult();
                Log.d(TAG, "Done compressing " + photo);
                UploadUtil.uploadToServer(photo, imageData);
                future.set(new Payload(Result.SUCCESS));
            });
        });
        return future;
    }
}

РЕДАКТИРОВАТЬ

В зависимости от того, что вы используете в своем приложении, вы также можете расширять RxWorker (если вы используете RxJava) или CoroutineWorker (если вы с использованием сопрограмм). Оба они являются производными от ListenableWorker.

person Bartholomew Furrow    schedule 15.10.2018
comment
Не могли бы вы добавить пример того, как использовать этот класс? - person idish; 27.11.2018
comment
@idish Я добавил пример. - person Bartholomew Furrow; 04.12.2018
comment
Я не могу использовать SettableFuture.create () в альфа-13, класс ограничен только той же группой библиотек. - person David Vávra; 15.12.2018
comment
Действительно, SettableFuture.create(); модуль является частным только для группы библиотек WorkManager. Не может быть использован. - person idish; 27.12.2018
comment
На самом деле это не личное. stackoverflow.com/questions/43656617/ Просто подавить предупреждение @SuppressLint (RestrictedApi) - person A.Sanchez.SD; 17.01.2019
comment
Задача выполняется в основном потоке developer.android.com/reference/androidx/work/ ListenableWorker. Они говорят, что The startWork() method is called on the main thread. Также я не вижу onStartWork в классе. Вы можете это объяснить? - person Abhay Pai; 03.02.2019
comment
Мы можем использовать ListenableWorker, вы можете увидеть документ здесь: developer.android.com / topic / libraries / architecture / workmanager / - person Duong.Nguyen; 23.03.2021

Если вы говорите об асинхронном задании, вы можете переместить свою работу в RxJava Observables / Singles.

Существует набор операторов, таких как .blockingGet() или .blockingFirst(), которые преобразуют Observable<T> в блокировку T.

Worker работает в фоновом потоке, поэтому не беспокойтесь о NetworkOnMainThreadException.

person Lukas    schedule 21.05.2018
comment
можете ли вы ответить на этот вопрос: stackoverflow.com/questions/50580106/ - person Usman Rana; 29.05.2018
comment
Использование синхронного по асинхронному API при работе из фонового потока не всегда достаточно. Например, некоторые асинхронные API имеют onProgress обратные вызовы, которые будут вызываться в основном потоке. - person idish; 27.12.2018
comment
можно ли выполнить команду ffmpeg с помощью RxJava? поскольку это уже асинхронный метод с обратным вызовом - person Usman Rana; 29.03.2019

Я использовал BlockingQueue, что упрощает синхронизацию потоков и передачу результата между потоками, вам понадобится только один объект

private var disposable = Disposables.disposed()

private val completable = Completable.fromAction { 
        //do some heavy computation
    }.subscribeOn(Schedulers.computation()) // you will do the work on background thread

override fun doWork(): Result {
    val result = LinkedBlockingQueue<Result>()

    disposable = completable.subscribe(
            { result.put(Result.SUCCESS) },
            { result.put(Result.RETRY) }
    )

    return try {
        result.take() //need to block this thread untill completable has finished
    } catch (e: InterruptedException) {
        Result.RETRY
    }
}

Также не забывайте освобождать ресурсы, если ваш Worker был остановлен, это главное преимущество перед .blockingGet(), так как теперь вы можете правильно отменить свою задачу Rx.

override fun onStopped(cancelled: Boolean) {
    disposable.dispose()
}
person Roman Nazarevych    schedule 24.07.2018
comment
Не могли бы вы добавить еще код для того же самого? Это довольно абстрактно. - person Vinayak; 11.08.2020

С помощью сопрограмм вы можете «синхронизировать» doWork() следующим образом:

Приостановить метод получения местоположения (асинхронно):

private suspend fun getLocation(): Location = suspendCoroutine { continuation ->
    val mFusedLocationClient = LocationServices.getFusedLocationProviderClient(appContext)
    mFusedLocationClient.lastLocation.addOnSuccessListener {
        continuation.resume(it)
    }.addOnFailureListener {
        continuation.resumeWithException(it)
    }
}

Пример вызова в doWork():

override fun doWork(): Result {
    val loc = runBlocking {
        getLocation()
    }
    val latitude = loc.latitude
}
person Webfreak    schedule 04.07.2019

Я также предпочел бы подход, рекомендованный @TomH. Однако я использовал его с Firebase Storage. Использование WorkManager вместе с CountDownlatch помогло мне. Вот фрагмент кода. Журналы ведутся с использованием древесины.

Он возвращает downloadUrl из Firebase в виде строки после завершения задачи, но до того, как рабочий вернет успех.

@NonNull
@Override
public Result doWork() {
    mFirebaseStorage = mFirebaseStorage.getInstance();
    mTriviaImageStorageReference = mFirebaseStorage.getReference().child("images");

    CountDownLatch countDown = new CountDownLatch(2);
    Uri imageUri = Uri.parse(getInputData().getString(KEY_IMAGE_URI));

    try {

    // get the image reference
    final StorageReference imageRef = mTriviaImageStorageReference.child(imageUri.getLastPathSegment());

    // upload the image to Firebase
    imageRef.putFile(imageUri).continueWithTask(new Continuation<UploadTask.TaskSnapshot, Task<Uri>>() {
        @Override
        public Task<Uri> then(@NonNull Task<UploadTask.TaskSnapshot> task) throws Exception {
            if (!task.isSuccessful()) {
                throw task.getException();
            }
            countDown.countDown();
            return imageRef.getDownloadUrl();
        }
    }).addOnCompleteListener(new OnCompleteListener<Uri>() {
        @Override
        public void onComplete(@NonNull Task<Uri> task) {
            if (task.isSuccessful()) {
                Timber.d("Image was successfully uploaded to Firebase");
                Uri downloadUri = task.getResult();
                String imageUrl = downloadUri.toString();

                Timber.d(("URl of the image is: " + imageUrl));

                mOutputData = new Data.Builder()
                        .putString(KEY_FIREBASE_IMAGE_URL, imageUrl)
                        .build();
                countDown.countDown();
            } else {
                Toast.makeText(getApplicationContext(), "upload failed", Toast.LENGTH_SHORT).show();
                countDown.countDown();
            }
        }
    });
    countDown.await();
    return Result.success(mOutputData);

    } catch (Throwable throwable) {
        Timber.e(throwable, "Error uploading image");
        return Result.failure();
    }
}
person madfree    schedule 15.05.2020
comment
Я пробовал ваш путь, но мой doWork вызывается несколько раз, - person gr_aman; 24.05.2020

Это поздно, но это может помочь другим людям,

Вы можете использовать CoroutineWorker и внутри doWork () использовать что-то под названием suspendCancellableCoroutine, оно специально предназначено для этой цели.

Ниже приведен фрагмент кода:

class FileDownloader(private val appContext: Context, params: WorkerParameters) :
CoroutineWorker(appContext, params) {

   override suspend fun doWork(): Result {

       try {

          suspendCancellableCoroutine<Int> { cancellableContinuation ->

              // Here you can call your asynchronous callback based network

                override fun onComplete() {
                        cancellableContinuation.resumeWith(
                            kotlin.Result.success(100))
                }

                override fun onError(error: Error?) {

                        cancellableContinuation.resumeWithException(
                            error?.connectionException ?: Throwable()
                        )
                   
               }
               
     }

     }catch (e: Exception) {
           return Result.failure()
      }

  return Result.success()
}
}

Здесь Coroutine будет остановлен, пока вы не вызовете cancellableContinuation.resumeWith.

person Abhishek Kumar    schedule 01.10.2020