powered by simpleCommunicator - 2.0.30     © 2024 Programmizd 02
Map
Форумы / Java [игнор отключен] [закрыт для гостей] / Queue с поддержкой сериализации на диск.
25 сообщений из 200, страница 8 из 8
Queue с поддержкой сериализации на диск.
    #39909461
Фотография полудух
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
в профиле есть настройки форума, а там есть "Включить "быстрый" ответ и цитирование"
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39909550
PetroNotC Sharp
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
полудух
в профиле есть настройки форума, а там есть "Включить "быстрый" ответ и цитирование"

Точно. Спасибо.
Кстати очень странный параметр. Насколько понял, он по умолчанию при создании аккаунта не включается. И второе, зачем она выведена для юзверя? Какой прок от режима "выкл быстрое цитирование"?
Плюсов вроде никаких).
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39909609
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Еще сам себе ссылку кидаю. Kafka. Persistence.

https://kafka.apache.org/documentation/#persistence
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39909663
Фотография Valentin Kolesnikov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
Еще сам себе ссылку кидаю. Kafka. Persistence.

https://kafka.apache.org/documentation/#persistence


http://swaydb.io/?language=java/

Легковеснее.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39909858
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39909869
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Valentin Kolesnikov
mayton
Еще сам себе ссылку кидаю. Kafka. Persistence.

https://kafka.apache.org/documentation/#persistence


http://swaydb.io/?language=java/

Легковеснее.

Прости пожалуйста. Но где те самые весы на которые можно положить эти две технологии чтоб взвесить?
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39909871
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник

Вот за это - спасибос большой. Посмотрю.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910430
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Новый год был бурный. Но топик я не забыл. Я помню. Актуальность есть.

Промежуточный итог. На рассмотрении следующие системы.

