powered by simpleCommunicator - 2.0.30     © 2024 Programmizd 02
Map
Форумы / Java [игнор отключен] [закрыт для гостей] / Queue с поддержкой сериализации на диск.
25 сообщений из 200, страница 4 из 8
Queue с поддержкой сериализации на диск.
    #39906276
вадя
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Каждый файл приближенно до 128Мб. Что диск рационально утилизировать.
для этого ещё надо размер кластера сделать максимальным - 128 к или больше.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906280
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Я беру за основу идею что мне не нужен 1 месседж ка единица использования.
Если при размере 1-4k средняя длина месседжа будет 2 к то в 128 Мегабайтном
куске у меня лежит фрагмент FIFO порядка 65 тысяч месседжей.

Это меня вполне устраивает. Все равно чтение и запись будет крупным потоком.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906324
Leonid Kudryavtsev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
В теме с буферизацией видосов речь идет о записях фиксированного размера.
Их легче поддерживать. Как арифметику указателей в С++

Mayton, фиксированные <=> не фиксированные - двадцать строчек кода

В момент записи: Разбиваем на сhunk'и и пишем в FIFO Circle Ring Buffer, у последней записи ставим признак последняя
В момент чтения:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
StringBuilder sb = new...; 
while ( true ) {
  MyFixedSizeElement tmp = myFileCircleRingBuffer.poll();
  sb.append( tmp.chunk );
  if ( tmp.послядняя ) {
    return sb.toString();
  }
}



Для вариативного размера... я пока придумал писать sequence из файлов. Типа

Чем сиквенс из 10 файлов по 128 Mb лучше одного файла в 1.2 Gb - мне не понятно
Чем он помогает при записях разной длина - мне так же не понятно.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906327
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev, предлагай свои варианты.

Базовый проткол остаётся тот-же самый. Но добавим метод poll. И несколько очередей (несколько экземпляров Q)
и бросание исключений и ошибки статусов для каждой команды.

Код: javascript
1.
2.
3.
4.
5.
interface Q {
   add(Object entity);
   Object poll();
   purge();
}
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906328
Leonid Kudryavtsev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
В теме с буферизацией видосов....

тут скорее вопрос:
1. допустимо и нужно ли "накапливать" входящие пакеты в памяти перед записью на диск?
2. какая максимальная задержка по времени допустима?
(пакет пришел и пауза... буфер не заполнен, нужен сброс по таймеру)
3. требование к надежности
(нужно ли и когда жестко flush'ить)
4. Так же не очень понятно, где хранить head - tail позицию. Вроде же они должны быть синхронезированы у читателя и писателя (когда очередь не заполнена и читатели ждут новых данных). Если на диске, когда их flush'ить
С последним пунктом не уверен, нужно думать, а что бы начинать думать, по хорошему нужно начинать кодить )))

совершенно не понятно, что Вы имели в виду под "удалением"
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906329
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev
mayton
В теме с буферизацией видосов....

тут скорее вопрос:
1. допустимо и нужно ли "накапливать" входящие пакеты в памяти перед записью на диск?

Да.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906331
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev

4. Так же не очень понятно, где хранить head - tail позицию. Вроде же они должны быть синхронезированы у читателя и писателя (когда очередь не заполнена и читатели ждут новых данных). Если на диске, когда их flush'ить
С последним пунктом не уверен, нужно думать, а что бы начинать думать, по хорошему нужно начинать кодить )))

Это самый интересный и сложный вопрос. Я как раз думаю что форум придумает пропозицию.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906332
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev

3. требование к надежности
(нужно ли и когда жестко flush'ить)

Нет жесткого требования. Если вы будете флашить через каждые 3 секунды с последнего event то я не против.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906333
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev

совершенно не понятно, что Вы имели в виду под "удалением"

Очистка? Ну... как вариант обнаружен баг и очередь надо сбросить.
Или в кластере java-приложений насмерть помер один из nodes и чтобы
ребаланс евентов сработал - надо просто рестаровать бизнес процессы
в новом окружении очередей. Старые данные что скопились в очередях
уже ценности не представляют. Их можно убить.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906378
Kachalov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Пятничный креатив: сбрасывайте сообщения в Lucene (если нужно легкое рукоблудное решение с нулевым администрированием) или в Elastic (тут с администрированием, зато и с кластеризацией) с индексацией по времени
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906396
Leonid Kudryavtsev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
В первом приближении
Многопоток не проверял и не делал
Проверок не делал
Гитхабом не заморачивался )))
Буферизацию не делал. Флегда флушится.

пришлось сделать свою реализацию FileOutputStream, FileInputStream трогать не стал )))


Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

