Гость
Map
Форумы / Java [игнор отключен] [закрыт для гостей] / Подскажите по rxJava / 10 сообщений из 10, страница 1 из 1
09.06.2021, 11:52
    #40076569
bobo96
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Подскажите по rxJava
Всем привет, не могу разобраться с rxJava.
Элементарная (на первый взгляд) задача: пройтись по массиву с ip адресами и найти первый доступный.
Код:

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
        Subscription subscription = Observable
                .from(%ARRAY%)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.d("mylog", "onCompleted ");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d("mylog", "onError " + e.getMessage());
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d("mylog", "onNext " + s);

                        String[] serverData = s.split(":");
                        Socket socket = new Socket();

                        try {
                            socket.connect(new InetSocketAddress(
                                            InetAddress.getByName(serverData[0]),
                                            Integer.parseInt(serverData[1])), 2000);
                        } catch (IOException e) {
                        }
                    }
                });



В логах имеем:
D/mylog: onNext 192.168.0.103:7799
D/mylog: onError null

То есть доходит до первого недоступного и останавливается.
Интернеты говорят, что нужно вроде как использовать onExceptionResumeNext\onErrorResumeNext. Пробовал добавлять и .onExceptionResumeNext(Observable.from(%ARRAY%)) и .onErrorResumeNext(Observable.from(%ARRAY%)) и вместе - результат в логах такой же, как и без них.
Что я делаю не так ?
...
Рейтинг: 0 / 0
09.06.2021, 13:31
    #40076602
chpasha
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Подскажите по rxJava
я не специалист по rx от слова вообще, но по-моему оно в onError вообще не должно попадать по-хорошему, если ты все ошибки в onNext ловишь. Возможно у тебя там вываливается что-то окромя IOException, например в Integer.parseInt или прилетает null вместо строки или IndexOutOfBounds, если в адресах IP без порта. T.e. стремится нужно не к затыканию дыр в виде "как продолжить после ошибки", а к исправлению или учету этой конкретной ошибки

Кроме того из кода не видно, где тут "первый доступный"? Где остановка при успехе? По-хорошему проверка должна где-то в filter (или аналоге) происходить, а потом должно быть что-то типа findFirst. Но вот вопрос в том, как сделать в filter асинхронную проверку. Имхо RxJava тут либо вообще мимо кассы, либо нужно делать не так (как правильно на Rx я не знаю) - стартовать всю процедуру в отдельном потоке и уже в нем проверять каждый IP по очереди блокирующим запросом. На стримах я бы сделал так

Код: java
1.
2.
3.
4.
5.
6.
7.
@WorkerThread /*аннотация ничего не делает, только сигнализирует, что метод должен выполняться в фоне*/
Optional<String> findAvailable()
{
      Arrays.stream(%ARRAY%)
             .filter(/*здесь проверка из OnNext, возвращающая только рабочие IP*/)
             .findFirst()          
}



P. S. погуглил коротко, с rx нужно делать примерно так (правильный синтаксис может отличаться, но идея вроде понятна)

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
PublishSubject<Integer> stop = PublishSubject.create();

source
.takeUntil(stop)
.doOnNext(new Action1<Integer>() {
    int calls;
    @Override
    public void call(Integer t) {
        System.out.println("Saving " + t);
        if (++calls == 3) {
            stop.onNext(1);
        }
    }
})
.subscribe();
...
Рейтинг: 0 / 0
09.06.2021, 13:53
    #40076608
bobo96
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Подскажите по rxJava
chpasha,

Ок, будем вникать дальше, спасибо!
...
Рейтинг: 0 / 0
09.06.2021, 13:54
    #40076609
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Подскажите по rxJava
Возможно - долгий таймаут по недоступности. А добавь логгирование ошибок.

Код: java
1.
2.
3.
4.
5.
6.
7.
try {
                            socket.connect(new InetSocketAddress(
                                            InetAddress.getByName(serverData[0]),
                                            Integer.parseInt(serverData[1])), 2000);
                        } catch (IOException e) {
                            Log.error(.....);
                        }
...
Рейтинг: 0 / 0
09.06.2021, 14:13
    #40076620
Garrick
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Подскажите по rxJava
bobo96,

Самое первое - неправильно вот это

