powered by simpleCommunicator - 2.0.30     © 2024 Programmizd 02
Map
Форумы / Java [игнор отключен] [закрыт для гостей] / Подскажите по rxJava
10 сообщений из 10, страница 1 из 1
Подскажите по rxJava
    #40076569
bobo96
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Всем привет, не могу разобраться с rxJava.
Элементарная (на первый взгляд) задача: пройтись по массиву с ip адресами и найти первый доступный.
Код:

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
        Subscription subscription = Observable
                .from(%ARRAY%)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d("mylog", "onCompleted ");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d("mylog", "onError " + e.getMessage());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d("mylog", "onNext " + s);

                        String[] serverData = s.split(":");
                        Socket socket = new Socket();

                        try {
                            socket.connect(new InetSocketAddress(
                                            InetAddress.getByName(serverData[0]),
                                            Integer.parseInt(serverData[1])), 2000);
                        } catch (IOException e) {
                        }
                    }
                });



В логах имеем:
D/mylog: onNext 192.168.0.103:7799
D/mylog: onError null

То есть доходит до первого недоступного и останавливается.
Интернеты говорят, что нужно вроде как использовать onExceptionResumeNext\onErrorResumeNext. Пробовал добавлять и .onExceptionResumeNext(Observable.from(%ARRAY%)) и .onErrorResumeNext(Observable.from(%ARRAY%)) и вместе - результат в логах такой же, как и без них.
Что я делаю не так ?
...
Рейтинг: 0 / 0
Подскажите по rxJava
    #40076602
chpasha
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
я не специалист по rx от слова вообще, но по-моему оно в onError вообще не должно попадать по-хорошему, если ты все ошибки в onNext ловишь. Возможно у тебя там вываливается что-то окромя IOException, например в Integer.parseInt или прилетает null вместо строки или IndexOutOfBounds, если в адресах IP без порта. T.e. стремится нужно не к затыканию дыр в виде "как продолжить после ошибки", а к исправлению или учету этой конкретной ошибки

Кроме того из кода не видно, где тут "первый доступный"? Где остановка при успехе? По-хорошему проверка должна где-то в filter (или аналоге) происходить, а потом должно быть что-то типа findFirst. Но вот вопрос в том, как сделать в filter асинхронную проверку. Имхо RxJava тут либо вообще мимо кассы, либо нужно делать не так (как правильно на Rx я не знаю) - стартовать всю процедуру в отдельном потоке и уже в нем проверять каждый IP по очереди блокирующим запросом. На стримах я бы сделал так

Код: java
1.
2.
3.
4.
5.
6.
7.
@WorkerThread /*аннотация ничего не делает, только сигнализирует, что метод должен выполняться в фоне*/
Optional<String> findAvailable()
{
      Arrays.stream(%ARRAY%)
             .filter(/*здесь проверка из OnNext, возвращающая только рабочие IP*/)
             .findFirst()          
}



P. S. погуглил коротко, с rx нужно делать примерно так (правильный синтаксис может отличаться, но идея вроде понятна)

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
PublishSubject<Integer> stop = PublishSubject.create();

source
.takeUntil(stop)
.doOnNext(new Action1<Integer>() {
    int calls;
    @Override
    public void call(Integer t) {
        System.out.println("Saving " + t);
        if (++calls == 3) {
            stop.onNext(1);
        }
    }
})
.subscribe();
...
Рейтинг: 0 / 0
Подскажите по rxJava
    #40076608
bobo96
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
chpasha,

Ок, будем вникать дальше, спасибо!
...
Рейтинг: 0 / 0
Подскажите по rxJava
    #40076609
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Возможно - долгий таймаут по недоступности. А добавь логгирование ошибок.

Код: java
1.
2.
3.
4.
5.
6.
7.
try {
                            socket.connect(new InetSocketAddress(
                                            InetAddress.getByName(serverData[0]),
                                            Integer.parseInt(serverData[1])), 2000);
                        } catch (IOException e) {
                            Log.error(.....);
                        }
...
Рейтинг: 0 / 0
Подскажите по rxJava
    #40076620