/**
 * Расширяет FileOutputStream. При close не закрывает поток, а просто перепозиционируется в начало
 */
public class MyFileOutputStream extends FileOutputStream {
    public MyFileOutputStream( String fname ) throws FileNotFoundException {
        super(fname);
    }

    @Override
    public void close() throws IOException {
        this.getChannel().position( 0 );
    }
}



Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
import java.io.*;

/**
 * Класс представляет из себя Circle Ring Buffer для работы с файлом
 */
public class FileRing {
    int maxFileSize;
    int reserv;
    String fname;
    long readPos;
    long flushedWritePos;
    long flushedFileSize;
    FileInputStream inStream;
    MyFileOutputStream outStream;
    ObjectInputStream in;
    ObjectOutputStream out;
    DataInput inData;
    DataOutput outData;
    FileRing(String fname, int maxFileSize, int reserv ) throws IOException {
        this.fname = fname;
        this.maxFileSize = maxFileSize;
        this.reserv = reserv;
        outStream = new MyFileOutputStream( fname );
        out = new  ObjectOutputStream( outStream );
        inStream = new FileInputStream( fname );
        in = new ObjectInputStream( inStream );
    }

    public void add(Object entity) throws IOException {
        out.writeObject( entity );
        this.flush();
    }

    /* Достигли конца очереди при записи, переходим в начало */
    private void writeEndOfCircle() throws IOException {
        //
System.out.println( "writeEndOfCircle!" );
        this.flushedFileSize = this.flushedWritePos;
System.out.println( "this.flushedFileSize="+this.flushedFileSize );
        // позиционируемся в начало
        // "Как-бы" закрываем поток и пересоздаем его
        this.out.close();
        this.out = new ObjectOutputStream( this.outStream );
    }

    public void flush() throws IOException {
        out.flush();
        this.flushedWritePos = outStream.getChannel().position();
System.out.println( "flushedWritePos="+flushedWritePos );
        // достигли конец файла
        if ( this.flushedWritePos > (this.maxFileSize-this.reserv) ) {
            this.writeEndOfCircle();
        }
    }

    public boolean isEmpty() {
        return this.readPos == this.flushedWritePos;
    }

    public Object poll() throws IOException, ClassNotFoundException {
        Object o;
        // Достигли конца файла
        if ( this.readPos==this.flushedFileSize ) {
            System.out.println( "end of file at read. readPos="+ this.readPos );
            // Достигли конца файла, переоткрываем его
            this.in.close();
            this.inStream = new FileInputStream( this.fname );
            this.in = new ObjectInputStream( this.inStream );
            this.readPos = 0;
        }
        if (isEmpty()) {
           return null;
        }
        o = in.readObject();
        this.readPos = this.inStream.getChannel().position();
System.out.println( "readPos="+readPos );
        return o;
    }

    public void purge() {
        this.readPos = this.flushedWritePos = 0;
    }
}



простейший тест на запись и чтение:
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
import java.io.IOException;

public class Test {
    public static void  main( String argv[] ) throws IOException, ClassNotFoundException {
        FileRing r;
        r = new FileRing( "my.ring", 200, 40 );
        // Простое тестирование записи/чтения
        for ( int i = 0; i<10; i++ ) {
            String s= "My String "+i+".";
            r.add( s );
        }
        for ( int i = 0; i<10; i++ ) {
            String s = (String) r.poll();
            System.out.println( s );
        }
        // На очередных строчках, будет переход через границу буффера
        for ( int i = 0; i<10; i++ ) {
            String s= "My String "+i+".";
            r.add( s );
        }
        for ( int i = 0; i<10; i++ ) {
            String s = (String) r.poll();
            System.out.println( s );
        }

    }
}


...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906418
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev,
И как его закрыть тогда?
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906419
забыл ник
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
забыл ник
Leonid Kudryavtsev,
И как его закрыть тогда?

Что если вдруг эксепшен? Ну так себе идея..
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906434
Фотография Valentin Kolesnikov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Есть ещё memcached и rocksdb.

https://db-engines.com/en/system/Memcached;RocksDB;SwayDB

Хорошего вам дня!
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906460
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev,

Спасибо. Попробуем.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906599
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Хм. Кое что не юзается. Убрал.

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
import java.io.*;

/**
 * Класс представляет из себя Circle Ring Buffer для работы с файлом
 */