автор
Код: java
1.
2.
 } catch (IOException e) {
 }



Поэтому ни фига и непонятно что не работает.
...
Рейтинг: 0 / 0
09.06.2021, 14:22
    #40076627
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Подскажите по rxJava
Возможно вот эта ловушка и не ловит никаких ошибок потому что не делается re-throw ошибк наверх.

Код: java
1.
2.
3.
                    public void onError(Throwable e) {
                        Log.d("mylog", "onError " + e.getMessage());
                    }
...
Рейтинг: 0 / 0
10.06.2021, 07:22
    #40076774
bobo96
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Подскажите по rxJava
Спасибо за ответы. В общем ситуация такая (сам я тоже с этой либой раньше не работал): изначально у меня была такая зависимость - implementation group: 'io.reactivex', name: 'rxjava', version: '1.3.8' (кодинг под андроид если что), потом, попав вроде как на офф. сайт проекта заимплементил другие либы - implementation 'io.reactivex.rxjava3:rxandroid:3.0.0', implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
С ними все ок, при ошибке остановки нет! Но проблема в другом: как остановиться, когда найден ip, который отвечает ?)))
Код немного другой, на всякий случай с зависимостями:

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;

        Observable.fromArray(mKeeper.getConfig().getSERVERS())
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@io.reactivex.rxjava3.annotations.NonNull Disposable d) {
                        Log.d("mylog", "onSubscribe ");
                    }

                    @Override
                    public void onNext(@io.reactivex.rxjava3.annotations.NonNull String s) {
                        String[] serverData = s.split(":");

                        Socket socket = new Socket();

                        try {
                            Log.d("mylog", "onNext " + s);
                            socket.connect(new InetSocketAddress(
                                            InetAddress.getByName(serverData[0]),
                                            Integer.parseInt(serverData[1])),
                                    mKeeper.getConfig().getSERVER_CONNECT_TIMEOUT());
                            Log.d("mylog", "Connect! ");
                        } catch (IOException e) {
                            e.printStackTrace();
                            Log.d("mylog", "catch " + e.getMessage());
                        } finally {
                            try {
                                socket.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }

                    @Override
                    public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
                        Log.d("mylog", "onError " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("mylog", "onComplete ");
                    }
                });



Лог:
D/mylog: onSubscribe
D/mylog: onNext 192.168.43.21:7001
D/mylog: catch failed to connect to /192.168.43.21 (port 7001) from /192.168.43.1 (port 49872) after 2000ms: isConnected failed: ECONNREFUSED (Connection refused)
D/mylog: onNext 192.168.0.103:7001
D/mylog: catch failed to connect to /192.168.0.103 (port 7001) from /10.88.209.250 (port 45876) after 2000ms
D/mylog: onNext 85.12.240.55:7001
D/mylog: catch failed to connect to /85.12.xxx.xxx (port 7001) from /10.88.209.250 (port 39630) after 2000ms
D/mylog: onNext 109.195.107.114:7001
D/mylog: catch failed to connect to /109.195.xxx.xxx (port 7001) from /10.88.209.250 (port 42966) after 1999ms: isConnected failed: ECONNREFUSED (Connection refused)
D/mylog: onComplete
...
Рейтинг: 0 / 0
10.06.2021, 11:26
    #40076812
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Подскажите по rxJava
А "реактивность" вообще предполагает такое понятие как остановиться?
...
Рейтинг: 0 / 0
10.06.2021, 12:54
    #40076859
bobo96
Гость
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Подскажите по rxJava
mayton, без понятия, но логично предположить, что такая возможность должна быть.
...
Рейтинг: 0 / 0
10.06.2021, 12:59
    #40076860
mayton
Участник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Подскажите по rxJava
Можно вызвать System.exit(). Это прервёт java-процесс и процесс операционки. Но можно просто
посмотреть в сторону отказа от реактивности как от шаблона подобных разработок. Я не имею
ничего против Р. но очевидно что данная задача имеет строго выраженое начало и конец. Массив - конечен.
...
Рейтинг: 0 / 0
Форумы / Java [игнор отключен] [закрыт для гостей] / Подскажите по rxJava / 10 сообщений из 10, страница 1 из 1
Целевая тема:
Создать новую тему:
Автор:
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]