Garrick
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
bobo96,

Самое первое - неправильно вот это

автор
Код: java
1.
2.
 } catch (IOException e) {
 }



Поэтому ни фига и непонятно что не работает.
...
Рейтинг: 0 / 0
Подскажите по rxJava
    #40076627
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Возможно вот эта ловушка и не ловит никаких ошибок потому что не делается re-throw ошибк наверх.

Код: java
1.
2.
3.
                    public void onError(Throwable e) {
                        Log.d("mylog", "onError " + e.getMessage());
                    }
...
Рейтинг: 0 / 0
Подскажите по rxJava
    #40076774
bobo96
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
Спасибо за ответы. В общем ситуация такая (сам я тоже с этой либой раньше не работал): изначально у меня была такая зависимость - implementation group: 'io.reactivex', name: 'rxjava', version: '1.3.8' (кодинг под андроид если что), потом, попав вроде как на офф. сайт проекта заимплементил другие либы - implementation 'io.reactivex.rxjava3:rxandroid:3.0.0', implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
С ними все ок, при ошибке остановки нет! Но проблема в другом: как остановиться, когда найден ip, который отвечает ?)))
Код немного другой, на всякий случай с зависимостями:

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;

        Observable.fromArray(mKeeper.getConfig().getSERVERS())
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@io.reactivex.rxjava3.annotations.NonNull Disposable d) {
                        Log.d("mylog", "onSubscribe ");
                    }

                    @Override
                    public void onNext(@io.reactivex.rxjava3.annotations.NonNull String s) {
                        String[] serverData = s.split(":");

                        Socket socket = new Socket();

                        try {
                            Log.d("mylog", "onNext " + s);
                            socket.connect(new InetSocketAddress(
                                            InetAddress.getByName(serverData[0]),
                                            Integer.parseInt(serverData[1])),
                                    mKeeper.getConfig().getSERVER_CONNECT_TIMEOUT());
                            Log.d("mylog", "Connect! ");
                        } catch (IOException e) {
                            e.printStackTrace();
                            Log.d("mylog", "catch " + e.getMessage());
                        } finally {
                            try {
                                socket.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }

                    @Override
                    public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
                        Log.d("mylog", "onError " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("mylog", "onComplete ");
                    }
                });



Лог:
D/mylog: onSubscribe
D/mylog: onNext 192.168.43.21:7001
D/mylog: catch failed to connect to /192.168.43.21 (port 7001) from /192.168.43.1 (port 49872) after 2000ms: isConnected failed: ECONNREFUSED (Connection refused)
D/mylog: onNext 192.168.0.103:7001
D/mylog: catch failed to connect to /192.168.0.103 (port 7001) from /10.88.209.250 (port 45876) after 2000ms
D/mylog: onNext 85.12.240.55:7001
D/mylog: catch failed to connect to /85.12.xxx.xxx (port 7001) from /10.88.209.250 (port 39630) after 2000ms
D/mylog: onNext 109.195.107.114:7001
D/mylog: catch failed to connect to /109.195.xxx.xxx (port 7001) from /10.88.209.250 (port 42966) after 1999ms: isConnected failed: ECONNREFUSED (Connection refused)
D/mylog: onComplete
...
Рейтинг: 0 / 0
Подскажите по rxJava
    #40076812
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
А "реактивность" вообще предполагает такое понятие как остановиться?
...
Рейтинг: 0 / 0
Подскажите по rxJava
    #40076859
bobo96
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Гость
mayton, без понятия, но логично предположить, что такая возможность должна быть.
...
Рейтинг: 0 / 0
Подскажите по rxJava
    #40076860
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Можно вызвать System.exit(). Это прервёт java-процесс и процесс операционки. Но можно просто
посмотреть в сторону отказа от реактивности как от шаблона подобных разработок. Я не имею
ничего против Р. но очевидно что данная задача имеет строго выраженое начало и конец. Массив - конечен.
...
Рейтинг: 0 / 0
10 сообщений из 10, страница 1 из 1
Форумы / Java [игнор отключен] [закрыт для гостей] / Подскажите по rxJava
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]