- Berkeley Db (точно есть)
- KahaDB (есть но хер доступишся до API запрятано внутри)
- Leonid's Queue (хорошо но мне надо попроще. С ротацией файлов).
- RocksDB (посмотреть есть ли поддержка)
- Tape ( https://github.com/square/tape) посмотреть что это как работает и какие есть лимиты.

Как дойдут руки я попробую сделать бенчмарк хотя-бы для 2-3 их них и выбрать одну либу.

Прочие системы которые я отбросил не содержали нативной поддержки FIDO(Queue) как
дисковой структуры данных или нигде не деларировали ее.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910657
Sergei.Agalakov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
А зачем всегда на диск писать всю очередь? Ну выросла очередь в памяти, пусть её ОС на диск своппит, когда припрёт. Кольцевой буфер теоретически правильный подход, но для ССД это хороший способ портить диски, впрочем, как и куча отдельных файлов. Почему, кстати, ext4, а не xfs или что-то другое? И да, куски меньше 4к при записи всё равно смысла не имеют.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910658
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Ext4 это просто дефолтная амазонская ec2 конфигурация.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910660
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
По поводу свопа. Опять же... Мы влезем в глубокий технический спор на тему как выбирать Xmx для известной конфигурации памяти.

Я думаю тут выбор очевиден. А дисковая очередь - это просто моя перестраховки. Тем более что я знаю характер и род нагрузки. И он не совпадает с Heap. Он редкий. И шквальный.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910668
Sergei.Agalakov
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
То-есть у вас вся нагрузка взрывная и заморачиваться с трешхолдом писать в память/писать на диск нет смысла?
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910670
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Есть смысл писать на диск.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910834
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
тест QueueFile - в мьютексах не шарю
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TestQueueFile {

    private final int CNT_SD = 10_000;//cnt send data peer thread

    private final int CNT_TX = 2;//cnt writer
    private final int CNT_RX = 2;//cnt reader

    private final Object LOCK = new Object();

    private QueueFile queueFile = null;

    private int ttl = 0;

    public TestQueueFile() {

        try {
            //new File("./testFile.txt").deleteOnExit();

            queueFile = new QueueFile.Builder(new File("./testFile.txt")).
                    zero(true).
                    forceLegacy(true).
                    build();

            for (int i = 0; i < CNT_TX; i++) {
                executorService.submit(new Writer());
            }

            while (writerCnt < 1) {
                Thread.sleep(1);
            }

            for (int i = 0; i < CNT_RX; i++) {
                executorService.submit(new Reader());
            }

            executorService.shutdown();
            executorService.awaitTermination(30, TimeUnit.MINUTES);

            System.out.println(queueFile.toString());
            System.out.println("Total: " + ttl);

            queueFile.close();

        } catch (Exception ex) {
            ex.printStackTrace();
        }

    }

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        new TestQueueFile();
    }

    private ExecutorService executorService = Executors.newFixedThreadPool(CNT_RX + CNT_TX);

    private int readerCnt = 0, writerCnt = 0;

    private class Writer implements Runnable {

        @Override
        public void run() {
            synchronized (LOCK) {
                writerCnt++;
            }
            Thread.currentThread().setName("writer" + writerCnt);
            System.out.printf("%10s started.\n", Thread.currentThread().getName());
            try {
                for (int i = 0; i < CNT_SD; i++) {
                    synchronized (LOCK) {
                        queueFile.add((i + " " + Thread.currentThread().getName() + " text\n").getBytes());
                    }
                    Thread.sleep(1);
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            synchronized (LOCK) {
                writerCnt--;
            }

            System.out.printf("%10s finished!\n", Thread.currentThread().getName());
        }
    }

    private class Reader implements Runnable {

        @Override
        public void run() {
            synchronized (LOCK) {
                readerCnt++;
            }
            Thread.currentThread().setName("reader" + readerCnt);
            System.out.printf("%10s started.\n", Thread.currentThread().getName());
            int cnt0 = 0, cnt1 = 0;
            try {
                boolean empty = false;
                /*while*/
                for (int i = 0; i < CNT_SD * 1_000_000/*:D*/ && (writerCnt > 0 || !empty); i++) {

                    synchronized (LOCK) {
                        empty = queueFile.isEmpty();
                        if (!empty) {
                            cnt1++;
                            String s = new String(queueFile.pool());
                            //int x = Integer.parseInt(s.split(" ")[0].trim());
                            //System.out.println(Thread.currentThread().getName() + " " + s);
                        }
                    }
                    //Thread.sleep((long) (Math.random() * 100));
                    if (empty) {
                        cnt0++;
                        Thread.sleep(2);
                    }
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            synchronized (LOCK) {
                ttl += cnt1;
                readerCnt--;
            }
            System.out.printf("%10s finished! idle =%4s, data =%4s\n", Thread.currentThread().getName(), cnt0, cnt1);
        }
    }
}

QueueFile - покоцал
Код: java
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
55.
56.
57.
58.
59.
60.
61.
62.
63.
64.
65.
66.
67.
68.
69.
70.
71.
72.
73.
74.
75.
76.
77.
78.
79.
80.
81.
82.
83.
84.
85.
86.
87.
88.
89.
90.
91.
92.
93.
94.
95.
96.
97.
98.
99.
100.
101.
102.
103.
104.
105.
106.
107.
108.
109.
110.
111.
112.
113.
114.
115.
116.
117.
118.
119.
120.
121.
122.
123.
124.
125.
126.
127.
128.
129.
130.
131.
132.
133.
134.
135.
136.
137.
138.
139.
140.
141.
142.
143.
144.
145.
146.
147.
148.
149.
150.
151.
152.
153.
154.
155.
156.
157.
158.
159.
160.
161.
162.
163.
164.
165.
166.
167.
168.
169.
170.
171.
172.
173.
174.
175.
176.
177.
178.
179.
180.
181.
182.
183.
184.
185.
186.
187.
188.
189.
190.
191.
192.
193.
194.
195.
196.
197.
198.
199.
200.
201.
202.
203.
204.
205.
206.
207.
208.
209.
210.
211.
212.
213.
214.
215.
216.
217.
218.
219.
220.
221.
222.
223.
224.
225.
226.
227.
228.
229.
230.
231.
232.
233.
234.
235.
236.
237.
238.
239.
240.
241.
242.
243.
244.
245.
246.
247.
248.
249.
250.
251.
252.
253.
254.
255.
256.
257.
258.
259.
260.
261.
262.
263.
264.
265.
266.
267.
268.
269.
270.
271.
272.
273.
274.
275.
276.
277.
278.
279.
280.
281.
282.
283.
284.
285.
286.
287.
288.
289.
290.
291.
292.
293.
294.
295.
296.
297.
298.
299.
300.
301.
302.
303.
304.
305.
306.
307.
308.
309.
310.
311.
312.
313.
314.
315.
316.
317.
318.
319.
320.
321.
322.
323.
324.
325.
326.
327.
328.
329.
330.
331.
332.
333.
334.
335.
336.
337.
338.
339.
340.
341.
342.
343.
344.
345.
346.
347.
348.
349.
350.
351.
352.
353.
354.
355.
356.
357.
358.
359.
360.
361.
362.
363.
364.
365.
366.
367.
368.
369.
370.
371.
372.
373.
374.
375.
376.
377.
378.
379.
380.
381.
382.
383.
384.
385.
386.
387.
388.
389.
390.
391.
392.
393.
394.
395.
396.
397.
398.
399.
400.
401.
402.
403.
404.
405.
406.
407.
408.
409.
410.
411.
412.
413.
414.
415.
416.
417.
418.
419.
420.
421.
422.
423.
424.
425.
426.
427.
428.
429.
430.
431.
432.
433.
434.
435.
436.
437.
438.
439.
440.
441.
442.
443.
444.
445.
446.
447.
448.
449.
450.
451.
452.
453.
454.
455.
456.
457.
458.
459.
460.
461.
462.
463.
464.
465.
466.
467.
468.
469.
470.
471.
472.
473.
474.
475.
476.
477.
478.
479.
480.
481.
482.
483.
484.
485.
486.
487.
488.
489.
490.
491.
492.
493.
494.
495.
496.
497.
498.
499.
500.
501.
502.
503.
504.
505.
506.
507.
508.
509.
510.
511.
512.
513.
514.
515.
516.
517.
518.
519.
520.
521.
522.
523.
524.
525.
526.
527.
528.
529.
530.
531.
532.
533.
534.
535.
536.
537.
538.
539.
540.
541.
542.
543.
544.
545.
546.
547.
548.
549.
550.
551.
552.
553.
554.
555.
556.
557.
558.
559.
560.
561.
562.
563.
564.
565.
566.
567.
568.
569.
570.
571.
572.
573.
574.
575.
576.
577.
578.
579.
580.
581.
582.
583.
584.
585.
586.
587.
588.
589.
590.
591.
592.
593.
594.
595.
596.
597.
598.
599.
600.
601.
602.
603.
604.
605.
606.
607.
608.
609.
610.
611.
612.
613.
614.
615.
616.
617.
618.
619.
620.
621.
622.
623.
624.
625.
626.
627.
628.
629.
630.
631.
632.
633.
634.
635.
636.
637.
638.
639.
640.
641.
642.
643.
644.
645.
646.
647.
648.
649.
650.
651.
652.
653.
654.
655.
656.
657.
658.
659.
660.
661.
662.
663.
664.
665.
666.
667.
668.
669.
670.
671.
672.
673.
674.
675.
676.
677.
678.
679.
680.
681.
682.
683.
684.
685.
686.
687.
688.
689.
690.
691.
692.
693.
694.
695.
696.
697.
698.
699.
700.
701.
702.
703.
704.
705.
706.
707.
708.
709.
710.
711.
712.
713.
714.
715.
716.
717.
718.
719.
720.
721.
722.
723.
724.
725.
726.
727.
728.
729.
730.
731.
732.
733.
734.
735.
736.
737.
738.
739.
740.
741.
742.
743.
744.
745.
746.
747.
748.
749.
750.
751.
752.
753.
754.
755.
756.
757.
758.
759.
760.
761.
762.
763.
764.
765.
766.
767.
768.
769.
770.
771.
772.
773.
774.
775.
776.
777.
778.
779.
780.
781.
782.
783.
784.
785.
786.
787.
788.
789.
790.
791.
792.
793.
794.
795.
796.
797.
798.
799.
800.
801.
802.
803.
804.
805.
806.
807.
808.
809.
810.
811.
812.
813.
814.
815.
816.
817.
818.
819.
820.
821.
822.
823.
824.
825.
826.
827.
828.
829.
830.
831.
832.
833.
834.
835.
836.
837.
838.
839.
840.
841.
842.
843.
844.
845.
846.
847.
848.
849.
850.
851.
852.
853.
854.
855.
856.
857.
858.
859.
860.
861.
862.
863.
864.
865.
866.
867.
868.
869.
870.
871.
872.
873.
874.
875.
876.
877.
878.
879.
880.
881.
882.
883.
884.
885.
886.
887.
888.
889.
890.
891.
892.
893.
894.
895.
896.
897.
898.
899.
900.
901.
902.
903.
904.
905.
906.
907.
908.
909.
910.
911.
912.
913.
914.
915.
916.
917.
918.
919.
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */

/*
 * Copyright (C) 2010 Square, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
//import javax.annotation.Nullable;

import static java.lang.Math.min;

/**
 * A reliable, efficient, file-based, FIFO queue. Additions and removals are
 * O(1). All operations are atomic. Writes are synchronous; data will be written
 * to disk before an operation returns. The underlying file is structured to
 * survive process and even system crashes. If an I/O exception is thrown during
 * a mutating change, the change is aborted. It is safe to continue to use a
 * {@code QueueFile} instance after an exception.
 *
 * <p>
 * <strong>Note that this implementation is not synchronized.</strong>
 *
 * <p>
 * In a traditional queue, the remove operation returns an element. In this
 * queue, {@link #peek} and {@link #remove} are used in conjunction. Use
 * {@code peek} to retrieve the first element, and then {@code remove} to remove
 * it after successful processing. If the system crashes after {@code peek} and
 * during processing, the element will remain in the queue, to be processed when
 * the system restarts.
 *
 * <p>
 * <strong>NOTE:</strong> The current implementation is built for file systems
 * that support atomic segment writes (like YAFFS). Most conventional file
 * systems don't support this; if the power goes out while writing a segment,
 * the segment will contain garbage and the file will be corrupt. We'll add
 * journaling support so this class can be used with more file systems later.
 *
 * Construct instances with {@link Builder}.
 *
 * @author Bob Lee (bob@squareup.com)
 */
public final class QueueFile implements Closeable, Iterable<byte[]> {

    /**
     * Leading bit set to 1 indicating a versioned header and the version of 1.
     */
    private static final int VERSIONED_HEADER = 0x80000001;

    /**
     * Initial file size in bytes.
     */
    //static final int INITIAL_LENGTH = 4096; // one file system block
    static final int INITIAL_LENGTH = 4096; // one file system block

    /**
     * A block of nothing to write over old data.
     */
    private static final byte[] ZEROES = new byte[INITIAL_LENGTH];

    /**
     * The underlying file. Uses a ring buffer to store entries. Designed so
     * that a modification isn't committed or visible until we write the header.
     * The header is much smaller than a segment. So long as the underlying file
     * system supports atomic segment writes, changes to the queue are atomic.
     * Storing the file length ensures we can recover from a failed expansion
     * (i.e. if setting the file length succeeds but the process dies before the
     * data can be copied).
     * <p>
     * This implementation supports two versions of the on-disk format.
     * <pre>
     * Format:
     *   16-32 bytes      Header
     *   ...              Data
     *
     * Header (32 bytes):
     *   1 bit            Versioned indicator [0 = legacy (see "Legacy Header"), 1 = versioned]
     *   31 bits          Version, always 1
     *   8 bytes          File length
     *   4 bytes          Element count
     *   8 bytes          Head element position
     *   8 bytes          Tail element position
     *
     * Legacy Header (16 bytes):
     *   1 bit            Legacy indicator, always 0 (see "Header")
     *   31 bits          File length
     *   4 bytes          Element count
     *   4 bytes          Head element position
     *   4 bytes          Tail element position
     *
     * Element:
     *   4 bytes          Data length
     *   ...              Data
     * </pre>
     */
    final RandomAccessFile raf;

    /**
     * Keep file around for error reporting.
     */
    final File file;

    /**
     * True when using the versioned header format. Otherwise use the legacy
     * format.
     */
    final boolean versioned;

    /**
     * The header length in bytes: 16 or 32.
     */
    final int headerLength;

    /**
     * Cached file length. Always a power of 2.
     */
    long fileLength;

    /**
     * Number of elements.
     */
    private int elementCount;

    /**
     * Pointer to first (or eldest) element.
     */
    private Element first;

    /**
     * Pointer to last (or newest) element.
     */
    private Element last;

    /**
     * In-memory buffer. Big enough to hold the header.
     */
    private final byte[] buffer = new byte[32];

    /**
     * The number of times this file has been structurally modified — it is
     * incremented during {@link #remove(int)} and
     * {@link #add(byte[], int, int)}. Used by {@link ElementIterator} to guard
     * against concurrent modification.
     */
    private int modCount = 0;

    /**
     * When true, removing an element will also overwrite data with zero bytes.
     */
    private final boolean zero;

    private boolean closed;

    private static RandomAccessFile initializeFromFile(File file, boolean forceLegacy)
            throws IOException {
        if (!file.exists()) {
            // Use a temp file so we don't leave a partially-initialized file.
            File tempFile = new File(file.getPath() + ".tmp");
            try (RandomAccessFile raf = open(tempFile)) {
                raf.setLength(INITIAL_LENGTH);

                raf.seek(0);
                if (forceLegacy) {
                    raf.writeInt(INITIAL_LENGTH);
                } else {
                    raf.writeInt(VERSIONED_HEADER);
                    raf.writeLong(INITIAL_LENGTH);
                }

            }

            // A rename is atomic.
            if (!tempFile.renameTo(file)) {
                throw new IOException("Rename failed!");
            }
        }

        return open(file);
    }

    /**
     * Opens a random access file that writes synchronously.
     */
    private static RandomAccessFile open(File file) throws FileNotFoundException {
        return new RandomAccessFile(file, "rwd");
    }

    QueueFile(File file, RandomAccessFile raf, boolean zero, boolean forceLegacy) throws IOException {
        this.file = file;
        this.raf = raf;
        this.zero = zero;

        raf.seek(0);
        raf.readFully(buffer);

        versioned = !forceLegacy && (buffer[0] & 0x80) != 0;
        long firstOffset;
        long lastOffset;
        if (versioned) {
            headerLength = 32;

            int version = readInt(buffer, 0) & 0x7FFFFFFF;
            if (version != 1) {
                throw new IOException(
                        "Unable to read version " + version + " format. Supported versions are 1 and legacy.");
            }
            fileLength = readLong(buffer, 4);
            elementCount = readInt(buffer, 12);
            firstOffset = readLong(buffer, 16);
            lastOffset = readLong(buffer, 24);
        } else {
            headerLength = 16;

            fileLength = readInt(buffer, 0);
            elementCount = readInt(buffer, 4);
            firstOffset = readInt(buffer, 8);
            lastOffset = readInt(buffer, 12);
        }

        if (fileLength > raf.length()) {
            throw new IOException(
                    "File is truncated. Expected length: " + fileLength + ", Actual length: " + raf.length());
        } else if (fileLength <= headerLength) {
            throw new IOException(
                    "File is corrupt; length stored in header (" + fileLength + ") is invalid.");
        }

        first = readElement(firstOffset);
        last = readElement(lastOffset);
    }

    /**
     * Stores an {@code int} in the {@code byte[]}. The behavior is equivalent
     * to calling {@link RandomAccessFile#writeInt}.
     */
    private static void writeInt(byte[] buffer, int offset, int value) {
        buffer[offset] = (byte) (value >> 24);
        buffer[offset + 1] = (byte) (value >> 16);
        buffer[offset + 2] = (byte) (value >> 8);
        buffer[offset + 3] = (byte) value;
    }

    /**
     * Reads an {@code int} from the {@code byte[]}.
     */
    private static int readInt(byte[] buffer, int offset) {
        return ((buffer[offset] & 0xff) << 24)
                + ((buffer[offset + 1] & 0xff) << 16)
                + ((buffer[offset + 2] & 0xff) << 8)
                + (buffer[offset + 3] & 0xff);
    }

    /**
     * Stores an {@code long} in the {@code byte[]}. The behavior is equivalent
     * to calling {@link RandomAccessFile#writeLong}.
     */
    private static void writeLong(byte[] buffer, int offset, long value) {
        buffer[offset] = (byte) (value >> 56);
        buffer[offset + 1] = (byte) (value >> 48);
        buffer[offset + 2] = (byte) (value >> 40);
        buffer[offset + 3] = (byte) (value >> 32);
        buffer[offset + 4] = (byte) (value >> 24);
        buffer[offset + 5] = (byte) (value >> 16);
        buffer[offset + 6] = (byte) (value >> 8);
        buffer[offset + 7] = (byte) value;
    }

    /**
     * Reads an {@code long} from the {@code byte[]}.
     */
    private static long readLong(byte[] buffer, int offset) {
        return ((buffer[offset] & 0xffL) << 56)
                + ((buffer[offset + 1] & 0xffL) << 48)
                + ((buffer[offset + 2] & 0xffL) << 40)
                + ((buffer[offset + 3] & 0xffL) << 32)
                + ((buffer[offset + 4] & 0xffL) << 24)
                + ((buffer[offset + 5] & 0xffL) << 16)
                + ((buffer[offset + 6] & 0xffL) << 8)
                + (buffer[offset + 7] & 0xffL);
    }

    /**
     * Writes header atomically. The arguments contain the updated values. The
     * class member fields should not have changed yet. This only updates the
     * state in the file. It's up to the caller to update the class member
     * variables *after* this call succeeds. Assumes segment writes are atomic
     * in the underlying file system.
     */
    private void writeHeader(long fileLength, int elementCount, long firstPosition, long lastPosition) throws IOException {

        raf.seek(0);
        if (versioned) {

            writeInt(buffer, 0, VERSIONED_HEADER);
            writeLong(buffer, 4, fileLength);
            writeInt(buffer, 12, elementCount);
            writeLong(buffer, 16, firstPosition);
            writeLong(buffer, 24, lastPosition);

            raf.write(buffer, 0, 32);
            return;
        }

        // Legacy queue header.
        writeInt(buffer, 0, (int) fileLength); // Signed, so leading bit is always 0 aka legacy.
        writeInt(buffer, 4, elementCount);
        writeInt(buffer, 8, (int) firstPosition);
        writeInt(buffer, 12, (int) lastPosition);

        raf.write(buffer, 0, 16);

    }

    private Element readElement(long position) throws IOException {
        if (position == 0) {
            return Element.NULL;
        }
        ringRead(position, buffer, 0, Element.HEADER_LENGTH);
        int length = readInt(buffer, 0);
        return new Element(position, length);
    }

    /**
     * Wraps the position if it exceeds the end of the file.
     */
    private long wrapPosition(long position) {
        return position < fileLength ? position
                : headerLength + position - fileLength;
    }

    /**
     * Writes count bytes from buffer to position in file. Automatically wraps
     * write if position is past the end of the file or if buffer overlaps it.
     *
     * @param position in file to write to
     * @param buffer to write from
     * @param count # of bytes to write
     */
    private void ringWrite(long position, byte[] buffer, int offset, int count) throws IOException {
        position = wrapPosition(position);
        if (position + count <= fileLength) {
            raf.seek(position);
            raf.write(buffer, offset, count);
        } else {
            // The write overlaps the EOF.
            // # of bytes to write before the EOF. Guaranteed to be less than Integer.MAX_VALUE.
            int beforeEof = (int) (fileLength - position);
            raf.seek(position);
            raf.write(buffer, offset, beforeEof);
            raf.seek(headerLength);
            raf.write(buffer, offset + beforeEof, count - beforeEof);
        }
    }

    private void ringErase(long position, long length) throws IOException {
        while (length > 0) {
            int chunk = (int) min(length, ZEROES.length);
            ringWrite(position, ZEROES, 0, chunk);
            length -= chunk;
            position += chunk;
        }
    }

    /**
     * Reads count bytes into buffer from file. Wraps if necessary.
     *
     * @param position in file to read from
     * @param buffer to read into
     * @param count # of bytes to read
     */
    private void ringRead(long position, byte[] buffer, int offset, int count) throws IOException {
        position = wrapPosition(position);
        if (position + count <= fileLength) {
            raf.seek(position);
            raf.readFully(buffer, offset, count);
        } else {
            // The read overlaps the EOF.
            // # of bytes to read before the EOF. Guaranteed to be less than Integer.MAX_VALUE.
            int beforeEof = (int) (fileLength - position);
            raf.seek(position);
            raf.readFully(buffer, offset, beforeEof);
            raf.seek(headerLength);
            raf.readFully(buffer, offset + beforeEof, count - beforeEof);
        }
    }

    /**
     * Adds an element to the end of the queue.
     *
     * @param data to copy bytes from
     */
    public void add(byte[] data) throws IOException {
        add(data, 0, data.length);
    }

    /**
     * Adds an element to the end of the queue.
     *
     * @param data to copy bytes from
     * @param offset to start from in buffer
     * @param count number of bytes to copy
     * @throws IndexOutOfBoundsException if {@code offset < 0} or
     * {@code count < 0}, or if {@code
     * offset + count} is bigger than the length of {@code buffer}.
     */
    public void add(byte[] data, int offset, int count) throws IOException {
        if (data == null) {
            throw new NullPointerException("data == null");
        }
        if ((offset | count) < 0 || count > data.length - offset) {
            throw new IndexOutOfBoundsException();
        }
        if (closed) {
            throw new IllegalStateException("closed");
        }

        expandIfNecessary(count);

        // Insert a new element after the current last element.
        boolean wasEmpty = isEmpty();
        long position = wasEmpty ? headerLength
                : wrapPosition(last.position + Element.HEADER_LENGTH + last.length);
        Element newLast = new Element(position, count);

        // Write length.
        writeInt(buffer, 0, count);
        ringWrite(newLast.position, buffer, 0, Element.HEADER_LENGTH);

        // Write data.
        ringWrite(newLast.position + Element.HEADER_LENGTH, data, offset, count);

        // Commit the addition. If wasEmpty, first == last.
        long firstPosition = wasEmpty ? newLast.position : first.position;

        writeHeader(fileLength, elementCount + 1, firstPosition, newLast.position);
        last = newLast;
        elementCount++;
        modCount++;
        if (wasEmpty) {
            first = last; // first element
        }
    }

    private long usedBytes() {
        if (elementCount == 0) {
            return headerLength;
        }

        if (last.position >= first.position) {
            // Contiguous queue.
            return (last.position - first.position) // all but last entry
                    + Element.HEADER_LENGTH + last.length // last entry
                    + headerLength;
        } else {
            // tail < head. The queue wraps.
            return last.position // buffer front + header
                    + Element.HEADER_LENGTH + last.length // last entry
                    + fileLength - first.position;        // buffer end
        }
    }

    private long remainingBytes() {
        return fileLength - usedBytes();
    }

    /**
     * Returns true if this queue contains no entries.
     */
    public boolean isEmpty() {
        return elementCount == 0;
    }

    /**
     * If necessary, expands the file to accommodate an additional element of
     * the given length.
     *
     * @param dataLength length of data being added
     */
    private void expandIfNecessary(long dataLength) throws IOException {
        long elementLength = Element.HEADER_LENGTH + dataLength;
        long remainingBytes = remainingBytes();
        if (remainingBytes >= elementLength) {
            return;
        }

        // Expand.
        long previousLength = fileLength;
        long newLength;
        // Double the length until we can fit the new data.
        do {
            remainingBytes += previousLength;
            newLength = previousLength << 1;
            previousLength = newLength;
        } while (remainingBytes < elementLength);

        setLength(newLength);

        // Calculate the position of the tail end of the data in the ring buffer
        long endOfLastElement = wrapPosition(last.position + Element.HEADER_LENGTH + last.length);
        long count = 0;
        // If the buffer is split, we need to make it contiguous
        if (endOfLastElement <= first.position) {
            FileChannel channel = raf.getChannel();
            channel.position(fileLength); // destination position
            count = endOfLastElement - headerLength;
            if (channel.transferTo(headerLength, count, channel) != count) {
                throw new AssertionError("Copied insufficient number of bytes!");
            }
        }

        // Commit the expansion.
        if (last.position < first.position) {
            long newLastPosition = fileLength + last.position - headerLength;
            writeHeader(newLength, elementCount, first.position, newLastPosition);
            last = new Element(newLastPosition, last.length);
        } else {
            writeHeader(newLength, elementCount, first.position, last.position);
        }

        fileLength = newLength;

        if (zero) {
            ringErase(headerLength, count);
        }
    }

    /**
     * Sets the length of the file.
     */
    private void setLength(long newLength) throws IOException {
        // Set new file length (considered metadata) and sync it to storage.
        raf.setLength(newLength);
        raf.getChannel().force(true);
    }

    /**
     * Reads the eldest element. Returns null if the queue is empty.
     */
    public byte[] peek() throws IOException {
        if (closed) {
            throw new IllegalStateException("closed");
        }
        if (isEmpty()) {
            return null;
        }
        int length = first.length;
        byte[] data = new byte[length];
        ringRead(first.position + Element.HEADER_LENGTH, data, 0, length);
        return data;
    }

    public byte[] pool() throws IOException {
        byte[] data = peek();
        remove();
        return data;
    }

    /**
     * Returns an iterator over elements in this QueueFile.
     *
     * <p>
     * The iterator disallows modifications to be made to the QueueFile during
     * iteration. Removing elements from the head of the QueueFile is permitted
     * during iteration using {@link Iterator#remove()}.
     *
     * <p>
     * The iterator may throw an unchecked {@link IOException} during
     * {@link Iterator#next()} or {@link Iterator#remove()}.
     */
    @Override
    public Iterator<byte[]> iterator() {
        return new ElementIterator();
    }

    private final class ElementIterator implements Iterator<byte[]> {

        /**
         * Index of element to be returned by subsequent call to next.
         */
        int nextElementIndex = 0;

        /**
         * Position of element to be returned by subsequent call to next.
         */
        private long nextElementPosition = first.position;

        /**
         * The {@link #modCount} value that the iterator believes that the
         * backing QueueFile should have. If this expectation is violated, the
         * iterator has detected concurrent modification.
         */
        int expectedModCount = modCount;

        private ElementIterator() {
        }

        private void checkForComodification() {
            if (modCount != expectedModCount) {
                throw new ConcurrentModificationException();
            }
        }

        @Override
        public boolean hasNext() {
            if (closed) {
                throw new IllegalStateException("closed");
            }
            checkForComodification();
            return nextElementIndex != elementCount;
        }

        @Override
        public byte[] next() {
            if (closed) {
                throw new IllegalStateException("closed");
            }
            checkForComodification();
            if (isEmpty()) {
                throw new NoSuchElementException();
            }
            if (nextElementIndex >= elementCount) {
                throw new NoSuchElementException();
            }

            try {
                // Read the current element.
                Element current = readElement(nextElementPosition);
                byte[] buffer = new byte[current.length];
                nextElementPosition = wrapPosition(current.position + Element.HEADER_LENGTH);
                ringRead(nextElementPosition, buffer, 0, current.length);

                // Update the pointer to the next element.
                nextElementPosition
                        = wrapPosition(current.position + Element.HEADER_LENGTH + current.length);
                nextElementIndex++;

                // Return the read element.
                return buffer;
            } catch (IOException e) {
                throw QueueFile.<Error>getSneakyThrowable(e);
            }
        }

        @Override
        public void remove() {
            checkForComodification();

            if (isEmpty()) {
                throw new NoSuchElementException();
            }
            if (nextElementIndex != 1) {
                throw new UnsupportedOperationException("Removal is only permitted from the head.");
            }

            try {
                QueueFile.this.remove();
            } catch (IOException e) {
                throw QueueFile.<Error>getSneakyThrowable(e);
            }

            expectedModCount = modCount;
            nextElementIndex--;
        }
    }

    /**
     * Returns the number of elements in this queue.
     */
    public int size() {
        return elementCount;
    }

    /**
     * Removes the eldest element.
     *
     * @throws NoSuchElementException if the queue is empty
     */
    public void remove() throws IOException {
        remove(1);
    }

    /**
     * Removes the eldest {@code n} elements.
     *
     * @throws NoSuchElementException if the queue is empty
     */
    public void remove(int n) throws IOException {
        if (n < 0) {
            throw new IllegalArgumentException("Cannot remove negative (" + n + ") number of elements.");
        }
        if (n == 0) {
            return;
        }
        if (n == elementCount) {
            clear();
            return;
        }
        if (isEmpty()) {
            throw new NoSuchElementException();
        }
        if (n > elementCount) {
            throw new IllegalArgumentException(
                    "Cannot remove more elements (" + n + ") than present in queue (" + elementCount + ").");
        }

        long eraseStartPosition = first.position;
        long eraseTotalLength = 0;

        // Read the position and length of the new first element.
        long newFirstPosition = first.position;
        int newFirstLength = first.length;
        for (int i = 0; i < n; i++) {
            eraseTotalLength += Element.HEADER_LENGTH + newFirstLength;
            newFirstPosition = wrapPosition(newFirstPosition + Element.HEADER_LENGTH + newFirstLength);
            ringRead(newFirstPosition, buffer, 0, Element.HEADER_LENGTH);
            newFirstLength = readInt(buffer, 0);
        }

        // Commit the header.
        writeHeader(fileLength, elementCount - n, newFirstPosition, last.position);
        elementCount -= n;
        modCount++;
        first = new Element(newFirstPosition, newFirstLength);

        if (zero) {
            ringErase(eraseStartPosition, eraseTotalLength);
        }
    }

    /**
     * Clears this queue. Truncates the file to the initial size.
     */
    public void clear() throws IOException {
        if (closed) {
            throw new IllegalStateException("closed");
        }

        // Commit the header.
        writeHeader(INITIAL_LENGTH, 0, 0, 0);

        if (zero) {
            // Zero out data.
            raf.seek(headerLength);
            raf.write(ZEROES, 0, INITIAL_LENGTH - headerLength);
        }

        elementCount = 0;
        first = Element.NULL;
        last = Element.NULL;
        if (fileLength > INITIAL_LENGTH) {
            setLength(INITIAL_LENGTH);
        }
        fileLength = INITIAL_LENGTH;
        modCount++;
    }

    /**
     * The underlying {@link File} backing this queue.
     */
    public File file() {
        return file;
    }

    @Override
    public void close() throws IOException {
        try (raf) {
            writeHeader(fileLength, elementCount, first.position, last.position);
            closed = true;
        }
    }

    @Override
    public String toString() {
        try {
            writeHeader(fileLength, elementCount, first.position, last.position);
        } catch (IOException ex) {
            ex.printStackTrace();
        }

        return "QueueFile{"
                + "file=" + file
                + ", zero=" + zero
                + ", versioned=" + versioned
                + ", length=" + fileLength
                + ", size=" + elementCount
                + ", first=" + first
                + ", last=" + last
                + '}';
    }

    /**
     * A pointer to an element.
     */
    static class Element {

        static final Element NULL = new Element(0, 0);

        /**
         * Length of element header in bytes.
         */
        static final int HEADER_LENGTH = 4;

        /**
         * Position in file.
         */
        final long position;

        /**
         * The length of the data.
         */
        final int length;

        /**
         * Constructs a new element.
         *
         * @param position within file
         * @param length of data
         */
        Element(long position, int length) {
            this.position = position;
            this.length = length;
        }

        @Override
        public String toString() {
            return getClass().getSimpleName()
                    + "[position=" + position
                    + ", length=" + length
                    + "]";
        }
    }

    /**
     * Fluent API for creating {@link QueueFile} instances.
     */
    public static final class Builder {

        final File file;
        boolean zero = true;
        boolean forceLegacy = false;

        /**
         * Start constructing a new queue backed by the given file.
         */
        public Builder(File file) {
            if (file == null) {
                throw new NullPointerException("file == null");
            }
            this.file = file;
        }

        /**
         * When true, removing an element will also overwrite data with zero
         * bytes.
         *
         * @return
         */
        public Builder zero(boolean zero) {
            this.zero = zero;
            return this;
        }

        /**
         * When true, only the legacy (Tape 1.x) format will be used.
         */
        public Builder forceLegacy(boolean forceLegacy) {
            this.forceLegacy = forceLegacy;
            return this;
        }

        /**
         * Constructs a new queue backed by the given builder. Only one instance
         * should access a given file at a time.
         */
        public QueueFile build() throws IOException {
            RandomAccessFile raf = initializeFromFile(file, forceLegacy);
            QueueFile qf = null;
            try {
                qf = new QueueFile(file, raf, zero, forceLegacy);
                return qf;
            } finally {
                if (qf == null) {
                    raf.close();
                }
            }
        }
    }

    /**
     * Use this to throw checked exceptions from iterator methods that do not
     * declare that they throw checked exceptions.
     */
    @SuppressWarnings({"unchecked", "TypeParameterUnusedInFormals"})
    static <T extends Throwable> T getSneakyThrowable(Throwable t) throws T {
        throw (T) t;
    }
}

...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910845
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky, ого тут букв. А есть ссылка на гитхаб?
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910851
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton, git чего?

я на коленке)

это давал - https://github.com/square/tape

от туда выдрал - QueueFile.java
покоцал - покоцаный файл/*код*/ под спойлером (-аннотации Private, +pool)
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910854
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Victor Nevsky, спасибо конешно. Но мне такие простыни кода в браузер вобщем-то не надо.

Я вообще листинги больше чем 20 строк не воспринимаю. Неужели ты думаешь что я их щас
быстро прочту и осознаю и решу что это рабчий материал?

Я либо смотрю структурно. Либо по юзкейсам. По описаниям. По документации.

Мне в данном топике исходники не нужны. Я и сам много всего пишу и могу наводнить этот форум
своими сорцами. Мне нужны какие-то гарантии что это работает.

Ты сам проверил что это работает?

Понимаешь? Чтоб я не тратил своё время на изучение чуждого кода. Поэтому и мне были интересны
в первую очередь BerkeleyDb, Kaha(ApacheMQ) потому-что это 100% рабочий материал.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910855
Victor Nevsky
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton, даже не знаю, что, Вам, ответить ....
код рабочий, готовый тест ... многопоточный ...
проверял ...
95,1%

.... непонятки указал - мьютексы.

слабое звено - читающий блокирует пишущего, пишущий - читающего ...
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910856
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Хорошо. Я потрачу на ваш сорц 15 минут времени. Но если что-то не склеится - я отставлю в сторону.

Вы не против?
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39910857
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
До сорца Леонида руки так и не дошли. Последнее что я там видел - NPE и моё пожелание упростить
интеракцию файловой системы до нескольких файлов. Без повторного использования.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39911067
Бумбараш
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
а почему для задачи очереди не использовать очереди? я чо то так и не понял
простите, что такое неординарное предложение

кафки, рабиты и вот это всё
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39911070
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
mayton
MQ системы обычно связаны с поднятием сетевых сервисов. У меня пока все локально. 1 процесс все делает.
Поэтому поднимать еще MQ не вижу смысла.
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39911079
Бумбараш
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
ну так их локально можно поставить

базы данные тоже обычно связаны с поднятием сетевых сервисов
ну и вообще
обычно любой %program_name% связан с поднятием сетевых сервисов

или стоит цель zaebatsya, но написать что-нибудь своё?
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39911080
Бумбараш
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Бумбараш
ну так их локально можно поставить

базы данных тоже обычно связаны с поднятием сетевых сервисов
ну и вообще
обычно любой %program_name% связан с поднятием сетевых сервисов

или стоит цель zaebatsya, но написать что-нибудь своё?
...
Рейтинг: 0 / 0
Queue с поддержкой сериализации на диск.
    #39911081
Фотография mayton
Скрыть профиль Поместить в игнор-лист Сообщения автора в теме
Участник
Отвечу в течение недели. Пока нет времени.
...
Рейтинг: 0 / 0
25 сообщений из 200, страница 8 из 8
Форумы / Java [игнор отключен] [закрыт для гостей] / Queue с поддержкой сериализации на диск.
Целевая тема:
Создать новую тему:
Автор:
Закрыть
Цитировать
Найденые пользователи ...
Разблокировать пользователей ...
Читали форум (0):
Пользователи онлайн (0):
x
x
Закрыть


Просмотр
0 / 0
Close
Debug Console [Select Text]