public class FileRing {
    int maxFileSize;
    int reserv;
    String fname;
    long readPos;
    long flushedWritePos;
    long flushedFileSize;
    FileInputStream inStream;
    MyFileOutputStream outStream;
    ObjectInputStream in;
    ObjectOutputStream out;
    DataInput inData;
    DataOutput outData;
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906614
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Надо еще потестить в 2 потока. Один добавляет. Другой poll-ит.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906689
Leonid Kudryavtsev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Надо еще потестить в 2 потока. Один добавляет. Другой poll-ит.

я никак не могу придумать, как сделать проверку на переполнение буфера

если ее не делать - то все просто, но как-то хочется, что бы если пишешь больше размера буфера и читалка не успевает - был бы эксепшен
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906697
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Давайте отложим пока. Это не бизнес-кейс.

Главное что больше чем heap положили в голову очереди - уже хорошо.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906776
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Заменил Object на Serializable. Изменил тест пока на синхронное отрабатывание сначала продюсера а потом потребителя.

Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
package mayton;

import org.apache.log4j.Logger;

import java.io.Serializable;
import java.util.Random;

public class Test {

    static Logger logger = org.apache.log4j.Logger.getLogger(Test.class);

    public Test() throws Exception {
        final FileRing r = new FileRing("out/my.ring", 16_384, 40);

        Thread procuder = new Thread(() -> {
            Random random = new Random();
            try {
                for (int i = 0; i < 20_000; i++) {
                    r.add(Integer.valueOf(i));
                }
                logger.info("Successfully wrote 10_000 Integer objects");
            } catch (Exception e) {
                logger.error(e);
            }
        });

        procuder.start();

        procuder.join();

        Thread consumer = new Thread(() -> {
            try {
                int sum = 0;
                for (int i = 0; i < 20_000; i++) {
                    Serializable object = r.poll();
                    sum += (Integer) object;
                }
                logger.info("Successfully read 20_000 Integer objects with sum = " + sum);
            } catch (Exception e) {
                logger.error("Exception during consumer loop", e);
            }
        });

        consumer.start();

        consumer.join();
    }

    public static void main(String argv[]) throws Exception {

        new Test();

    }

}


Код: java
1.
2.
3.
4.
5.
2019-12-22 22:42:10,512 [Thread-0] INFO  mayton.Test - Successfully wrote 10_000 Integer objects
2019-12-22 22:42:10,526 [Thread-1] ERROR mayton.Test - Exception during consumer loop
java.lang.NullPointerException
	at mayton.Test.lambda$new$1(Test.java:36)
	at java.base/java.lang.Thread.run(Thread.java:834)
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906815
Фотография crutchmaster
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Даже не могу представить как ты по работе проблемы решаешь. Всё - сам кодишь.

Ну почему всё. Либы для оракла и раббита я не велосипедил
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39906816
Фотография crutchmaster
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev
Честно говоря, если делать велосипед, я бы делал что нибудь простое. Типа Circle Ring Buffer на диске. Chunk фиксированного размера (байтов 30-60). Большие записи разбивать на несколько chunk'ов и просто писать последовательно.

Структура хранения будет тривиальная. Поскольку FIFO и Circle Ring Buffer то не будет фрагментации и все общение с диском исключительно последовательное - максимальная производительность на любом жетком диске.

Да, думал, пришел к тому же самому. Разделители - не нужно, запись кусками с метками (принято/отправленно/кусок/заголовок) настраиваемого размера.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39907281
Leonid Kudryavtsev
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Заменил Object на Serializable

AFAIK:
Serializable - тупое и медленное г...но
Externalizable - rulezzzzz
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39907294
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Leonid Kudryavtsev
mayton
Заменил Object на Serializable

AFAIK:
Serializable - тупое и медленное г...но
Externalizable - rulezzzzz

Согласен.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39907295
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
crutchmaster
Leonid Kudryavtsev
Честно говоря, если делать велосипед, я бы делал что нибудь простое. Типа Circle Ring Buffer на диске. Chunk фиксированного размера (байтов 30-60). Большие записи разбивать на несколько chunk'ов и просто писать последовательно.

Структура хранения будет тривиальная. Поскольку FIFO и Circle Ring Buffer то не будет фрагментации и все общение с диском исключительно последовательное - максимальная производительность на любом жетком диске.

Да, думал, пришел к тому же самому. Разделители - не нужно, запись кусками с метками (принято/отправленно/кусок/заголовок) настраиваемого размера.

Если использовать мой подход
Код: java
1.
2.
file0001.dat
file0002.dat


то кольцо делать не нужно. Файловая система предоставляет нам все что нужно.

А чтоб readPos не догонял writePos их надо синхронизировать через Приложение.
Как вариант - два Atomic счетчика.

Long указателя должно хватить на десятилетия. Инстанс приложения столько не живет
обычно.
...
Рейтинг: 0 / 0
25 сообщений из 200, страница 4 из 8
Форумы / Java [игнор отключен] [закрыт для гостей] / Queue с поддержкой сериализации на диск.
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]