Загрузка данных из файла в Apache Cassandra
#39388504
Ссылка:
Ссылка на сообщение:
Ссылка с названием темы:
|
|
|
Дедушка, Кассандра - требование научного руководителя.
Не могли бы вы еще подсказать, как в коде прописан путь к файлу с данными. Сама я разобраться в этом так и не смогла.
Код из файла-примера прилагаю. Заранее спасибо. Код 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35. 36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53. 54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71. 72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89. 90. 91. 92. 93. 94. 95. 96. 97. 98. 99. 100. 101. 102. 103. 104. 105. 106. 107. 108. 109. 110. 111. 112. 113. 114. 115. 116. 117. 118. 119. 120. 121. 122. 123. 124. 125. 126. 127. 128. 129. 130. 131. 132. 133. 134. 135. 136. 137. 138. 139. 140. 141. 142. 143. 144. 145. 146. 147. 148. 149. 150. 151. 152. 153. 154. 155. 156. 157. 158. 159. 160. 161. 162. 163. 164.
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package bulkload;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.math.BigDecimal;
import java.net.HttpURLConnection;
import java.net.URL;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.List;
import org.supercsv.io.CsvListReader;
import org.supercsv.prefs.CsvPreference;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
/**
* Usage: java bulkload.BulkLoad
*/
public class BulkLoad
{
public static final String CSV_URL = "http://real-chart.finance.yahoo.com/table.csv?s=%s";
/** Default output directory */
public static final String DEFAULT_OUTPUT_DIR = "./data";
public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd");
/** Keyspace name */
public static final String KEYSPACE = "quote";
/** Table name */
public static final String TABLE = "historical_prices";
/**
* Schema for bulk loading table.
* It is important not to forget adding keyspace name before table name,
* otherwise CQLSSTableWriter throws exception.
*/
public static final String SCHEMA = String.format("CREATE TABLE %s.%s (" +
"ticker ascii, " +
"date timestamp, " +
"open decimal, " +
"high decimal, " +
"low decimal, " +
"close decimal, " +
"volume bigint, " +
"adj_close decimal, " +
"PRIMARY KEY (ticker, date) " +
") WITH CLUSTERING ORDER BY (date DESC)", KEYSPACE, TABLE);
/**
* INSERT statement to bulk load.
* It is like prepared statement. You fill in place holder for each data.
*/
public static final String INSERT_STMT = String.format("INSERT INTO %s.%s (" +
"ticker, date, open, high, low, close, volume, adj_close" +
") VALUES (" +
"?, ?, ?, ?, ?, ?, ?, ?" +
")", KEYSPACE, TABLE);
public static void main(String[] args)
{
if (args.length == 0)
{
System.out.println("usage: java bulkload.BulkLoad <list of ticker symbols>");
return;
}
// magic!
Config.setClientMode(true);
// Create output directory that has keyspace and table name in the path
File outputDir = new File(DEFAULT_OUTPUT_DIR + File.separator + KEYSPACE + File.separator + TABLE);
if (!outputDir.exists() && !outputDir.mkdirs())
{
throw new RuntimeException("Cannot create output directory: " + outputDir);
}
// Prepare SSTable writer
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
// set output directory
builder.inDirectory(outputDir)
// set target schema
.forTable(SCHEMA)
// set CQL statement to put data
.using(INSERT_STMT)
// set partitioner if needed
// default is Murmur3Partitioner so set if you use different one.
.withPartitioner(new Murmur3Partitioner());
CQLSSTableWriter writer = builder.build();
for (String ticker : args)
{
HttpURLConnection conn;
try
{
URL url = new URL(String.format(CSV_URL, ticker));
conn = (HttpURLConnection) url.openConnection();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
try (
BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
CsvListReader csvReader = new CsvListReader(reader, CsvPreference.STANDARD_PREFERENCE)
)
{
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK)
{
System.out.println("Historical data not found for " + ticker);
continue;
}
csvReader.getHeader(true);
// Write to SSTable while reading data
List<String> line;
while ((line = csvReader.read()) != null)
{
// We use Java types here based on
// http://www.datastax.com/drivers/java/2.0/com/datastax/driver/core/DataType.Name.html#asJavaClass%28%29
writer.addRow(ticker,
DATE_FORMAT.parse(line.get(0)),
new BigDecimal(line.get(1)),
new BigDecimal(line.get(2)),
new BigDecimal(line.get(3)),
new BigDecimal(line.get(4)),
Long.parseLong(line.get(5)),
new BigDecimal(line.get(6)));
}
}
catch (InvalidRequestException | ParseException | IOException e)
{
e.printStackTrace();
}
}
try
{
writer.close();
}
catch (IOException ignore) {}
}
}
|
|