тест QueueFile - в мьютексах не шарю
1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35. 36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53. 54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71. 72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89. 90. 91. 92. 93. 94. 95. 96. 97. 98. 99. 100. 101. 102. 103. 104. 105. 106. 107. 108. 109. 110. 111. 112. 113. 114. 115. 116. 117. 118. 119. 120. 121. 122. 123. 124. 125. 126. 127. 128. 129. 130. 131. 132. 133.
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TestQueueFile {
private final int CNT_SD = 10_000;//cnt send data peer thread
private final int CNT_TX = 2;//cnt writer
private final int CNT_RX = 2;//cnt reader
private final Object LOCK = new Object();
private QueueFile queueFile = null;
private int ttl = 0;
public TestQueueFile() {
try {
//new File("./testFile.txt").deleteOnExit();
queueFile = new QueueFile.Builder(new File("./testFile.txt")).
zero(true).
forceLegacy(true).
build();
for (int i = 0; i < CNT_TX; i++) {
executorService.submit(new Writer());
}
while (writerCnt < 1) {
Thread.sleep(1);
}
for (int i = 0; i < CNT_RX; i++) {
executorService.submit(new Reader());
}
executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.MINUTES);
System.out.println(queueFile.toString());
System.out.println("Total: " + ttl);
queueFile.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* @param args the command line arguments
*/
public static void main(String[] args) {
new TestQueueFile();
}
private ExecutorService executorService = Executors.newFixedThreadPool(CNT_RX + CNT_TX);
private int readerCnt = 0, writerCnt = 0;
private class Writer implements Runnable {
@Override
public void run() {
synchronized (LOCK) {
writerCnt++;
}
Thread.currentThread().setName("writer" + writerCnt);
System.out.printf("%10s started.\n", Thread.currentThread().getName());
try {
for (int i = 0; i < CNT_SD; i++) {
synchronized (LOCK) {
queueFile.add((i + " " + Thread.currentThread().getName() + " text\n").getBytes());
}
Thread.sleep(1);
}
} catch (Exception ex) {
ex.printStackTrace();
}
synchronized (LOCK) {
writerCnt--;
}
System.out.printf("%10s finished!\n", Thread.currentThread().getName());
}
}
private class Reader implements Runnable {
@Override
public void run() {
synchronized (LOCK) {
readerCnt++;
}
Thread.currentThread().setName("reader" + readerCnt);
System.out.printf("%10s started.\n", Thread.currentThread().getName());
int cnt0 = 0, cnt1 = 0;
try {
boolean empty = false;
/*while*/
for (int i = 0; i < CNT_SD * 1_000_000/*:D*/ && (writerCnt > 0 || !empty); i++) {
synchronized (LOCK) {
empty = queueFile.isEmpty();
if (!empty) {
cnt1++;
String s = new String(queueFile.pool());
//int x = Integer.parseInt(s.split(" ")[0].trim());
//System.out.println(Thread.currentThread().getName() + " " + s);
}
}
//Thread.sleep((long) (Math.random() * 100));
if (empty) {
cnt0++;
Thread.sleep(2);
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
synchronized (LOCK) {
ttl += cnt1;
readerCnt--;
}
System.out.printf("%10s finished! idle =%4s, data =%4s\n", Thread.currentThread().getName(), cnt0, cnt1);
}
}
}
QueueFile - покоцал 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35. 36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53. 54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71. 72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89. 90. 91. 92. 93. 94. 95. 96. 97. 98. 99. 100. 101. 102. 103. 104. 105. 106. 107. 108. 109. 110. 111. 112. 113. 114. 115. 116. 117. 118. 119. 120. 121. 122. 123. 124. 125. 126. 127. 128. 129. 130. 131. 132. 133. 134. 135. 136. 137. 138. 139. 140. 141. 142. 143. 144. 145. 146. 147. 148. 149. 150. 151. 152. 153. 154. 155. 156. 157. 158. 159. 160. 161. 162. 163. 164. 165. 166. 167. 168. 169. 170. 171. 172. 173. 174. 175. 176. 177. 178. 179. 180. 181. 182. 183. 184. 185. 186. 187. 188. 189. 190. 191. 192. 193. 194. 195. 196. 197. 198. 199. 200. 201. 202. 203. 204. 205. 206. 207. 208. 209. 210. 211. 212. 213. 214. 215. 216. 217. 218. 219. 220. 221. 222. 223. 224. 225. 226. 227. 228. 229. 230. 231. 232. 233. 234. 235. 236. 237. 238. 239. 240. 241. 242. 243. 244. 245. 246. 247. 248. 249. 250. 251. 252. 253. 254. 255. 256. 257. 258. 259. 260. 261. 262. 263. 264. 265. 266. 267. 268. 269. 270. 271. 272. 273. 274. 275. 276. 277. 278. 279. 280. 281. 282. 283. 284. 285. 286. 287. 288. 289. 290. 291. 292. 293. 294. 295. 296. 297. 298. 299. 300. 301. 302. 303. 304. 305. 306. 307. 308. 309. 310. 311. 312. 313. 314. 315. 316. 317. 318. 319. 320. 321. 322. 323. 324. 325. 326. 327. 328. 329. 330. 331. 332. 333. 334. 335. 336. 337. 338. 339. 340. 341. 342. 343. 344. 345. 346. 347. 348. 349. 350. 351. 352. 353. 354. 355. 356. 357. 358. 359. 360. 361. 362. 363. 364. 365. 366. 367. 368. 369. 370. 371. 372. 373. 374. 375. 376. 377. 378. 379. 380. 381. 382. 383. 384. 385. 386. 387. 388. 389. 390. 391. 392. 393. 394. 395. 396. 397. 398. 399. 400. 401. 402. 403. 404. 405. 406. 407. 408. 409. 410. 411. 412. 413. 414. 415. 416. 417. 418. 419. 420. 421. 422. 423. 424. 425. 426. 427. 428. 429. 430. 431. 432. 433. 434. 435. 436. 437. 438. 439. 440. 441. 442. 443. 444. 445. 446. 447. 448. 449. 450. 451. 452. 453. 454. 455. 456. 457. 458. 459. 460. 461. 462. 463. 464. 465. 466. 467. 468. 469. 470. 471. 472. 473. 474. 475. 476. 477. 478. 479. 480. 481. 482. 483. 484. 485. 486. 487. 488. 489. 490. 491. 492. 493. 494. 495. 496. 497. 498. 499. 500. 501. 502. 503. 504. 505. 506. 507. 508. 509. 510. 511. 512. 513. 514. 515. 516. 517. 518. 519. 520. 521. 522. 523. 524. 525. 526. 527. 528. 529. 530. 531. 532. 533. 534. 535. 536. 537. 538. 539. 540. 541. 542. 543. 544. 545. 546. 547. 548. 549. 550. 551. 552. 553. 554. 555. 556. 557. 558. 559. 560. 561. 562. 563. 564. 565. 566. 567. 568. 569. 570. 571. 572. 573. 574. 575. 576. 577. 578. 579. 580. 581. 582. 583. 584. 585. 586. 587. 588. 589. 590. 591. 592. 593. 594. 595. 596. 597. 598. 599. 600. 601. 602. 603. 604. 605. 606. 607. 608. 609. 610. 611. 612. 613. 614. 615. 616. 617. 618. 619. 620. 621. 622. 623. 624. 625. 626. 627. 628. 629. 630. 631. 632. 633. 634. 635. 636. 637. 638. 639. 640. 641. 642. 643. 644. 645. 646. 647. 648. 649. 650. 651. 652. 653. 654. 655. 656. 657. 658. 659. 660. 661. 662. 663. 664. 665. 666. 667. 668. 669. 670. 671. 672. 673. 674. 675. 676. 677. 678. 679. 680. 681. 682. 683. 684. 685. 686. 687. 688. 689. 690. 691. 692. 693. 694. 695. 696. 697. 698. 699. 700. 701. 702. 703. 704. 705. 706. 707. 708. 709. 710. 711. 712. 713. 714. 715. 716. 717. 718. 719. 720. 721. 722. 723. 724. 725. 726. 727. 728. 729. 730. 731. 732. 733. 734. 735. 736. 737. 738. 739. 740. 741. 742. 743. 744. 745. 746. 747. 748. 749. 750. 751. 752. 753. 754. 755. 756. 757. 758. 759. 760. 761. 762. 763. 764. 765. 766. 767. 768. 769. 770. 771. 772. 773. 774. 775. 776. 777. 778. 779. 780. 781. 782. 783. 784. 785. 786. 787. 788. 789. 790. 791. 792. 793. 794. 795. 796. 797. 798. 799. 800. 801. 802. 803. 804. 805. 806. 807. 808. 809. 810. 811. 812. 813. 814. 815. 816. 817. 818. 819. 820. 821. 822. 823. 824. 825. 826. 827. 828. 829. 830. 831. 832. 833. 834. 835. 836. 837. 838. 839. 840. 841. 842. 843. 844. 845. 846. 847. 848. 849. 850. 851. 852. 853. 854. 855. 856. 857. 858. 859. 860. 861. 862. 863. 864. 865. 866. 867. 868. 869. 870. 871. 872. 873. 874. 875. 876. 877. 878. 879. 880. 881. 882. 883. 884. 885. 886. 887. 888. 889. 890. 891. 892. 893. 894. 895. 896. 897. 898. 899. 900. 901. 902. 903. 904. 905. 906. 907. 908. 909. 910. 911. 912. 913. 914. 915. 916. 917. 918. 919.
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* Copyright (C) 2010 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
//import javax.annotation.Nullable;
import static java.lang.Math.min;
/**
* A reliable, efficient, file-based, FIFO queue. Additions and removals are
* O(1). All operations are atomic. Writes are synchronous; data will be written
* to disk before an operation returns. The underlying file is structured to
* survive process and even system crashes. If an I/O exception is thrown during
* a mutating change, the change is aborted. It is safe to continue to use a
* {@code QueueFile} instance after an exception.
*
* <p>
* <strong>Note that this implementation is not synchronized.</strong>
*
* <p>
* In a traditional queue, the remove operation returns an element. In this
* queue, {@link #peek} and {@link #remove} are used in conjunction. Use
* {@code peek} to retrieve the first element, and then {@code remove} to remove
* it after successful processing. If the system crashes after {@code peek} and
* during processing, the element will remain in the queue, to be processed when
* the system restarts.
*
* <p>
* <strong>NOTE:</strong> The current implementation is built for file systems
* that support atomic segment writes (like YAFFS). Most conventional file
* systems don't support this; if the power goes out while writing a segment,
* the segment will contain garbage and the file will be corrupt. We'll add
* journaling support so this class can be used with more file systems later.
*
* Construct instances with {@link Builder}.
*
* @author Bob Lee (bob@squareup.com)
*/
public final class QueueFile implements Closeable, Iterable<byte[]> {
/**
* Leading bit set to 1 indicating a versioned header and the version of 1.
*/
private static final int VERSIONED_HEADER = 0x80000001;
/**
* Initial file size in bytes.
*/
//static final int INITIAL_LENGTH = 4096; // one file system block
static final int INITIAL_LENGTH = 4096; // one file system block
/**
* A block of nothing to write over old data.
*/
private static final byte[] ZEROES = new byte[INITIAL_LENGTH];
/**
* The underlying file. Uses a ring buffer to store entries. Designed so
* that a modification isn't committed or visible until we write the header.
* The header is much smaller than a segment. So long as the underlying file
* system supports atomic segment writes, changes to the queue are atomic.
* Storing the file length ensures we can recover from a failed expansion
* (i.e. if setting the file length succeeds but the process dies before the
* data can be copied).
* <p>
* This implementation supports two versions of the on-disk format.
* <pre>
* Format:
* 16-32 bytes Header
* ... Data
*
* Header (32 bytes):
* 1 bit Versioned indicator [0 = legacy (see "Legacy Header"), 1 = versioned]
* 31 bits Version, always 1
* 8 bytes File length
* 4 bytes Element count
* 8 bytes Head element position
* 8 bytes Tail element position
*
* Legacy Header (16 bytes):
* 1 bit Legacy indicator, always 0 (see "Header")
* 31 bits File length
* 4 bytes Element count
* 4 bytes Head element position
* 4 bytes Tail element position
*
* Element:
* 4 bytes Data length
* ... Data
* </pre>
*/
final RandomAccessFile raf;
/**
* Keep file around for error reporting.
*/
final File file;
/**
* True when using the versioned header format. Otherwise use the legacy
* format.
*/
final boolean versioned;
/**
* The header length in bytes: 16 or 32.
*/
final int headerLength;
/**
* Cached file length. Always a power of 2.
*/
long fileLength;
/**
* Number of elements.
*/
private int elementCount;
/**
* Pointer to first (or eldest) element.
*/
private Element first;
/**
* Pointer to last (or newest) element.
*/
private Element last;
/**
* In-memory buffer. Big enough to hold the header.
*/
private final byte[] buffer = new byte[32];
/**
* The number of times this file has been structurally modified — it is
* incremented during {@link #remove(int)} and
* {@link #add(byte[], int, int)}. Used by {@link ElementIterator} to guard
* against concurrent modification.
*/
private int modCount = 0;
/**
* When true, removing an element will also overwrite data with zero bytes.
*/
private final boolean zero;
private boolean closed;
private static RandomAccessFile initializeFromFile(File file, boolean forceLegacy)
throws IOException {
if (!file.exists()) {
// Use a temp file so we don't leave a partially-initialized file.
File tempFile = new File(file.getPath() + ".tmp");
try (RandomAccessFile raf = open(tempFile)) {
raf.setLength(INITIAL_LENGTH);
raf.seek(0);
if (forceLegacy) {
raf.writeInt(INITIAL_LENGTH);
} else {
raf.writeInt(VERSIONED_HEADER);
raf.writeLong(INITIAL_LENGTH);
}
}
// A rename is atomic.
if (!tempFile.renameTo(file)) {
throw new IOException("Rename failed!");
}
}
return open(file);
}
/**
* Opens a random access file that writes synchronously.
*/
private static RandomAccessFile open(File file) throws FileNotFoundException {
return new RandomAccessFile(file, "rwd");
}
QueueFile(File file, RandomAccessFile raf, boolean zero, boolean forceLegacy) throws IOException {
this.file = file;
this.raf = raf;
this.zero = zero;
raf.seek(0);
raf.readFully(buffer);
versioned = !forceLegacy && (buffer[0] & 0x80) != 0;
long firstOffset;
long lastOffset;
if (versioned) {
headerLength = 32;
int version = readInt(buffer, 0) & 0x7FFFFFFF;
if (version != 1) {
throw new IOException(
"Unable to read version " + version + " format. Supported versions are 1 and legacy.");
}
fileLength = readLong(buffer, 4);
elementCount = readInt(buffer, 12);
firstOffset = readLong(buffer, 16);
lastOffset = readLong(buffer, 24);
} else {
headerLength = 16;
fileLength = readInt(buffer, 0);
elementCount = readInt(buffer, 4);
firstOffset = readInt(buffer, 8);
lastOffset = readInt(buffer, 12);
}
if (fileLength > raf.length()) {
throw new IOException(
"File is truncated. Expected length: " + fileLength + ", Actual length: " + raf.length());
} else if (fileLength <= headerLength) {
throw new IOException(
"File is corrupt; length stored in header (" + fileLength + ") is invalid.");
}
first = readElement(firstOffset);
last = readElement(lastOffset);
}
/**
* Stores an {@code int} in the {@code byte[]}. The behavior is equivalent
* to calling {@link RandomAccessFile#writeInt}.
*/
private static void writeInt(byte[] buffer, int offset, int value) {
buffer[offset] = (byte) (value >> 24);
buffer[offset + 1] = (byte) (value >> 16);
buffer[offset + 2] = (byte) (value >> 8);
buffer[offset + 3] = (byte) value;
}
/**
* Reads an {@code int} from the {@code byte[]}.
*/
private static int readInt(byte[] buffer, int offset) {
return ((buffer[offset] & 0xff) << 24)
+ ((buffer[offset + 1] & 0xff) << 16)
+ ((buffer[offset + 2] & 0xff) << 8)
+ (buffer[offset + 3] & 0xff);
}
/**
* Stores an {@code long} in the {@code byte[]}. The behavior is equivalent
* to calling {@link RandomAccessFile#writeLong}.
*/
private static void writeLong(byte[] buffer, int offset, long value) {
buffer[offset] = (byte) (value >> 56);
buffer[offset + 1] = (byte) (value >> 48);
buffer[offset + 2] = (byte) (value >> 40);
buffer[offset + 3] = (byte) (value >> 32);
buffer[offset + 4] = (byte) (value >> 24);
buffer[offset + 5] = (byte) (value >> 16);
buffer[offset + 6] = (byte) (value >> 8);
buffer[offset + 7] = (byte) value;
}
/**
* Reads an {@code long} from the {@code byte[]}.
*/
private static long readLong(byte[] buffer, int offset) {
return ((buffer[offset] & 0xffL) << 56)
+ ((buffer[offset + 1] & 0xffL) << 48)
+ ((buffer[offset + 2] & 0xffL) << 40)
+ ((buffer[offset + 3] & 0xffL) << 32)
+ ((buffer[offset + 4] & 0xffL) << 24)
+ ((buffer[offset + 5] & 0xffL) << 16)
+ ((buffer[offset + 6] & 0xffL) << 8)
+ (buffer[offset + 7] & 0xffL);
}
/**
* Writes header atomically. The arguments contain the updated values. The
* class member fields should not have changed yet. This only updates the
* state in the file. It's up to the caller to update the class member
* variables *after* this call succeeds. Assumes segment writes are atomic
* in the underlying file system.
*/
private void writeHeader(long fileLength, int elementCount, long firstPosition, long lastPosition) throws IOException {
raf.seek(0);
if (versioned) {
writeInt(buffer, 0, VERSIONED_HEADER);
writeLong(buffer, 4, fileLength);
writeInt(buffer, 12, elementCount);
writeLong(buffer, 16, firstPosition);
writeLong(buffer, 24, lastPosition);
raf.write(buffer, 0, 32);
return;
}
// Legacy queue header.
writeInt(buffer, 0, (int) fileLength); // Signed, so leading bit is always 0 aka legacy.
writeInt(buffer, 4, elementCount);
writeInt(buffer, 8, (int) firstPosition);
writeInt(buffer, 12, (int) lastPosition);
raf.write(buffer, 0, 16);
}
private Element readElement(long position) throws IOException {
if (position == 0) {
return Element.NULL;
}
ringRead(position, buffer, 0, Element.HEADER_LENGTH);
int length = readInt(buffer, 0);
return new Element(position, length);
}
/**
* Wraps the position if it exceeds the end of the file.
*/
private long wrapPosition(long position) {
return position < fileLength ? position
: headerLength + position - fileLength;
}
/**
* Writes count bytes from buffer to position in file. Automatically wraps
* write if position is past the end of the file or if buffer overlaps it.
*
* @param position in file to write to
* @param buffer to write from
* @param count # of bytes to write
*/
private void ringWrite(long position, byte[] buffer, int offset, int count) throws IOException {
position = wrapPosition(position);
if (position + count <= fileLength) {
raf.seek(position);
raf.write(buffer, offset, count);
} else {
// The write overlaps the EOF.
// # of bytes to write before the EOF. Guaranteed to be less than Integer.MAX_VALUE.
int beforeEof = (int) (fileLength - position);
raf.seek(position);
raf.write(buffer, offset, beforeEof);
raf.seek(headerLength);
raf.write(buffer, offset + beforeEof, count - beforeEof);
}
}
private void ringErase(long position, long length) throws IOException {
while (length > 0) {
int chunk = (int) min(length, ZEROES.length);
ringWrite(position, ZEROES, 0, chunk);
length -= chunk;
position += chunk;
}
}
/**
* Reads count bytes into buffer from file. Wraps if necessary.
*
* @param position in file to read from
* @param buffer to read into
* @param count # of bytes to read
*/
private void ringRead(long position, byte[] buffer, int offset, int count) throws IOException {
position = wrapPosition(position);
if (position + count <= fileLength) {
raf.seek(position);
raf.readFully(buffer, offset, count);
} else {
// The read overlaps the EOF.
// # of bytes to read before the EOF. Guaranteed to be less than Integer.MAX_VALUE.
int beforeEof = (int) (fileLength - position);
raf.seek(position);
raf.readFully(buffer, offset, beforeEof);
raf.seek(headerLength);
raf.readFully(buffer, offset + beforeEof, count - beforeEof);
}
}
/**
* Adds an element to the end of the queue.
*
* @param data to copy bytes from
*/
public void add(byte[] data) throws IOException {
add(data, 0, data.length);
}
/**
* Adds an element to the end of the queue.
*
* @param data to copy bytes from
* @param offset to start from in buffer
* @param count number of bytes to copy
* @throws IndexOutOfBoundsException if {@code offset < 0} or
* {@code count < 0}, or if {@code
* offset + count} is bigger than the length of {@code buffer}.
*/
public void add(byte[] data, int offset, int count) throws IOException {
if (data == null) {
throw new NullPointerException("data == null");
}
if ((offset | count) < 0 || count > data.length - offset) {
throw new IndexOutOfBoundsException();
}
if (closed) {
throw new IllegalStateException("closed");
}
expandIfNecessary(count);
// Insert a new element after the current last element.
boolean wasEmpty = isEmpty();
long position = wasEmpty ? headerLength
: wrapPosition(last.position + Element.HEADER_LENGTH + last.length);
Element newLast = new Element(position, count);
// Write length.
writeInt(buffer, 0, count);
ringWrite(newLast.position, buffer, 0, Element.HEADER_LENGTH);
// Write data.
ringWrite(newLast.position + Element.HEADER_LENGTH, data, offset, count);
// Commit the addition. If wasEmpty, first == last.
long firstPosition = wasEmpty ? newLast.position : first.position;
writeHeader(fileLength, elementCount + 1, firstPosition, newLast.position);
last = newLast;
elementCount++;
modCount++;
if (wasEmpty) {
first = last; // first element
}
}
private long usedBytes() {
if (elementCount == 0) {
return headerLength;
}
if (last.position >= first.position) {
// Contiguous queue.
return (last.position - first.position) // all but last entry
+ Element.HEADER_LENGTH + last.length // last entry
+ headerLength;
} else {
// tail < head. The queue wraps.
return last.position // buffer front + header
+ Element.HEADER_LENGTH + last.length // last entry
+ fileLength - first.position; // buffer end
}
}
private long remainingBytes() {
return fileLength - usedBytes();
}
/**
* Returns true if this queue contains no entries.
*/
public boolean isEmpty() {
return elementCount == 0;
}
/**
* If necessary, expands the file to accommodate an additional element of
* the given length.
*
* @param dataLength length of data being added
*/
private void expandIfNecessary(long dataLength) throws IOException {
long elementLength = Element.HEADER_LENGTH + dataLength;
long remainingBytes = remainingBytes();
if (remainingBytes >= elementLength) {
return;
}
// Expand.
long previousLength = fileLength;
long newLength;
// Double the length until we can fit the new data.
do {
remainingBytes += previousLength;
newLength = previousLength << 1;
previousLength = newLength;
} while (remainingBytes < elementLength);
setLength(newLength);
// Calculate the position of the tail end of the data in the ring buffer
long endOfLastElement = wrapPosition(last.position + Element.HEADER_LENGTH + last.length);
long count = 0;
// If the buffer is split, we need to make it contiguous
if (endOfLastElement <= first.position) {
FileChannel channel = raf.getChannel();
channel.position(fileLength); // destination position
count = endOfLastElement - headerLength;
if (channel.transferTo(headerLength, count, channel) != count) {
throw new AssertionError("Copied insufficient number of bytes!");
}
}
// Commit the expansion.
if (last.position < first.position) {
long newLastPosition = fileLength + last.position - headerLength;
writeHeader(newLength, elementCount, first.position, newLastPosition);
last = new Element(newLastPosition, last.length);
} else {
writeHeader(newLength, elementCount, first.position, last.position);
}
fileLength = newLength;
if (zero) {
ringErase(headerLength, count);
}
}
/**
* Sets the length of the file.
*/
private void setLength(long newLength) throws IOException {
// Set new file length (considered metadata) and sync it to storage.
raf.setLength(newLength);
raf.getChannel().force(true);
}
/**
* Reads the eldest element. Returns null if the queue is empty.
*/
public byte[] peek() throws IOException {
if (closed) {
throw new IllegalStateException("closed");
}
if (isEmpty()) {
return null;
}
int length = first.length;
byte[] data = new byte[length];
ringRead(first.position + Element.HEADER_LENGTH, data, 0, length);
return data;
}
public byte[] pool() throws IOException {
byte[] data = peek();
remove();
return data;
}
/**
* Returns an iterator over elements in this QueueFile.
*
* <p>
* The iterator disallows modifications to be made to the QueueFile during
* iteration. Removing elements from the head of the QueueFile is permitted
* during iteration using {@link Iterator#remove()}.
*
* <p>
* The iterator may throw an unchecked {@link IOException} during
* {@link Iterator#next()} or {@link Iterator#remove()}.
*/
@Override
public Iterator<byte[]> iterator() {
return new ElementIterator();
}
private final class ElementIterator implements Iterator<byte[]> {
/**
* Index of element to be returned by subsequent call to next.
*/
int nextElementIndex = 0;
/**
* Position of element to be returned by subsequent call to next.
*/
private long nextElementPosition = first.position;
/**
* The {@link #modCount} value that the iterator believes that the
* backing QueueFile should have. If this expectation is violated, the
* iterator has detected concurrent modification.
*/
int expectedModCount = modCount;
private ElementIterator() {
}
private void checkForComodification() {
if (modCount != expectedModCount) {
throw new ConcurrentModificationException();
}
}
@Override
public boolean hasNext() {
if (closed) {
throw new IllegalStateException("closed");
}
checkForComodification();
return nextElementIndex != elementCount;
}
@Override
public byte[] next() {
if (closed) {
throw new IllegalStateException("closed");
}
checkForComodification();
if (isEmpty()) {
throw new NoSuchElementException();
}
if (nextElementIndex >= elementCount) {
throw new NoSuchElementException();
}
try {
// Read the current element.
Element current = readElement(nextElementPosition);
byte[] buffer = new byte[current.length];
nextElementPosition = wrapPosition(current.position + Element.HEADER_LENGTH);
ringRead(nextElementPosition, buffer, 0, current.length);
// Update the pointer to the next element.
nextElementPosition
= wrapPosition(current.position + Element.HEADER_LENGTH + current.length);
nextElementIndex++;
// Return the read element.
return buffer;
} catch (IOException e) {
throw QueueFile.<Error>getSneakyThrowable(e);
}
}
@Override
public void remove() {
checkForComodification();
if (isEmpty()) {
throw new NoSuchElementException();
}
if (nextElementIndex != 1) {
throw new UnsupportedOperationException("Removal is only permitted from the head.");
}
try {
QueueFile.this.remove();
} catch (IOException e) {
throw QueueFile.<Error>getSneakyThrowable(e);
}
expectedModCount = modCount;
nextElementIndex--;
}
}
/**
* Returns the number of elements in this queue.
*/
public int size() {
return elementCount;
}
/**
* Removes the eldest element.
*
* @throws NoSuchElementException if the queue is empty
*/
public void remove() throws IOException {
remove(1);
}
/**
* Removes the eldest {@code n} elements.
*
* @throws NoSuchElementException if the queue is empty
*/
public void remove(int n) throws IOException {
if (n < 0) {
throw new IllegalArgumentException("Cannot remove negative (" + n + ") number of elements.");
}
if (n == 0) {
return;
}
if (n == elementCount) {
clear();
return;
}
if (isEmpty()) {
throw new NoSuchElementException();
}
if (n > elementCount) {
throw new IllegalArgumentException(
"Cannot remove more elements (" + n + ") than present in queue (" + elementCount + ").");
}
long eraseStartPosition = first.position;
long eraseTotalLength = 0;
// Read the position and length of the new first element.
long newFirstPosition = first.position;
int newFirstLength = first.length;
for (int i = 0; i < n; i++) {
eraseTotalLength += Element.HEADER_LENGTH + newFirstLength;
newFirstPosition = wrapPosition(newFirstPosition + Element.HEADER_LENGTH + newFirstLength);
ringRead(newFirstPosition, buffer, 0, Element.HEADER_LENGTH);
newFirstLength = readInt(buffer, 0);
}
// Commit the header.
writeHeader(fileLength, elementCount - n, newFirstPosition, last.position);
elementCount -= n;
modCount++;
first = new Element(newFirstPosition, newFirstLength);
if (zero) {
ringErase(eraseStartPosition, eraseTotalLength);
}
}
/**
* Clears this queue. Truncates the file to the initial size.
*/
public void clear() throws IOException {
if (closed) {
throw new IllegalStateException("closed");
}
// Commit the header.
writeHeader(INITIAL_LENGTH, 0, 0, 0);
if (zero) {
// Zero out data.
raf.seek(headerLength);
raf.write(ZEROES, 0, INITIAL_LENGTH - headerLength);
}
elementCount = 0;
first = Element.NULL;
last = Element.NULL;
if (fileLength > INITIAL_LENGTH) {
setLength(INITIAL_LENGTH);
}
fileLength = INITIAL_LENGTH;
modCount++;
}
/**
* The underlying {@link File} backing this queue.
*/
public File file() {
return file;
}
@Override
public void close() throws IOException {
try (raf) {
writeHeader(fileLength, elementCount, first.position, last.position);
closed = true;
}
}
@Override
public String toString() {
try {
writeHeader(fileLength, elementCount, first.position, last.position);
} catch (IOException ex) {
ex.printStackTrace();
}
return "QueueFile{"
+ "file=" + file
+ ", zero=" + zero
+ ", versioned=" + versioned
+ ", length=" + fileLength
+ ", size=" + elementCount
+ ", first=" + first
+ ", last=" + last
+ '}';
}
/**
* A pointer to an element.
*/
static class Element {
static final Element NULL = new Element(0, 0);
/**
* Length of element header in bytes.
*/
static final int HEADER_LENGTH = 4;
/**
* Position in file.
*/
final long position;
/**
* The length of the data.
*/
final int length;
/**
* Constructs a new element.
*
* @param position within file
* @param length of data
*/
Element(long position, int length) {
this.position = position;
this.length = length;
}
@Override
public String toString() {
return getClass().getSimpleName()
+ "[position=" + position
+ ", length=" + length
+ "]";
}
}
/**
* Fluent API for creating {@link QueueFile} instances.
*/
public static final class Builder {
final File file;
boolean zero = true;
boolean forceLegacy = false;
/**
* Start constructing a new queue backed by the given file.
*/
public Builder(File file) {
if (file == null) {
throw new NullPointerException("file == null");
}
this.file = file;
}
/**
* When true, removing an element will also overwrite data with zero
* bytes.
*
* @return
*/
public Builder zero(boolean zero) {
this.zero = zero;
return this;
}
/**
* When true, only the legacy (Tape 1.x) format will be used.
*/
public Builder forceLegacy(boolean forceLegacy) {
this.forceLegacy = forceLegacy;
return this;
}
/**
* Constructs a new queue backed by the given builder. Only one instance
* should access a given file at a time.
*/
public QueueFile build() throws IOException {
RandomAccessFile raf = initializeFromFile(file, forceLegacy);
QueueFile qf = null;
try {
qf = new QueueFile(file, raf, zero, forceLegacy);
return qf;
} finally {
if (qf == null) {
raf.close();
}
}
}
}
/**
* Use this to throw checked exceptions from iterator methods that do not
* declare that they throw checked exceptions.
*/
@SuppressWarnings({"unchecked", "TypeParameterUnusedInFormals"})
static <T extends Throwable> T getSneakyThrowable(Throwable t) throws T {
throw (T) t;
}
}
|